Copilot commented on code in PR #9237:
URL: https://github.com/apache/gravitino/pull/9237#discussion_r2568190098


##########
common/src/main/java/org/apache/gravitino/credential/CredentialProviderDelegator.java:
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.credential;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.util.Map;
+
+/**
+ * An abstract base class for {@link CredentialProvider} implementations that 
delegate the actual
+ * credential generation to a {@link CredentialGenerator}. It handles the lazy 
and reflective
+ * loading of the generator to isolate heavy dependencies.
+ *
+ * @param <T> The type of credential generated by this provider.
+ */
+public abstract class CredentialProviderDelegator<T extends Credential>
+    implements CredentialProvider {
+
+  /** The properties used by the generator to generate the credential. */
+  protected Map<String, String> properties;
+
+  private volatile CredentialGenerator<T> generator;

Review Comment:
   The `generator` field should not be marked as `volatile` without proper 
synchronization. Since the generator is initialized once in the `initialize()` 
method and then used in `getCredential()`, and there's no double-checked 
locking pattern or other thread-safety mechanism, the `volatile` keyword alone 
is insufficient. Consider removing `volatile` since the initialization happens 
in a single method call, or implement proper lazy initialization with 
double-checked locking if that's the intent.
   ```suggestion
     private CredentialGenerator<T> generator;
   ```



##########
common/src/main/java/org/apache/gravitino/credential/CredentialProviderDelegator.java:
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.credential;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.util.Map;
+
+/**
+ * An abstract base class for {@link CredentialProvider} implementations that 
delegate the actual
+ * credential generation to a {@link CredentialGenerator}. It handles the lazy 
and reflective
+ * loading of the generator to isolate heavy dependencies.
+ *
+ * @param <T> The type of credential generated by this provider.
+ */
+public abstract class CredentialProviderDelegator<T extends Credential>
+    implements CredentialProvider {
+
+  /** The properties used by the generator to generate the credential. */
+  protected Map<String, String> properties;
+
+  private volatile CredentialGenerator<T> generator;
+
+  /**
+   * Initializes the provider by storing properties and loading the associated 
{@link
+   * CredentialGenerator}.
+   *
+   * @param properties A map of configuration properties for the provider.
+   */
+  @Override
+  public void initialize(Map<String, String> properties) {
+    this.properties = properties;
+    this.generator = loadGenerator();
+    generator.initialize(properties);
+  }
+
+  /**
+   * Delegates the credential generation to the loaded {@link 
CredentialGenerator}.
+   *
+   * @param context The context containing information required for credential 
retrieval.
+   * @return A {@link Credential} object.
+   * @throws RuntimeException if credential generation fails.
+   */
+  @Override
+  public Credential getCredential(CredentialContext context) {
+    try {
+      return generator.generate(context);
+    } catch (Exception e) {
+      throw new RuntimeException(
+          "Failed to generate credential using " + getGeneratorClassName(), e);
+    }

Review Comment:
   The generator is loaded and initialized in the `initialize()` method, but 
the generator field is checked for null only in the `close()` method. If 
`getCredential()` is called before `initialize()`, it will result in a 
NullPointerException. Consider adding a null check in `getCredential()` with an 
appropriate error message, or document that `initialize()` must be called 
before `getCredential()`.



##########
common/src/main/java/org/apache/gravitino/credential/CredentialProviderDelegator.java:
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.credential;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.util.Map;
+
+/**
+ * An abstract base class for {@link CredentialProvider} implementations that 
delegate the actual
+ * credential generation to a {@link CredentialGenerator}. It handles the lazy 
and reflective
+ * loading of the generator to isolate heavy dependencies.
+ *
+ * @param <T> The type of credential generated by this provider.
+ */
+public abstract class CredentialProviderDelegator<T extends Credential>
+    implements CredentialProvider {
+
+  /** The properties used by the generator to generate the credential. */
+  protected Map<String, String> properties;
+
+  private volatile CredentialGenerator<T> generator;
+
+  /**
+   * Initializes the provider by storing properties and loading the associated 
{@link
+   * CredentialGenerator}.
+   *
+   * @param properties A map of configuration properties for the provider.
+   */
+  @Override
+  public void initialize(Map<String, String> properties) {
+    this.properties = properties;
+    this.generator = loadGenerator();
+    generator.initialize(properties);
+  }
+
+  /**
+   * Delegates the credential generation to the loaded {@link 
CredentialGenerator}.
+   *
+   * @param context The context containing information required for credential 
retrieval.
+   * @return A {@link Credential} object.
+   * @throws RuntimeException if credential generation fails.
+   */
+  @Override
+  public Credential getCredential(CredentialContext context) {
+    try {
+      return generator.generate(context);
+    } catch (Exception e) {
+      throw new RuntimeException(
+          "Failed to generate credential using " + getGeneratorClassName(), e);
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (generator != null) {
+      generator.close();
+    }
+  }
+
+  /**
+   * Returns the fully qualified class name of the {@link CredentialGenerator} 
implementation. This
+   * generator will be loaded via reflection to perform the actual credential 
creation.
+   *
+   * @return The class name of the credential generator.
+   */
+  protected abstract String getGeneratorClassName();
+
+  /**
+   * Loads and instantiates the {@link CredentialGenerator} using reflection.
+   *
+   * <p>This implementation uses a no-argument constructor. The constructor 
can be non-public.
+   *
+   * @return An instance of the credential generator.
+   * @throws RuntimeException if the generator cannot be loaded or 
instantiated.
+   */
+  @SuppressWarnings("unchecked")
+  private CredentialGenerator<T> loadGenerator() {
+    try {
+      Class<?> generatorClass = Class.forName(getGeneratorClassName());
+      Constructor<?> constructor = generatorClass.getDeclaredConstructor();
+      constructor.setAccessible(true);
+      return (CredentialGenerator<T>) constructor.newInstance();
+    } catch (Exception e) {
+      throw new RuntimeException(
+          "Failed to load or instantiate CredentialGenerator: " + 
getGeneratorClassName(), e);
+    }
+  }
+}

Review Comment:
   The new `CredentialProviderDelegator` class lacks unit tests. While the 
concrete implementations (GCSTokenProvider, S3TokenProvider, etc.) exercise the 
delegator functionality, the delegator itself should have unit tests covering 
edge cases such as:
   - Behavior when `getCredential()` is called before `initialize()`
   - Behavior when the generator class doesn't exist
   - Behavior when the generator constructor throws an exception
   - Proper resource cleanup in `close()`
   
   Consider adding tests for these scenarios to ensure robust error handling.



##########
bundles/aws/src/main/java/org/apache/gravitino/s3/credential/S3TokenGenerator.java:
##########
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.s3.credential;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Stream;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.gravitino.credential.CredentialContext;
+import org.apache.gravitino.credential.CredentialGenerator;
+import org.apache.gravitino.credential.PathBasedCredentialContext;
+import org.apache.gravitino.credential.S3TokenCredential;
+import org.apache.gravitino.credential.config.S3CredentialConfig;
+import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
+import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
+import software.amazon.awssdk.policybuilder.iam.IamConditionOperator;
+import software.amazon.awssdk.policybuilder.iam.IamEffect;
+import software.amazon.awssdk.policybuilder.iam.IamPolicy;
+import software.amazon.awssdk.policybuilder.iam.IamResource;
+import software.amazon.awssdk.policybuilder.iam.IamStatement;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.sts.StsClient;
+import software.amazon.awssdk.services.sts.StsClientBuilder;
+import software.amazon.awssdk.services.sts.model.AssumeRoleRequest;
+import software.amazon.awssdk.services.sts.model.AssumeRoleResponse;
+import software.amazon.awssdk.services.sts.model.Credentials;
+
+/** Generate S3 token credentials according to the read and write paths. */
+public class S3TokenGenerator implements 
CredentialGenerator<S3TokenCredential> {
+
+  private StsClient stsClient;
+  private String roleArn;
+  private String externalID;
+  private int tokenExpireSecs;
+
+  @Override
+  public void initialize(Map<String, String> properties) {
+    S3CredentialConfig s3CredentialConfig = new S3CredentialConfig(properties);
+    this.roleArn = s3CredentialConfig.s3RoleArn();
+    this.externalID = s3CredentialConfig.externalID();
+    this.tokenExpireSecs = s3CredentialConfig.tokenExpireInSecs();
+    this.stsClient = createStsClient(s3CredentialConfig);
+  }
+
+  @Override
+  public S3TokenCredential generate(CredentialContext context) {
+    if (!(context instanceof PathBasedCredentialContext)) {
+      return null;
+    }
+
+    PathBasedCredentialContext pathContext = (PathBasedCredentialContext) 
context;
+
+    Credentials s3Token =
+        createS3Token(
+            pathContext.getReadPaths(), pathContext.getWritePaths(), 
pathContext.getUserName());
+
+    return new S3TokenCredential(
+        s3Token.accessKeyId(),
+        s3Token.secretAccessKey(),
+        s3Token.sessionToken(),
+        s3Token.expiration().toEpochMilli());
+  }
+
+  private StsClient createStsClient(S3CredentialConfig s3CredentialConfig) {
+    AwsCredentialsProvider credentialsProvider =
+        StaticCredentialsProvider.create(
+            AwsBasicCredentials.create(
+                s3CredentialConfig.accessKeyID(), 
s3CredentialConfig.secretAccessKey()));
+    StsClientBuilder builder = 
StsClient.builder().credentialsProvider(credentialsProvider);
+
+    if (StringUtils.isNotBlank(s3CredentialConfig.region())) {
+      builder.region(Region.of(s3CredentialConfig.region()));
+    }
+    if (StringUtils.isNotBlank(s3CredentialConfig.stsEndpoint())) {
+      builder.endpointOverride(URI.create(s3CredentialConfig.stsEndpoint()));
+    }
+    return builder.build();
+  }
+
+  private Credentials createS3Token(
+      Set<String> readLocations, Set<String> writeLocations, String userName) {
+    IamPolicy policy = createPolicy(roleArn, readLocations, writeLocations);
+    AssumeRoleRequest.Builder builder =
+        AssumeRoleRequest.builder()
+            .roleArn(roleArn)
+            .roleSessionName("gravitino_" + userName)
+            .durationSeconds(tokenExpireSecs)
+            .policy(policy.toJson());
+
+    if (StringUtils.isNotBlank(externalID)) {
+      builder.externalId(externalID);
+    }
+    AssumeRoleResponse response = stsClient.assumeRole(builder.build());
+    return response.credentials();
+  }
+
+  private IamPolicy createPolicy(
+      String roleArn, Set<String> readLocations, Set<String> writeLocations) {
+    IamPolicy.Builder policyBuilder = IamPolicy.builder();
+    IamStatement.Builder allowGetObjectStatementBuilder =
+        IamStatement.builder()
+            .effect(IamEffect.ALLOW)
+            .addAction("s3:GetObject")
+            .addAction("s3:GetObjectVersion");
+    Map<String, IamStatement.Builder> bucketListStatementBuilder = new 
HashMap<>();
+    Map<String, IamStatement.Builder> bucketGetLocationStatementBuilder = new 
HashMap<>();
+
+    String arnPrefix = getArnPrefix(roleArn);
+    Stream.concat(readLocations.stream(), writeLocations.stream())
+        .distinct()
+        .forEach(
+            location -> {
+              URI uri = URI.create(location);
+              allowGetObjectStatementBuilder.addResource(
+                  IamResource.create(getS3UriWithArn(arnPrefix, uri)));
+              String bucketArn = arnPrefix + getBucketName(uri);
+              String rawPath = trimLeadingSlash(uri.getPath());
+              bucketListStatementBuilder
+                  .computeIfAbsent(
+                      bucketArn,
+                      key ->
+                          IamStatement.builder()
+                              .effect(IamEffect.ALLOW)
+                              .addAction("s3:ListBucket")
+                              .addResource(key))
+                  .addConditions(
+                      IamConditionOperator.STRING_LIKE,
+                      "s3:prefix",
+                      Arrays.asList(rawPath, addWildcardToPath(rawPath)));
+
+              bucketGetLocationStatementBuilder.computeIfAbsent(
+                  bucketArn,
+                  key ->
+                      IamStatement.builder()
+                          .effect(IamEffect.ALLOW)
+                          .addAction("s3:GetBucketLocation")
+                          .addResource(key));
+            });
+
+    if (!writeLocations.isEmpty()) {
+      IamStatement.Builder allowPutObjectStatementBuilder =
+          IamStatement.builder()
+              .effect(IamEffect.ALLOW)
+              .addAction("s3:PutObject")
+              .addAction("s3:DeleteObject");
+      writeLocations.forEach(
+          location -> {
+            URI uri = URI.create(location);
+            allowPutObjectStatementBuilder.addResource(
+                IamResource.create(getS3UriWithArn(arnPrefix, uri)));
+          });
+      policyBuilder.addStatement(allowPutObjectStatementBuilder.build());
+    }
+
+    bucketListStatementBuilder
+        .values()
+        .forEach(builder -> policyBuilder.addStatement(builder.build()));
+    bucketGetLocationStatementBuilder
+        .values()
+        .forEach(builder -> policyBuilder.addStatement(builder.build()));

Review Comment:
   The method `addStatementsToPolicy` was simplified in S3TokenGenerator (lines 
178-183) compared to the original AwsIrsaCredentialProvider implementation. The 
original had a fallback mechanism that would add a statement with a specific 
action when the statement builders were empty. This simplification removes that 
fallback logic for S3TokenGenerator. Verify that this change is intentional and 
doesn't break any functionality where an empty bucket list should still result 
in a statement being added.
   ```suggestion
           .forEach(builder -> policyBuilder.addStatement(builder.build()));
       // Fallback: if no bucket-specific statements were added, allow listing 
and getting location for all buckets
       if (bucketListStatementBuilder.isEmpty() && 
bucketGetLocationStatementBuilder.isEmpty()) {
         policyBuilder.addStatement(
             IamStatement.builder()
                 .effect(IamEffect.ALLOW)
                 .addAction("s3:ListAllMyBuckets")
                 .addAction("s3:GetBucketLocation")
                 .addResource(IamResource.create("arn:aws:s3:::*"))
                 .build());
       }
   ```



##########
bundles/aws/src/main/java/org/apache/gravitino/s3/credential/AwsIrsaCredentialGenerator.java:
##########
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.s3.credential;
+
+import java.io.IOException;
+import java.net.URI;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Stream;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.gravitino.credential.AwsIrsaCredential;
+import org.apache.gravitino.credential.CredentialContext;
+import org.apache.gravitino.credential.CredentialGenerator;
+import org.apache.gravitino.credential.PathBasedCredentialContext;
+import org.apache.gravitino.credential.config.S3CredentialConfig;
+import software.amazon.awssdk.auth.credentials.AwsCredentials;
+import software.amazon.awssdk.auth.credentials.AwsSessionCredentials;
+import 
software.amazon.awssdk.auth.credentials.WebIdentityTokenFileCredentialsProvider;
+import software.amazon.awssdk.policybuilder.iam.IamConditionOperator;
+import software.amazon.awssdk.policybuilder.iam.IamEffect;
+import software.amazon.awssdk.policybuilder.iam.IamPolicy;
+import software.amazon.awssdk.policybuilder.iam.IamResource;
+import software.amazon.awssdk.policybuilder.iam.IamStatement;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.sts.StsClient;
+import software.amazon.awssdk.services.sts.StsClientBuilder;
+import 
software.amazon.awssdk.services.sts.model.AssumeRoleWithWebIdentityRequest;
+import 
software.amazon.awssdk.services.sts.model.AssumeRoleWithWebIdentityResponse;
+import software.amazon.awssdk.services.sts.model.Credentials;
+
+/** Generate AWS IRSA credentials according to the read and write paths. */
+public class AwsIrsaCredentialGenerator implements 
CredentialGenerator<AwsIrsaCredential> {
+
+  private WebIdentityTokenFileCredentialsProvider baseCredentialsProvider;
+  private String roleArn;
+  private int tokenExpireSecs;
+  private String region;
+  private String stsEndpoint;
+
+  @Override
+  public void initialize(Map<String, String> properties) {
+    // Use WebIdentityTokenFileCredentialsProvider for base IRSA configuration
+    this.baseCredentialsProvider = 
WebIdentityTokenFileCredentialsProvider.create();
+
+    S3CredentialConfig s3CredentialConfig = new S3CredentialConfig(properties);
+    this.roleArn = s3CredentialConfig.s3RoleArn();
+    this.tokenExpireSecs = s3CredentialConfig.tokenExpireInSecs();
+    this.region = s3CredentialConfig.region();
+    this.stsEndpoint = s3CredentialConfig.stsEndpoint();
+  }
+
+  @Override
+  public AwsIrsaCredential generate(CredentialContext context) {
+    if (!(context instanceof PathBasedCredentialContext)) {
+      // Fallback to original behavior for non-path-based contexts
+      AwsCredentials creds = baseCredentialsProvider.resolveCredentials();
+      if (creds instanceof AwsSessionCredentials) {
+        AwsSessionCredentials sessionCreds = (AwsSessionCredentials) creds;
+        long expiration =
+            sessionCreds.expirationTime().isPresent()
+                ? sessionCreds.expirationTime().get().toEpochMilli()
+                : 0L;
+        return new AwsIrsaCredential(
+            sessionCreds.accessKeyId(),
+            sessionCreds.secretAccessKey(),
+            sessionCreds.sessionToken(),
+            expiration);
+      } else {
+        throw new IllegalStateException(
+            "AWS IRSA credentials must be of type AwsSessionCredentials. "
+                + "Check your EKS/IRSA configuration. Got: "
+                + creds.getClass().getName());
+      }
+    }
+
+    PathBasedCredentialContext pathBasedCredentialContext = 
(PathBasedCredentialContext) context;
+
+    Credentials s3Token =
+        createCredentialsWithSessionPolicy(
+            pathBasedCredentialContext.getReadPaths(),
+            pathBasedCredentialContext.getWritePaths(),
+            pathBasedCredentialContext.getUserName());
+    return new AwsIrsaCredential(
+        s3Token.accessKeyId(),
+        s3Token.secretAccessKey(),
+        s3Token.sessionToken(),
+        s3Token.expiration().toEpochMilli());
+  }
+
+  private Credentials createCredentialsWithSessionPolicy(
+      Set<String> readLocations, Set<String> writeLocations, String userName) {
+    validateInputParameters(readLocations, writeLocations, userName);
+
+    IamPolicy sessionPolicy = createSessionPolicy(readLocations, 
writeLocations, region);
+    String webIdentityTokenFile = getValidatedWebIdentityTokenFile();
+    String effectiveRoleArn = getValidatedRoleArn(roleArn);
+
+    try {
+      String tokenContent =
+          new String(Files.readAllBytes(Paths.get(webIdentityTokenFile)), 
StandardCharsets.UTF_8);
+      if (StringUtils.isBlank(tokenContent)) {
+        throw new IllegalStateException(
+            "Web identity token file is empty: " + webIdentityTokenFile);
+      }
+
+      return assumeRoleWithSessionPolicy(effectiveRoleArn, userName, 
tokenContent, sessionPolicy);
+    } catch (Exception e) {
+      throw new RuntimeException(
+          "Failed to create credentials with session policy for user: " + 
userName, e);
+    }
+  }
+
+  private IamPolicy createSessionPolicy(
+      Set<String> readLocations, Set<String> writeLocations, String region) {
+    IamPolicy.Builder policyBuilder = IamPolicy.builder();
+    String arnPrefix = getArnPrefix(region);
+
+    addReadPermissions(policyBuilder, readLocations, writeLocations, 
arnPrefix);
+    if (!writeLocations.isEmpty()) {
+      addWritePermissions(policyBuilder, writeLocations, arnPrefix);
+    }
+    addBucketPermissions(policyBuilder, readLocations, writeLocations, 
arnPrefix);
+
+    return policyBuilder.build();
+  }
+
+  private void addReadPermissions(
+      IamPolicy.Builder policyBuilder,
+      Set<String> readLocations,
+      Set<String> writeLocations,
+      String arnPrefix) {
+    IamStatement.Builder allowGetObjectStatementBuilder =
+        IamStatement.builder()
+            .effect(IamEffect.ALLOW)
+            .addAction("s3:GetObject")
+            .addAction("s3:GetObjectVersion");
+
+    Stream.concat(readLocations.stream(), writeLocations.stream())
+        .distinct()
+        .forEach(
+            location -> {
+              URI uri = URI.create(location);
+              allowGetObjectStatementBuilder.addResource(
+                  IamResource.create(getS3UriWithArn(arnPrefix, uri)));
+            });
+
+    policyBuilder.addStatement(allowGetObjectStatementBuilder.build());
+  }
+
+  private void addWritePermissions(
+      IamPolicy.Builder policyBuilder, Set<String> writeLocations, String 
arnPrefix) {
+    IamStatement.Builder allowPutObjectStatementBuilder =
+        IamStatement.builder()
+            .effect(IamEffect.ALLOW)
+            .addAction("s3:PutObject")
+            .addAction("s3:DeleteObject");
+
+    writeLocations.forEach(
+        location -> {
+          URI uri = URI.create(location);
+          allowPutObjectStatementBuilder.addResource(
+              IamResource.create(getS3UriWithArn(arnPrefix, uri)));
+        });
+
+    policyBuilder.addStatement(allowPutObjectStatementBuilder.build());
+  }
+
+  private void addBucketPermissions(
+      IamPolicy.Builder policyBuilder,
+      Set<String> readLocations,
+      Set<String> writeLocations,
+      String arnPrefix) {
+    Map<String, IamStatement.Builder> bucketListStatementBuilder = new 
HashMap<>();
+    Map<String, IamStatement.Builder> bucketGetLocationStatementBuilder = new 
HashMap<>();
+
+    Stream.concat(readLocations.stream(), writeLocations.stream())
+        .distinct()
+        .forEach(
+            location -> {
+              URI uri = URI.create(location);
+              String bucketArn = arnPrefix + getBucketName(uri);
+              String rawPath = trimLeadingSlash(uri.getPath());
+
+              bucketListStatementBuilder
+                  .computeIfAbsent(
+                      bucketArn,
+                      key ->
+                          IamStatement.builder()
+                              .effect(IamEffect.ALLOW)
+                              .addAction("s3:ListBucket")
+                              .addResource(key))
+                  .addConditions(
+                      IamConditionOperator.STRING_LIKE,
+                      "s3:prefix",
+                      Arrays.asList(rawPath, addWildcardToPath(rawPath)));
+
+              bucketGetLocationStatementBuilder.computeIfAbsent(
+                  bucketArn,
+                  key ->
+                      IamStatement.builder()
+                          .effect(IamEffect.ALLOW)
+                          .addAction("s3:GetBucketLocation")
+                          .addResource(key));
+            });
+
+    addStatementsToPolicy(policyBuilder, bucketListStatementBuilder);
+    addStatementsToPolicy(policyBuilder, bucketGetLocationStatementBuilder);
+  }
+
+  private void addStatementsToPolicy(
+      IamPolicy.Builder policyBuilder, Map<String, IamStatement.Builder> 
statementBuilders) {
+    statementBuilders.values().forEach(builder -> 
policyBuilder.addStatement(builder.build()));

Review Comment:
   The method `addStatementsToPolicy` in AwsIrsaCredentialGenerator (lines 
232-234) was simplified to always add all statements from the builder map. The 
original implementation had a fallback parameter that would add a specific 
action statement when the map was empty. This simplification removes that 
fallback mechanism. Ensure this change doesn't break scenarios where empty 
bucket lists should still result in a fallback statement.
   ```suggestion
       // Fallback for ListBucket: allow listing any bucket if no specific 
buckets are present
       addStatementsToPolicy(
           policyBuilder,
           bucketListStatementBuilder,
           IamStatement.builder()
               .effect(IamEffect.ALLOW)
               .addAction("s3:ListBucket")
               .addResource(arnPrefix + "*")
               .build());
       // Fallback for GetBucketLocation: allow getting location of any bucket 
if none specified
       addStatementsToPolicy(
           policyBuilder,
           bucketGetLocationStatementBuilder,
           IamStatement.builder()
               .effect(IamEffect.ALLOW)
               .addAction("s3:GetBucketLocation")
               .addResource(arnPrefix + "*")
               .build());
     }
   
     private void addStatementsToPolicy(
         IamPolicy.Builder policyBuilder,
         Map<String, IamStatement.Builder> statementBuilders,
         IamStatement fallbackStatement) {
       if (statementBuilders.isEmpty()) {
         policyBuilder.addStatement(fallbackStatement);
       } else {
         statementBuilders.values().forEach(builder -> 
policyBuilder.addStatement(builder.build()));
       }
   ```



##########
common/src/main/java/org/apache/gravitino/credential/CredentialProviderDelegator.java:
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.credential;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.util.Map;
+
+/**
+ * An abstract base class for {@link CredentialProvider} implementations that 
delegate the actual
+ * credential generation to a {@link CredentialGenerator}. It handles the lazy 
and reflective
+ * loading of the generator to isolate heavy dependencies.
+ *
+ * @param <T> The type of credential generated by this provider.
+ */
+public abstract class CredentialProviderDelegator<T extends Credential>
+    implements CredentialProvider {
+
+  /** The properties used by the generator to generate the credential. */
+  protected Map<String, String> properties;
+
+  private volatile CredentialGenerator<T> generator;
+
+  /**
+   * Initializes the provider by storing properties and loading the associated 
{@link
+   * CredentialGenerator}.
+   *
+   * @param properties A map of configuration properties for the provider.
+   */
+  @Override
+  public void initialize(Map<String, String> properties) {
+    this.properties = properties;
+    this.generator = loadGenerator();
+    generator.initialize(properties);
+  }
+
+  /**
+   * Delegates the credential generation to the loaded {@link 
CredentialGenerator}.
+   *
+   * @param context The context containing information required for credential 
retrieval.
+   * @return A {@link Credential} object.
+   * @throws RuntimeException if credential generation fails.
+   */
+  @Override
+  public Credential getCredential(CredentialContext context) {
+    try {
+      return generator.generate(context);
+    } catch (Exception e) {
+      throw new RuntimeException(
+          "Failed to generate credential using " + getGeneratorClassName(), e);
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (generator != null) {
+      generator.close();
+    }
+  }
+
+  /**
+   * Returns the fully qualified class name of the {@link CredentialGenerator} 
implementation. This
+   * generator will be loaded via reflection to perform the actual credential 
creation.
+   *
+   * @return The class name of the credential generator.
+   */
+  protected abstract String getGeneratorClassName();

Review Comment:
   [nitpick] The `getGeneratorClassName()` method should be marked as 
`protected` but returns `String`. The method visibility is correct, but 
consider adding `@Override` annotation if this is intended to be overridden, or 
make it `abstract` (which it already is). The issue is that the access modifier 
`protected` is appropriate, but this is an abstract method that subclasses must 
implement - this should be documented more clearly or the method signature 
should indicate it returns a non-null value using `@Nonnull` annotation.



##########
bundles/aws/src/main/java/org/apache/gravitino/s3/credential/S3TokenGenerator.java:
##########
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.s3.credential;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Stream;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.gravitino.credential.CredentialContext;
+import org.apache.gravitino.credential.CredentialGenerator;
+import org.apache.gravitino.credential.PathBasedCredentialContext;
+import org.apache.gravitino.credential.S3TokenCredential;
+import org.apache.gravitino.credential.config.S3CredentialConfig;
+import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
+import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
+import software.amazon.awssdk.policybuilder.iam.IamConditionOperator;
+import software.amazon.awssdk.policybuilder.iam.IamEffect;
+import software.amazon.awssdk.policybuilder.iam.IamPolicy;
+import software.amazon.awssdk.policybuilder.iam.IamResource;
+import software.amazon.awssdk.policybuilder.iam.IamStatement;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.sts.StsClient;
+import software.amazon.awssdk.services.sts.StsClientBuilder;
+import software.amazon.awssdk.services.sts.model.AssumeRoleRequest;
+import software.amazon.awssdk.services.sts.model.AssumeRoleResponse;
+import software.amazon.awssdk.services.sts.model.Credentials;
+
+/** Generate S3 token credentials according to the read and write paths. */
+public class S3TokenGenerator implements 
CredentialGenerator<S3TokenCredential> {
+
+  private StsClient stsClient;
+  private String roleArn;
+  private String externalID;
+  private int tokenExpireSecs;
+
+  @Override
+  public void initialize(Map<String, String> properties) {
+    S3CredentialConfig s3CredentialConfig = new S3CredentialConfig(properties);
+    this.roleArn = s3CredentialConfig.s3RoleArn();
+    this.externalID = s3CredentialConfig.externalID();
+    this.tokenExpireSecs = s3CredentialConfig.tokenExpireInSecs();
+    this.stsClient = createStsClient(s3CredentialConfig);
+  }
+
+  @Override
+  public S3TokenCredential generate(CredentialContext context) {
+    if (!(context instanceof PathBasedCredentialContext)) {
+      return null;
+    }
+
+    PathBasedCredentialContext pathContext = (PathBasedCredentialContext) 
context;
+
+    Credentials s3Token =
+        createS3Token(
+            pathContext.getReadPaths(), pathContext.getWritePaths(), 
pathContext.getUserName());
+
+    return new S3TokenCredential(
+        s3Token.accessKeyId(),
+        s3Token.secretAccessKey(),
+        s3Token.sessionToken(),
+        s3Token.expiration().toEpochMilli());
+  }
+
+  private StsClient createStsClient(S3CredentialConfig s3CredentialConfig) {
+    AwsCredentialsProvider credentialsProvider =
+        StaticCredentialsProvider.create(
+            AwsBasicCredentials.create(
+                s3CredentialConfig.accessKeyID(), 
s3CredentialConfig.secretAccessKey()));
+    StsClientBuilder builder = 
StsClient.builder().credentialsProvider(credentialsProvider);
+
+    if (StringUtils.isNotBlank(s3CredentialConfig.region())) {
+      builder.region(Region.of(s3CredentialConfig.region()));
+    }
+    if (StringUtils.isNotBlank(s3CredentialConfig.stsEndpoint())) {
+      builder.endpointOverride(URI.create(s3CredentialConfig.stsEndpoint()));
+    }
+    return builder.build();
+  }
+
+  private Credentials createS3Token(
+      Set<String> readLocations, Set<String> writeLocations, String userName) {
+    IamPolicy policy = createPolicy(roleArn, readLocations, writeLocations);
+    AssumeRoleRequest.Builder builder =
+        AssumeRoleRequest.builder()
+            .roleArn(roleArn)
+            .roleSessionName("gravitino_" + userName)
+            .durationSeconds(tokenExpireSecs)
+            .policy(policy.toJson());
+
+    if (StringUtils.isNotBlank(externalID)) {
+      builder.externalId(externalID);
+    }
+    AssumeRoleResponse response = stsClient.assumeRole(builder.build());
+    return response.credentials();
+  }
+
+  private IamPolicy createPolicy(
+      String roleArn, Set<String> readLocations, Set<String> writeLocations) {
+    IamPolicy.Builder policyBuilder = IamPolicy.builder();
+    IamStatement.Builder allowGetObjectStatementBuilder =
+        IamStatement.builder()
+            .effect(IamEffect.ALLOW)
+            .addAction("s3:GetObject")
+            .addAction("s3:GetObjectVersion");
+    Map<String, IamStatement.Builder> bucketListStatementBuilder = new 
HashMap<>();
+    Map<String, IamStatement.Builder> bucketGetLocationStatementBuilder = new 
HashMap<>();
+
+    String arnPrefix = getArnPrefix(roleArn);
+    Stream.concat(readLocations.stream(), writeLocations.stream())
+        .distinct()
+        .forEach(
+            location -> {
+              URI uri = URI.create(location);
+              allowGetObjectStatementBuilder.addResource(
+                  IamResource.create(getS3UriWithArn(arnPrefix, uri)));
+              String bucketArn = arnPrefix + getBucketName(uri);
+              String rawPath = trimLeadingSlash(uri.getPath());
+              bucketListStatementBuilder
+                  .computeIfAbsent(
+                      bucketArn,
+                      key ->
+                          IamStatement.builder()
+                              .effect(IamEffect.ALLOW)
+                              .addAction("s3:ListBucket")
+                              .addResource(key))
+                  .addConditions(
+                      IamConditionOperator.STRING_LIKE,
+                      "s3:prefix",
+                      Arrays.asList(rawPath, addWildcardToPath(rawPath)));
+
+              bucketGetLocationStatementBuilder.computeIfAbsent(
+                  bucketArn,
+                  key ->
+                      IamStatement.builder()
+                          .effect(IamEffect.ALLOW)
+                          .addAction("s3:GetBucketLocation")
+                          .addResource(key));
+            });
+
+    if (!writeLocations.isEmpty()) {
+      IamStatement.Builder allowPutObjectStatementBuilder =
+          IamStatement.builder()
+              .effect(IamEffect.ALLOW)
+              .addAction("s3:PutObject")
+              .addAction("s3:DeleteObject");
+      writeLocations.forEach(
+          location -> {
+            URI uri = URI.create(location);
+            allowPutObjectStatementBuilder.addResource(
+                IamResource.create(getS3UriWithArn(arnPrefix, uri)));
+          });
+      policyBuilder.addStatement(allowPutObjectStatementBuilder.build());
+    }
+
+    bucketListStatementBuilder
+        .values()
+        .forEach(builder -> policyBuilder.addStatement(builder.build()));
+    bucketGetLocationStatementBuilder
+        .values()
+        .forEach(builder -> policyBuilder.addStatement(builder.build()));
+    policyBuilder.addStatement(allowGetObjectStatementBuilder.build());
+
+    return policyBuilder.build();
+  }
+
+  private String getS3UriWithArn(String arnPrefix, URI uri) {
+    return arnPrefix + addWildcardToPath(removeSchemaFromS3Uri(uri));
+  }
+
+  private String getArnPrefix(String roleArn) {
+    if (roleArn.contains("aws-cn")) {
+      return "arn:aws-cn:s3:::";
+    } else if (roleArn.contains("aws-us-gov")) {
+      return "arn:aws-us-gov:s3:::";
+    }
+    return "arn:aws:s3:::";
+  }
+
+  private static String addWildcardToPath(String path) {
+    return path.endsWith("/") ? path + "*" : path + "/*";
+  }

Review Comment:
   The method `concatPathWithSep` was removed from S3TokenProvider but a new 
method `addWildcardToPath` (line 202) was added to S3TokenGenerator. The old 
method had complex logic for concatenating paths with separators, but the new 
implementation simplifies this to just appending wildcards. Verify that all 
call sites that previously used `concatPathWithSep` now correctly use 
`addWildcardToPath` and that the behavior is equivalent.



##########
common/src/main/java/org/apache/gravitino/credential/CredentialGenerator.java:
##########
@@ -0,0 +1,50 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+package org.apache.gravitino.credential;
+
+import java.io.Closeable;
+import java.util.Map;
+
+/**
+ * An interface for generating credentials. Implementations of this interface 
will contain the
+ * actual logic for creating credentials and may have heavy dependencies. They 
are intended to be
+ * loaded via reflection by a {@link CredentialProvider} to avoid classpath 
issues during service
+ * loading.
+ *
+ * @param <T> The type of credential this generator produces.
+ */
+public interface CredentialGenerator<T extends Credential> extends Closeable {
+
+  /**
+   * Initializes the credential generator with catalog properties.
+   *
+   * @param properties catalog properties that can be used to configure the 
provider. The specific
+   *     properties required vary by implementation.
+   */
+  void initialize(Map<String, String> properties);
+
+  /**
+   * Generates a credential.
+   *
+   * @param context The context providing necessary information for credential 
retrieval.
+   * @return The generated credential.
+   * @throws Exception if an error occurs during credential generation.
+   */
+  T generate(CredentialContext context) throws Exception;

Review Comment:
   The `generate` method in CredentialGenerator declares `throws Exception`, 
which is very broad. Consider declaring more specific exceptions (e.g., 
`IOException`, `CredentialException`) to help callers handle errors 
appropriately. The broad `Exception` declaration makes it difficult for callers 
to distinguish between different failure modes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to