This is an automated email from the ASF dual-hosted git repository.

fanng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/main by this push:
     new a00fb04ffe [#9236] improvement(credentialProvider): isolate SDK 
dependencies in `CredentialProviders` using a Generator pattern (#9237)
a00fb04ffe is described below

commit a00fb04ffed532889247a04b5cec1dd7cb3e1630
Author: mchades <[email protected]>
AuthorDate: Fri Nov 28 15:17:12 2025 +0800

    [#9236] improvement(credentialProvider): isolate SDK dependencies in 
`CredentialProviders` using a Generator pattern (#9237)
    
    ### What changes were proposed in this pull request?
    
    isolate SDK dependencies in `CredentialProviders` using a Generator
    pattern
    
    ### Why are the changes needed?
    
    Fix: #9236
    
    ### Does this PR introduce _any_ user-facing change?
    
    no
    
    ### How was this patch tested?
    
    CI pass
    
    ---------
    
    Co-authored-by: Copilot <[email protected]>
    Co-authored-by: fanng <[email protected]>
---
 ...SSTokenProvider.java => OSSTokenGenerator.java} |  64 +---
 .../gravitino/oss/credential/OSSTokenProvider.java | 240 +------------
 ...ovider.java => AwsIrsaCredentialGenerator.java} | 145 ++------
 .../s3/credential/AwsIrsaCredentialProvider.java   | 375 +--------------------
 ...{S3TokenProvider.java => S3TokenGenerator.java} | 166 ++++-----
 .../gravitino/s3/credential/S3TokenProvider.java   | 229 +------------
 ...STokenProvider.java => ADLSTokenGenerator.java} |  57 ++--
 .../abs/credential/ADLSTokenProvider.java          | 113 +------
 ...CSTokenProvider.java => GCSTokenGenerator.java} | 110 +++---
 .../gravitino/gcs/credential/GCSTokenProvider.java | 254 +-------------
 .../gcs/credential/TestGCSTokenProvider.java       |   4 +-
 ...ntialProvider.java => CredentialGenerator.java} |  32 +-
 .../gravitino/credential/CredentialProvider.java   |  11 +-
 .../credential/CredentialProviderDelegator.java    | 105 ++++++
 14 files changed, 370 insertions(+), 1535 deletions(-)

diff --git 
a/bundles/aliyun/src/main/java/org/apache/gravitino/oss/credential/OSSTokenProvider.java
 
b/bundles/aliyun/src/main/java/org/apache/gravitino/oss/credential/OSSTokenGenerator.java
similarity index 77%
copy from 
bundles/aliyun/src/main/java/org/apache/gravitino/oss/credential/OSSTokenProvider.java
copy to 
bundles/aliyun/src/main/java/org/apache/gravitino/oss/credential/OSSTokenGenerator.java
index 79d7f51f78..9ad6b8bf84 100644
--- 
a/bundles/aliyun/src/main/java/org/apache/gravitino/oss/credential/OSSTokenProvider.java
+++ 
b/bundles/aliyun/src/main/java/org/apache/gravitino/oss/credential/OSSTokenGenerator.java
@@ -16,6 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.gravitino.oss.credential;
 
 import com.aliyun.credentials.Client;
@@ -31,11 +32,9 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 import java.util.stream.Stream;
-import javax.annotation.Nullable;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.gravitino.credential.Credential;
 import org.apache.gravitino.credential.CredentialContext;
-import org.apache.gravitino.credential.CredentialProvider;
+import org.apache.gravitino.credential.CredentialGenerator;
 import org.apache.gravitino.credential.OSSTokenCredential;
 import org.apache.gravitino.credential.PathBasedCredentialContext;
 import org.apache.gravitino.credential.config.OSSCredentialConfig;
@@ -46,7 +45,8 @@ import org.apache.gravitino.oss.credential.policy.Statement;
 import org.apache.gravitino.oss.credential.policy.StringLike;
 
 /** Generates OSS token to access OSS data. */
-public class OSSTokenProvider implements CredentialProvider {
+public class OSSTokenGenerator implements 
CredentialGenerator<OSSTokenCredential> {
+
   private final ObjectMapper objectMapper = new ObjectMapper();
   private String accessKeyId;
   private String secretAccessKey;
@@ -55,12 +55,6 @@ public class OSSTokenProvider implements CredentialProvider {
   private int tokenExpireSecs;
   private String region;
 
-  /**
-   * Initializes the credential provider with catalog properties.
-   *
-   * @param properties catalog properties that can be used to configure the 
provider. The specific
-   *     properties required vary by implementation.
-   */
   @Override
   public void initialize(Map<String, String> properties) {
     OSSCredentialConfig credentialConfig = new OSSCredentialConfig(properties);
@@ -72,36 +66,18 @@ public class OSSTokenProvider implements CredentialProvider 
{
     this.region = credentialConfig.region();
   }
 
-  /**
-   * Returns the type of credential, it should be identical in Gravitino.
-   *
-   * @return A string identifying the type of credentials.
-   */
-  @Override
-  public String credentialType() {
-    return OSSTokenCredential.OSS_TOKEN_CREDENTIAL_TYPE;
-  }
-
-  /**
-   * Obtains a credential based on the provided context information.
-   *
-   * @param context A context object providing necessary information for 
retrieving credentials.
-   * @return A Credential object containing the authentication information 
needed to access a system
-   *     or resource. Null will be returned if no credential is available.
-   */
-  @Nullable
   @Override
-  public Credential getCredential(CredentialContext context) {
+  public OSSTokenCredential generate(CredentialContext context) throws 
Exception {
     if (!(context instanceof PathBasedCredentialContext)) {
       return null;
     }
-    PathBasedCredentialContext pathBasedCredentialContext = 
(PathBasedCredentialContext) context;
+
+    PathBasedCredentialContext pathContext = (PathBasedCredentialContext) 
context;
+
     CredentialModel credentialModel =
         createOSSCredentialModel(
-            roleArn,
-            pathBasedCredentialContext.getReadPaths(),
-            pathBasedCredentialContext.getWritePaths(),
-            pathBasedCredentialContext.getUserName());
+            pathContext.getReadPaths(), pathContext.getWritePaths(), 
pathContext.getUserName());
+
     return new OSSTokenCredential(
         credentialModel.accessKeyId,
         credentialModel.accessKeySecret,
@@ -110,7 +86,7 @@ public class OSSTokenProvider implements CredentialProvider {
   }
 
   private CredentialModel createOSSCredentialModel(
-      String roleArn, Set<String> readLocations, Set<String> writeLocations, 
String userName) {
+      Set<String> readLocations, Set<String> writeLocations, String userName) {
     Config config = new Config();
     config.setAccessKeyId(accessKeyId);
     config.setAccessKeySecret(secretAccessKey);
@@ -121,29 +97,26 @@ public class OSSTokenProvider implements 
CredentialProvider {
       config.setExternalId(externalID);
     }
     config.setRoleSessionExpiration(tokenExpireSecs);
-    config.setPolicy(createPolicy(readLocations, writeLocations));
+    config.setPolicy(createPolicy(readLocations, writeLocations, region));
     // Local object and client is a simple proxy that does not require manual 
release
     Client client = new Client(config);
     return client.getCredential();
   }
 
-  // reference:
-  // 
https://www.alibabacloud.com/help/en/oss/user-guide/tutorial-use-ram-policies-to-control-access-to-oss?spm=a2c63.p38356.help-menu-31815.d_2_4_5_1.5536471b56XPRQ
-  private String createPolicy(Set<String> readLocations, Set<String> 
writeLocations) {
+  private String createPolicy(
+      Set<String> readLocations, Set<String> writeLocations, String region) {
     Policy.Builder policyBuilder = Policy.builder().version("1");
 
-    // Allow read and write access to the specified locations
     Statement.Builder allowGetObjectStatementBuilder =
         Statement.builder()
             .effect(Effect.ALLOW)
             .addAction("oss:GetObject")
             .addAction("oss:GetObjectVersion");
 
-    // Add support for bucket-level policies
     Map<String, Statement.Builder> bucketListStatementBuilder = new 
HashMap<>();
     Map<String, Statement.Builder> bucketMetadataStatementBuilder = new 
HashMap<>();
 
-    String arnPrefix = getArnPrefix();
+    String arnPrefix = getArnPrefix(region);
     Stream.concat(readLocations.stream(), writeLocations.stream())
         .distinct()
         .forEach(
@@ -151,7 +124,6 @@ public class OSSTokenProvider implements CredentialProvider 
{
               URI uri = URI.create(location);
               
allowGetObjectStatementBuilder.addResource(getOssUriWithArn(arnPrefix, uri));
               String bucketArn = arnPrefix + getBucketName(uri);
-              // OSS use 'oss:ListObjects' to list objects in a bucket while 
s3 use 's3:ListBucket'
               bucketListStatementBuilder.computeIfAbsent(
                   bucketArn,
                   key ->
@@ -160,14 +132,12 @@ public class OSSTokenProvider implements 
CredentialProvider {
                           .addAction("oss:ListObjects")
                           .addResource(key)
                           .condition(getCondition(uri)));
-              // Add get bucket location and bucket info action.
               bucketMetadataStatementBuilder.computeIfAbsent(
                   bucketArn,
                   key ->
                       Statement.builder()
                           .effect(Effect.ALLOW)
                           .addAction("oss:GetBucketLocation")
-                          // Required for OSS Hadoop connector to get bucket 
information
                           .addAction("oss:GetBucketInfo")
                           .addResource(key));
             });
@@ -191,7 +161,6 @@ public class OSSTokenProvider implements CredentialProvider 
{
           .values()
           .forEach(statementBuilder -> 
policyBuilder.addStatement(statementBuilder.build()));
     } else {
-      // add list privilege with 0 resources
       policyBuilder.addStatement(
           
Statement.builder().effect(Effect.ALLOW).addAction("oss:ListBucket").build());
     }
@@ -216,7 +185,7 @@ public class OSSTokenProvider implements CredentialProvider 
{
         .build();
   }
 
-  private String getArnPrefix() {
+  private String getArnPrefix(String region) {
     if (StringUtils.isNotEmpty(region)) {
       return "acs:oss:" + region + ":*:";
     }
@@ -241,7 +210,6 @@ public class OSSTokenProvider implements CredentialProvider 
{
     }
   }
 
-  // Transform 'oss://bucket/path' to /bucket/path
   private String removeSchemaFromOSSUri(URI uri) {
     String bucket = uri.getHost();
     String path = trimLeadingSlash(uri.getPath());
diff --git 
a/bundles/aliyun/src/main/java/org/apache/gravitino/oss/credential/OSSTokenProvider.java
 
b/bundles/aliyun/src/main/java/org/apache/gravitino/oss/credential/OSSTokenProvider.java
index 79d7f51f78..1783d7f095 100644
--- 
a/bundles/aliyun/src/main/java/org/apache/gravitino/oss/credential/OSSTokenProvider.java
+++ 
b/bundles/aliyun/src/main/java/org/apache/gravitino/oss/credential/OSSTokenProvider.java
@@ -16,247 +16,25 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.gravitino.oss.credential;
 
-import com.aliyun.credentials.Client;
-import com.aliyun.credentials.models.Config;
-import com.aliyun.credentials.models.CredentialModel;
-import com.aliyun.credentials.utils.AuthConstant;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import java.io.IOException;
-import java.net.URI;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
-import java.util.stream.Stream;
-import javax.annotation.Nullable;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.gravitino.credential.Credential;
-import org.apache.gravitino.credential.CredentialContext;
-import org.apache.gravitino.credential.CredentialProvider;
+import org.apache.gravitino.credential.CredentialProviderDelegator;
 import org.apache.gravitino.credential.OSSTokenCredential;
-import org.apache.gravitino.credential.PathBasedCredentialContext;
-import org.apache.gravitino.credential.config.OSSCredentialConfig;
-import org.apache.gravitino.oss.credential.policy.Condition;
-import org.apache.gravitino.oss.credential.policy.Effect;
-import org.apache.gravitino.oss.credential.policy.Policy;
-import org.apache.gravitino.oss.credential.policy.Statement;
-import org.apache.gravitino.oss.credential.policy.StringLike;
-
-/** Generates OSS token to access OSS data. */
-public class OSSTokenProvider implements CredentialProvider {
-  private final ObjectMapper objectMapper = new ObjectMapper();
-  private String accessKeyId;
-  private String secretAccessKey;
-  private String roleArn;
-  private String externalID;
-  private int tokenExpireSecs;
-  private String region;
 
-  /**
-   * Initializes the credential provider with catalog properties.
-   *
-   * @param properties catalog properties that can be used to configure the 
provider. The specific
-   *     properties required vary by implementation.
-   */
-  @Override
-  public void initialize(Map<String, String> properties) {
-    OSSCredentialConfig credentialConfig = new OSSCredentialConfig(properties);
-    this.roleArn = credentialConfig.ossRoleArn();
-    this.externalID = credentialConfig.externalID();
-    this.tokenExpireSecs = credentialConfig.tokenExpireInSecs();
-    this.accessKeyId = credentialConfig.accessKeyID();
-    this.secretAccessKey = credentialConfig.secretAccessKey();
-    this.region = credentialConfig.region();
-  }
+/**
+ * A lightweight credential provider for OSS. It delegates the actual 
credential generation to
+ * {@link OSSTokenGenerator} which is loaded via reflection to avoid classpath 
issues.
+ */
+public class OSSTokenProvider extends 
CredentialProviderDelegator<OSSTokenCredential> {
 
-  /**
-   * Returns the type of credential, it should be identical in Gravitino.
-   *
-   * @return A string identifying the type of credentials.
-   */
   @Override
   public String credentialType() {
     return OSSTokenCredential.OSS_TOKEN_CREDENTIAL_TYPE;
   }
 
-  /**
-   * Obtains a credential based on the provided context information.
-   *
-   * @param context A context object providing necessary information for 
retrieving credentials.
-   * @return A Credential object containing the authentication information 
needed to access a system
-   *     or resource. Null will be returned if no credential is available.
-   */
-  @Nullable
   @Override
-  public Credential getCredential(CredentialContext context) {
-    if (!(context instanceof PathBasedCredentialContext)) {
-      return null;
-    }
-    PathBasedCredentialContext pathBasedCredentialContext = 
(PathBasedCredentialContext) context;
-    CredentialModel credentialModel =
-        createOSSCredentialModel(
-            roleArn,
-            pathBasedCredentialContext.getReadPaths(),
-            pathBasedCredentialContext.getWritePaths(),
-            pathBasedCredentialContext.getUserName());
-    return new OSSTokenCredential(
-        credentialModel.accessKeyId,
-        credentialModel.accessKeySecret,
-        credentialModel.securityToken,
-        credentialModel.expiration);
-  }
-
-  private CredentialModel createOSSCredentialModel(
-      String roleArn, Set<String> readLocations, Set<String> writeLocations, 
String userName) {
-    Config config = new Config();
-    config.setAccessKeyId(accessKeyId);
-    config.setAccessKeySecret(secretAccessKey);
-    config.setType(AuthConstant.RAM_ROLE_ARN);
-    config.setRoleArn(roleArn);
-    config.setRoleSessionName(getRoleName(userName));
-    if (StringUtils.isNotBlank(externalID)) {
-      config.setExternalId(externalID);
-    }
-    config.setRoleSessionExpiration(tokenExpireSecs);
-    config.setPolicy(createPolicy(readLocations, writeLocations));
-    // Local object and client is a simple proxy that does not require manual 
release
-    Client client = new Client(config);
-    return client.getCredential();
-  }
-
-  // reference:
-  // 
https://www.alibabacloud.com/help/en/oss/user-guide/tutorial-use-ram-policies-to-control-access-to-oss?spm=a2c63.p38356.help-menu-31815.d_2_4_5_1.5536471b56XPRQ
-  private String createPolicy(Set<String> readLocations, Set<String> 
writeLocations) {
-    Policy.Builder policyBuilder = Policy.builder().version("1");
-
-    // Allow read and write access to the specified locations
-    Statement.Builder allowGetObjectStatementBuilder =
-        Statement.builder()
-            .effect(Effect.ALLOW)
-            .addAction("oss:GetObject")
-            .addAction("oss:GetObjectVersion");
-
-    // Add support for bucket-level policies
-    Map<String, Statement.Builder> bucketListStatementBuilder = new 
HashMap<>();
-    Map<String, Statement.Builder> bucketMetadataStatementBuilder = new 
HashMap<>();
-
-    String arnPrefix = getArnPrefix();
-    Stream.concat(readLocations.stream(), writeLocations.stream())
-        .distinct()
-        .forEach(
-            location -> {
-              URI uri = URI.create(location);
-              
allowGetObjectStatementBuilder.addResource(getOssUriWithArn(arnPrefix, uri));
-              String bucketArn = arnPrefix + getBucketName(uri);
-              // OSS use 'oss:ListObjects' to list objects in a bucket while 
s3 use 's3:ListBucket'
-              bucketListStatementBuilder.computeIfAbsent(
-                  bucketArn,
-                  key ->
-                      Statement.builder()
-                          .effect(Effect.ALLOW)
-                          .addAction("oss:ListObjects")
-                          .addResource(key)
-                          .condition(getCondition(uri)));
-              // Add get bucket location and bucket info action.
-              bucketMetadataStatementBuilder.computeIfAbsent(
-                  bucketArn,
-                  key ->
-                      Statement.builder()
-                          .effect(Effect.ALLOW)
-                          .addAction("oss:GetBucketLocation")
-                          // Required for OSS Hadoop connector to get bucket 
information
-                          .addAction("oss:GetBucketInfo")
-                          .addResource(key));
-            });
-
-    if (!writeLocations.isEmpty()) {
-      Statement.Builder allowPutObjectStatementBuilder =
-          Statement.builder()
-              .effect(Effect.ALLOW)
-              .addAction("oss:PutObject")
-              .addAction("oss:DeleteObject");
-      writeLocations.forEach(
-          location -> {
-            URI uri = URI.create(location);
-            
allowPutObjectStatementBuilder.addResource(getOssUriWithArn(arnPrefix, uri));
-          });
-      policyBuilder.addStatement(allowPutObjectStatementBuilder.build());
-    }
-
-    if (!bucketListStatementBuilder.isEmpty()) {
-      bucketListStatementBuilder
-          .values()
-          .forEach(statementBuilder -> 
policyBuilder.addStatement(statementBuilder.build()));
-    } else {
-      // add list privilege with 0 resources
-      policyBuilder.addStatement(
-          
Statement.builder().effect(Effect.ALLOW).addAction("oss:ListBucket").build());
-    }
-    bucketMetadataStatementBuilder
-        .values()
-        .forEach(statementBuilder -> 
policyBuilder.addStatement(statementBuilder.build()));
-
-    policyBuilder.addStatement(allowGetObjectStatementBuilder.build());
-    try {
-      return objectMapper.writeValueAsString(policyBuilder.build());
-    } catch (JsonProcessingException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  private Condition getCondition(URI uri) {
-    return Condition.builder()
-        .stringLike(
-            StringLike.builder()
-                .addPrefix(concatPathWithSep(trimLeadingSlash(uri.getPath()), 
"*", "/"))
-                .build())
-        .build();
+  public String getGeneratorClassName() {
+    return "org.apache.gravitino.oss.credential.OSSTokenGenerator";
   }
-
-  private String getArnPrefix() {
-    if (StringUtils.isNotEmpty(region)) {
-      return "acs:oss:" + region + ":*:";
-    }
-    return "acs:oss:*:*:";
-  }
-
-  private String getBucketName(URI uri) {
-    return uri.getHost();
-  }
-
-  private String getOssUriWithArn(String arnPrefix, URI uri) {
-    return arnPrefix + concatPathWithSep(removeSchemaFromOSSUri(uri), "*", 
"/");
-  }
-
-  private static String concatPathWithSep(String leftPath, String rightPath, 
String fileSep) {
-    if (leftPath.endsWith(fileSep) && rightPath.startsWith(fileSep)) {
-      return leftPath + rightPath.substring(1);
-    } else if (!leftPath.endsWith(fileSep) && !rightPath.startsWith(fileSep)) {
-      return leftPath + fileSep + rightPath;
-    } else {
-      return leftPath + rightPath;
-    }
-  }
-
-  // Transform 'oss://bucket/path' to /bucket/path
-  private String removeSchemaFromOSSUri(URI uri) {
-    String bucket = uri.getHost();
-    String path = trimLeadingSlash(uri.getPath());
-    return String.join(
-        "/", Stream.of(bucket, 
path).filter(Objects::nonNull).toArray(String[]::new));
-  }
-
-  private String trimLeadingSlash(String path) {
-    return path.startsWith("/") ? path.substring(1) : path;
-  }
-
-  private String getRoleName(String userName) {
-    return "gravitino_" + userName;
-  }
-
-  @Override
-  public void close() throws IOException {}
 }
diff --git 
a/bundles/aws/src/main/java/org/apache/gravitino/s3/credential/AwsIrsaCredentialProvider.java
 
b/bundles/aws/src/main/java/org/apache/gravitino/s3/credential/AwsIrsaCredentialGenerator.java
similarity index 70%
copy from 
bundles/aws/src/main/java/org/apache/gravitino/s3/credential/AwsIrsaCredentialProvider.java
copy to 
bundles/aws/src/main/java/org/apache/gravitino/s3/credential/AwsIrsaCredentialGenerator.java
index 8a59756d84..dadeb27237 100644
--- 
a/bundles/aws/src/main/java/org/apache/gravitino/s3/credential/AwsIrsaCredentialProvider.java
+++ 
b/bundles/aws/src/main/java/org/apache/gravitino/s3/credential/AwsIrsaCredentialGenerator.java
@@ -1,24 +1,24 @@
 /*
- *  Licensed to the Apache Software Foundation (ASF) under one
- *  or more contributor license agreements.  See the NOTICE file
- *  distributed with this work for additional information
- *  regarding copyright ownership.  The ASF licenses this file
- *  to you under the Apache License, Version 2.0 (the
- *  "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
  *
- *   http://www.apache.org/licenses/LICENSE-2.0
+ *  http://www.apache.org/licenses/LICENSE-2.0
  *
- *  Unless required by applicable law or agreed to in writing,
- *  software distributed under the License is distributed on an
- *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- *  KIND, either express or implied.  See the License for the
- *  specific language governing permissions and limitations
- *  under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
  */
-
 package org.apache.gravitino.s3.credential;
 
+import java.io.IOException;
 import java.net.URI;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
@@ -31,9 +31,8 @@ import java.util.Set;
 import java.util.stream.Stream;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.gravitino.credential.AwsIrsaCredential;
-import org.apache.gravitino.credential.Credential;
 import org.apache.gravitino.credential.CredentialContext;
-import org.apache.gravitino.credential.CredentialProvider;
+import org.apache.gravitino.credential.CredentialGenerator;
 import org.apache.gravitino.credential.PathBasedCredentialContext;
 import org.apache.gravitino.credential.config.S3CredentialConfig;
 import software.amazon.awssdk.auth.credentials.AwsCredentials;
@@ -51,41 +50,8 @@ import 
software.amazon.awssdk.services.sts.model.AssumeRoleWithWebIdentityReques
 import 
software.amazon.awssdk.services.sts.model.AssumeRoleWithWebIdentityResponse;
 import software.amazon.awssdk.services.sts.model.Credentials;
 
-/**
- * AWS IRSA credential provider that supports both basic IRSA credentials and 
fine-grained
- * path-based access control using AWS session policies.
- *
- * <p>This provider operates in two modes:
- *
- * <ul>
- *   <li><b>Basic IRSA mode</b>: For non-path-based credential contexts, 
returns credentials with
- *       full permissions of the associated IAM role (backward compatibility)
- *   <li><b>Fine-grained mode</b>: For path-based credential contexts (e.g., 
table access with
- *       vended credentials), uses AWS session policies to restrict 
permissions to specific S3 paths
- * </ul>
- *
- * <p>The fine-grained mode leverages AWS session policies with 
AssumeRoleWithWebIdentity to create
- * temporary credentials with restricted permissions. Session policies can 
only reduce (not expand)
- * the permissions already granted by the IAM role:
- *
- * <ul>
- *   <li>s3:GetObject, s3:GetObjectVersion for read access to specific table 
paths only
- *   <li>s3:ListBucket with s3:prefix conditions limiting to table directories 
only
- *   <li>s3:PutObject, s3:DeleteObject for write operations on specific paths 
only
- *   <li>s3:GetBucketLocation for bucket metadata access
- * </ul>
- *
- * <p>Prerequisites for fine-grained mode:
- *
- * <ul>
- *   <li>EKS cluster with IRSA properly configured
- *   <li>AWS_WEB_IDENTITY_TOKEN_FILE environment variable pointing to service 
account token
- *   <li>IAM role configured for IRSA with broad S3 permissions (session 
policy will restrict them)
- *   <li>Optional: s3-role-arn for assuming different role (if not provided, 
uses IRSA role
- *       directly)
- * </ul>
- */
-public class AwsIrsaCredentialProvider implements CredentialProvider {
+/** Generate AWS IRSA credentials according to the read and write paths. */
+public class AwsIrsaCredentialGenerator implements 
CredentialGenerator<AwsIrsaCredential> {
 
   private WebIdentityTokenFileCredentialsProvider baseCredentialsProvider;
   private String roleArn;
@@ -106,17 +72,7 @@ public class AwsIrsaCredentialProvider implements 
CredentialProvider {
   }
 
   @Override
-  public void close() {
-    // No external resources to close
-  }
-
-  @Override
-  public String credentialType() {
-    return AwsIrsaCredential.AWS_IRSA_CREDENTIAL_TYPE;
-  }
-
-  @Override
-  public Credential getCredential(CredentialContext context) {
+  public AwsIrsaCredential generate(CredentialContext context) {
     if (!(context instanceof PathBasedCredentialContext)) {
       // Fallback to original behavior for non-path-based contexts
       AwsCredentials creds = baseCredentialsProvider.resolveCredentials();
@@ -157,14 +113,9 @@ public class AwsIrsaCredentialProvider implements 
CredentialProvider {
       Set<String> readLocations, Set<String> writeLocations, String userName) {
     validateInputParameters(readLocations, writeLocations, userName);
 
-    // Create session policy that restricts access to specific paths
-    IamPolicy sessionPolicy = createSessionPolicy(readLocations, 
writeLocations);
-
-    // Get web identity token file path and validate
+    IamPolicy sessionPolicy = createSessionPolicy(readLocations, 
writeLocations, region);
     String webIdentityTokenFile = getValidatedWebIdentityTokenFile();
-
-    // Get role ARN and validate
-    String effectiveRoleArn = getValidatedRoleArn();
+    String effectiveRoleArn = getValidatedRoleArn(roleArn);
 
     try {
       String tokenContent =
@@ -181,19 +132,15 @@ public class AwsIrsaCredentialProvider implements 
CredentialProvider {
     }
   }
 
-  private IamPolicy createSessionPolicy(Set<String> readLocations, Set<String> 
writeLocations) {
+  private IamPolicy createSessionPolicy(
+      Set<String> readLocations, Set<String> writeLocations, String region) {
     IamPolicy.Builder policyBuilder = IamPolicy.builder();
-    String arnPrefix = getArnPrefix();
+    String arnPrefix = getArnPrefix(region);
 
-    // Add read permissions for all locations
     addReadPermissions(policyBuilder, readLocations, writeLocations, 
arnPrefix);
-
-    // Add write permissions if needed
     if (!writeLocations.isEmpty()) {
       addWritePermissions(policyBuilder, writeLocations, arnPrefix);
     }
-
-    // Add bucket-level permissions
     addBucketPermissions(policyBuilder, readLocations, writeLocations, 
arnPrefix);
 
     return policyBuilder.build();
@@ -256,7 +203,6 @@ public class AwsIrsaCredentialProvider implements 
CredentialProvider {
               String bucketArn = arnPrefix + getBucketName(uri);
               String rawPath = trimLeadingSlash(uri.getPath());
 
-              // Add list bucket permissions with prefix conditions
               bucketListStatementBuilder
                   .computeIfAbsent(
                       bucketArn,
@@ -268,11 +214,8 @@ public class AwsIrsaCredentialProvider implements 
CredentialProvider {
                   .addConditions(
                       IamConditionOperator.STRING_LIKE,
                       "s3:prefix",
-                      Arrays.asList(
-                          rawPath, // Get raw path metadata information
-                          addWildcardToPath(rawPath))); // Listing objects in 
raw path
+                      Arrays.asList(rawPath, addWildcardToPath(rawPath)));
 
-              // Add get bucket location permissions
               bucketGetLocationStatementBuilder.computeIfAbsent(
                   bucketArn,
                   key ->
@@ -282,34 +225,20 @@ public class AwsIrsaCredentialProvider implements 
CredentialProvider {
                           .addResource(key));
             });
 
-    // Add bucket list statements
-    addStatementsToPolicy(policyBuilder, bucketListStatementBuilder, 
"s3:ListBucket");
-
-    // Add bucket location statements
-    addStatementsToPolicy(policyBuilder, bucketGetLocationStatementBuilder, 
null);
+    addStatementsToPolicy(policyBuilder, bucketListStatementBuilder);
+    addStatementsToPolicy(policyBuilder, bucketGetLocationStatementBuilder);
   }
 
   private void addStatementsToPolicy(
-      IamPolicy.Builder policyBuilder,
-      Map<String, IamStatement.Builder> statementBuilders,
-      String fallbackAction) {
-    if (!statementBuilders.isEmpty()) {
-      statementBuilders
-          .values()
-          .forEach(statementBuilder -> 
policyBuilder.addStatement(statementBuilder.build()));
-    } else if (fallbackAction != null) {
-      policyBuilder.addStatement(
-          
IamStatement.builder().effect(IamEffect.ALLOW).addAction(fallbackAction).build());
-    }
+      IamPolicy.Builder policyBuilder, Map<String, IamStatement.Builder> 
statementBuilders) {
+    statementBuilders.values().forEach(builder -> 
policyBuilder.addStatement(builder.build()));
   }
 
   private String getS3UriWithArn(String arnPrefix, URI uri) {
     return arnPrefix + addWildcardToPath(removeSchemaFromS3Uri(uri));
   }
 
-  private String getArnPrefix() {
-    // For session policies, we default to standard AWS S3 ARN prefix
-    // The region can be determined from the AWS environment or configuration
+  private String getArnPrefix(String region) {
     if (StringUtils.isNotBlank(region)) {
       if (region.contains("cn-")) {
         return "arn:aws-cn:s3:::";
@@ -324,7 +253,6 @@ public class AwsIrsaCredentialProvider implements 
CredentialProvider {
     return path.endsWith("/") ? path + "*" : path + "/*";
   }
 
-  // Transform 's3://bucket/path' to /bucket/path
   private static String removeSchemaFromS3Uri(URI uri) {
     String bucket = uri.getHost();
     String path = trimLeadingSlash(uri.getPath());
@@ -333,10 +261,7 @@ public class AwsIrsaCredentialProvider implements 
CredentialProvider {
   }
 
   private static String trimLeadingSlash(String path) {
-    if (path.startsWith("/")) {
-      path = path.substring(1);
-    }
-    return path;
+    return path.startsWith("/") ? path.substring(1) : path;
   }
 
   private static String getBucketName(URI uri) {
@@ -368,9 +293,9 @@ public class AwsIrsaCredentialProvider implements 
CredentialProvider {
     return webIdentityTokenFile;
   }
 
-  private String getValidatedRoleArn() {
+  private String getValidatedRoleArn(String configRoleArn) {
     String effectiveRoleArn =
-        StringUtils.isNotBlank(roleArn) ? roleArn : 
System.getenv("AWS_ROLE_ARN");
+        StringUtils.isNotBlank(configRoleArn) ? configRoleArn : 
System.getenv("AWS_ROLE_ARN");
     if (StringUtils.isBlank(effectiveRoleArn)) {
       throw new IllegalStateException(
           "No role ARN available. Either configure s3-role-arn or ensure 
AWS_ROLE_ARN environment variable is set.");
@@ -383,7 +308,6 @@ public class AwsIrsaCredentialProvider implements 
CredentialProvider {
 
   private Credentials assumeRoleWithSessionPolicy(
       String roleArn, String userName, String webIdentityToken, IamPolicy 
sessionPolicy) {
-    // Create STS client for this request
     StsClientBuilder stsBuilder = StsClient.builder();
     if (StringUtils.isNotBlank(region)) {
       stsBuilder.region(Region.of(region));
@@ -406,4 +330,7 @@ public class AwsIrsaCredentialProvider implements 
CredentialProvider {
       return response.credentials();
     }
   }
+
+  @Override
+  public void close() throws IOException {}
 }
diff --git 
a/bundles/aws/src/main/java/org/apache/gravitino/s3/credential/AwsIrsaCredentialProvider.java
 
b/bundles/aws/src/main/java/org/apache/gravitino/s3/credential/AwsIrsaCredentialProvider.java
index 8a59756d84..e758650759 100644
--- 
a/bundles/aws/src/main/java/org/apache/gravitino/s3/credential/AwsIrsaCredentialProvider.java
+++ 
b/bundles/aws/src/main/java/org/apache/gravitino/s3/credential/AwsIrsaCredentialProvider.java
@@ -1,55 +1,26 @@
 /*
- *  Licensed to the Apache Software Foundation (ASF) under one
- *  or more contributor license agreements.  See the NOTICE file
- *  distributed with this work for additional information
- *  regarding copyright ownership.  The ASF licenses this file
- *  to you under the Apache License, Version 2.0 (the
- *  "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
  *
- *   http://www.apache.org/licenses/LICENSE-2.0
+ *  http://www.apache.org/licenses/LICENSE-2.0
  *
- *  Unless required by applicable law or agreed to in writing,
- *  software distributed under the License is distributed on an
- *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- *  KIND, either express or implied.  See the License for the
- *  specific language governing permissions and limitations
- *  under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
  */
 
 package org.apache.gravitino.s3.credential;
 
-import java.net.URI;
-import java.nio.charset.StandardCharsets;
-import java.nio.file.Files;
-import java.nio.file.Paths;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
-import java.util.stream.Stream;
-import org.apache.commons.lang3.StringUtils;
 import org.apache.gravitino.credential.AwsIrsaCredential;
-import org.apache.gravitino.credential.Credential;
-import org.apache.gravitino.credential.CredentialContext;
-import org.apache.gravitino.credential.CredentialProvider;
-import org.apache.gravitino.credential.PathBasedCredentialContext;
-import org.apache.gravitino.credential.config.S3CredentialConfig;
-import software.amazon.awssdk.auth.credentials.AwsCredentials;
-import software.amazon.awssdk.auth.credentials.AwsSessionCredentials;
-import 
software.amazon.awssdk.auth.credentials.WebIdentityTokenFileCredentialsProvider;
-import software.amazon.awssdk.policybuilder.iam.IamConditionOperator;
-import software.amazon.awssdk.policybuilder.iam.IamEffect;
-import software.amazon.awssdk.policybuilder.iam.IamPolicy;
-import software.amazon.awssdk.policybuilder.iam.IamResource;
-import software.amazon.awssdk.policybuilder.iam.IamStatement;
-import software.amazon.awssdk.regions.Region;
-import software.amazon.awssdk.services.sts.StsClient;
-import software.amazon.awssdk.services.sts.StsClientBuilder;
-import 
software.amazon.awssdk.services.sts.model.AssumeRoleWithWebIdentityRequest;
-import 
software.amazon.awssdk.services.sts.model.AssumeRoleWithWebIdentityResponse;
-import software.amazon.awssdk.services.sts.model.Credentials;
+import org.apache.gravitino.credential.CredentialProviderDelegator;
 
 /**
  * AWS IRSA credential provider that supports both basic IRSA credentials and 
fine-grained
@@ -85,30 +56,7 @@ import software.amazon.awssdk.services.sts.model.Credentials;
  *       directly)
  * </ul>
  */
-public class AwsIrsaCredentialProvider implements CredentialProvider {
-
-  private WebIdentityTokenFileCredentialsProvider baseCredentialsProvider;
-  private String roleArn;
-  private int tokenExpireSecs;
-  private String region;
-  private String stsEndpoint;
-
-  @Override
-  public void initialize(Map<String, String> properties) {
-    // Use WebIdentityTokenFileCredentialsProvider for base IRSA configuration
-    this.baseCredentialsProvider = 
WebIdentityTokenFileCredentialsProvider.create();
-
-    S3CredentialConfig s3CredentialConfig = new S3CredentialConfig(properties);
-    this.roleArn = s3CredentialConfig.s3RoleArn();
-    this.tokenExpireSecs = s3CredentialConfig.tokenExpireInSecs();
-    this.region = s3CredentialConfig.region();
-    this.stsEndpoint = s3CredentialConfig.stsEndpoint();
-  }
-
-  @Override
-  public void close() {
-    // No external resources to close
-  }
+public class AwsIrsaCredentialProvider extends 
CredentialProviderDelegator<AwsIrsaCredential> {
 
   @Override
   public String credentialType() {
@@ -116,294 +64,7 @@ public class AwsIrsaCredentialProvider implements 
CredentialProvider {
   }
 
   @Override
-  public Credential getCredential(CredentialContext context) {
-    if (!(context instanceof PathBasedCredentialContext)) {
-      // Fallback to original behavior for non-path-based contexts
-      AwsCredentials creds = baseCredentialsProvider.resolveCredentials();
-      if (creds instanceof AwsSessionCredentials) {
-        AwsSessionCredentials sessionCreds = (AwsSessionCredentials) creds;
-        long expiration =
-            sessionCreds.expirationTime().isPresent()
-                ? sessionCreds.expirationTime().get().toEpochMilli()
-                : 0L;
-        return new AwsIrsaCredential(
-            sessionCreds.accessKeyId(),
-            sessionCreds.secretAccessKey(),
-            sessionCreds.sessionToken(),
-            expiration);
-      } else {
-        throw new IllegalStateException(
-            "AWS IRSA credentials must be of type AwsSessionCredentials. "
-                + "Check your EKS/IRSA configuration. Got: "
-                + creds.getClass().getName());
-      }
-    }
-
-    PathBasedCredentialContext pathBasedCredentialContext = 
(PathBasedCredentialContext) context;
-
-    Credentials s3Token =
-        createCredentialsWithSessionPolicy(
-            pathBasedCredentialContext.getReadPaths(),
-            pathBasedCredentialContext.getWritePaths(),
-            pathBasedCredentialContext.getUserName());
-    return new AwsIrsaCredential(
-        s3Token.accessKeyId(),
-        s3Token.secretAccessKey(),
-        s3Token.sessionToken(),
-        s3Token.expiration().toEpochMilli());
-  }
-
-  private Credentials createCredentialsWithSessionPolicy(
-      Set<String> readLocations, Set<String> writeLocations, String userName) {
-    validateInputParameters(readLocations, writeLocations, userName);
-
-    // Create session policy that restricts access to specific paths
-    IamPolicy sessionPolicy = createSessionPolicy(readLocations, 
writeLocations);
-
-    // Get web identity token file path and validate
-    String webIdentityTokenFile = getValidatedWebIdentityTokenFile();
-
-    // Get role ARN and validate
-    String effectiveRoleArn = getValidatedRoleArn();
-
-    try {
-      String tokenContent =
-          new String(Files.readAllBytes(Paths.get(webIdentityTokenFile)), 
StandardCharsets.UTF_8);
-      if (StringUtils.isBlank(tokenContent)) {
-        throw new IllegalStateException(
-            "Web identity token file is empty: " + webIdentityTokenFile);
-      }
-
-      return assumeRoleWithSessionPolicy(effectiveRoleArn, userName, 
tokenContent, sessionPolicy);
-    } catch (Exception e) {
-      throw new RuntimeException(
-          "Failed to create credentials with session policy for user: " + 
userName, e);
-    }
-  }
-
-  private IamPolicy createSessionPolicy(Set<String> readLocations, Set<String> 
writeLocations) {
-    IamPolicy.Builder policyBuilder = IamPolicy.builder();
-    String arnPrefix = getArnPrefix();
-
-    // Add read permissions for all locations
-    addReadPermissions(policyBuilder, readLocations, writeLocations, 
arnPrefix);
-
-    // Add write permissions if needed
-    if (!writeLocations.isEmpty()) {
-      addWritePermissions(policyBuilder, writeLocations, arnPrefix);
-    }
-
-    // Add bucket-level permissions
-    addBucketPermissions(policyBuilder, readLocations, writeLocations, 
arnPrefix);
-
-    return policyBuilder.build();
-  }
-
-  private void addReadPermissions(
-      IamPolicy.Builder policyBuilder,
-      Set<String> readLocations,
-      Set<String> writeLocations,
-      String arnPrefix) {
-    IamStatement.Builder allowGetObjectStatementBuilder =
-        IamStatement.builder()
-            .effect(IamEffect.ALLOW)
-            .addAction("s3:GetObject")
-            .addAction("s3:GetObjectVersion");
-
-    Stream.concat(readLocations.stream(), writeLocations.stream())
-        .distinct()
-        .forEach(
-            location -> {
-              URI uri = URI.create(location);
-              allowGetObjectStatementBuilder.addResource(
-                  IamResource.create(getS3UriWithArn(arnPrefix, uri)));
-            });
-
-    policyBuilder.addStatement(allowGetObjectStatementBuilder.build());
-  }
-
-  private void addWritePermissions(
-      IamPolicy.Builder policyBuilder, Set<String> writeLocations, String 
arnPrefix) {
-    IamStatement.Builder allowPutObjectStatementBuilder =
-        IamStatement.builder()
-            .effect(IamEffect.ALLOW)
-            .addAction("s3:PutObject")
-            .addAction("s3:DeleteObject");
-
-    writeLocations.forEach(
-        location -> {
-          URI uri = URI.create(location);
-          allowPutObjectStatementBuilder.addResource(
-              IamResource.create(getS3UriWithArn(arnPrefix, uri)));
-        });
-
-    policyBuilder.addStatement(allowPutObjectStatementBuilder.build());
-  }
-
-  private void addBucketPermissions(
-      IamPolicy.Builder policyBuilder,
-      Set<String> readLocations,
-      Set<String> writeLocations,
-      String arnPrefix) {
-    Map<String, IamStatement.Builder> bucketListStatementBuilder = new 
HashMap<>();
-    Map<String, IamStatement.Builder> bucketGetLocationStatementBuilder = new 
HashMap<>();
-
-    Stream.concat(readLocations.stream(), writeLocations.stream())
-        .distinct()
-        .forEach(
-            location -> {
-              URI uri = URI.create(location);
-              String bucketArn = arnPrefix + getBucketName(uri);
-              String rawPath = trimLeadingSlash(uri.getPath());
-
-              // Add list bucket permissions with prefix conditions
-              bucketListStatementBuilder
-                  .computeIfAbsent(
-                      bucketArn,
-                      key ->
-                          IamStatement.builder()
-                              .effect(IamEffect.ALLOW)
-                              .addAction("s3:ListBucket")
-                              .addResource(key))
-                  .addConditions(
-                      IamConditionOperator.STRING_LIKE,
-                      "s3:prefix",
-                      Arrays.asList(
-                          rawPath, // Get raw path metadata information
-                          addWildcardToPath(rawPath))); // Listing objects in 
raw path
-
-              // Add get bucket location permissions
-              bucketGetLocationStatementBuilder.computeIfAbsent(
-                  bucketArn,
-                  key ->
-                      IamStatement.builder()
-                          .effect(IamEffect.ALLOW)
-                          .addAction("s3:GetBucketLocation")
-                          .addResource(key));
-            });
-
-    // Add bucket list statements
-    addStatementsToPolicy(policyBuilder, bucketListStatementBuilder, 
"s3:ListBucket");
-
-    // Add bucket location statements
-    addStatementsToPolicy(policyBuilder, bucketGetLocationStatementBuilder, 
null);
-  }
-
-  private void addStatementsToPolicy(
-      IamPolicy.Builder policyBuilder,
-      Map<String, IamStatement.Builder> statementBuilders,
-      String fallbackAction) {
-    if (!statementBuilders.isEmpty()) {
-      statementBuilders
-          .values()
-          .forEach(statementBuilder -> 
policyBuilder.addStatement(statementBuilder.build()));
-    } else if (fallbackAction != null) {
-      policyBuilder.addStatement(
-          
IamStatement.builder().effect(IamEffect.ALLOW).addAction(fallbackAction).build());
-    }
-  }
-
-  private String getS3UriWithArn(String arnPrefix, URI uri) {
-    return arnPrefix + addWildcardToPath(removeSchemaFromS3Uri(uri));
-  }
-
-  private String getArnPrefix() {
-    // For session policies, we default to standard AWS S3 ARN prefix
-    // The region can be determined from the AWS environment or configuration
-    if (StringUtils.isNotBlank(region)) {
-      if (region.contains("cn-")) {
-        return "arn:aws-cn:s3:::";
-      } else if (region.contains("us-gov-")) {
-        return "arn:aws-us-gov:s3:::";
-      }
-    }
-    return "arn:aws:s3:::";
-  }
-
-  private static String addWildcardToPath(String path) {
-    return path.endsWith("/") ? path + "*" : path + "/*";
-  }
-
-  // Transform 's3://bucket/path' to /bucket/path
-  private static String removeSchemaFromS3Uri(URI uri) {
-    String bucket = uri.getHost();
-    String path = trimLeadingSlash(uri.getPath());
-    return String.join(
-        "/", Stream.of(bucket, 
path).filter(Objects::nonNull).toArray(String[]::new));
-  }
-
-  private static String trimLeadingSlash(String path) {
-    if (path.startsWith("/")) {
-      path = path.substring(1);
-    }
-    return path;
-  }
-
-  private static String getBucketName(URI uri) {
-    return uri.getHost();
-  }
-
-  private void validateInputParameters(
-      Set<String> readLocations, Set<String> writeLocations, String userName) {
-    if (StringUtils.isBlank(userName)) {
-      throw new IllegalArgumentException("userName cannot be null or empty");
-    }
-    if ((readLocations == null || readLocations.isEmpty())
-        && (writeLocations == null || writeLocations.isEmpty())) {
-      throw new IllegalArgumentException("At least one read or write location 
must be specified");
-    }
-  }
-
-  private String getValidatedWebIdentityTokenFile() {
-    String webIdentityTokenFile = System.getenv("AWS_WEB_IDENTITY_TOKEN_FILE");
-    if (StringUtils.isBlank(webIdentityTokenFile)) {
-      throw new IllegalStateException(
-          "AWS_WEB_IDENTITY_TOKEN_FILE environment variable is not set. "
-              + "Ensure IRSA is properly configured in your EKS cluster.");
-    }
-    if (!Files.exists(Paths.get(webIdentityTokenFile))) {
-      throw new IllegalStateException(
-          "Web identity token file does not exist: " + webIdentityTokenFile);
-    }
-    return webIdentityTokenFile;
-  }
-
-  private String getValidatedRoleArn() {
-    String effectiveRoleArn =
-        StringUtils.isNotBlank(roleArn) ? roleArn : 
System.getenv("AWS_ROLE_ARN");
-    if (StringUtils.isBlank(effectiveRoleArn)) {
-      throw new IllegalStateException(
-          "No role ARN available. Either configure s3-role-arn or ensure 
AWS_ROLE_ARN environment variable is set.");
-    }
-    if (!effectiveRoleArn.startsWith("arn:aws")) {
-      throw new IllegalArgumentException("Invalid role ARN format: " + 
effectiveRoleArn);
-    }
-    return effectiveRoleArn;
-  }
-
-  private Credentials assumeRoleWithSessionPolicy(
-      String roleArn, String userName, String webIdentityToken, IamPolicy 
sessionPolicy) {
-    // Create STS client for this request
-    StsClientBuilder stsBuilder = StsClient.builder();
-    if (StringUtils.isNotBlank(region)) {
-      stsBuilder.region(Region.of(region));
-    }
-    if (StringUtils.isNotBlank(stsEndpoint)) {
-      stsBuilder.endpointOverride(URI.create(stsEndpoint));
-    }
-
-    try (StsClient stsClient = stsBuilder.build()) {
-      AssumeRoleWithWebIdentityRequest request =
-          AssumeRoleWithWebIdentityRequest.builder()
-              .roleArn(roleArn)
-              .roleSessionName("gravitino_irsa_session_" + userName)
-              .durationSeconds(tokenExpireSecs)
-              .webIdentityToken(webIdentityToken)
-              .policy(sessionPolicy.toJson())
-              .build();
-
-      AssumeRoleWithWebIdentityResponse response = 
stsClient.assumeRoleWithWebIdentity(request);
-      return response.credentials();
-    }
+  public String getGeneratorClassName() {
+    return "org.apache.gravitino.s3.credential.AwsIrsaCredentialGenerator";
   }
 }
diff --git 
a/bundles/aws/src/main/java/org/apache/gravitino/s3/credential/S3TokenProvider.java
 
b/bundles/aws/src/main/java/org/apache/gravitino/s3/credential/S3TokenGenerator.java
similarity index 60%
copy from 
bundles/aws/src/main/java/org/apache/gravitino/s3/credential/S3TokenProvider.java
copy to 
bundles/aws/src/main/java/org/apache/gravitino/s3/credential/S3TokenGenerator.java
index 56d293d046..9158a5bafe 100644
--- 
a/bundles/aws/src/main/java/org/apache/gravitino/s3/credential/S3TokenProvider.java
+++ 
b/bundles/aws/src/main/java/org/apache/gravitino/s3/credential/S3TokenGenerator.java
@@ -1,24 +1,25 @@
 /*
- *  Licensed to the Apache Software Foundation (ASF) under one
- *  or more contributor license agreements.  See the NOTICE file
- *  distributed with this work for additional information
- *  regarding copyright ownership.  The ASF licenses this file
- *  to you under the Apache License, Version 2.0 (the
- *  "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
  *
- *   http://www.apache.org/licenses/LICENSE-2.0
+ *  http://www.apache.org/licenses/LICENSE-2.0
  *
- *  Unless required by applicable law or agreed to in writing,
- *  software distributed under the License is distributed on an
- *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- *  KIND, either express or implied.  See the License for the
- *  specific language governing permissions and limitations
- *  under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
  */
 
 package org.apache.gravitino.s3.credential;
 
+import java.io.IOException;
 import java.net.URI;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -27,9 +28,8 @@ import java.util.Objects;
 import java.util.Set;
 import java.util.stream.Stream;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.gravitino.credential.Credential;
 import org.apache.gravitino.credential.CredentialContext;
-import org.apache.gravitino.credential.CredentialProvider;
+import org.apache.gravitino.credential.CredentialGenerator;
 import org.apache.gravitino.credential.PathBasedCredentialContext;
 import org.apache.gravitino.credential.S3TokenCredential;
 import org.apache.gravitino.credential.config.S3CredentialConfig;
@@ -48,8 +48,8 @@ import 
software.amazon.awssdk.services.sts.model.AssumeRoleRequest;
 import software.amazon.awssdk.services.sts.model.AssumeRoleResponse;
 import software.amazon.awssdk.services.sts.model.Credentials;
 
-/** Generates S3 token to access S3 data. */
-public class S3TokenProvider implements CredentialProvider {
+/** Generate S3 token credentials according to the read and write paths. */
+public class S3TokenGenerator implements 
CredentialGenerator<S3TokenCredential> {
 
   private StsClient stsClient;
   private String roleArn;
@@ -66,29 +66,17 @@ public class S3TokenProvider implements CredentialProvider {
   }
 
   @Override
-  public void close() {
-    if (stsClient != null) {
-      stsClient.close();
-    }
-  }
-
-  @Override
-  public String credentialType() {
-    return S3TokenCredential.S3_TOKEN_CREDENTIAL_TYPE;
-  }
-
-  @Override
-  public Credential getCredential(CredentialContext context) {
+  public S3TokenCredential generate(CredentialContext context) {
     if (!(context instanceof PathBasedCredentialContext)) {
       return null;
     }
-    PathBasedCredentialContext pathBasedCredentialContext = 
(PathBasedCredentialContext) context;
+
+    PathBasedCredentialContext pathContext = (PathBasedCredentialContext) 
context;
+
     Credentials s3Token =
         createS3Token(
-            roleArn,
-            pathBasedCredentialContext.getReadPaths(),
-            pathBasedCredentialContext.getWritePaths(),
-            pathBasedCredentialContext.getUserName());
+            pathContext.getReadPaths(), pathContext.getWritePaths(), 
pathContext.getUserName());
+
     return new S3TokenCredential(
         s3Token.accessKeyId(),
         s3Token.secretAccessKey(),
@@ -102,20 +90,33 @@ public class S3TokenProvider implements CredentialProvider 
{
             AwsBasicCredentials.create(
                 s3CredentialConfig.accessKeyID(), 
s3CredentialConfig.secretAccessKey()));
     StsClientBuilder builder = 
StsClient.builder().credentialsProvider(credentialsProvider);
-    String region = s3CredentialConfig.region();
-    if (StringUtils.isNotBlank(region)) {
-      builder.region(Region.of(region));
+
+    if (StringUtils.isNotBlank(s3CredentialConfig.region())) {
+      builder.region(Region.of(s3CredentialConfig.region()));
     }
-    String stsEndpoint = s3CredentialConfig.stsEndpoint();
-    // If the user does not set a value or provides an blank string, we treat 
as unspecified.
-    // The goal is to pass a custom endpoint to the `builder` only when the 
user specifies a
-    // non-blank value.
-    if (StringUtils.isNotBlank(stsEndpoint)) {
-      builder.endpointOverride(URI.create(stsEndpoint));
+    if (StringUtils.isNotBlank(s3CredentialConfig.stsEndpoint())) {
+      builder.endpointOverride(URI.create(s3CredentialConfig.stsEndpoint()));
     }
     return builder.build();
   }
 
+  private Credentials createS3Token(
+      Set<String> readLocations, Set<String> writeLocations, String userName) {
+    IamPolicy policy = createPolicy(roleArn, readLocations, writeLocations);
+    AssumeRoleRequest.Builder builder =
+        AssumeRoleRequest.builder()
+            .roleArn(roleArn)
+            .roleSessionName("gravitino_" + userName)
+            .durationSeconds(tokenExpireSecs)
+            .policy(policy.toJson());
+
+    if (StringUtils.isNotBlank(externalID)) {
+      builder.externalId(externalID);
+    }
+    AssumeRoleResponse response = stsClient.assumeRole(builder.build());
+    return response.credentials();
+  }
+
   private IamPolicy createPolicy(
       String roleArn, Set<String> readLocations, Set<String> writeLocations) {
     IamPolicy.Builder policyBuilder = IamPolicy.builder();
@@ -124,8 +125,8 @@ public class S3TokenProvider implements CredentialProvider {
             .effect(IamEffect.ALLOW)
             .addAction("s3:GetObject")
             .addAction("s3:GetObjectVersion");
-    Map<String, IamStatement.Builder> bucketListStatmentBuilder = new 
HashMap<>();
-    Map<String, IamStatement.Builder> bucketGetLocationStatmentBuilder = new 
HashMap<>();
+    Map<String, IamStatement.Builder> bucketListStatementBuilder = new 
HashMap<>();
+    Map<String, IamStatement.Builder> bucketGetLocationStatementBuilder = new 
HashMap<>();
 
     String arnPrefix = getArnPrefix(roleArn);
     Stream.concat(readLocations.stream(), writeLocations.stream())
@@ -137,10 +138,10 @@ public class S3TokenProvider implements 
CredentialProvider {
                   IamResource.create(getS3UriWithArn(arnPrefix, uri)));
               String bucketArn = arnPrefix + getBucketName(uri);
               String rawPath = trimLeadingSlash(uri.getPath());
-              bucketListStatmentBuilder
+              bucketListStatementBuilder
                   .computeIfAbsent(
                       bucketArn,
-                      (String key) ->
+                      key ->
                           IamStatement.builder()
                               .effect(IamEffect.ALLOW)
                               .addAction("s3:ListBucket")
@@ -148,12 +149,9 @@ public class S3TokenProvider implements CredentialProvider 
{
                   .addConditions(
                       IamConditionOperator.STRING_LIKE,
                       "s3:prefix",
-                      Arrays.asList(
-                          // Get raw path metadata information for AWS hadoop 
connector
-                          rawPath,
-                          // Listing objects in raw path
-                          concatPathWithSep(rawPath, "*", "/")));
-              bucketGetLocationStatmentBuilder.computeIfAbsent(
+                      Arrays.asList(rawPath, addWildcardToPath(rawPath)));
+
+              bucketGetLocationStatementBuilder.computeIfAbsent(
                   bucketArn,
                   key ->
                       IamStatement.builder()
@@ -176,24 +174,20 @@ public class S3TokenProvider implements 
CredentialProvider {
           });
       policyBuilder.addStatement(allowPutObjectStatementBuilder.build());
     }
-    if (!bucketListStatmentBuilder.isEmpty()) {
-      bucketListStatmentBuilder
-          .values()
-          .forEach(statementBuilder -> 
policyBuilder.addStatement(statementBuilder.build()));
-    } else {
-      // add list privilege with 0 resources
-      policyBuilder.addStatement(
-          
IamStatement.builder().effect(IamEffect.ALLOW).addAction("s3:ListBucket").build());
-    }
 
-    bucketGetLocationStatmentBuilder
+    bucketListStatementBuilder
+        .values()
+        .forEach(builder -> policyBuilder.addStatement(builder.build()));
+    bucketGetLocationStatementBuilder
         .values()
-        .forEach(statementBuilder -> 
policyBuilder.addStatement(statementBuilder.build()));
-    return 
policyBuilder.addStatement(allowGetObjectStatementBuilder.build()).build();
+        .forEach(builder -> policyBuilder.addStatement(builder.build()));
+    policyBuilder.addStatement(allowGetObjectStatementBuilder.build());
+
+    return policyBuilder.build();
   }
 
   private String getS3UriWithArn(String arnPrefix, URI uri) {
-    return arnPrefix + concatPathWithSep(removeSchemaFromS3Uri(uri), "*", "/");
+    return arnPrefix + addWildcardToPath(removeSchemaFromS3Uri(uri));
   }
 
   private String getArnPrefix(String roleArn) {
@@ -201,22 +195,14 @@ public class S3TokenProvider implements 
CredentialProvider {
       return "arn:aws-cn:s3:::";
     } else if (roleArn.contains("aws-us-gov")) {
       return "arn:aws-us-gov:s3:::";
-    } else {
-      return "arn:aws:s3:::";
     }
+    return "arn:aws:s3:::";
   }
 
-  private static String concatPathWithSep(String leftPath, String rightPath, 
String fileSep) {
-    if (leftPath.endsWith(fileSep) && rightPath.startsWith(fileSep)) {
-      return leftPath + rightPath.substring(1);
-    } else if (!leftPath.endsWith(fileSep) && !rightPath.startsWith(fileSep)) {
-      return leftPath + fileSep + rightPath;
-    } else {
-      return leftPath + rightPath;
-    }
+  private static String addWildcardToPath(String path) {
+    return path.endsWith("/") ? path + "*" : path + "/*";
   }
 
-  // Transform 's3://bucket/path' to /bucket/path
   private static String removeSchemaFromS3Uri(URI uri) {
     String bucket = uri.getHost();
     String path = trimLeadingSlash(uri.getPath());
@@ -225,29 +211,17 @@ public class S3TokenProvider implements 
CredentialProvider {
   }
 
   private static String trimLeadingSlash(String path) {
-    if (path.startsWith("/")) {
-      path = path.substring(1);
-    }
-    return path;
+    return path.startsWith("/") ? path.substring(1) : path;
   }
 
   private static String getBucketName(URI uri) {
     return uri.getHost();
   }
 
-  private Credentials createS3Token(
-      String roleArn, Set<String> readLocations, Set<String> writeLocations, 
String userName) {
-    IamPolicy policy = createPolicy(roleArn, readLocations, writeLocations);
-    AssumeRoleRequest.Builder builder =
-        AssumeRoleRequest.builder()
-            .roleArn(roleArn)
-            .roleSessionName("gravitino_" + userName)
-            .durationSeconds(tokenExpireSecs)
-            .policy(policy.toJson());
-    if (StringUtils.isNotBlank(externalID)) {
-      builder.externalId(externalID);
+  @Override
+  public void close() throws IOException {
+    if (stsClient != null) {
+      stsClient.close();
     }
-    AssumeRoleResponse response = stsClient.assumeRole(builder.build());
-    return response.credentials();
   }
 }
diff --git 
a/bundles/aws/src/main/java/org/apache/gravitino/s3/credential/S3TokenProvider.java
 
b/bundles/aws/src/main/java/org/apache/gravitino/s3/credential/S3TokenProvider.java
index 56d293d046..164a867a88 100644
--- 
a/bundles/aws/src/main/java/org/apache/gravitino/s3/credential/S3TokenProvider.java
+++ 
b/bundles/aws/src/main/java/org/apache/gravitino/s3/credential/S3TokenProvider.java
@@ -19,58 +19,14 @@
 
 package org.apache.gravitino.s3.credential;
 
-import java.net.URI;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
-import java.util.stream.Stream;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.gravitino.credential.Credential;
-import org.apache.gravitino.credential.CredentialContext;
-import org.apache.gravitino.credential.CredentialProvider;
-import org.apache.gravitino.credential.PathBasedCredentialContext;
+import org.apache.gravitino.credential.CredentialProviderDelegator;
 import org.apache.gravitino.credential.S3TokenCredential;
-import org.apache.gravitino.credential.config.S3CredentialConfig;
-import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
-import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
-import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
-import software.amazon.awssdk.policybuilder.iam.IamConditionOperator;
-import software.amazon.awssdk.policybuilder.iam.IamEffect;
-import software.amazon.awssdk.policybuilder.iam.IamPolicy;
-import software.amazon.awssdk.policybuilder.iam.IamResource;
-import software.amazon.awssdk.policybuilder.iam.IamStatement;
-import software.amazon.awssdk.regions.Region;
-import software.amazon.awssdk.services.sts.StsClient;
-import software.amazon.awssdk.services.sts.StsClientBuilder;
-import software.amazon.awssdk.services.sts.model.AssumeRoleRequest;
-import software.amazon.awssdk.services.sts.model.AssumeRoleResponse;
-import software.amazon.awssdk.services.sts.model.Credentials;
 
-/** Generates S3 token to access S3 data. */
-public class S3TokenProvider implements CredentialProvider {
-
-  private StsClient stsClient;
-  private String roleArn;
-  private String externalID;
-  private int tokenExpireSecs;
-
-  @Override
-  public void initialize(Map<String, String> properties) {
-    S3CredentialConfig s3CredentialConfig = new S3CredentialConfig(properties);
-    this.roleArn = s3CredentialConfig.s3RoleArn();
-    this.externalID = s3CredentialConfig.externalID();
-    this.tokenExpireSecs = s3CredentialConfig.tokenExpireInSecs();
-    this.stsClient = createStsClient(s3CredentialConfig);
-  }
-
-  @Override
-  public void close() {
-    if (stsClient != null) {
-      stsClient.close();
-    }
-  }
+/**
+ * A lightweight credential provider for S3. It delegates the actual 
credential generation to {@link
+ * S3TokenGenerator} which is loaded via reflection to avoid classpath issues.
+ */
+public class S3TokenProvider extends 
CredentialProviderDelegator<S3TokenCredential> {
 
   @Override
   public String credentialType() {
@@ -78,176 +34,7 @@ public class S3TokenProvider implements CredentialProvider {
   }
 
   @Override
-  public Credential getCredential(CredentialContext context) {
-    if (!(context instanceof PathBasedCredentialContext)) {
-      return null;
-    }
-    PathBasedCredentialContext pathBasedCredentialContext = 
(PathBasedCredentialContext) context;
-    Credentials s3Token =
-        createS3Token(
-            roleArn,
-            pathBasedCredentialContext.getReadPaths(),
-            pathBasedCredentialContext.getWritePaths(),
-            pathBasedCredentialContext.getUserName());
-    return new S3TokenCredential(
-        s3Token.accessKeyId(),
-        s3Token.secretAccessKey(),
-        s3Token.sessionToken(),
-        s3Token.expiration().toEpochMilli());
-  }
-
-  private StsClient createStsClient(S3CredentialConfig s3CredentialConfig) {
-    AwsCredentialsProvider credentialsProvider =
-        StaticCredentialsProvider.create(
-            AwsBasicCredentials.create(
-                s3CredentialConfig.accessKeyID(), 
s3CredentialConfig.secretAccessKey()));
-    StsClientBuilder builder = 
StsClient.builder().credentialsProvider(credentialsProvider);
-    String region = s3CredentialConfig.region();
-    if (StringUtils.isNotBlank(region)) {
-      builder.region(Region.of(region));
-    }
-    String stsEndpoint = s3CredentialConfig.stsEndpoint();
-    // If the user does not set a value or provides an blank string, we treat 
as unspecified.
-    // The goal is to pass a custom endpoint to the `builder` only when the 
user specifies a
-    // non-blank value.
-    if (StringUtils.isNotBlank(stsEndpoint)) {
-      builder.endpointOverride(URI.create(stsEndpoint));
-    }
-    return builder.build();
-  }
-
-  private IamPolicy createPolicy(
-      String roleArn, Set<String> readLocations, Set<String> writeLocations) {
-    IamPolicy.Builder policyBuilder = IamPolicy.builder();
-    IamStatement.Builder allowGetObjectStatementBuilder =
-        IamStatement.builder()
-            .effect(IamEffect.ALLOW)
-            .addAction("s3:GetObject")
-            .addAction("s3:GetObjectVersion");
-    Map<String, IamStatement.Builder> bucketListStatmentBuilder = new 
HashMap<>();
-    Map<String, IamStatement.Builder> bucketGetLocationStatmentBuilder = new 
HashMap<>();
-
-    String arnPrefix = getArnPrefix(roleArn);
-    Stream.concat(readLocations.stream(), writeLocations.stream())
-        .distinct()
-        .forEach(
-            location -> {
-              URI uri = URI.create(location);
-              allowGetObjectStatementBuilder.addResource(
-                  IamResource.create(getS3UriWithArn(arnPrefix, uri)));
-              String bucketArn = arnPrefix + getBucketName(uri);
-              String rawPath = trimLeadingSlash(uri.getPath());
-              bucketListStatmentBuilder
-                  .computeIfAbsent(
-                      bucketArn,
-                      (String key) ->
-                          IamStatement.builder()
-                              .effect(IamEffect.ALLOW)
-                              .addAction("s3:ListBucket")
-                              .addResource(key))
-                  .addConditions(
-                      IamConditionOperator.STRING_LIKE,
-                      "s3:prefix",
-                      Arrays.asList(
-                          // Get raw path metadata information for AWS hadoop 
connector
-                          rawPath,
-                          // Listing objects in raw path
-                          concatPathWithSep(rawPath, "*", "/")));
-              bucketGetLocationStatmentBuilder.computeIfAbsent(
-                  bucketArn,
-                  key ->
-                      IamStatement.builder()
-                          .effect(IamEffect.ALLOW)
-                          .addAction("s3:GetBucketLocation")
-                          .addResource(key));
-            });
-
-    if (!writeLocations.isEmpty()) {
-      IamStatement.Builder allowPutObjectStatementBuilder =
-          IamStatement.builder()
-              .effect(IamEffect.ALLOW)
-              .addAction("s3:PutObject")
-              .addAction("s3:DeleteObject");
-      writeLocations.forEach(
-          location -> {
-            URI uri = URI.create(location);
-            allowPutObjectStatementBuilder.addResource(
-                IamResource.create(getS3UriWithArn(arnPrefix, uri)));
-          });
-      policyBuilder.addStatement(allowPutObjectStatementBuilder.build());
-    }
-    if (!bucketListStatmentBuilder.isEmpty()) {
-      bucketListStatmentBuilder
-          .values()
-          .forEach(statementBuilder -> 
policyBuilder.addStatement(statementBuilder.build()));
-    } else {
-      // add list privilege with 0 resources
-      policyBuilder.addStatement(
-          
IamStatement.builder().effect(IamEffect.ALLOW).addAction("s3:ListBucket").build());
-    }
-
-    bucketGetLocationStatmentBuilder
-        .values()
-        .forEach(statementBuilder -> 
policyBuilder.addStatement(statementBuilder.build()));
-    return 
policyBuilder.addStatement(allowGetObjectStatementBuilder.build()).build();
-  }
-
-  private String getS3UriWithArn(String arnPrefix, URI uri) {
-    return arnPrefix + concatPathWithSep(removeSchemaFromS3Uri(uri), "*", "/");
-  }
-
-  private String getArnPrefix(String roleArn) {
-    if (roleArn.contains("aws-cn")) {
-      return "arn:aws-cn:s3:::";
-    } else if (roleArn.contains("aws-us-gov")) {
-      return "arn:aws-us-gov:s3:::";
-    } else {
-      return "arn:aws:s3:::";
-    }
-  }
-
-  private static String concatPathWithSep(String leftPath, String rightPath, 
String fileSep) {
-    if (leftPath.endsWith(fileSep) && rightPath.startsWith(fileSep)) {
-      return leftPath + rightPath.substring(1);
-    } else if (!leftPath.endsWith(fileSep) && !rightPath.startsWith(fileSep)) {
-      return leftPath + fileSep + rightPath;
-    } else {
-      return leftPath + rightPath;
-    }
-  }
-
-  // Transform 's3://bucket/path' to /bucket/path
-  private static String removeSchemaFromS3Uri(URI uri) {
-    String bucket = uri.getHost();
-    String path = trimLeadingSlash(uri.getPath());
-    return String.join(
-        "/", Stream.of(bucket, 
path).filter(Objects::nonNull).toArray(String[]::new));
-  }
-
-  private static String trimLeadingSlash(String path) {
-    if (path.startsWith("/")) {
-      path = path.substring(1);
-    }
-    return path;
-  }
-
-  private static String getBucketName(URI uri) {
-    return uri.getHost();
-  }
-
-  private Credentials createS3Token(
-      String roleArn, Set<String> readLocations, Set<String> writeLocations, 
String userName) {
-    IamPolicy policy = createPolicy(roleArn, readLocations, writeLocations);
-    AssumeRoleRequest.Builder builder =
-        AssumeRoleRequest.builder()
-            .roleArn(roleArn)
-            .roleSessionName("gravitino_" + userName)
-            .durationSeconds(tokenExpireSecs)
-            .policy(policy.toJson());
-    if (StringUtils.isNotBlank(externalID)) {
-      builder.externalId(externalID);
-    }
-    AssumeRoleResponse response = stsClient.assumeRole(builder.build());
-    return response.credentials();
+  public String getGeneratorClassName() {
+    return "org.apache.gravitino.s3.credential.S3TokenGenerator";
   }
 }
diff --git 
a/bundles/azure/src/main/java/org/apache/gravitino/abs/credential/ADLSTokenProvider.java
 
b/bundles/azure/src/main/java/org/apache/gravitino/abs/credential/ADLSTokenGenerator.java
similarity index 73%
copy from 
bundles/azure/src/main/java/org/apache/gravitino/abs/credential/ADLSTokenProvider.java
copy to 
bundles/azure/src/main/java/org/apache/gravitino/abs/credential/ADLSTokenGenerator.java
index 3ec9d56c28..f3cbb5b1fe 100644
--- 
a/bundles/azure/src/main/java/org/apache/gravitino/abs/credential/ADLSTokenProvider.java
+++ 
b/bundles/azure/src/main/java/org/apache/gravitino/abs/credential/ADLSTokenGenerator.java
@@ -1,22 +1,21 @@
 /*
- *  Licensed to the Apache Software Foundation (ASF) under one
- *  or more contributor license agreements.  See the NOTICE file
- *  distributed with this work for additional information
- *  regarding copyright ownership.  The ASF licenses this file
- *  to you under the Apache License, Version 2.0 (the
- *  "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
  *
- *   http://www.apache.org/licenses/LICENSE-2.0
+ *  http://www.apache.org/licenses/LICENSE-2.0
  *
- *  Unless required by applicable law or agreed to in writing,
- *  software distributed under the License is distributed on an
- *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- *  KIND, either express or implied.  See the License for the
- *  specific language governing permissions and limitations
- *  under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
  */
-
 package org.apache.gravitino.abs.credential;
 
 import com.azure.core.util.Context;
@@ -28,19 +27,20 @@ import 
com.azure.storage.file.datalake.implementation.util.DataLakeSasImplUtil;
 import com.azure.storage.file.datalake.models.UserDelegationKey;
 import com.azure.storage.file.datalake.sas.DataLakeServiceSasSignatureValues;
 import com.azure.storage.file.datalake.sas.PathSasPermission;
+import java.io.IOException;
 import java.time.OffsetDateTime;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 import org.apache.gravitino.credential.ADLSTokenCredential;
-import org.apache.gravitino.credential.Credential;
 import org.apache.gravitino.credential.CredentialContext;
-import org.apache.gravitino.credential.CredentialProvider;
+import org.apache.gravitino.credential.CredentialGenerator;
 import org.apache.gravitino.credential.PathBasedCredentialContext;
 import org.apache.gravitino.credential.config.AzureCredentialConfig;
 
 /** Generates ADLS token to access ADLS data. */
-public class ADLSTokenProvider implements CredentialProvider {
+public class ADLSTokenGenerator implements 
CredentialGenerator<ADLSTokenCredential> {
+
   private String storageAccountName;
   private String tenantId;
   private String clientId;
@@ -61,23 +61,15 @@ public class ADLSTokenProvider implements 
CredentialProvider {
   }
 
   @Override
-  public void close() {}
-
-  @Override
-  public String credentialType() {
-    return ADLSTokenCredential.ADLS_TOKEN_CREDENTIAL_TYPE;
-  }
-
-  @Override
-  public Credential getCredential(CredentialContext context) {
+  public ADLSTokenCredential generate(CredentialContext context) {
     if (!(context instanceof PathBasedCredentialContext)) {
       return null;
     }
-    PathBasedCredentialContext pathBasedCredentialContext = 
(PathBasedCredentialContext) context;
 
-    Set<String> writePaths = pathBasedCredentialContext.getWritePaths();
-    Set<String> readPaths = pathBasedCredentialContext.getReadPaths();
+    PathBasedCredentialContext pathContext = (PathBasedCredentialContext) 
context;
 
+    Set<String> writePaths = pathContext.getWritePaths();
+    Set<String> readPaths = pathContext.getReadPaths();
     Set<String> combinedPaths = new HashSet<>(writePaths);
     combinedPaths.addAll(readPaths);
 
@@ -109,7 +101,6 @@ public class ADLSTokenProvider implements 
CredentialProvider {
 
     PathSasPermission pathSasPermission =
         new 
PathSasPermission().setReadPermission(true).setListPermission(true);
-
     if (!writePaths.isEmpty()) {
       pathSasPermission
           .setWritePermission(true)
@@ -120,7 +111,6 @@ public class ADLSTokenProvider implements 
CredentialProvider {
 
     DataLakeServiceSasSignatureValues signatureValues =
         new DataLakeServiceSasSignatureValues(expiry, pathSasPermission);
-
     ADLSLocationUtils.ADLSLocationParts locationParts = 
ADLSLocationUtils.parseLocation(uniquePath);
     String sasToken =
         new DataLakeSasImplUtil(
@@ -134,4 +124,7 @@ public class ADLSTokenProvider implements 
CredentialProvider {
     return new ADLSTokenCredential(
         locationParts.getAccountName(), sasToken, 
expiry.toInstant().toEpochMilli());
   }
+
+  @Override
+  public void close() throws IOException {}
 }
diff --git 
a/bundles/azure/src/main/java/org/apache/gravitino/abs/credential/ADLSTokenProvider.java
 
b/bundles/azure/src/main/java/org/apache/gravitino/abs/credential/ADLSTokenProvider.java
index 3ec9d56c28..151d4cc93c 100644
--- 
a/bundles/azure/src/main/java/org/apache/gravitino/abs/credential/ADLSTokenProvider.java
+++ 
b/bundles/azure/src/main/java/org/apache/gravitino/abs/credential/ADLSTokenProvider.java
@@ -19,49 +19,14 @@
 
 package org.apache.gravitino.abs.credential;
 
-import com.azure.core.util.Context;
-import com.azure.identity.ClientSecretCredential;
-import com.azure.identity.ClientSecretCredentialBuilder;
-import com.azure.storage.file.datalake.DataLakeServiceClient;
-import com.azure.storage.file.datalake.DataLakeServiceClientBuilder;
-import com.azure.storage.file.datalake.implementation.util.DataLakeSasImplUtil;
-import com.azure.storage.file.datalake.models.UserDelegationKey;
-import com.azure.storage.file.datalake.sas.DataLakeServiceSasSignatureValues;
-import com.azure.storage.file.datalake.sas.PathSasPermission;
-import java.time.OffsetDateTime;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
 import org.apache.gravitino.credential.ADLSTokenCredential;
-import org.apache.gravitino.credential.Credential;
-import org.apache.gravitino.credential.CredentialContext;
-import org.apache.gravitino.credential.CredentialProvider;
-import org.apache.gravitino.credential.PathBasedCredentialContext;
-import org.apache.gravitino.credential.config.AzureCredentialConfig;
+import org.apache.gravitino.credential.CredentialProviderDelegator;
 
-/** Generates ADLS token to access ADLS data. */
-public class ADLSTokenProvider implements CredentialProvider {
-  private String storageAccountName;
-  private String tenantId;
-  private String clientId;
-  private String clientSecret;
-  private String endpoint;
-  private Integer tokenExpireSecs;
-
-  @Override
-  public void initialize(Map<String, String> properties) {
-    AzureCredentialConfig azureCredentialConfig = new 
AzureCredentialConfig(properties);
-    this.storageAccountName = azureCredentialConfig.storageAccountName();
-    this.tenantId = azureCredentialConfig.tenantId();
-    this.clientId = azureCredentialConfig.clientId();
-    this.clientSecret = azureCredentialConfig.clientSecret();
-    this.endpoint =
-        String.format("https://%s.%s";, storageAccountName, 
ADLSTokenCredential.ADLS_DOMAIN);
-    this.tokenExpireSecs = azureCredentialConfig.adlsTokenExpireInSecs();
-  }
-
-  @Override
-  public void close() {}
+/**
+ * A lightweight credential provider for ADLS. It delegates the actual 
credential generation to
+ * {@link ADLSTokenGenerator} which is loaded via reflection to avoid 
classpath issues.
+ */
+public class ADLSTokenProvider extends 
CredentialProviderDelegator<ADLSTokenCredential> {
 
   @Override
   public String credentialType() {
@@ -69,69 +34,7 @@ public class ADLSTokenProvider implements CredentialProvider 
{
   }
 
   @Override
-  public Credential getCredential(CredentialContext context) {
-    if (!(context instanceof PathBasedCredentialContext)) {
-      return null;
-    }
-    PathBasedCredentialContext pathBasedCredentialContext = 
(PathBasedCredentialContext) context;
-
-    Set<String> writePaths = pathBasedCredentialContext.getWritePaths();
-    Set<String> readPaths = pathBasedCredentialContext.getReadPaths();
-
-    Set<String> combinedPaths = new HashSet<>(writePaths);
-    combinedPaths.addAll(readPaths);
-
-    if (combinedPaths.size() != 1) {
-      throw new IllegalArgumentException(
-          "ADLS should contain exactly one unique path, but found: "
-              + combinedPaths.size()
-              + " paths: "
-              + combinedPaths);
-    }
-    String uniquePath = combinedPaths.iterator().next();
-
-    ClientSecretCredential clientSecretCredential =
-        new ClientSecretCredentialBuilder()
-            .tenantId(tenantId)
-            .clientId(clientId)
-            .clientSecret(clientSecret)
-            .build();
-
-    DataLakeServiceClient dataLakeServiceClient =
-        new DataLakeServiceClientBuilder()
-            .endpoint(endpoint)
-            .credential(clientSecretCredential)
-            .buildClient();
-
-    OffsetDateTime start = OffsetDateTime.now();
-    OffsetDateTime expiry = OffsetDateTime.now().plusSeconds(tokenExpireSecs);
-    UserDelegationKey userDelegationKey = 
dataLakeServiceClient.getUserDelegationKey(start, expiry);
-
-    PathSasPermission pathSasPermission =
-        new 
PathSasPermission().setReadPermission(true).setListPermission(true);
-
-    if (!writePaths.isEmpty()) {
-      pathSasPermission
-          .setWritePermission(true)
-          .setDeletePermission(true)
-          .setCreatePermission(true)
-          .setAddPermission(true);
-    }
-
-    DataLakeServiceSasSignatureValues signatureValues =
-        new DataLakeServiceSasSignatureValues(expiry, pathSasPermission);
-
-    ADLSLocationUtils.ADLSLocationParts locationParts = 
ADLSLocationUtils.parseLocation(uniquePath);
-    String sasToken =
-        new DataLakeSasImplUtil(
-                signatureValues,
-                locationParts.getContainer(),
-                ADLSLocationUtils.trimSlashes(locationParts.getPath()),
-                true)
-            .generateUserDelegationSas(
-                userDelegationKey, locationParts.getAccountName(), 
Context.NONE);
-
-    return new ADLSTokenCredential(
-        locationParts.getAccountName(), sasToken, 
expiry.toInstant().toEpochMilli());
+  public String getGeneratorClassName() {
+    return "org.apache.gravitino.abs.credential.ADLSTokenGenerator";
   }
 }
diff --git 
a/bundles/gcp/src/main/java/org/apache/gravitino/gcs/credential/GCSTokenProvider.java
 
b/bundles/gcp/src/main/java/org/apache/gravitino/gcs/credential/GCSTokenGenerator.java
similarity index 72%
copy from 
bundles/gcp/src/main/java/org/apache/gravitino/gcs/credential/GCSTokenProvider.java
copy to 
bundles/gcp/src/main/java/org/apache/gravitino/gcs/credential/GCSTokenGenerator.java
index f40f2ea49f..e4ce0b728f 100644
--- 
a/bundles/gcp/src/main/java/org/apache/gravitino/gcs/credential/GCSTokenProvider.java
+++ 
b/bundles/gcp/src/main/java/org/apache/gravitino/gcs/credential/GCSTokenGenerator.java
@@ -1,20 +1,20 @@
 /*
- *  Licensed to the Apache Software Foundation (ASF) under one
- *  or more contributor license agreements.  See the NOTICE file
- *  distributed with this work for additional information
- *  regarding copyright ownership.  The ASF licenses this file
- *  to you under the Apache License, Version 2.0 (the
- *  "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
  *
- *   http://www.apache.org/licenses/LICENSE-2.0
+ *  http://www.apache.org/licenses/LICENSE-2.0
  *
- *  Unless required by applicable law or agreed to in writing,
- *  software distributed under the License is distributed on an
- *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- *  KIND, either express or implied.  See the License for the
- *  specific language governing permissions and limitations
- *  under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
  */
 
 package org.apache.gravitino.gcs.credential;
@@ -24,7 +24,6 @@ import com.google.auth.oauth2.CredentialAccessBoundary;
 import com.google.auth.oauth2.CredentialAccessBoundary.AccessBoundaryRule;
 import com.google.auth.oauth2.DownscopedCredentials;
 import com.google.auth.oauth2.GoogleCredentials;
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import java.io.IOException;
 import java.io.InputStream;
@@ -42,18 +41,16 @@ import java.util.Map;
 import java.util.Set;
 import java.util.stream.Stream;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.gravitino.credential.Credential;
 import org.apache.gravitino.credential.CredentialContext;
-import org.apache.gravitino.credential.CredentialProvider;
+import org.apache.gravitino.credential.CredentialGenerator;
 import org.apache.gravitino.credential.GCSTokenCredential;
 import org.apache.gravitino.credential.PathBasedCredentialContext;
 import org.apache.gravitino.credential.config.GCSCredentialConfig;
 
 /** Generate GCS access token according to the read and write paths. */
-public class GCSTokenProvider implements CredentialProvider {
+public class GCSTokenGenerator implements 
CredentialGenerator<GCSTokenCredential> {
 
   private static final String INITIAL_SCOPE = 
"https://www.googleapis.com/auth/cloud-platform";;
-
   private GoogleCredentials sourceCredentials;
 
   @Override
@@ -68,33 +65,25 @@ public class GCSTokenProvider implements CredentialProvider 
{
   }
 
   @Override
-  public void close() {}
-
-  @Override
-  public String credentialType() {
-    return GCSTokenCredential.GCS_TOKEN_CREDENTIAL_TYPE;
-  }
-
-  @Override
-  public Credential getCredential(CredentialContext context) {
+  public GCSTokenCredential generate(CredentialContext context) throws 
IOException {
     if (!(context instanceof PathBasedCredentialContext)) {
       return null;
     }
+
     PathBasedCredentialContext pathBasedCredentialContext = 
(PathBasedCredentialContext) context;
-    try {
-      AccessToken accessToken =
-          getToken(
-              pathBasedCredentialContext.getReadPaths(),
-              pathBasedCredentialContext.getWritePaths());
-      String tokenValue = accessToken.getTokenValue();
-      long expireTime = 
accessToken.getExpirationTime().toInstant().toEpochMilli();
-      return new GCSTokenCredential(tokenValue, expireTime);
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
+    AccessToken accessToken =
+        getToken(
+            sourceCredentials,
+            pathBasedCredentialContext.getReadPaths(),
+            pathBasedCredentialContext.getWritePaths());
+
+    String tokenValue = accessToken.getTokenValue();
+    long expireTime = 
accessToken.getExpirationTime().toInstant().toEpochMilli();
+    return new GCSTokenCredential(tokenValue, expireTime);
   }
 
-  private AccessToken getToken(Set<String> readLocations, Set<String> 
writeLocations)
+  private AccessToken getToken(
+      GoogleCredentials sourceCredentials, Set<String> readLocations, 
Set<String> writeLocations)
       throws IOException {
     DownscopedCredentials downscopedCredentials =
         DownscopedCredentials.newBuilder()
@@ -120,7 +109,6 @@ public class GCSTokenProvider implements CredentialProvider 
{
     return readExpressions;
   }
 
-  @VisibleForTesting
   // "a/b/c" will get ["a", "a/", "a/b", "a/b/", "a/b/c"]
   static List<String> getAllResources(String resourcePath) {
     if (resourcePath.endsWith("/")) {
@@ -143,7 +131,6 @@ public class GCSTokenProvider implements CredentialProvider 
{
     return results;
   }
 
-  @VisibleForTesting
   // Remove the first '/', and append `/` if the path does not end with '/'.
   static String normalizeUriPath(String resourcePath) {
     if (resourcePath.isEmpty() || "/".equals(resourcePath)) {
@@ -160,12 +147,9 @@ public class GCSTokenProvider implements 
CredentialProvider {
 
   private CredentialAccessBoundary getAccessBoundary(
       Set<String> readLocations, Set<String> writeLocations) {
-    // bucketName -> read resource expressions
     Map<String, List<String>> readExpressions = new HashMap<>();
-    // bucketName -> write resource expressions
     Map<String, List<String>> writeExpressions = new HashMap<>();
 
-    // Construct read and write resource expressions
     HashSet<String> readBuckets = new HashSet<>();
     HashSet<String> writeBuckets = new HashSet<>();
     Stream.concat(readLocations.stream(), writeLocations.stream())
@@ -178,9 +162,7 @@ public class GCSTokenProvider implements CredentialProvider 
{
               String resourcePath = normalizeUriPath(uri.getPath());
               List<String> resourceExpressions =
                   readExpressions.computeIfAbsent(bucketName, key -> new 
ArrayList<>());
-              // add read privilege
               resourceExpressions.addAll(getReadExpressions(bucketName, 
resourcePath));
-              // add list privilege
               resourceExpressions.add(
                   String.format(
                       
"api.getAttribute('storage.googleapis.com/objectListPrefix', 
'').startsWith('%s')",
@@ -189,7 +171,6 @@ public class GCSTokenProvider implements CredentialProvider 
{
                 writeBuckets.add(bucketName);
                 resourceExpressions =
                     writeExpressions.computeIfAbsent(bucketName, key -> new 
ArrayList<>());
-                // add write privilege
                 resourceExpressions.add(
                     String.format(
                         
"resource.name.startsWith('projects/_/buckets/%s/objects/%s')",
@@ -197,13 +178,10 @@ public class GCSTokenProvider implements 
CredentialProvider {
               }
             });
 
-    // Construct policy according to the resource expression and privilege.
     CredentialAccessBoundary.Builder credentialAccessBoundaryBuilder =
         CredentialAccessBoundary.newBuilder();
     readBuckets.forEach(
         bucket -> {
-          // Hadoop GCS connector needs storage.buckets.get permission, the 
reason why not use
-          // inRole:roles/storage.legacyBucketReader is it provides extra list 
permission.
           AccessBoundaryRule bucketInfoRule =
               AccessBoundaryRule.newBuilder()
                   .setAvailableResource(toGCSBucketResource(bucket))
@@ -215,10 +193,9 @@ public class GCSTokenProvider implements 
CredentialProvider {
           AccessBoundaryRule rule =
               getAccessBoundaryRule(
                   bucket, readConditions, 
Arrays.asList("inRole:roles/storage.objectViewer"));
-          if (rule == null) {
-            return;
+          if (rule != null) {
+            credentialAccessBoundaryBuilder.addRule(rule);
           }
-          credentialAccessBoundaryBuilder.addRule(rule);
         });
 
     writeBuckets.forEach(
@@ -229,10 +206,9 @@ public class GCSTokenProvider implements 
CredentialProvider {
                   bucket,
                   writeConditions,
                   Arrays.asList("inRole:roles/storage.legacyBucketWriter"));
-          if (rule == null) {
-            return;
+          if (rule != null) {
+            credentialAccessBoundaryBuilder.addRule(rule);
           }
-          credentialAccessBoundaryBuilder.addRule(rule);
         });
 
     return credentialAccessBoundaryBuilder.build();
@@ -243,15 +219,14 @@ public class GCSTokenProvider implements 
CredentialProvider {
     if (resourceExpression == null || resourceExpression.isEmpty()) {
       return null;
     }
-    CredentialAccessBoundary.AccessBoundaryRule.Builder builder =
-        CredentialAccessBoundary.AccessBoundaryRule.newBuilder();
-    builder.setAvailableResource(toGCSBucketResource(bucketName));
-    builder.setAvailabilityCondition(
-        
CredentialAccessBoundary.AccessBoundaryRule.AvailabilityCondition.newBuilder()
-            .setExpression(String.join(" || ", resourceExpression))
-            .build());
-    builder.setAvailablePermissions(permissions);
-    return builder.build();
+    return AccessBoundaryRule.newBuilder()
+        .setAvailableResource(toGCSBucketResource(bucketName))
+        .setAvailabilityCondition(
+            AccessBoundaryRule.AvailabilityCondition.newBuilder()
+                .setExpression(String.join(" || ", resourceExpression))
+                .build())
+        .setAvailablePermissions(permissions)
+        .build();
   }
 
   private static String toGCSBucketResource(String bucketName) {
@@ -275,4 +250,7 @@ public class GCSTokenProvider implements CredentialProvider 
{
       throw new IOException("GCS credential file does not exist." + 
gcsCredentialFilePath, e);
     }
   }
+
+  @Override
+  public void close() throws IOException {}
 }
diff --git 
a/bundles/gcp/src/main/java/org/apache/gravitino/gcs/credential/GCSTokenProvider.java
 
b/bundles/gcp/src/main/java/org/apache/gravitino/gcs/credential/GCSTokenProvider.java
index f40f2ea49f..027f7b1928 100644
--- 
a/bundles/gcp/src/main/java/org/apache/gravitino/gcs/credential/GCSTokenProvider.java
+++ 
b/bundles/gcp/src/main/java/org/apache/gravitino/gcs/credential/GCSTokenProvider.java
@@ -19,56 +19,14 @@
 
 package org.apache.gravitino.gcs.credential;
 
-import com.google.auth.oauth2.AccessToken;
-import com.google.auth.oauth2.CredentialAccessBoundary;
-import com.google.auth.oauth2.CredentialAccessBoundary.AccessBoundaryRule;
-import com.google.auth.oauth2.DownscopedCredentials;
-import com.google.auth.oauth2.GoogleCredentials;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.URI;
-import java.nio.file.Files;
-import java.nio.file.NoSuchFileException;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.stream.Stream;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.gravitino.credential.Credential;
-import org.apache.gravitino.credential.CredentialContext;
-import org.apache.gravitino.credential.CredentialProvider;
+import org.apache.gravitino.credential.CredentialProviderDelegator;
 import org.apache.gravitino.credential.GCSTokenCredential;
-import org.apache.gravitino.credential.PathBasedCredentialContext;
-import org.apache.gravitino.credential.config.GCSCredentialConfig;
 
-/** Generate GCS access token according to the read and write paths. */
-public class GCSTokenProvider implements CredentialProvider {
-
-  private static final String INITIAL_SCOPE = 
"https://www.googleapis.com/auth/cloud-platform";;
-
-  private GoogleCredentials sourceCredentials;
-
-  @Override
-  public void initialize(Map<String, String> properties) {
-    GCSCredentialConfig gcsCredentialConfig = new 
GCSCredentialConfig(properties);
-    try {
-      this.sourceCredentials =
-          
getSourceCredentials(gcsCredentialConfig).createScoped(INITIAL_SCOPE);
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  @Override
-  public void close() {}
+/**
+ * A lightweight credential provider for GCS. It delegates the actual 
credential generation to
+ * {@link GCSTokenGenerator} which is loaded via reflection to avoid classpath 
issues.
+ */
+public class GCSTokenProvider extends 
CredentialProviderDelegator<GCSTokenCredential> {
 
   @Override
   public String credentialType() {
@@ -76,203 +34,7 @@ public class GCSTokenProvider implements CredentialProvider 
{
   }
 
   @Override
-  public Credential getCredential(CredentialContext context) {
-    if (!(context instanceof PathBasedCredentialContext)) {
-      return null;
-    }
-    PathBasedCredentialContext pathBasedCredentialContext = 
(PathBasedCredentialContext) context;
-    try {
-      AccessToken accessToken =
-          getToken(
-              pathBasedCredentialContext.getReadPaths(),
-              pathBasedCredentialContext.getWritePaths());
-      String tokenValue = accessToken.getTokenValue();
-      long expireTime = 
accessToken.getExpirationTime().toInstant().toEpochMilli();
-      return new GCSTokenCredential(tokenValue, expireTime);
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  private AccessToken getToken(Set<String> readLocations, Set<String> 
writeLocations)
-      throws IOException {
-    DownscopedCredentials downscopedCredentials =
-        DownscopedCredentials.newBuilder()
-            .setSourceCredential(sourceCredentials)
-            .setCredentialAccessBoundary(getAccessBoundary(readLocations, 
writeLocations))
-            .build();
-    return downscopedCredentials.refreshAccessToken();
-  }
-
-  private List<String> getReadExpressions(String bucketName, String 
resourcePath) {
-    List<String> readExpressions = new ArrayList<>();
-    readExpressions.add(
-        String.format(
-            "resource.name.startsWith('projects/_/buckets/%s/objects/%s')",
-            bucketName, resourcePath));
-    getAllResources(resourcePath)
-        .forEach(
-            parentResourcePath ->
-                readExpressions.add(
-                    String.format(
-                        "resource.name == 'projects/_/buckets/%s/objects/%s'",
-                        bucketName, parentResourcePath)));
-    return readExpressions;
-  }
-
-  @VisibleForTesting
-  // "a/b/c" will get ["a", "a/", "a/b", "a/b/", "a/b/c"]
-  static List<String> getAllResources(String resourcePath) {
-    if (resourcePath.endsWith("/")) {
-      resourcePath = resourcePath.substring(0, resourcePath.length() - 1);
-    }
-    if (resourcePath.isEmpty()) {
-      return Arrays.asList("");
-    }
-    Preconditions.checkArgument(
-        !resourcePath.startsWith("/"), resourcePath + " should not start with 
/");
-    List<String> parts = Arrays.asList(resourcePath.split("/"));
-    List<String> results = new ArrayList<>();
-    String parent = "";
-    for (int i = 0; i < parts.size() - 1; i++) {
-      results.add(parts.get(i));
-      parent += parts.get(i) + "/";
-      results.add(parent);
-    }
-    results.add(parent + parts.get(parts.size() - 1));
-    return results;
-  }
-
-  @VisibleForTesting
-  // Remove the first '/', and append `/` if the path does not end with '/'.
-  static String normalizeUriPath(String resourcePath) {
-    if (resourcePath.isEmpty() || "/".equals(resourcePath)) {
-      return "";
-    }
-    if (resourcePath.startsWith("/")) {
-      resourcePath = resourcePath.substring(1);
-    }
-    if (resourcePath.endsWith("/")) {
-      return resourcePath;
-    }
-    return resourcePath + "/";
-  }
-
-  private CredentialAccessBoundary getAccessBoundary(
-      Set<String> readLocations, Set<String> writeLocations) {
-    // bucketName -> read resource expressions
-    Map<String, List<String>> readExpressions = new HashMap<>();
-    // bucketName -> write resource expressions
-    Map<String, List<String>> writeExpressions = new HashMap<>();
-
-    // Construct read and write resource expressions
-    HashSet<String> readBuckets = new HashSet<>();
-    HashSet<String> writeBuckets = new HashSet<>();
-    Stream.concat(readLocations.stream(), writeLocations.stream())
-        .distinct()
-        .forEach(
-            location -> {
-              URI uri = URI.create(location);
-              String bucketName = getBucketName(uri);
-              readBuckets.add(bucketName);
-              String resourcePath = normalizeUriPath(uri.getPath());
-              List<String> resourceExpressions =
-                  readExpressions.computeIfAbsent(bucketName, key -> new 
ArrayList<>());
-              // add read privilege
-              resourceExpressions.addAll(getReadExpressions(bucketName, 
resourcePath));
-              // add list privilege
-              resourceExpressions.add(
-                  String.format(
-                      
"api.getAttribute('storage.googleapis.com/objectListPrefix', 
'').startsWith('%s')",
-                      resourcePath));
-              if (writeLocations.contains(location)) {
-                writeBuckets.add(bucketName);
-                resourceExpressions =
-                    writeExpressions.computeIfAbsent(bucketName, key -> new 
ArrayList<>());
-                // add write privilege
-                resourceExpressions.add(
-                    String.format(
-                        
"resource.name.startsWith('projects/_/buckets/%s/objects/%s')",
-                        bucketName, resourcePath));
-              }
-            });
-
-    // Construct policy according to the resource expression and privilege.
-    CredentialAccessBoundary.Builder credentialAccessBoundaryBuilder =
-        CredentialAccessBoundary.newBuilder();
-    readBuckets.forEach(
-        bucket -> {
-          // Hadoop GCS connector needs storage.buckets.get permission, the 
reason why not use
-          // inRole:roles/storage.legacyBucketReader is it provides extra list 
permission.
-          AccessBoundaryRule bucketInfoRule =
-              AccessBoundaryRule.newBuilder()
-                  .setAvailableResource(toGCSBucketResource(bucket))
-                  .setAvailablePermissions(
-                      
Arrays.asList("inRole:roles/storage.insightsCollectorService"))
-                  .build();
-          credentialAccessBoundaryBuilder.addRule(bucketInfoRule);
-          List<String> readConditions = readExpressions.get(bucket);
-          AccessBoundaryRule rule =
-              getAccessBoundaryRule(
-                  bucket, readConditions, 
Arrays.asList("inRole:roles/storage.objectViewer"));
-          if (rule == null) {
-            return;
-          }
-          credentialAccessBoundaryBuilder.addRule(rule);
-        });
-
-    writeBuckets.forEach(
-        bucket -> {
-          List<String> writeConditions = writeExpressions.get(bucket);
-          AccessBoundaryRule rule =
-              getAccessBoundaryRule(
-                  bucket,
-                  writeConditions,
-                  Arrays.asList("inRole:roles/storage.legacyBucketWriter"));
-          if (rule == null) {
-            return;
-          }
-          credentialAccessBoundaryBuilder.addRule(rule);
-        });
-
-    return credentialAccessBoundaryBuilder.build();
-  }
-
-  private AccessBoundaryRule getAccessBoundaryRule(
-      String bucketName, List<String> resourceExpression, List<String> 
permissions) {
-    if (resourceExpression == null || resourceExpression.isEmpty()) {
-      return null;
-    }
-    CredentialAccessBoundary.AccessBoundaryRule.Builder builder =
-        CredentialAccessBoundary.AccessBoundaryRule.newBuilder();
-    builder.setAvailableResource(toGCSBucketResource(bucketName));
-    builder.setAvailabilityCondition(
-        
CredentialAccessBoundary.AccessBoundaryRule.AvailabilityCondition.newBuilder()
-            .setExpression(String.join(" || ", resourceExpression))
-            .build());
-    builder.setAvailablePermissions(permissions);
-    return builder.build();
-  }
-
-  private static String toGCSBucketResource(String bucketName) {
-    return "//storage.googleapis.com/projects/_/buckets/" + bucketName;
-  }
-
-  private static String getBucketName(URI uri) {
-    return uri.getHost();
-  }
-
-  private GoogleCredentials getSourceCredentials(GCSCredentialConfig 
gcsCredentialConfig)
-      throws IOException {
-    String gcsCredentialFilePath = gcsCredentialConfig.gcsCredentialFilePath();
-    if (StringUtils.isBlank(gcsCredentialFilePath)) {
-      return GoogleCredentials.getApplicationDefault();
-    }
-    Path credentialsFilePath = Paths.get(gcsCredentialFilePath);
-    try (InputStream fileInputStream = 
Files.newInputStream(credentialsFilePath)) {
-      return GoogleCredentials.fromStream(fileInputStream);
-    } catch (NoSuchFileException e) {
-      throw new IOException("GCS credential file does not exist." + 
gcsCredentialFilePath, e);
-    }
+  public String getGeneratorClassName() {
+    return "org.apache.gravitino.gcs.credential.GCSTokenGenerator";
   }
 }
diff --git 
a/bundles/gcp/src/test/java/org/apache/gravitino/gcs/credential/TestGCSTokenProvider.java
 
b/bundles/gcp/src/test/java/org/apache/gravitino/gcs/credential/TestGCSTokenProvider.java
index 9dfb7b6440..1b3283bfae 100644
--- 
a/bundles/gcp/src/test/java/org/apache/gravitino/gcs/credential/TestGCSTokenProvider.java
+++ 
b/bundles/gcp/src/test/java/org/apache/gravitino/gcs/credential/TestGCSTokenProvider.java
@@ -41,7 +41,7 @@ public class TestGCSTokenProvider {
 
     checkResults.forEach(
         (key, value) -> {
-          List<String> parentResources = GCSTokenProvider.getAllResources(key);
+          List<String> parentResources = 
GCSTokenGenerator.getAllResources(key);
           Assertions.assertArrayEquals(value.toArray(), 
parentResources.toArray());
         });
   }
@@ -59,7 +59,7 @@ public class TestGCSTokenProvider {
 
     checkResults.forEach(
         (k, v) -> {
-          String normalizedPath = GCSTokenProvider.normalizeUriPath(k);
+          String normalizedPath = GCSTokenGenerator.normalizeUriPath(k);
           Assertions.assertEquals(v, normalizedPath);
         });
   }
diff --git 
a/common/src/main/java/org/apache/gravitino/credential/CredentialProvider.java 
b/common/src/main/java/org/apache/gravitino/credential/CredentialGenerator.java
similarity index 57%
copy from 
common/src/main/java/org/apache/gravitino/credential/CredentialProvider.java
copy to 
common/src/main/java/org/apache/gravitino/credential/CredentialGenerator.java
index 4056cd00b1..c0db87f258 100644
--- 
a/common/src/main/java/org/apache/gravitino/credential/CredentialProvider.java
+++ 
b/common/src/main/java/org/apache/gravitino/credential/CredentialGenerator.java
@@ -16,21 +16,23 @@
  *  specific language governing permissions and limitations
  *  under the License.
  */
-
 package org.apache.gravitino.credential;
 
 import java.io.Closeable;
 import java.util.Map;
-import javax.annotation.Nullable;
 
 /**
- * Interface for credential providers.
+ * An interface for generating credentials. Implementations of this interface 
will contain the
+ * actual logic for creating credentials and may have heavy dependencies. They 
are intended to be
+ * loaded via reflection by a {@link CredentialProvider} to avoid classpath 
issues during service
+ * loading.
  *
- * <p>A credential provider is responsible for managing and retrieving 
credentials.
+ * @param <T> The type of credential this generator produces.
  */
-public interface CredentialProvider extends Closeable {
+public interface CredentialGenerator<T extends Credential> extends Closeable {
+
   /**
-   * Initializes the credential provider with catalog properties.
+   * Initializes the credential generator with catalog properties.
    *
    * @param properties catalog properties that can be used to configure the 
provider. The specific
    *     properties required vary by implementation.
@@ -38,19 +40,11 @@ public interface CredentialProvider extends Closeable {
   void initialize(Map<String, String> properties);
 
   /**
-   * Returns the type of credential, it should be identical in Gravitino.
-   *
-   * @return A string identifying the type of credentials.
-   */
-  String credentialType();
-
-  /**
-   * Obtains a credential based on the provided context information.
+   * Generates a credential.
    *
-   * @param context A context object providing necessary information for 
retrieving credentials.
-   * @return A Credential object containing the authentication information 
needed to access a system
-   *     or resource. Null will be returned if no credential is available.
+   * @param context The context providing necessary information for credential 
retrieval.
+   * @return The generated credential.
+   * @throws Exception if an error occurs during credential generation.
    */
-  @Nullable
-  Credential getCredential(CredentialContext context);
+  T generate(CredentialContext context) throws Exception;
 }
diff --git 
a/common/src/main/java/org/apache/gravitino/credential/CredentialProvider.java 
b/common/src/main/java/org/apache/gravitino/credential/CredentialProvider.java
index 4056cd00b1..ebb5d2ee6b 100644
--- 
a/common/src/main/java/org/apache/gravitino/credential/CredentialProvider.java
+++ 
b/common/src/main/java/org/apache/gravitino/credential/CredentialProvider.java
@@ -24,11 +24,16 @@ import java.util.Map;
 import javax.annotation.Nullable;
 
 /**
- * Interface for credential providers.
+ * An interface for providing credentials to access external systems.
  *
- * <p>A credential provider is responsible for managing and retrieving 
credentials.
+ * <p>Implementations of this interface are discovered using Java's {@link 
java.util.ServiceLoader}.
+ * To prevent class loading issues and unnecessary dependency bloat, all 
implementations must be
+ * lightweight. Any logic that requires heavy external dependencies (e.g., 
cloud SDKs) should be
+ * isolated in a separate {@link CredentialGenerator} class and loaded 
reflectively at runtime. The
+ * {@link CredentialProviderDelegator} provides a convenient base class for 
this pattern.
  */
 public interface CredentialProvider extends Closeable {
+
   /**
    * Initializes the credential provider with catalog properties.
    *
@@ -45,7 +50,7 @@ public interface CredentialProvider extends Closeable {
   String credentialType();
 
   /**
-   * Obtains a credential based on the provided context information.
+   * Gets a credential based on the provided context information.
    *
    * @param context A context object providing necessary information for 
retrieving credentials.
    * @return A Credential object containing the authentication information 
needed to access a system
diff --git 
a/common/src/main/java/org/apache/gravitino/credential/CredentialProviderDelegator.java
 
b/common/src/main/java/org/apache/gravitino/credential/CredentialProviderDelegator.java
new file mode 100644
index 0000000000..d87b7880e6
--- /dev/null
+++ 
b/common/src/main/java/org/apache/gravitino/credential/CredentialProviderDelegator.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.credential;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.util.Map;
+
+/**
+ * An abstract base class for {@link CredentialProvider} implementations that 
delegate the actual
+ * credential generation to a {@link CredentialGenerator}. It handles the lazy 
and reflective
+ * loading of the generator to isolate heavy dependencies.
+ *
+ * @param <T> The type of credential generated by this provider.
+ */
+public abstract class CredentialProviderDelegator<T extends Credential>
+    implements CredentialProvider {
+
+  /** The properties used by the generator to generate the credential. */
+  protected Map<String, String> properties;
+
+  private volatile CredentialGenerator<T> generator;
+
+  /**
+   * Initializes the provider by storing properties and loading the associated 
{@link
+   * CredentialGenerator}.
+   *
+   * @param properties A map of configuration properties for the provider.
+   */
+  @Override
+  public void initialize(Map<String, String> properties) {
+    this.properties = properties;
+    this.generator = loadGenerator();
+    generator.initialize(properties);
+  }
+
+  /**
+   * Delegates the credential generation to the loaded {@link 
CredentialGenerator}.
+   *
+   * @param context The context containing information required for credential 
retrieval.
+   * @return A {@link Credential} object.
+   * @throws RuntimeException if credential generation fails.
+   */
+  @Override
+  public Credential getCredential(CredentialContext context) {
+    try {
+      return generator.generate(context);
+    } catch (Exception e) {
+      throw new RuntimeException(
+          "Failed to generate credential using " + getGeneratorClassName(), e);
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (generator != null) {
+      generator.close();
+    }
+  }
+
+  /**
+   * Returns the fully qualified class name of the {@link CredentialGenerator} 
implementation. This
+   * generator will be loaded via reflection to perform the actual credential 
creation.
+   *
+   * @return The class name of the credential generator.
+   */
+  protected abstract String getGeneratorClassName();
+
+  /**
+   * Loads and instantiates the {@link CredentialGenerator} using reflection.
+   *
+   * <p>This implementation uses a no-argument constructor. The constructor 
can be non-public.
+   *
+   * @return An instance of the credential generator.
+   * @throws RuntimeException if the generator cannot be loaded or 
instantiated.
+   */
+  @SuppressWarnings("unchecked")
+  private CredentialGenerator<T> loadGenerator() {
+    try {
+      Class<?> generatorClass = Class.forName(getGeneratorClassName());
+      Constructor<?> constructor = generatorClass.getDeclaredConstructor();
+      constructor.setAccessible(true);
+      return (CredentialGenerator<T>) constructor.newInstance();
+    } catch (Exception e) {
+      throw new RuntimeException(
+          "Failed to load or instantiate CredentialGenerator: " + 
getGeneratorClassName(), e);
+    }
+  }
+}

Reply via email to