obelix74 commented on code in PR #4707:
URL: https://github.com/apache/polaris/pull/4707#discussion_r3409579820


##########
polaris-core/src/main/java/org/apache/polaris/core/storage/gcp/GcpFederatedCredentialsExchanger.java:
##########
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.core.storage.gcp;
+
+import com.auth0.jwt.JWT;
+import com.auth0.jwt.JWTCreator;
+import com.auth0.jwt.algorithms.Algorithm;
+import com.google.auth.http.HttpTransportFactory;
+import com.google.auth.oauth2.GoogleCredentials;
+import com.google.auth.oauth2.IdentityPoolCredentials;
+import com.google.common.annotations.VisibleForTesting;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.security.KeyFactory;
+import java.security.interfaces.RSAPrivateKey;
+import java.security.spec.PKCS8EncodedKeySpec;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Base64;
+import java.util.Date;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Produces a GCP federated {@link GoogleCredentials} whose identity carries 
{@code
+ * <realm>/<principal>}, so that GCS Data Access audit logs attribute access 
to the requesting
+ * Polaris principal. This is the GCP counterpart of AWS STS session tags.
+ *
+ * <p>The federated credential is an {@link IdentityPoolCredentials} backed by 
a programmatic
+ * subject-token supplier: on each token refresh google-auth invokes the 
supplier, which mints a
+ * short-lived RS256 JWT ({@code sub = <realm>/<principal>}, {@code realm} 
claim), and exchanges it
+ * at the Workload Identity Pool provider's STS endpoint. The provider maps 
{@code google.subject =
+ * assertion.sub} and {@code attribute.realm = assertion.realm}; per-realm 
{@code attribute.realm}
+ * IAM bindings then enforce that a realm-A identity can only impersonate 
realm-A's service account.
+ * The returned credential is intended to be used as the source for tenant 
service-account
+ * impersonation (see {@link GcpCredentialsStorageIntegration}).
+ *
+ * <p>Network note: this performs an STS token exchange against {@code 
sts.googleapis.com} in
+ * addition to the existing {@code iamcredentials.googleapis.com} and {@code 
storage.googleapis.com}
+ * traffic.
+ */
+public class GcpFederatedCredentialsExchanger {
+
+  static final String STS_TOKEN_URL = "https://sts.googleapis.com/v1/token";;
+  static final String SUBJECT_TOKEN_TYPE = 
"urn:ietf:params:oauth:token-type:jwt";
+  static final String CLOUD_PLATFORM_SCOPE = 
"https://www.googleapis.com/auth/cloud-platform";;
+
+  /** Attribution JWTs are single-purpose and short-lived. */
+  static final Duration JWT_LIFETIME = Duration.ofMinutes(5);
+
+  /**
+   * JVM-wide cache of parsed signing keys, keyed by file path. The key file 
is a stable pod-mounted
+   * secret; parsing it (disk read + {@link KeyFactory}) once per path 
amortizes across vends rather
+   * than re-reading on every credential-cache miss. Key rotation is delivered 
by a process restart
+   * (the secret is mounted at startup), which clears this cache.
+   */
+  private static final ConcurrentHashMap<Path, RSAPrivateKey> 
SIGNING_KEY_CACHE =
+      new ConcurrentHashMap<>();
+
+  private final String issuer;
+  private final String wifAudience;
+  private final Path signingKeyPath;
+  private final String signingKeyId;
+  private final HttpTransportFactory transportFactory;
+
+  public GcpFederatedCredentialsExchanger(
+      String issuer,
+      String wifAudience,
+      Path signingKeyPath,
+      String signingKeyId,
+      HttpTransportFactory transportFactory) {
+    this.issuer = issuer;
+    this.wifAudience = wifAudience;
+    this.signingKeyPath = signingKeyPath;
+    this.signingKeyId = signingKeyId;
+    this.transportFactory = transportFactory;
+  }
+
+  /**
+   * Builds a federated credential whose subject is {@code 
<realm>/<principal>}.
+   *
+   * @param subject the attribution subject, {@code <realm>/<principal>} (see 
{@link
+   *     GcpAttributionSubjectBuilder})
+   * @param realm the realm identifier, emitted as the {@code realm} claim for 
{@code
+   *     attribute.realm} mapping
+   * @return federated credentials suitable as the source for tenant-SA 
impersonation
+   */
+  public GoogleCredentials federatedCredentials(String subject, String realm) {
+    return IdentityPoolCredentials.newBuilder()
+        .setHttpTransportFactory(transportFactory)
+        .setAudience(wifAudience)
+        .setSubjectTokenType(SUBJECT_TOKEN_TYPE)
+        .setTokenUrl(STS_TOKEN_URL)
+        .setScopes(List.of(CLOUD_PLATFORM_SCOPE))
+        .setSubjectTokenSupplier(context -> mintAttributionJwt(subject, realm))
+        .build();
+  }
+
+  @VisibleForTesting
+  String mintAttributionJwt(String subject, String realm) throws IOException {
+    Instant now = Instant.now();
+    JWTCreator.Builder builder =
+        JWT.create()
+            .withIssuer(issuer)
+            .withSubject(subject)
+            .withAudience(wifAudience)
+            .withClaim("realm", realm)
+            .withIssuedAt(Date.from(now))
+            .withExpiresAt(Date.from(now.plus(JWT_LIFETIME)))
+            .withJWTId(UUID.randomUUID().toString());
+    // Set the kid header so the WIF provider can pick the right public key 
from its JWKS during
+    // rotation (when the JWKS holds both the old and new keys). Omitted only 
for a single-key JWKS.
+    if (signingKeyId != null && !signingKeyId.isEmpty()) {
+      builder.withKeyId(signingKeyId);
+    }
+    return builder.sign(Algorithm.RSA256(null, loadSigningKey()));
+  }
+
+  private RSAPrivateKey loadSigningKey() throws IOException {
+    RSAPrivateKey cached = SIGNING_KEY_CACHE.get(signingKeyPath);
+    if (cached != null) {
+      return cached;
+    }
+    RSAPrivateKey key = readPkcs8PrivateKey(signingKeyPath);
+    SIGNING_KEY_CACHE.putIfAbsent(signingKeyPath, key);
+    return key;
+  }
+
+  /** Reads an RSA private key from a PKCS#8 PEM file. */
+  @VisibleForTesting
+  static RSAPrivateKey readPkcs8PrivateKey(Path pemPath) throws IOException {
+    String pem = Files.readString(pemPath);
+    String base64 =
+        pem.replaceAll("-----BEGIN [A-Z ]+-----", "")

Review Comment:
   `PemUtils` lives in `runtime/service` 
(`org.apache.polaris.service.auth.internal.broker`), which is a downstream 
module that depends on `polaris-core`. Using it from `polaris-core` would 
invert that dependency and create a cycle. Moving it to `polaris-core` could 
work in principle, but it is also package-private and has broader scope 
(key-pair generation, public-key reading, file writing) that does not belong in 
core storage. The `readPkcs8PrivateKey` here is a small, targeted helper with 
no deps beyond the JDK — I think keeping it local is the cleaner choice. Happy 
to file a follow-up to consolidate PEM utilities into a shared module if that 
is desirable.



##########
polaris-core/src/main/java/org/apache/polaris/core/storage/gcp/GcpFederatedCredentialsExchanger.java:
##########
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.core.storage.gcp;
+
+import com.auth0.jwt.JWT;
+import com.auth0.jwt.JWTCreator;
+import com.auth0.jwt.algorithms.Algorithm;
+import com.google.auth.http.HttpTransportFactory;
+import com.google.auth.oauth2.GoogleCredentials;
+import com.google.auth.oauth2.IdentityPoolCredentials;
+import com.google.common.annotations.VisibleForTesting;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.security.KeyFactory;
+import java.security.interfaces.RSAPrivateKey;
+import java.security.spec.PKCS8EncodedKeySpec;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Base64;
+import java.util.Date;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Produces a GCP federated {@link GoogleCredentials} whose identity carries 
{@code
+ * <realm>/<principal>}, so that GCS Data Access audit logs attribute access 
to the requesting
+ * Polaris principal. This is the GCP counterpart of AWS STS session tags.
+ *
+ * <p>The federated credential is an {@link IdentityPoolCredentials} backed by 
a programmatic
+ * subject-token supplier: on each token refresh google-auth invokes the 
supplier, which mints a
+ * short-lived RS256 JWT ({@code sub = <realm>/<principal>}, {@code realm} 
claim), and exchanges it
+ * at the Workload Identity Pool provider's STS endpoint. The provider maps 
{@code google.subject =
+ * assertion.sub} and {@code attribute.realm = assertion.realm}; per-realm 
{@code attribute.realm}
+ * IAM bindings then enforce that a realm-A identity can only impersonate 
realm-A's service account.
+ * The returned credential is intended to be used as the source for tenant 
service-account
+ * impersonation (see {@link GcpCredentialsStorageIntegration}).
+ *
+ * <p>Network note: this performs an STS token exchange against {@code 
sts.googleapis.com} in
+ * addition to the existing {@code iamcredentials.googleapis.com} and {@code 
storage.googleapis.com}
+ * traffic.
+ */
+public class GcpFederatedCredentialsExchanger {
+
+  static final String STS_TOKEN_URL = "https://sts.googleapis.com/v1/token";;
+  static final String SUBJECT_TOKEN_TYPE = 
"urn:ietf:params:oauth:token-type:jwt";
+  static final String CLOUD_PLATFORM_SCOPE = 
"https://www.googleapis.com/auth/cloud-platform";;
+
+  /** Attribution JWTs are single-purpose and short-lived. */
+  static final Duration JWT_LIFETIME = Duration.ofMinutes(5);
+
+  /**
+   * JVM-wide cache of parsed signing keys, keyed by file path. The key file 
is a stable pod-mounted
+   * secret; parsing it (disk read + {@link KeyFactory}) once per path 
amortizes across vends rather
+   * than re-reading on every credential-cache miss. Key rotation is delivered 
by a process restart
+   * (the secret is mounted at startup), which clears this cache.
+   */
+  private static final ConcurrentHashMap<Path, RSAPrivateKey> 
SIGNING_KEY_CACHE =
+      new ConcurrentHashMap<>();
+
+  private final String issuer;
+  private final String wifAudience;
+  private final Path signingKeyPath;
+  private final String signingKeyId;
+  private final HttpTransportFactory transportFactory;
+
+  public GcpFederatedCredentialsExchanger(
+      String issuer,
+      String wifAudience,
+      Path signingKeyPath,
+      String signingKeyId,
+      HttpTransportFactory transportFactory) {
+    this.issuer = issuer;
+    this.wifAudience = wifAudience;
+    this.signingKeyPath = signingKeyPath;
+    this.signingKeyId = signingKeyId;
+    this.transportFactory = transportFactory;
+  }
+
+  /**
+   * Builds a federated credential whose subject is {@code 
<realm>/<principal>}.
+   *
+   * @param subject the attribution subject, {@code <realm>/<principal>} (see 
{@link
+   *     GcpAttributionSubjectBuilder})
+   * @param realm the realm identifier, emitted as the {@code realm} claim for 
{@code
+   *     attribute.realm} mapping
+   * @return federated credentials suitable as the source for tenant-SA 
impersonation
+   */
+  public GoogleCredentials federatedCredentials(String subject, String realm) {
+    return IdentityPoolCredentials.newBuilder()
+        .setHttpTransportFactory(transportFactory)
+        .setAudience(wifAudience)
+        .setSubjectTokenType(SUBJECT_TOKEN_TYPE)
+        .setTokenUrl(STS_TOKEN_URL)
+        .setScopes(List.of(CLOUD_PLATFORM_SCOPE))
+        .setSubjectTokenSupplier(context -> mintAttributionJwt(subject, realm))
+        .build();
+  }
+
+  @VisibleForTesting
+  String mintAttributionJwt(String subject, String realm) throws IOException {
+    Instant now = Instant.now();
+    JWTCreator.Builder builder =
+        JWT.create()
+            .withIssuer(issuer)
+            .withSubject(subject)
+            .withAudience(wifAudience)
+            .withClaim("realm", realm)
+            .withIssuedAt(Date.from(now))
+            .withExpiresAt(Date.from(now.plus(JWT_LIFETIME)))
+            .withJWTId(UUID.randomUUID().toString());
+    // Set the kid header so the WIF provider can pick the right public key 
from its JWKS during
+    // rotation (when the JWKS holds both the old and new keys). Omitted only 
for a single-key JWKS.
+    if (signingKeyId != null && !signingKeyId.isEmpty()) {
+      builder.withKeyId(signingKeyId);
+    }
+    return builder.sign(Algorithm.RSA256(null, loadSigningKey()));
+  }
+
+  private RSAPrivateKey loadSigningKey() throws IOException {
+    RSAPrivateKey cached = SIGNING_KEY_CACHE.get(signingKeyPath);
+    if (cached != null) {
+      return cached;
+    }
+    RSAPrivateKey key = readPkcs8PrivateKey(signingKeyPath);
+    SIGNING_KEY_CACHE.putIfAbsent(signingKeyPath, key);

Review Comment:
   Good call — switched to `computeIfAbsent`, which atomically ensures only one 
thread does the file I/O per path. Used `UncheckedIOException` to wrap the 
checked `IOException` inside the lambda and re-throws it at the call site.



##########
runtime/admin/distribution/LICENSE:
##########
@@ -399,6 +399,36 @@ License: Apache License 2.0 - 
https://github.com/hyperxpro/Brotli4j/blob/v1.16.0
 
 
--------------------------------------------------------------------------------
 
+This product bundles Auth0 Java JWT.
+
+* Maven group:artifact IDs: com.auth0:java-jwt

Review Comment:
   `java-jwt` was already a transitive dependency in the admin distribution 
before this PR — `runtime/service` has used it for internal JWT token signing 
since before this branch. Its absence from the LICENSE was a pre-existing 
omission rather than something introduced here. Since this PR adds 
`polaris-core` as a direct consumer it was a natural point to catch and correct 
that gap. The entry is needed: `java-jwt` is bundled in the admin distribution 
jar regardless of whether the dependency is direct or transitive.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to