adnanhemani commented on code in PR #4707:
URL: https://github.com/apache/polaris/pull/4707#discussion_r3432376930
##########
polaris-core/src/main/java/org/apache/polaris/core/config/FeatureConfiguration.java:
##########
@@ -209,6 +209,82 @@ public static void enforceFeatureEnabledOrThrow(
.defaultValue(List.<String>of())
.buildFeatureConfiguration();
+ //
---------------------------------------------------------------------------
+ // GCS principal attribution via Workload Identity Federation
Review Comment:
Personally, don't think we need this comment given that the description
fields are already quite detailed.
##########
polaris-core/src/main/java/org/apache/polaris/core/config/FeatureConfiguration.java:
##########
@@ -209,6 +209,82 @@ public static void enforceFeatureEnabledOrThrow(
.defaultValue(List.<String>of())
.buildFeatureConfiguration();
+ //
---------------------------------------------------------------------------
+ // GCS principal attribution via Workload Identity Federation
+ //
+ // GCP downscoped credentials have no session-tag mechanism (unlike AWS
STS), and custom audit
+ // headers only reach GCS audit logs if the client forwards them. To
attribute GCS data access
+ // to the Polaris principal for ANY client, credential vending can chain
+ // catalog-signed JWT -> STS token exchange -> per-catalog service-account
impersonation, so the
+ // principal appears in serviceAccountDelegationInfo of every GCS Data
Access audit log entry.
+ //
+ // Attribution must be explicitly enabled via
GCS_PRINCIPAL_ATTRIBUTION_ENABLED. When enabled,
+ // WIF_AUDIENCE, TOKEN_ISSUER, and SIGNING_KEY_FILE are all required;
Polaris will throw at the
+ // first credential-vending attempt if any are missing. Additionally
requires a gcpServiceAccount
+ // on the per-catalog StorageConfiguration.
+ //
---------------------------------------------------------------------------
+
+ public static final FeatureConfiguration<Boolean>
GCS_PRINCIPAL_ATTRIBUTION_ENABLED =
+ PolarisConfiguration.<Boolean>builder()
+ .key("GCS_PRINCIPAL_ATTRIBUTION_ENABLED")
+ .description(
+ "Enables GCS principal attribution via Workload Identity
Federation.\n"
+ + "When true, credential vending chains a catalog-signed JWT
through an STS token\n"
+ + "exchange and service-account impersonation so the Polaris
principal appears in GCS\n"
+ + "Data Access audit logs
(serviceAccountDelegationInfo.principalSubject).\n"
+ + "Requires GCS_PRINCIPAL_ATTRIBUTION_WIF_AUDIENCE,
GCS_PRINCIPAL_ATTRIBUTION_TOKEN_ISSUER,\n"
+ + "and GCS_PRINCIPAL_ATTRIBUTION_SIGNING_KEY_FILE to also be
set;\n"
+ + "a missing required value is a fatal configuration
error.\n"
+ + "Also requires a gcpServiceAccount on the catalog
StorageConfiguration.\n"
+ + "Default: false (attribution disabled).")
+ .defaultValue(false)
+ .buildFeatureConfiguration();
+
+ public static final FeatureConfiguration<String>
GCS_PRINCIPAL_ATTRIBUTION_WIF_AUDIENCE =
+ PolarisConfiguration.<String>builder()
+ .key("GCS_PRINCIPAL_ATTRIBUTION_WIF_AUDIENCE")
+ .description(
+ "Full resource name of the Workload Identity Pool provider used
for GCS principal\n"
+ + "attribution, e.g.\n"
+ +
"//iam.googleapis.com/projects/<num>/locations/global/workloadIdentityPools/<pool>/providers/<provider>.\n"
+ + "Used as both the attribution JWT 'aud' claim and the STS
token-exchange audience.\n"
+ + "Empty (default) disables principal attribution.")
Review Comment:
This description says "Empty (default) disables principal attribution." That
was accurate before `GCS_PRINCIPAL_ATTRIBUTION_ENABLED` was added, but is now
stale. With the explicit enable flag, an empty `WIF_AUDIENCE` while
`ENABLED=true` does not gracefully disable attribution — it triggers a fatal
validation error. `TOKEN_ISSUER` (line ~262) and `SIGNING_KEY_FILE` (line ~272)
carry the same stale description. All three should say something like:
"Required when `GCS_PRINCIPAL_ATTRIBUTION_ENABLED=true`; ignored otherwise."
##########
polaris-core/src/main/java/org/apache/polaris/core/storage/gcp/GcpFederatedCredentialsExchanger.java:
##########
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.core.storage.gcp;
+
+import com.auth0.jwt.JWT;
+import com.auth0.jwt.JWTCreator;
+import com.auth0.jwt.algorithms.Algorithm;
+import com.google.auth.http.HttpTransportFactory;
+import com.google.auth.oauth2.GoogleCredentials;
+import com.google.auth.oauth2.IdentityPoolCredentials;
+import com.google.common.annotations.VisibleForTesting;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.security.KeyFactory;
+import java.security.interfaces.RSAPrivateKey;
+import java.security.spec.PKCS8EncodedKeySpec;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Base64;
+import java.util.Date;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Produces a GCP federated {@link GoogleCredentials} whose identity carries
{@code
+ * <realm>/<principal>}, so that GCS Data Access audit logs attribute access
to the requesting
+ * Polaris principal. This is the GCP counterpart of AWS STS session tags.
+ *
+ * <p>The federated credential is an {@link IdentityPoolCredentials} backed by
a programmatic
+ * subject-token supplier: on each token refresh google-auth invokes the
supplier, which mints a
+ * short-lived RS256 JWT ({@code sub = <realm>/<principal>}, {@code realm}
claim), and exchanges it
+ * at the Workload Identity Pool provider's STS endpoint. The provider maps
{@code google.subject =
+ * assertion.sub} and {@code attribute.realm = assertion.realm}; per-realm
{@code attribute.realm}
+ * IAM bindings then enforce that a realm-A identity can only impersonate
realm-A's service account.
+ * The returned credential is intended to be used as the source for tenant
service-account
+ * impersonation (see {@link GcpCredentialsStorageIntegration}).
+ *
+ * <p>Network note: this performs an STS token exchange against {@code
sts.googleapis.com} in
+ * addition to the existing {@code iamcredentials.googleapis.com} and {@code
storage.googleapis.com}
+ * traffic.
+ */
+public class GcpFederatedCredentialsExchanger {
+
+ static final String STS_TOKEN_URL = "https://sts.googleapis.com/v1/token";
+ static final String SUBJECT_TOKEN_TYPE =
"urn:ietf:params:oauth:token-type:jwt";
+ static final String CLOUD_PLATFORM_SCOPE =
"https://www.googleapis.com/auth/cloud-platform";
+
+ /** Attribution JWTs are single-purpose and short-lived. */
+ static final Duration JWT_LIFETIME = Duration.ofMinutes(5);
+
+ /**
+ * JVM-wide cache of parsed signing keys, keyed by file path. The key file
is a stable pod-mounted
+ * secret; parsing it (disk read + {@link KeyFactory}) once per path
amortizes across vends rather
+ * than re-reading on every credential-cache miss. Key rotation is delivered
by a process restart
+ * (the secret is mounted at startup), which clears this cache.
+ */
+ private static final ConcurrentHashMap<Path, RSAPrivateKey>
SIGNING_KEY_CACHE =
+ new ConcurrentHashMap<>();
+
+ private final String issuer;
+ private final String wifAudience;
+ private final Path signingKeyPath;
+ private final String signingKeyId;
+ private final HttpTransportFactory transportFactory;
+
+ public GcpFederatedCredentialsExchanger(
+ String issuer,
+ String wifAudience,
+ Path signingKeyPath,
+ String signingKeyId,
+ HttpTransportFactory transportFactory) {
+ this.issuer = issuer;
+ this.wifAudience = wifAudience;
+ this.signingKeyPath = signingKeyPath;
+ this.signingKeyId = signingKeyId;
+ this.transportFactory = transportFactory;
+ }
+
+ /**
+ * Builds a federated credential whose subject is {@code
<realm>/<principal>}.
+ *
+ * @param subject the attribution subject, {@code <realm>/<principal>} (see
{@link
+ * GcpAttributionSubjectBuilder})
+ * @param realm the realm identifier, emitted as the {@code realm} claim for
{@code
+ * attribute.realm} mapping
+ * @return federated credentials suitable as the source for tenant-SA
impersonation
+ */
+ public GoogleCredentials federatedCredentials(String subject, String realm) {
+ return IdentityPoolCredentials.newBuilder()
+ .setHttpTransportFactory(transportFactory)
+ .setAudience(wifAudience)
+ .setSubjectTokenType(SUBJECT_TOKEN_TYPE)
+ .setTokenUrl(STS_TOKEN_URL)
+ .setScopes(List.of(CLOUD_PLATFORM_SCOPE))
+ .setSubjectTokenSupplier(context -> mintAttributionJwt(subject, realm))
+ .build();
+ }
+
+ @VisibleForTesting
+ String mintAttributionJwt(String subject, String realm) throws IOException {
+ Instant now = Instant.now();
+ JWTCreator.Builder builder =
+ JWT.create()
+ .withIssuer(issuer)
+ .withSubject(subject)
+ .withAudience(wifAudience)
+ .withClaim("realm", realm)
+ .withIssuedAt(Date.from(now))
+ .withExpiresAt(Date.from(now.plus(JWT_LIFETIME)))
+ .withJWTId(UUID.randomUUID().toString());
+ // Set the kid header so the WIF provider can pick the right public key
from its JWKS during
+ // rotation (when the JWKS holds both the old and new keys). Omitted only
for a single-key JWKS.
+ if (signingKeyId != null && !signingKeyId.isEmpty()) {
+ builder.withKeyId(signingKeyId);
+ }
+ return builder.sign(Algorithm.RSA256(null, loadSigningKey()));
Review Comment:
May I suggest a quick comment here to reduce confusion around the `null`
param here?
```java
// null public key: sign-only; verification is the WIF provider's
responsibility
```
##########
polaris-core/src/main/java/org/apache/polaris/core/storage/gcp/GcpCredentialsStorageIntegration.java:
##########
@@ -123,13 +135,75 @@ public GcpCredentialsStorageIntegration(
GcpStorageConfigurationInfo storageConfig,
RealmConfig realmConfig,
GcpCredentialOps credentialOps) {
+ this(
+ sourceCredentials,
+ transportFactory,
+ cache,
+ storageConfig,
+ realmConfig,
+ credentialOps,
+ resolveAttributionParams(realmConfig));
+ }
+
+ /**
+ * Full constructor accepting pre-resolved attribution params. Downstream
integrations can pass
+ * their own {@link GcpAttributionParams} without depending on {@link
+ * org.apache.polaris.core.config.FeatureConfiguration}.
+ */
+ public GcpCredentialsStorageIntegration(
+ GoogleCredentials sourceCredentials,
+ HttpTransportFactory transportFactory,
+ org.apache.polaris.core.storage.cache.StorageCredentialCache cache,
+ GcpStorageConfigurationInfo storageConfig,
+ RealmConfig realmConfig,
+ GcpCredentialOps credentialOps,
+ Optional<GcpAttributionParams> attributionParams) {
super(cache, realmConfig, storageConfig);
// Needed for when environment variable GOOGLE_APPLICATION_CREDENTIALS
points to google service
// account key json
this.sourceCredentials =
sourceCredentials.createScoped("https://www.googleapis.com/auth/cloud-platform");
this.transportFactory = transportFactory;
this.credentialOps = credentialOps;
+ this.attributionParams = attributionParams;
+ }
+
+ /**
+ * Resolves {@link GcpAttributionParams} from realm config. Returns empty
when attribution is
+ * disabled; throws {@link IllegalStateException} when enabled but
misconfigured.
+ */
+ public static Optional<GcpAttributionParams>
resolveAttributionParams(RealmConfig realmConfig) {
Review Comment:
Not sure why this is a `static` function?
##########
polaris-core/src/main/java/org/apache/polaris/core/storage/gcp/GcpAttributionSubjectBuilder.java:
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.core.storage.gcp;
+
+/**
+ * Builds the {@code sub} claim for GCS principal-attribution JWTs as {@code
<realm>/<principal>},
+ * within GCP's 127-character {@code google.subject} limit.
+ *
+ * <p>The character budget mirrors the AWS session-name builder: one character
is reserved for the
+ * separator, then each field receives an equal share of the remainder, and
budget unused by a short
+ * field flows to the other. ISO control characters and the {@code /}
separator are stripped from
+ * each field so the subject stays unambiguously parseable, and the {@code
unknown} placeholder
+ * substitutes null/empty fields so the subject shape stays stable.
+ */
+public final class GcpAttributionSubjectBuilder {
+
+ /** GCP limit for the {@code google.subject} attribute of a federated
identity. */
+ public static final int MAX_SUBJECT_LENGTH = 127;
+
+ static final String SEPARATOR = "/";
+
+ static final String VALUE_UNKNOWN = "unknown";
+
+ private GcpAttributionSubjectBuilder() {}
+
+ /**
+ * Builds the attribution subject {@code <realm>/<principal>}, guaranteed to
be at most {@value
+ * #MAX_SUBJECT_LENGTH} characters.
+ *
+ * @param realm the realm identifier (gets first-half budget priority)
+ * @param principalName the Polaris principal name
+ * @return the subject string
+ */
+ public static String buildSubject(String realm, String principalName) {
+ String cleanRealm = sanitize(realm);
+ String cleanPrincipal = sanitize(principalName);
+
+ int budget = MAX_SUBJECT_LENGTH - SEPARATOR.length();
+ int remaining = budget;
+
+ int realmAlloc = remaining / 2;
+ int realmUsed = Math.min(cleanRealm.length(), realmAlloc);
+ remaining -= realmUsed;
+
+ int principalUsed = Math.min(cleanPrincipal.length(), remaining);
+ remaining -= principalUsed;
+
+ // Carry-forward: if the principal left budget unused, the realm may take
more than its
+ // initial half-share.
+ int realmFinal = Math.min(cleanRealm.length(), realmUsed + remaining);
Review Comment:
nit: I think the math goes both ways (principal OR realm can take over
budget of the opposite). Might want to change the comment to reflect that
##########
polaris-core/src/main/java/org/apache/polaris/core/storage/gcp/GcpCredentialsStorageIntegration.java:
##########
@@ -176,17 +250,30 @@ private GcpStorageCredentialCacheKey buildCacheKey(
@NonNull Set<String> writeLocations,
@NonNull Optional<String> refreshEndpoint,
@NonNull CredentialVendingContext context) {
+ // Principal attribution makes the vended token per-principal, so the
principal must
+ // participate in cache identity; otherwise it is left empty to preserve
cross-principal cache
+ // reuse. Attribution requires a service account to impersonate and a
principal to attribute.
+ String principalName = "";
+ Optional<GcpAttributionParams> resolvedAttributionParams =
Optional.empty();
+ if (attributionParams.isPresent() &&
storageConfig().getGcpServiceAccount() != null) {
+ principalName = context.principalName().orElse("");
+ if (!principalName.isEmpty()) {
Review Comment:
I don't think it's possible to have an empty principalName here. I agree
with the defensive coding style, but maybe we should just throw an exception
here (or send a WARN log line at the minimum) rather than just acting like
nothing is wrong.
##########
polaris-core/src/main/java/org/apache/polaris/core/storage/gcp/GcpFederatedCredentialsExchanger.java:
##########
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.core.storage.gcp;
+
+import com.auth0.jwt.JWT;
+import com.auth0.jwt.JWTCreator;
+import com.auth0.jwt.algorithms.Algorithm;
+import com.google.auth.http.HttpTransportFactory;
+import com.google.auth.oauth2.GoogleCredentials;
+import com.google.auth.oauth2.IdentityPoolCredentials;
+import com.google.common.annotations.VisibleForTesting;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.security.KeyFactory;
+import java.security.interfaces.RSAPrivateKey;
+import java.security.spec.PKCS8EncodedKeySpec;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Base64;
+import java.util.Date;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Produces a GCP federated {@link GoogleCredentials} whose identity carries
{@code
+ * <realm>/<principal>}, so that GCS Data Access audit logs attribute access
to the requesting
+ * Polaris principal. This is the GCP counterpart of AWS STS session tags.
+ *
+ * <p>The federated credential is an {@link IdentityPoolCredentials} backed by
a programmatic
+ * subject-token supplier: on each token refresh google-auth invokes the
supplier, which mints a
+ * short-lived RS256 JWT ({@code sub = <realm>/<principal>}, {@code realm}
claim), and exchanges it
+ * at the Workload Identity Pool provider's STS endpoint. The provider maps
{@code google.subject =
+ * assertion.sub} and {@code attribute.realm = assertion.realm}; per-realm
{@code attribute.realm}
+ * IAM bindings then enforce that a realm-A identity can only impersonate
realm-A's service account.
+ * The returned credential is intended to be used as the source for tenant
service-account
+ * impersonation (see {@link GcpCredentialsStorageIntegration}).
+ *
+ * <p>Network note: this performs an STS token exchange against {@code
sts.googleapis.com} in
+ * addition to the existing {@code iamcredentials.googleapis.com} and {@code
storage.googleapis.com}
+ * traffic.
+ */
+public class GcpFederatedCredentialsExchanger {
+
+ static final String STS_TOKEN_URL = "https://sts.googleapis.com/v1/token";
+ static final String SUBJECT_TOKEN_TYPE =
"urn:ietf:params:oauth:token-type:jwt";
+ static final String CLOUD_PLATFORM_SCOPE =
"https://www.googleapis.com/auth/cloud-platform";
+
+ /** Attribution JWTs are single-purpose and short-lived. */
+ static final Duration JWT_LIFETIME = Duration.ofMinutes(5);
+
+ /**
+ * JVM-wide cache of parsed signing keys, keyed by file path. The key file
is a stable pod-mounted
+ * secret; parsing it (disk read + {@link KeyFactory}) once per path
amortizes across vends rather
+ * than re-reading on every credential-cache miss. Key rotation is delivered
by a process restart
+ * (the secret is mounted at startup), which clears this cache.
+ */
+ private static final ConcurrentHashMap<Path, RSAPrivateKey>
SIGNING_KEY_CACHE =
+ new ConcurrentHashMap<>();
+
+ private final String issuer;
+ private final String wifAudience;
+ private final Path signingKeyPath;
+ private final String signingKeyId;
+ private final HttpTransportFactory transportFactory;
+
+ public GcpFederatedCredentialsExchanger(
+ String issuer,
+ String wifAudience,
+ Path signingKeyPath,
+ String signingKeyId,
+ HttpTransportFactory transportFactory) {
+ this.issuer = issuer;
+ this.wifAudience = wifAudience;
+ this.signingKeyPath = signingKeyPath;
+ this.signingKeyId = signingKeyId;
+ this.transportFactory = transportFactory;
+ }
+
+ /**
+ * Builds a federated credential whose subject is {@code
<realm>/<principal>}.
+ *
+ * @param subject the attribution subject, {@code <realm>/<principal>} (see
{@link
+ * GcpAttributionSubjectBuilder})
+ * @param realm the realm identifier, emitted as the {@code realm} claim for
{@code
+ * attribute.realm} mapping
+ * @return federated credentials suitable as the source for tenant-SA
impersonation
+ */
+ public GoogleCredentials federatedCredentials(String subject, String realm) {
+ return IdentityPoolCredentials.newBuilder()
+ .setHttpTransportFactory(transportFactory)
+ .setAudience(wifAudience)
+ .setSubjectTokenType(SUBJECT_TOKEN_TYPE)
+ .setTokenUrl(STS_TOKEN_URL)
+ .setScopes(List.of(CLOUD_PLATFORM_SCOPE))
+ .setSubjectTokenSupplier(context -> mintAttributionJwt(subject, realm))
+ .build();
+ }
+
+ @VisibleForTesting
+ String mintAttributionJwt(String subject, String realm) throws IOException {
+ Instant now = Instant.now();
+ JWTCreator.Builder builder =
+ JWT.create()
+ .withIssuer(issuer)
+ .withSubject(subject)
+ .withAudience(wifAudience)
+ .withClaim("realm", realm)
+ .withIssuedAt(Date.from(now))
+ .withExpiresAt(Date.from(now.plus(JWT_LIFETIME)))
+ .withJWTId(UUID.randomUUID().toString());
+ // Set the kid header so the WIF provider can pick the right public key
from its JWKS during
+ // rotation (when the JWKS holds both the old and new keys). Omitted only
for a single-key JWKS.
+ if (signingKeyId != null && !signingKeyId.isEmpty()) {
+ builder.withKeyId(signingKeyId);
+ }
+ return builder.sign(Algorithm.RSA256(null, loadSigningKey()));
+ }
+
+ private RSAPrivateKey loadSigningKey() throws IOException {
+ try {
+ return SIGNING_KEY_CACHE.computeIfAbsent(
+ signingKeyPath,
+ path -> {
+ try {
+ return readPkcs8PrivateKey(path);
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ });
+ } catch (UncheckedIOException e) {
+ throw e.getCause();
+ }
+ }
+
+ /** Reads an RSA private key from a PKCS#8 PEM file. */
+ @VisibleForTesting
+ static RSAPrivateKey readPkcs8PrivateKey(Path pemPath) throws IOException {
+ String pem = Files.readString(pemPath);
+ String base64 =
+ pem.replaceAll("-----BEGIN [A-Z ]+-----", "")
Review Comment:
The regex strips any PEM header (`BEGIN [A-Z ]+`), so it silently accepts
both `BEGIN PRIVATE KEY` (PKCS#8, correct) and `BEGIN RSA PRIVATE KEY` (PKCS#1,
wrong). If an operator provides the default `openssl genrsa` output, the header
is stripped cleanly but `PKCS8EncodedKeySpec` then fails on the raw DER with a
generic `InvalidKeySpecException` that gives no hint about the format mismatch.
Worth adding an explicit check on the header string and throwing a descriptive
error if it isn't `BEGIN PRIVATE KEY`.
##########
polaris-core/src/main/java/org/apache/polaris/core/storage/gcp/GcpCredentialsStorageIntegration.java:
##########
@@ -246,6 +333,40 @@ static StorageAccessConfig
compute(GcpStorageCredentialCacheKey key) {
return accessConfig.build();
}
+ /**
+ * Returns the credential to be used as the source for downscoping.
+ *
+ * <p>When GCS principal attribution is configured and a principal is
present (so the cache key
+ * carries pre-computed {@link GcpAttributionParams}), the impersonation
source is a federated
+ * identity whose subject is {@code <realm>/<principal>}, which surfaces the
principal in {@code
+ * serviceAccountDelegationInfo} of GCS Data Access audit logs. Otherwise
this is the standard
+ * path: impersonate the configured service account from the ambient source
credentials, or use
+ * those credentials directly.
+ */
+ private static GoogleCredentials baseCredentialsForVending(
Review Comment:
`baseCredentialsForVending` and `getBaseCredentials` (line ~374) are
confusingly similar names that sit ~30 lines apart. A reader trying to
understand the non-attribution path has to pause and confirm which one applies.
`baseCredentialsForVending` could be named `resolveSourceCredentials` or
`attributedSourceCredentials` to make the dispatch purpose immediately clear.
##########
polaris-core/src/main/java/org/apache/polaris/core/storage/gcp/GcpFederatedCredentialsExchanger.java:
##########
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.core.storage.gcp;
+
+import com.auth0.jwt.JWT;
+import com.auth0.jwt.JWTCreator;
+import com.auth0.jwt.algorithms.Algorithm;
+import com.google.auth.http.HttpTransportFactory;
+import com.google.auth.oauth2.GoogleCredentials;
+import com.google.auth.oauth2.IdentityPoolCredentials;
+import com.google.common.annotations.VisibleForTesting;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.security.KeyFactory;
+import java.security.interfaces.RSAPrivateKey;
+import java.security.spec.PKCS8EncodedKeySpec;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Base64;
+import java.util.Date;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Produces a GCP federated {@link GoogleCredentials} whose identity carries
{@code
+ * <realm>/<principal>}, so that GCS Data Access audit logs attribute access
to the requesting
+ * Polaris principal. This is the GCP counterpart of AWS STS session tags.
+ *
+ * <p>The federated credential is an {@link IdentityPoolCredentials} backed by
a programmatic
+ * subject-token supplier: on each token refresh google-auth invokes the
supplier, which mints a
+ * short-lived RS256 JWT ({@code sub = <realm>/<principal>}, {@code realm}
claim), and exchanges it
+ * at the Workload Identity Pool provider's STS endpoint. The provider maps
{@code google.subject =
+ * assertion.sub} and {@code attribute.realm = assertion.realm}; per-realm
{@code attribute.realm}
+ * IAM bindings then enforce that a realm-A identity can only impersonate
realm-A's service account.
+ * The returned credential is intended to be used as the source for tenant
service-account
+ * impersonation (see {@link GcpCredentialsStorageIntegration}).
+ *
+ * <p>Network note: this performs an STS token exchange against {@code
sts.googleapis.com} in
+ * addition to the existing {@code iamcredentials.googleapis.com} and {@code
storage.googleapis.com}
+ * traffic.
+ */
+public class GcpFederatedCredentialsExchanger {
+
+ static final String STS_TOKEN_URL = "https://sts.googleapis.com/v1/token";
+ static final String SUBJECT_TOKEN_TYPE =
"urn:ietf:params:oauth:token-type:jwt";
+ static final String CLOUD_PLATFORM_SCOPE =
"https://www.googleapis.com/auth/cloud-platform";
+
+ /** Attribution JWTs are single-purpose and short-lived. */
+ static final Duration JWT_LIFETIME = Duration.ofMinutes(5);
+
+ /**
+ * JVM-wide cache of parsed signing keys, keyed by file path. The key file
is a stable pod-mounted
+ * secret; parsing it (disk read + {@link KeyFactory}) once per path
amortizes across vends rather
+ * than re-reading on every credential-cache miss. Key rotation is delivered
by a process restart
+ * (the secret is mounted at startup), which clears this cache.
+ */
+ private static final ConcurrentHashMap<Path, RSAPrivateKey>
SIGNING_KEY_CACHE =
+ new ConcurrentHashMap<>();
Review Comment:
`SIGNING_KEY_CACHE` is an unbounded, never-evicted JVM-wide map. The PR
description documents that key rotation requires a process restart (correct for
a pod-mounted secret), but the map has no size bound. In a deployment where
many realms each have distinct signing-key paths, all keys accumulate for the
process lifetime. Low risk for typical deployments, but worth documenting the
assumption ("one or very few distinct key paths per process") directly on the
field - or using a Guava cache maybe.
Although, I think in practice, it will be very hard to reach a heavy amount
of these stored in memory (I believe RSAPrivateKeys are a few KBs each maximum)
- so maybe we just leave a comment here saying that we're basically bounded by
the amount of realms we support more than anything here.
##########
site/content/in-dev/unreleased/configuration/config-sections/flags-polaris_features.md:
##########
@@ -335,6 +335,51 @@ The maximum weight for the entity cache. This is a
heuristic value without any p
---
+##### `polaris.features."GCS_PRINCIPAL_ATTRIBUTION_ENABLED"`
+
+Enables GCS principal attribution via Workload Identity Federation. When true,
credential vending chains a catalog-signed JWT through an STS token exchange
and service-account impersonation so the Polaris principal appears in GCS Data
Access audit logs (serviceAccountDelegationInfo.principalSubject). Requires
GCS_PRINCIPAL_ATTRIBUTION_WIF_AUDIENCE, GCS_PRINCIPAL_ATTRIBUTION_TOKEN_ISSUER,
and GCS_PRINCIPAL_ATTRIBUTION_SIGNING_KEY_FILE to also be set; a missing
required value is a fatal configuration error. Also requires a
gcpServiceAccount on the catalog StorageConfiguration. Default: false
(attribution disabled).
+
+- **Type:** `Boolean`
+- **Default:** `false`
+
+---
+
+##### `polaris.features."GCS_PRINCIPAL_ATTRIBUTION_SIGNING_KEY_FILE"`
+
+Filesystem path to the PKCS#8 PEM RSA private key used to sign GCS attribution
JWTs (RS256). The corresponding public key must be published in the Workload
Identity Pool provider's uploaded JWKS. Empty (default) disables principal
attribution.
Review Comment:
Same stale description issue as in `FeatureConfiguration.java`: three of the
flag docs (`SIGNING_KEY_FILE` here, `TOKEN_ISSUER` at line ~367, and
`WIF_AUDIENCE` at line ~376) all say "Empty (default) disables principal
attribution." Now that `GCS_PRINCIPAL_ATTRIBUTION_ENABLED` is the master
switch, an empty value with `ENABLED=true` is a fatal misconfiguration, not a
graceful disable. These descriptions should say they are required when
`GCS_PRINCIPAL_ATTRIBUTION_ENABLED=true`.
##########
polaris-core/src/main/java/org/apache/polaris/core/storage/gcp/GcpCredentialsStorageIntegration.java:
##########
@@ -123,13 +135,75 @@ public GcpCredentialsStorageIntegration(
GcpStorageConfigurationInfo storageConfig,
RealmConfig realmConfig,
GcpCredentialOps credentialOps) {
+ this(
+ sourceCredentials,
+ transportFactory,
+ cache,
+ storageConfig,
+ realmConfig,
+ credentialOps,
+ resolveAttributionParams(realmConfig));
+ }
+
+ /**
+ * Full constructor accepting pre-resolved attribution params. Downstream
integrations can pass
+ * their own {@link GcpAttributionParams} without depending on {@link
+ * org.apache.polaris.core.config.FeatureConfiguration}.
+ */
+ public GcpCredentialsStorageIntegration(
+ GoogleCredentials sourceCredentials,
+ HttpTransportFactory transportFactory,
+ org.apache.polaris.core.storage.cache.StorageCredentialCache cache,
Review Comment:
No FQNs please.
##########
polaris-core/src/main/java/org/apache/polaris/core/storage/gcp/GcpCredentialsStorageIntegration.java:
##########
@@ -176,17 +250,30 @@ private GcpStorageCredentialCacheKey buildCacheKey(
@NonNull Set<String> writeLocations,
@NonNull Optional<String> refreshEndpoint,
@NonNull CredentialVendingContext context) {
+ // Principal attribution makes the vended token per-principal, so the
principal must
+ // participate in cache identity; otherwise it is left empty to preserve
cross-principal cache
+ // reuse. Attribution requires a service account to impersonate and a
principal to attribute.
+ String principalName = "";
+ Optional<GcpAttributionParams> resolvedAttributionParams =
Optional.empty();
+ if (attributionParams.isPresent() &&
storageConfig().getGcpServiceAccount() != null) {
Review Comment:
When attribution is enabled (`attributionParams.isPresent()`) but
`storageConfig().getGcpServiceAccount()` is null, the code silently falls back
to unattributed credentials with no log line. Would recommend putting a WARN or
DEBUG line here stating that we fell back regardless of the fact that
attribution was enabled
##########
polaris-core/src/main/java/org/apache/polaris/core/storage/gcp/GcpStorageCredentialCacheKey.java:
##########
@@ -55,25 +61,43 @@ public interface GcpStorageCredentialCacheKey extends
StorageCredentialCacheKey
@Value.Parameter(order = 6)
Optional<String> refreshCredentialsEndpoint();
+ /**
+ * The requesting principal name, included in the cache key only when GCS
principal attribution is
+ * enabled (otherwise empty). When attribution is active the vended
credential carries this
+ * principal's identity, so it must participate in cache identity to avoid
serving one principal's
+ * attributed credentials to another.
+ */
+ @Value.Parameter(order = 7)
+ String principalName();
Review Comment:
What about making this Optional<String> to match the convention in the rest
of the file rather than using "" to signal no-principal-name?
##########
polaris-core/src/main/java/org/apache/polaris/core/storage/gcp/GcpCredentialsStorageIntegration.java:
##########
@@ -246,6 +333,40 @@ static StorageAccessConfig
compute(GcpStorageCredentialCacheKey key) {
return accessConfig.build();
}
+ /**
+ * Returns the credential to be used as the source for downscoping.
+ *
+ * <p>When GCS principal attribution is configured and a principal is
present (so the cache key
+ * carries pre-computed {@link GcpAttributionParams}), the impersonation
source is a federated
+ * identity whose subject is {@code <realm>/<principal>}, which surfaces the
principal in {@code
+ * serviceAccountDelegationInfo} of GCS Data Access audit logs. Otherwise
this is the standard
+ * path: impersonate the configured service account from the ambient source
credentials, or use
+ * those credentials directly.
+ */
+ private static GoogleCredentials baseCredentialsForVending(
+ GcpStorageCredentialCacheKey key,
+ GcpStorageConfigurationInfo storageConfig,
+ GoogleCredentials sourceCredentials,
+ GcpCredentialOps credentialOps) {
+ Optional<GcpAttributionParams> attributionParams = key.attributionParams();
+ if (attributionParams.isPresent()) {
+ GcpAttributionParams params = attributionParams.get();
+ String subject =
+ GcpAttributionSubjectBuilder.buildSubject(key.realmId(),
key.principalName());
+ GcpFederatedCredentialsExchanger exchanger =
+ new GcpFederatedCredentialsExchanger(
+ params.tokenIssuer(),
+ params.wifAudience(),
+ Path.of(params.signingKeyFile()),
Review Comment:
`Path.of(params.signingKeyFile())` can throw `InvalidPathException`
(unchecked) if the configured string contains characters that are invalid on
the OS filesystem (e.g. null bytes). `resolveAttributionParams` validates that
the value is non-empty, but not that it is a syntactically valid path. The
exception would surface opaquely during a credential-cache miss rather than at
startup. Maybe worth moving the `Path.of(...)` call into
`resolveAttributionParams` (or whenever we do the initial param validations) so
the error is caught at construction time alongside the other config validation.
##########
polaris-core/src/main/java/org/apache/polaris/core/config/FeatureConfiguration.java:
##########
@@ -209,6 +209,64 @@ public static void enforceFeatureEnabledOrThrow(
.defaultValue(List.<String>of())
.buildFeatureConfiguration();
+ //
---------------------------------------------------------------------------
+ // GCS principal attribution via Workload Identity Federation
+ //
+ // GCP downscoped credentials have no session-tag mechanism (unlike AWS
STS), and custom audit
+ // headers only reach GCS audit logs if the client forwards them. To
attribute GCS data access
+ // to the Polaris principal for ANY client, credential vending can chain
+ // catalog-signed JWT -> STS token exchange -> per-catalog service-account
impersonation, so the
+ // principal appears in serviceAccountDelegationInfo of every GCS Data
Access audit log entry.
+ //
+ // Attribution activates automatically once the audience, issuer, and
signing key file are all
+ // set (no on/off flag); it additionally requires a gcpServiceAccount on the
storage config.
+ //
---------------------------------------------------------------------------
+
+ public static final FeatureConfiguration<String>
GCS_PRINCIPAL_ATTRIBUTION_WIF_AUDIENCE =
Review Comment:
I agree with @obelix74 on keeping this as a realm-level param. I'm not sure
I see a reasonable use case currently to need to control WIF Attribution at a
finer-grain than realm IMO. However, I do find it odd that you're checking if
all the arguments are present at the time of creating a StorageIntegration
rather than at server start time...
Is there a way that a Polaris admin can change the settings for a realm
during a running server instance?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]