obelix74 commented on code in PR #4707: URL: https://github.com/apache/polaris/pull/4707#discussion_r3433034039
########## polaris-core/src/main/java/org/apache/polaris/core/storage/gcp/GcpFederatedCredentialsExchanger.java: ########## @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.core.storage.gcp; + +import com.auth0.jwt.JWT; +import com.auth0.jwt.JWTCreator; +import com.auth0.jwt.algorithms.Algorithm; +import com.google.auth.http.HttpTransportFactory; +import com.google.auth.oauth2.GoogleCredentials; +import com.google.auth.oauth2.IdentityPoolCredentials; +import com.google.common.annotations.VisibleForTesting; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.security.KeyFactory; +import java.security.interfaces.RSAPrivateKey; +import java.security.spec.PKCS8EncodedKeySpec; +import java.time.Duration; +import java.time.Instant; +import java.util.Base64; +import java.util.Date; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; + +/** + * Produces a GCP federated {@link GoogleCredentials} whose identity carries {@code + * <realm>/<principal>}, so that GCS Data Access audit logs attribute access to the requesting + * Polaris principal. This is the GCP counterpart of AWS STS session tags. + * + * <p>The federated credential is an {@link IdentityPoolCredentials} backed by a programmatic + * subject-token supplier: on each token refresh google-auth invokes the supplier, which mints a + * short-lived RS256 JWT ({@code sub = <realm>/<principal>}, {@code realm} claim), and exchanges it + * at the Workload Identity Pool provider's STS endpoint. The provider maps {@code google.subject = + * assertion.sub} and {@code attribute.realm = assertion.realm}; per-realm {@code attribute.realm} + * IAM bindings then enforce that a realm-A identity can only impersonate realm-A's service account. + * The returned credential is intended to be used as the source for tenant service-account + * impersonation (see {@link GcpCredentialsStorageIntegration}). + * + * <p>Network note: this performs an STS token exchange against {@code sts.googleapis.com} in + * addition to the existing {@code iamcredentials.googleapis.com} and {@code storage.googleapis.com} + * traffic. + */ +public class GcpFederatedCredentialsExchanger { + + static final String STS_TOKEN_URL = "https://sts.googleapis.com/v1/token"; + static final String SUBJECT_TOKEN_TYPE = "urn:ietf:params:oauth:token-type:jwt"; + static final String CLOUD_PLATFORM_SCOPE = "https://www.googleapis.com/auth/cloud-platform"; + + /** Attribution JWTs are single-purpose and short-lived. */ + static final Duration JWT_LIFETIME = Duration.ofMinutes(5); + + /** + * JVM-wide cache of parsed signing keys, keyed by file path. The key file is a stable pod-mounted + * secret; parsing it (disk read + {@link KeyFactory}) once per path amortizes across vends rather + * than re-reading on every credential-cache miss. Key rotation is delivered by a process restart + * (the secret is mounted at startup), which clears this cache. + */ + private static final ConcurrentHashMap<Path, RSAPrivateKey> SIGNING_KEY_CACHE = + new ConcurrentHashMap<>(); + + private final String issuer; + private final String wifAudience; + private final Path signingKeyPath; + private final String signingKeyId; + private final HttpTransportFactory transportFactory; + + public GcpFederatedCredentialsExchanger( + String issuer, + String wifAudience, + Path signingKeyPath, + String signingKeyId, + HttpTransportFactory transportFactory) { + this.issuer = issuer; + this.wifAudience = wifAudience; + this.signingKeyPath = signingKeyPath; + this.signingKeyId = signingKeyId; + this.transportFactory = transportFactory; + } + + /** + * Builds a federated credential whose subject is {@code <realm>/<principal>}. + * + * @param subject the attribution subject, {@code <realm>/<principal>} (see {@link + * GcpAttributionSubjectBuilder}) + * @param realm the realm identifier, emitted as the {@code realm} claim for {@code + * attribute.realm} mapping + * @return federated credentials suitable as the source for tenant-SA impersonation + */ + public GoogleCredentials federatedCredentials(String subject, String realm) { + return IdentityPoolCredentials.newBuilder() + .setHttpTransportFactory(transportFactory) + .setAudience(wifAudience) + .setSubjectTokenType(SUBJECT_TOKEN_TYPE) + .setTokenUrl(STS_TOKEN_URL) + .setScopes(List.of(CLOUD_PLATFORM_SCOPE)) + .setSubjectTokenSupplier(context -> mintAttributionJwt(subject, realm)) + .build(); + } + + @VisibleForTesting + String mintAttributionJwt(String subject, String realm) throws IOException { + Instant now = Instant.now(); + JWTCreator.Builder builder = + JWT.create() + .withIssuer(issuer) + .withSubject(subject) + .withAudience(wifAudience) + .withClaim("realm", realm) + .withIssuedAt(Date.from(now)) + .withExpiresAt(Date.from(now.plus(JWT_LIFETIME))) + .withJWTId(UUID.randomUUID().toString()); + // Set the kid header so the WIF provider can pick the right public key from its JWKS during + // rotation (when the JWKS holds both the old and new keys). Omitted only for a single-key JWKS. + if (signingKeyId != null && !signingKeyId.isEmpty()) { + builder.withKeyId(signingKeyId); + } + return builder.sign(Algorithm.RSA256(null, loadSigningKey())); Review Comment: Added exactly that comment: `// null public key: sign-only; verification is the WIF provider's responsibility` ########## polaris-core/src/main/java/org/apache/polaris/core/storage/gcp/GcpFederatedCredentialsExchanger.java: ########## @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.core.storage.gcp; + +import com.auth0.jwt.JWT; +import com.auth0.jwt.JWTCreator; +import com.auth0.jwt.algorithms.Algorithm; +import com.google.auth.http.HttpTransportFactory; +import com.google.auth.oauth2.GoogleCredentials; +import com.google.auth.oauth2.IdentityPoolCredentials; +import com.google.common.annotations.VisibleForTesting; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.security.KeyFactory; +import java.security.interfaces.RSAPrivateKey; +import java.security.spec.PKCS8EncodedKeySpec; +import java.time.Duration; +import java.time.Instant; +import java.util.Base64; +import java.util.Date; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; + +/** + * Produces a GCP federated {@link GoogleCredentials} whose identity carries {@code + * <realm>/<principal>}, so that GCS Data Access audit logs attribute access to the requesting + * Polaris principal. This is the GCP counterpart of AWS STS session tags. + * + * <p>The federated credential is an {@link IdentityPoolCredentials} backed by a programmatic + * subject-token supplier: on each token refresh google-auth invokes the supplier, which mints a + * short-lived RS256 JWT ({@code sub = <realm>/<principal>}, {@code realm} claim), and exchanges it + * at the Workload Identity Pool provider's STS endpoint. The provider maps {@code google.subject = + * assertion.sub} and {@code attribute.realm = assertion.realm}; per-realm {@code attribute.realm} + * IAM bindings then enforce that a realm-A identity can only impersonate realm-A's service account. + * The returned credential is intended to be used as the source for tenant service-account + * impersonation (see {@link GcpCredentialsStorageIntegration}). + * + * <p>Network note: this performs an STS token exchange against {@code sts.googleapis.com} in + * addition to the existing {@code iamcredentials.googleapis.com} and {@code storage.googleapis.com} + * traffic. + */ +public class GcpFederatedCredentialsExchanger { + + static final String STS_TOKEN_URL = "https://sts.googleapis.com/v1/token"; + static final String SUBJECT_TOKEN_TYPE = "urn:ietf:params:oauth:token-type:jwt"; + static final String CLOUD_PLATFORM_SCOPE = "https://www.googleapis.com/auth/cloud-platform"; + + /** Attribution JWTs are single-purpose and short-lived. */ + static final Duration JWT_LIFETIME = Duration.ofMinutes(5); + + /** + * JVM-wide cache of parsed signing keys, keyed by file path. The key file is a stable pod-mounted + * secret; parsing it (disk read + {@link KeyFactory}) once per path amortizes across vends rather + * than re-reading on every credential-cache miss. Key rotation is delivered by a process restart + * (the secret is mounted at startup), which clears this cache. + */ + private static final ConcurrentHashMap<Path, RSAPrivateKey> SIGNING_KEY_CACHE = + new ConcurrentHashMap<>(); + + private final String issuer; + private final String wifAudience; + private final Path signingKeyPath; + private final String signingKeyId; + private final HttpTransportFactory transportFactory; + + public GcpFederatedCredentialsExchanger( + String issuer, + String wifAudience, + Path signingKeyPath, + String signingKeyId, + HttpTransportFactory transportFactory) { + this.issuer = issuer; + this.wifAudience = wifAudience; + this.signingKeyPath = signingKeyPath; + this.signingKeyId = signingKeyId; + this.transportFactory = transportFactory; + } + + /** + * Builds a federated credential whose subject is {@code <realm>/<principal>}. + * + * @param subject the attribution subject, {@code <realm>/<principal>} (see {@link + * GcpAttributionSubjectBuilder}) + * @param realm the realm identifier, emitted as the {@code realm} claim for {@code + * attribute.realm} mapping + * @return federated credentials suitable as the source for tenant-SA impersonation + */ + public GoogleCredentials federatedCredentials(String subject, String realm) { + return IdentityPoolCredentials.newBuilder() + .setHttpTransportFactory(transportFactory) + .setAudience(wifAudience) + .setSubjectTokenType(SUBJECT_TOKEN_TYPE) + .setTokenUrl(STS_TOKEN_URL) + .setScopes(List.of(CLOUD_PLATFORM_SCOPE)) + .setSubjectTokenSupplier(context -> mintAttributionJwt(subject, realm)) + .build(); + } + + @VisibleForTesting + String mintAttributionJwt(String subject, String realm) throws IOException { + Instant now = Instant.now(); + JWTCreator.Builder builder = + JWT.create() + .withIssuer(issuer) + .withSubject(subject) + .withAudience(wifAudience) + .withClaim("realm", realm) + .withIssuedAt(Date.from(now)) + .withExpiresAt(Date.from(now.plus(JWT_LIFETIME))) + .withJWTId(UUID.randomUUID().toString()); + // Set the kid header so the WIF provider can pick the right public key from its JWKS during + // rotation (when the JWKS holds both the old and new keys). Omitted only for a single-key JWKS. + if (signingKeyId != null && !signingKeyId.isEmpty()) { + builder.withKeyId(signingKeyId); + } + return builder.sign(Algorithm.RSA256(null, loadSigningKey())); + } + + private RSAPrivateKey loadSigningKey() throws IOException { + try { + return SIGNING_KEY_CACHE.computeIfAbsent( + signingKeyPath, + path -> { + try { + return readPkcs8PrivateKey(path); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + }); + } catch (UncheckedIOException e) { + throw e.getCause(); + } + } + + /** Reads an RSA private key from a PKCS#8 PEM file. */ + @VisibleForTesting + static RSAPrivateKey readPkcs8PrivateKey(Path pemPath) throws IOException { + String pem = Files.readString(pemPath); + String base64 = + pem.replaceAll("-----BEGIN [A-Z ]+-----", "") Review Comment: Done. Added an explicit check before stripping the PEM headers: if the file contains `BEGIN RSA PRIVATE KEY` we now throw an `IOException` immediately with a message like "PKCS#1 key format (BEGIN RSA PRIVATE KEY) is not supported ... convert to PKCS#8 with: openssl pkcs8 -topk8 -nocrypt -in key.pem -out key-pkcs8.pem". Added a test (`readPkcs8PrivateKeyRejectsPkcs1Format`) to cover this path. ########## polaris-core/src/main/java/org/apache/polaris/core/storage/gcp/GcpCredentialsStorageIntegration.java: ########## @@ -123,13 +135,75 @@ public GcpCredentialsStorageIntegration( GcpStorageConfigurationInfo storageConfig, RealmConfig realmConfig, GcpCredentialOps credentialOps) { + this( + sourceCredentials, + transportFactory, + cache, + storageConfig, + realmConfig, + credentialOps, + resolveAttributionParams(realmConfig)); + } + + /** + * Full constructor accepting pre-resolved attribution params. Downstream integrations can pass + * their own {@link GcpAttributionParams} without depending on {@link + * org.apache.polaris.core.config.FeatureConfiguration}. + */ + public GcpCredentialsStorageIntegration( + GoogleCredentials sourceCredentials, + HttpTransportFactory transportFactory, + org.apache.polaris.core.storage.cache.StorageCredentialCache cache, + GcpStorageConfigurationInfo storageConfig, + RealmConfig realmConfig, + GcpCredentialOps credentialOps, + Optional<GcpAttributionParams> attributionParams) { super(cache, realmConfig, storageConfig); // Needed for when environment variable GOOGLE_APPLICATION_CREDENTIALS points to google service // account key json this.sourceCredentials = sourceCredentials.createScoped("https://www.googleapis.com/auth/cloud-platform"); this.transportFactory = transportFactory; this.credentialOps = credentialOps; + this.attributionParams = attributionParams; + } + + /** + * Resolves {@link GcpAttributionParams} from realm config. Returns empty when attribution is + * disabled; throws {@link IllegalStateException} when enabled but misconfigured. + */ + public static Optional<GcpAttributionParams> resolveAttributionParams(RealmConfig realmConfig) { Review Comment: It's static because it's called in constructor-chaining expressions (`this(..., resolveAttributionParams(realmConfig))`). In Java you cannot call instance methods in a `this(...)` argument list before the object is initialized, so it must be static. Added a Javadoc note to that effect. ########## polaris-core/src/main/java/org/apache/polaris/core/storage/gcp/GcpCredentialsStorageIntegration.java: ########## @@ -176,17 +250,30 @@ private GcpStorageCredentialCacheKey buildCacheKey( @NonNull Set<String> writeLocations, @NonNull Optional<String> refreshEndpoint, @NonNull CredentialVendingContext context) { + // Principal attribution makes the vended token per-principal, so the principal must + // participate in cache identity; otherwise it is left empty to preserve cross-principal cache + // reuse. Attribution requires a service account to impersonate and a principal to attribute. + String principalName = ""; + Optional<GcpAttributionParams> resolvedAttributionParams = Optional.empty(); + if (attributionParams.isPresent() && storageConfig().getGcpServiceAccount() != null) { Review Comment: Added a WARN log: "GCS principal attribution is enabled but no gcpServiceAccount is configured on the StorageConfiguration; falling back to non-attributed credentials". ########## polaris-core/src/main/java/org/apache/polaris/core/storage/gcp/GcpCredentialsStorageIntegration.java: ########## @@ -176,17 +250,30 @@ private GcpStorageCredentialCacheKey buildCacheKey( @NonNull Set<String> writeLocations, @NonNull Optional<String> refreshEndpoint, @NonNull CredentialVendingContext context) { + // Principal attribution makes the vended token per-principal, so the principal must + // participate in cache identity; otherwise it is left empty to preserve cross-principal cache + // reuse. Attribution requires a service account to impersonate and a principal to attribute. + String principalName = ""; + Optional<GcpAttributionParams> resolvedAttributionParams = Optional.empty(); + if (attributionParams.isPresent() && storageConfig().getGcpServiceAccount() != null) { + principalName = context.principalName().orElse(""); + if (!principalName.isEmpty()) { Review Comment: Agreed it shouldn't happen in practice. Added a WARN log here: "GCS principal attribution is enabled but no principal name is present in the credential vending context; falling back to non-attributed credentials". -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
