This is an automated email from the ASF dual-hosted git repository.

jshao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/main by this push:
     new f265e50c2 [#4994][#4369] feat(core): support S3 credential vending 
(#4966)
f265e50c2 is described below

commit f265e50c2cc4fd3dc0a1fe113e9822adffc062fa
Author: FANNG <[email protected]>
AuthorDate: Tue Oct 29 18:13:24 2024 +0800

    [#4994][#4369] feat(core): support S3 credential vending (#4966)
    
    ### What changes were proposed in this pull request?
    
    support S3 credential vending, include s3 token and s3 static key
    
    
    ### Why are the changes needed?
    
    Fix: #4994
    Fix: #4369
    
    ### Does this PR introduce _any_ user-facing change?
    no
    
    ### How was this patch tested?
    
    add IT to do Iceberg operation by using S3 token
---
 .../gravitino/credential/GCSTokenCredential.java   |   4 +-
 .../credential/S3SecretKeyCredential.java          |  89 ++++++++
 .../gravitino/credential/S3TokenCredential.java    | 112 ++++++++++
 bundles/aws-bundle/build.gradle.kts                |   7 +
 .../s3/credential/S3SecretKeyProvider.java         |  54 +++++
 .../gravitino/s3/credential/S3TokenProvider.java   | 239 +++++++++++++++++++++
 ....apache.gravitino.credential.CredentialProvider |  20 ++
 bundles/gcp-bundle/build.gradle.kts                |   2 +
 .../lakehouse/iceberg/IcebergConstants.java        |   1 +
 .../gravitino/credential/CredentialConstants.java  |   2 +
 .../org/apache/gravitino/storage/S3Properties.java |   4 +
 .../credential/CredentialPropertyUtils.java        |  20 +-
 .../credential/TestCredentialPropertiesUtils.java  |  54 +++++
 core/build.gradle.kts                              |   1 +
 .../credential/config/S3CredentialConfig.java      | 110 ++++++++++
 gradle/libs.versions.toml                          |   9 +
 .../iceberg/common/ops/IcebergCatalogWrapper.java  |   2 -
 iceberg/iceberg-rest-server/build.gradle.kts       |   2 +
 .../service/rest/IcebergTableOperations.java       |  17 +-
 .../iceberg/integration/test/IcebergRESTS3IT.java  | 127 +++++++++++
 20 files changed, 868 insertions(+), 8 deletions(-)

diff --git 
a/api/src/main/java/org/apache/gravitino/credential/GCSTokenCredential.java 
b/api/src/main/java/org/apache/gravitino/credential/GCSTokenCredential.java
index 98186e2de..59dc3b246 100644
--- a/api/src/main/java/org/apache/gravitino/credential/GCSTokenCredential.java
+++ b/api/src/main/java/org/apache/gravitino/credential/GCSTokenCredential.java
@@ -33,8 +33,8 @@ public class GCSTokenCredential implements Credential {
   /** GCS credential property, token name. */
   public static final String GCS_TOKEN_NAME = "token";
 
-  private String token;
-  private long expireMs;
+  private final String token;
+  private final long expireMs;
 
   /**
    * @param token The GCS token.
diff --git 
a/api/src/main/java/org/apache/gravitino/credential/S3SecretKeyCredential.java 
b/api/src/main/java/org/apache/gravitino/credential/S3SecretKeyCredential.java
new file mode 100644
index 000000000..3063d5615
--- /dev/null
+++ 
b/api/src/main/java/org/apache/gravitino/credential/S3SecretKeyCredential.java
@@ -0,0 +1,89 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.gravitino.credential;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import java.util.Map;
+
+/** S3 secret key credential. */
+public class S3SecretKeyCredential implements Credential {
+
+  /** S3 secret key credential type. */
+  public static final String S3_SECRET_KEY_CREDENTIAL_TYPE = "s3-secret-key";
+  /** The static access key ID used to access S3 data. */
+  public static final String GRAVITINO_S3_STATIC_ACCESS_KEY_ID = 
"s3-access-key-id";
+  /** The static secret access key used to access S3 data. */
+  public static final String GRAVITINO_S3_STATIC_SECRET_ACCESS_KEY = 
"s3-secret-access-key";
+
+  private final String accessKeyId;
+  private final String secretAccessKey;
+
+  /**
+   * Constructs an instance of {@link S3SecretKeyCredential} with the static 
S3 access key ID and
+   * secret access key.
+   *
+   * @param accessKeyId The S3 static access key ID.
+   * @param secretAccessKey The S3 static secret access key.
+   */
+  public S3SecretKeyCredential(String accessKeyId, String secretAccessKey) {
+    Preconditions.checkNotNull(accessKeyId, "S3 access key Id should not 
null");
+    Preconditions.checkNotNull(secretAccessKey, "S3 secret access key should 
not null");
+
+    this.accessKeyId = accessKeyId;
+    this.secretAccessKey = secretAccessKey;
+  }
+
+  @Override
+  public String credentialType() {
+    return S3_SECRET_KEY_CREDENTIAL_TYPE;
+  }
+
+  @Override
+  public long expireTimeInMs() {
+    return 0;
+  }
+
+  @Override
+  public Map<String, String> credentialInfo() {
+    return (new ImmutableMap.Builder<String, String>())
+        .put(GRAVITINO_S3_STATIC_ACCESS_KEY_ID, accessKeyId)
+        .put(GRAVITINO_S3_STATIC_SECRET_ACCESS_KEY, secretAccessKey)
+        .build();
+  }
+
+  /**
+   * Get S3 static access key ID.
+   *
+   * @return The S3 access key ID.
+   */
+  public String accessKeyId() {
+    return accessKeyId;
+  }
+
+  /**
+   * Get S3 static secret access key.
+   *
+   * @return The S3 secret access key.
+   */
+  public String secretAccessKey() {
+    return secretAccessKey;
+  }
+}
diff --git 
a/api/src/main/java/org/apache/gravitino/credential/S3TokenCredential.java 
b/api/src/main/java/org/apache/gravitino/credential/S3TokenCredential.java
new file mode 100644
index 000000000..d30a7e140
--- /dev/null
+++ b/api/src/main/java/org/apache/gravitino/credential/S3TokenCredential.java
@@ -0,0 +1,112 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.gravitino.credential;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import java.util.Map;
+import org.apache.commons.lang3.StringUtils;
+
+/** S3 token credential. */
+public class S3TokenCredential implements Credential {
+
+  /** S3 token credential type. */
+  public static final String S3_TOKEN_CREDENTIAL_TYPE = "s3-token";
+  /** S3 session access key ID used to access S3 data. */
+  public static final String GRAVITINO_S3_SESSION_ACCESS_KEY_ID = 
"s3-access-key-id";
+  /** S3 session secret access key used to access S3 data. */
+  public static final String GRAVITINO_S3_SESSION_SECRET_ACCESS_KEY = 
"s3-secret-access-key";
+  /** S3 session token. */
+  public static final String GRAVITINO_S3_TOKEN = "s3-session-token";
+
+  private final String accessKeyId;
+  private final String secretAccessKey;
+  private final String sessionToken;
+  private final long expireTimeInMS;
+
+  /**
+   * Constructs an instance of {@link S3SecretKeyCredential} with session 
secret key and token.
+   *
+   * @param accessKeyId The S3 session access key ID.
+   * @param secretAccessKey The S3 session secret access key.
+   * @param sessionToken The S3 session token.
+   * @param expireTimeInMS The S3 session token expire time in ms.
+   */
+  public S3TokenCredential(
+      String accessKeyId, String secretAccessKey, String sessionToken, long 
expireTimeInMS) {
+    Preconditions.checkArgument(
+        StringUtils.isNotBlank(accessKeyId), "S3 access key Id should not be 
empty");
+    Preconditions.checkArgument(
+        StringUtils.isNotBlank(secretAccessKey), "S3 secret access key should 
not be empty");
+    Preconditions.checkArgument(
+        StringUtils.isNotBlank(sessionToken), "S3 session token should not be 
empty");
+
+    this.accessKeyId = accessKeyId;
+    this.secretAccessKey = secretAccessKey;
+    this.sessionToken = sessionToken;
+    this.expireTimeInMS = expireTimeInMS;
+  }
+
+  @Override
+  public String credentialType() {
+    return S3_TOKEN_CREDENTIAL_TYPE;
+  }
+
+  @Override
+  public long expireTimeInMs() {
+    return expireTimeInMS;
+  }
+
+  @Override
+  public Map<String, String> credentialInfo() {
+    return (new ImmutableMap.Builder<String, String>())
+        .put(GRAVITINO_S3_SESSION_ACCESS_KEY_ID, accessKeyId)
+        .put(GRAVITINO_S3_SESSION_SECRET_ACCESS_KEY, secretAccessKey)
+        .put(GRAVITINO_S3_TOKEN, sessionToken)
+        .build();
+  }
+
+  /**
+   * Get S3 session access key ID.
+   *
+   * @return The S3 access key ID.
+   */
+  public String accessKeyId() {
+    return accessKeyId;
+  }
+
+  /**
+   * Get S3 session secret access key.
+   *
+   * @return The S3 secret access key.
+   */
+  public String secretAccessKey() {
+    return secretAccessKey;
+  }
+
+  /**
+   * Get S3 session token.
+   *
+   * @return The S3 session token.
+   */
+  public String sessionToken() {
+    return sessionToken;
+  }
+}
diff --git a/bundles/aws-bundle/build.gradle.kts 
b/bundles/aws-bundle/build.gradle.kts
index 741bdc414..e1723d7af 100644
--- a/bundles/aws-bundle/build.gradle.kts
+++ b/bundles/aws-bundle/build.gradle.kts
@@ -25,8 +25,15 @@ plugins {
 }
 
 dependencies {
+  compileOnly(project(":api"))
+  compileOnly(project(":core"))
+  compileOnly(project(":catalogs:catalog-common"))
   compileOnly(project(":catalogs:catalog-hadoop"))
   compileOnly(libs.hadoop3.common)
+
+  implementation(libs.aws.iam)
+  implementation(libs.aws.policy)
+  implementation(libs.aws.sts)
   implementation(libs.hadoop3.aws)
 }
 
diff --git 
a/bundles/aws-bundle/src/main/java/org/apache/gravitino/s3/credential/S3SecretKeyProvider.java
 
b/bundles/aws-bundle/src/main/java/org/apache/gravitino/s3/credential/S3SecretKeyProvider.java
new file mode 100644
index 000000000..9b4da8900
--- /dev/null
+++ 
b/bundles/aws-bundle/src/main/java/org/apache/gravitino/s3/credential/S3SecretKeyProvider.java
@@ -0,0 +1,54 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.gravitino.s3.credential;
+
+import java.util.Map;
+import org.apache.gravitino.credential.Credential;
+import org.apache.gravitino.credential.CredentialContext;
+import org.apache.gravitino.credential.CredentialProvider;
+import org.apache.gravitino.credential.S3SecretKeyCredential;
+import org.apache.gravitino.credential.config.S3CredentialConfig;
+
+/** Generate S3 access key and secret key to access S3 data. */
+public class S3SecretKeyProvider implements CredentialProvider {
+
+  private String accessKey;
+  private String secretKey;
+
+  @Override
+  public void initialize(Map<String, String> properties) {
+    S3CredentialConfig s3CredentialConfig = new S3CredentialConfig(properties);
+    this.accessKey = s3CredentialConfig.accessKeyID();
+    this.secretKey = s3CredentialConfig.secretAccessKey();
+  }
+
+  @Override
+  public void close() {}
+
+  @Override
+  public String credentialType() {
+    return S3SecretKeyCredential.S3_SECRET_KEY_CREDENTIAL_TYPE;
+  }
+
+  @Override
+  public Credential getCredential(CredentialContext context) {
+    return new S3SecretKeyCredential(accessKey, secretKey);
+  }
+}
diff --git 
a/bundles/aws-bundle/src/main/java/org/apache/gravitino/s3/credential/S3TokenProvider.java
 
b/bundles/aws-bundle/src/main/java/org/apache/gravitino/s3/credential/S3TokenProvider.java
new file mode 100644
index 000000000..eda134a15
--- /dev/null
+++ 
b/bundles/aws-bundle/src/main/java/org/apache/gravitino/s3/credential/S3TokenProvider.java
@@ -0,0 +1,239 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.gravitino.s3.credential;
+
+import java.net.URI;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Stream;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.gravitino.credential.Credential;
+import org.apache.gravitino.credential.CredentialContext;
+import org.apache.gravitino.credential.CredentialProvider;
+import org.apache.gravitino.credential.PathBasedCredentialContext;
+import org.apache.gravitino.credential.S3TokenCredential;
+import org.apache.gravitino.credential.config.S3CredentialConfig;
+import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
+import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
+import software.amazon.awssdk.policybuilder.iam.IamConditionOperator;
+import software.amazon.awssdk.policybuilder.iam.IamEffect;
+import software.amazon.awssdk.policybuilder.iam.IamPolicy;
+import software.amazon.awssdk.policybuilder.iam.IamResource;
+import software.amazon.awssdk.policybuilder.iam.IamStatement;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.sts.StsClient;
+import software.amazon.awssdk.services.sts.StsClientBuilder;
+import software.amazon.awssdk.services.sts.model.AssumeRoleRequest;
+import software.amazon.awssdk.services.sts.model.AssumeRoleResponse;
+import software.amazon.awssdk.services.sts.model.Credentials;
+
+/** Generates S3 token to access S3 data. */
+public class S3TokenProvider implements CredentialProvider {
+  private StsClient stsClient;
+  private String roleArn;
+  private String externalID;
+  private int tokenExpireSecs;
+
+  @Override
+  public void initialize(Map<String, String> properties) {
+    S3CredentialConfig s3CredentialConfig = new S3CredentialConfig(properties);
+    this.roleArn = s3CredentialConfig.s3RoleArn();
+    this.externalID = s3CredentialConfig.externalID();
+    this.tokenExpireSecs = s3CredentialConfig.tokenExpireSecs();
+    this.stsClient = createStsClient(s3CredentialConfig);
+  }
+
+  @Override
+  public void close() {
+    if (stsClient != null) {
+      stsClient.close();
+    }
+  }
+
+  @Override
+  public String credentialType() {
+    return S3TokenCredential.S3_TOKEN_CREDENTIAL_TYPE;
+  }
+
+  @Override
+  public Credential getCredential(CredentialContext context) {
+    if (!(context instanceof PathBasedCredentialContext)) {
+      return null;
+    }
+    PathBasedCredentialContext pathBasedCredentialContext = 
(PathBasedCredentialContext) context;
+    Credentials s3Token =
+        createS3Token(
+            roleArn,
+            pathBasedCredentialContext.getReadPaths(),
+            pathBasedCredentialContext.getWritePaths(),
+            pathBasedCredentialContext.getUserName());
+    return new S3TokenCredential(
+        s3Token.accessKeyId(),
+        s3Token.secretAccessKey(),
+        s3Token.sessionToken(),
+        s3Token.expiration().toEpochMilli());
+  }
+
+  private StsClient createStsClient(S3CredentialConfig s3CredentialConfig) {
+    AwsCredentialsProvider credentialsProvider =
+        StaticCredentialsProvider.create(
+            AwsBasicCredentials.create(
+                s3CredentialConfig.accessKeyID(), 
s3CredentialConfig.secretAccessKey()));
+    StsClientBuilder builder = 
StsClient.builder().credentialsProvider(credentialsProvider);
+    String region = s3CredentialConfig.region();
+    if (StringUtils.isNotBlank(region)) {
+      builder.region(Region.of(region));
+    }
+    return builder.build();
+  }
+
+  private IamPolicy createPolicy(
+      String roleArn, Set<String> readLocations, Set<String> writeLocations) {
+    IamPolicy.Builder policyBuilder = IamPolicy.builder();
+    IamStatement.Builder allowGetObjectStatementBuilder =
+        IamStatement.builder()
+            .effect(IamEffect.ALLOW)
+            .addAction("s3:GetObject")
+            .addAction("s3:GetObjectVersion");
+    Map<String, IamStatement.Builder> bucketListStatmentBuilder = new 
HashMap<>();
+    Map<String, IamStatement.Builder> bucketGetLocationStatmentBuilder = new 
HashMap<>();
+
+    String arnPrefix = getArnPrefix(roleArn);
+    Stream.concat(readLocations.stream(), writeLocations.stream())
+        .distinct()
+        .forEach(
+            location -> {
+              URI uri = URI.create(location);
+              allowGetObjectStatementBuilder.addResource(
+                  IamResource.create(getS3UriWithArn(arnPrefix, uri)));
+              String bucketArn = arnPrefix + getBucketName(uri);
+              bucketListStatmentBuilder
+                  .computeIfAbsent(
+                      bucketArn,
+                      (String key) ->
+                          IamStatement.builder()
+                              .effect(IamEffect.ALLOW)
+                              .addAction("s3:ListBucket")
+                              .addResource(key))
+                  .addCondition(
+                      IamConditionOperator.STRING_LIKE,
+                      "s3:prefix",
+                      concatPathWithSep(trimLeadingSlash(uri.getPath()), "*", 
"/"));
+              bucketGetLocationStatmentBuilder.computeIfAbsent(
+                  bucketArn,
+                  key ->
+                      IamStatement.builder()
+                          .effect(IamEffect.ALLOW)
+                          .addAction("s3:GetBucketLocation")
+                          .addResource(key));
+            });
+
+    if (!writeLocations.isEmpty()) {
+      IamStatement.Builder allowPutObjectStatementBuilder =
+          IamStatement.builder()
+              .effect(IamEffect.ALLOW)
+              .addAction("s3:PutObject")
+              .addAction("s3:DeleteObject");
+      writeLocations.forEach(
+          location -> {
+            URI uri = URI.create(location);
+            allowPutObjectStatementBuilder.addResource(
+                IamResource.create(getS3UriWithArn(arnPrefix, uri)));
+          });
+      policyBuilder.addStatement(allowPutObjectStatementBuilder.build());
+    }
+    if (!bucketListStatmentBuilder.isEmpty()) {
+      bucketListStatmentBuilder
+          .values()
+          .forEach(statementBuilder -> 
policyBuilder.addStatement(statementBuilder.build()));
+    } else {
+      // add list privilege with 0 resources
+      policyBuilder.addStatement(
+          
IamStatement.builder().effect(IamEffect.ALLOW).addAction("s3:ListBucket").build());
+    }
+
+    bucketGetLocationStatmentBuilder
+        .values()
+        .forEach(statementBuilder -> 
policyBuilder.addStatement(statementBuilder.build()));
+    return 
policyBuilder.addStatement(allowGetObjectStatementBuilder.build()).build();
+  }
+
+  private String getS3UriWithArn(String arnPrefix, URI uri) {
+    return arnPrefix + concatPathWithSep(removeSchemaFromS3Uri(uri), "*", "/");
+  }
+
+  private String getArnPrefix(String roleArn) {
+    if (roleArn.contains("aws-cn")) {
+      return "arn:aws-cn:s3:::";
+    } else if (roleArn.contains("aws-us-gov")) {
+      return "arn:aws-us-gov:s3:::";
+    } else {
+      return "arn:aws:s3:::";
+    }
+  }
+
+  private static String concatPathWithSep(String leftPath, String rightPath, 
String fileSep) {
+    if (leftPath.endsWith(fileSep) && rightPath.startsWith(fileSep)) {
+      return leftPath + rightPath.substring(1);
+    } else if (!leftPath.endsWith(fileSep) && !rightPath.startsWith(fileSep)) {
+      return leftPath + fileSep + rightPath;
+    } else {
+      return leftPath + rightPath;
+    }
+  }
+
+  // Transform 's3://bucket/path' to /bucket/path
+  private static String removeSchemaFromS3Uri(URI uri) {
+    String bucket = uri.getHost();
+    String path = trimLeadingSlash(uri.getPath());
+    return String.join(
+        "/", Stream.of(bucket, 
path).filter(Objects::nonNull).toArray(String[]::new));
+  }
+
+  private static String trimLeadingSlash(String path) {
+    if (path.startsWith("/")) {
+      path = path.substring(1);
+    }
+    return path;
+  }
+
+  private static String getBucketName(URI uri) {
+    return uri.getHost();
+  }
+
+  private Credentials createS3Token(
+      String roleArn, Set<String> readLocations, Set<String> writeLocations, 
String userName) {
+    IamPolicy policy = createPolicy(roleArn, readLocations, writeLocations);
+    AssumeRoleRequest.Builder builder =
+        AssumeRoleRequest.builder()
+            .roleArn(roleArn)
+            .roleSessionName("gravitino_" + userName)
+            .durationSeconds(tokenExpireSecs)
+            .policy(policy.toJson());
+    if (StringUtils.isNotBlank(externalID)) {
+      builder.externalId(externalID);
+    }
+    AssumeRoleResponse response = stsClient.assumeRole(builder.build());
+    return response.credentials();
+  }
+}
diff --git 
a/bundles/aws-bundle/src/main/resources/META-INF/services/org.apache.gravitino.credential.CredentialProvider
 
b/bundles/aws-bundle/src/main/resources/META-INF/services/org.apache.gravitino.credential.CredentialProvider
new file mode 100644
index 000000000..7349688d6
--- /dev/null
+++ 
b/bundles/aws-bundle/src/main/resources/META-INF/services/org.apache.gravitino.credential.CredentialProvider
@@ -0,0 +1,20 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+org.apache.gravitino.s3.credential.S3TokenProvider
+org.apache.gravitino.s3.credential.S3SecretKeyProvider
\ No newline at end of file
diff --git a/bundles/gcp-bundle/build.gradle.kts 
b/bundles/gcp-bundle/build.gradle.kts
index e69ff345e..b887ef2c5 100644
--- a/bundles/gcp-bundle/build.gradle.kts
+++ b/bundles/gcp-bundle/build.gradle.kts
@@ -33,6 +33,8 @@ dependencies {
   compileOnly(libs.hadoop3.common)
 
   implementation(libs.commons.lang3)
+  // runtime used
+  implementation(libs.commons.logging)
   implementation(libs.hadoop3.gcs)
   implementation(libs.google.auth.http)
   implementation(libs.google.auth.credentials)
diff --git 
a/catalogs/catalog-common/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergConstants.java
 
b/catalogs/catalog-common/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergConstants.java
index 52e665579..4d9e99eba 100644
--- 
a/catalogs/catalog-common/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergConstants.java
+++ 
b/catalogs/catalog-common/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergConstants.java
@@ -40,6 +40,7 @@ public class IcebergConstants {
   public static final String ICEBERG_S3_ENDPOINT = "s3.endpoint";
   public static final String ICEBERG_S3_ACCESS_KEY_ID = "s3.access-key-id";
   public static final String ICEBERG_S3_SECRET_ACCESS_KEY = 
"s3.secret-access-key";
+  public static final String ICEBERG_S3_TOKEN = "s3.session-token";
   public static final String AWS_S3_REGION = "client.region";
 
   public static final String ICEBERG_OSS_ENDPOINT = "oss.endpoint";
diff --git 
a/catalogs/catalog-common/src/main/java/org/apache/gravitino/credential/CredentialConstants.java
 
b/catalogs/catalog-common/src/main/java/org/apache/gravitino/credential/CredentialConstants.java
index a141b637e..249fec70b 100644
--- 
a/catalogs/catalog-common/src/main/java/org/apache/gravitino/credential/CredentialConstants.java
+++ 
b/catalogs/catalog-common/src/main/java/org/apache/gravitino/credential/CredentialConstants.java
@@ -21,6 +21,8 @@ package org.apache.gravitino.credential;
 
 public class CredentialConstants {
   public static final String CREDENTIAL_PROVIDER_TYPE = 
"credential-provider-type";
+  public static final String S3_TOKEN_CREDENTIAL_PROVIDER = "s3-token";
+  public static final String TOKEN_EXPIRE_TIME = "token-expire-time";
 
   public static final String GCS_TOKEN_CREDENTIAL_PROVIDER_TYPE = "gcs-token";
 
diff --git 
a/catalogs/catalog-common/src/main/java/org/apache/gravitino/storage/S3Properties.java
 
b/catalogs/catalog-common/src/main/java/org/apache/gravitino/storage/S3Properties.java
index 9775bebc8..af37ae690 100644
--- 
a/catalogs/catalog-common/src/main/java/org/apache/gravitino/storage/S3Properties.java
+++ 
b/catalogs/catalog-common/src/main/java/org/apache/gravitino/storage/S3Properties.java
@@ -30,6 +30,10 @@ public class S3Properties {
   public static final String GRAVITINO_S3_SECRET_ACCESS_KEY = 
"s3-secret-access-key";
   // The region of the S3 service.
   public static final String GRAVITINO_S3_REGION = "s3-region";
+  // S3 role arn
+  public static final String GRAVITINO_S3_ROLE_ARN = "s3-role-arn";
+  // S3 external id
+  public static final String GRAVITINO_S3_EXTERNAL_ID = "s3-external-id";
 
   private S3Properties() {}
 }
diff --git 
a/common/src/main/java/org/apache/gravitino/credential/CredentialPropertyUtils.java
 
b/common/src/main/java/org/apache/gravitino/credential/CredentialPropertyUtils.java
index e380cc5d4..811f04c53 100644
--- 
a/common/src/main/java/org/apache/gravitino/credential/CredentialPropertyUtils.java
+++ 
b/common/src/main/java/org/apache/gravitino/credential/CredentialPropertyUtils.java
@@ -19,6 +19,7 @@
 
 package org.apache.gravitino.credential;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableMap;
 import java.util.HashMap;
 import java.util.Map;
@@ -27,8 +28,22 @@ import java.util.Map;
  * Helper class to generate specific credential properties for different table 
format and engine.
  */
 public class CredentialPropertyUtils {
+
+  @VisibleForTesting static final String ICEBERG_S3_ACCESS_KEY_ID = 
"s3.access-key-id";
+  @VisibleForTesting static final String ICEBERG_S3_SECRET_ACCESS_KEY = 
"s3.secret-access-key";
+  @VisibleForTesting static final String ICEBERG_S3_TOKEN = "s3.session-token";
+  @VisibleForTesting static final String ICEBERG_GCS_TOKEN = 
"gcs.oauth2.token";
+
   private static Map<String, String> icebergCredentialPropertyMap =
-      ImmutableMap.of(GCSTokenCredential.GCS_TOKEN_NAME, "gcs.oauth2.token");
+      ImmutableMap.of(
+          GCSTokenCredential.GCS_TOKEN_NAME,
+          ICEBERG_GCS_TOKEN,
+          S3SecretKeyCredential.GRAVITINO_S3_STATIC_ACCESS_KEY_ID,
+          ICEBERG_S3_ACCESS_KEY_ID,
+          S3SecretKeyCredential.GRAVITINO_S3_STATIC_SECRET_ACCESS_KEY,
+          ICEBERG_S3_SECRET_ACCESS_KEY,
+          S3TokenCredential.GRAVITINO_S3_TOKEN,
+          ICEBERG_S3_TOKEN);
 
   /**
    * Transforms a specific credential into a map of Iceberg properties.
@@ -44,6 +59,9 @@ public class CredentialPropertyUtils {
           "gcs.oauth2.token-expires-at", 
String.valueOf(credential.expireTimeInMs()));
       return icebergGCSCredentialProperties;
     }
+    if (credential instanceof S3TokenCredential || credential instanceof 
S3SecretKeyCredential) {
+      return transformProperties(credential.credentialInfo(), 
icebergCredentialPropertyMap);
+    }
     return credential.toProperties();
   }
 
diff --git 
a/common/src/test/java/org/apache/gravitino/credential/TestCredentialPropertiesUtils.java
 
b/common/src/test/java/org/apache/gravitino/credential/TestCredentialPropertiesUtils.java
new file mode 100644
index 000000000..4d3ad698b
--- /dev/null
+++ 
b/common/src/test/java/org/apache/gravitino/credential/TestCredentialPropertiesUtils.java
@@ -0,0 +1,54 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.gravitino.credential;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.Map;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class TestCredentialPropertiesUtils {
+
+  @Test
+  void testToIcebergProperties() {
+    S3TokenCredential s3TokenCredential = new S3TokenCredential("key", 
"secret", "token", 0);
+    Map<String, String> icebergProperties =
+        CredentialPropertyUtils.toIcebergProperties(s3TokenCredential);
+    Map<String, String> expectedProperties =
+        ImmutableMap.of(
+            CredentialPropertyUtils.ICEBERG_S3_ACCESS_KEY_ID,
+            "key",
+            CredentialPropertyUtils.ICEBERG_S3_SECRET_ACCESS_KEY,
+            "secret",
+            CredentialPropertyUtils.ICEBERG_S3_TOKEN,
+            "token");
+    Assertions.assertEquals(expectedProperties, icebergProperties);
+
+    S3SecretKeyCredential secretKeyCredential = new 
S3SecretKeyCredential("key", "secret");
+    icebergProperties = 
CredentialPropertyUtils.toIcebergProperties(secretKeyCredential);
+    expectedProperties =
+        ImmutableMap.of(
+            CredentialPropertyUtils.ICEBERG_S3_ACCESS_KEY_ID,
+            "key",
+            CredentialPropertyUtils.ICEBERG_S3_SECRET_ACCESS_KEY,
+            "secret");
+    Assertions.assertEquals(expectedProperties, icebergProperties);
+  }
+}
diff --git a/core/build.gradle.kts b/core/build.gradle.kts
index cb07b49d9..b09c0e358 100644
--- a/core/build.gradle.kts
+++ b/core/build.gradle.kts
@@ -25,6 +25,7 @@ plugins {
 dependencies {
   implementation(project(":api"))
   implementation(project(":common"))
+  implementation(project(":catalogs:catalog-common"))
   implementation(project(":meta"))
   implementation(libs.bundles.log4j)
   implementation(libs.bundles.metrics)
diff --git 
a/core/src/main/java/org/apache/gravitino/credential/config/S3CredentialConfig.java
 
b/core/src/main/java/org/apache/gravitino/credential/config/S3CredentialConfig.java
new file mode 100644
index 000000000..d16e43c08
--- /dev/null
+++ 
b/core/src/main/java/org/apache/gravitino/credential/config/S3CredentialConfig.java
@@ -0,0 +1,110 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.gravitino.credential.config;
+
+import java.util.Map;
+import javax.validation.constraints.NotNull;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.gravitino.Config;
+import org.apache.gravitino.config.ConfigBuilder;
+import org.apache.gravitino.config.ConfigConstants;
+import org.apache.gravitino.config.ConfigEntry;
+import org.apache.gravitino.credential.CredentialConstants;
+import org.apache.gravitino.storage.S3Properties;
+
+public class S3CredentialConfig extends Config {
+
+  public static final ConfigEntry<String> S3_REGION =
+      new ConfigBuilder(S3Properties.GRAVITINO_S3_REGION)
+          .doc("The region of the S3 service")
+          .version(ConfigConstants.VERSION_0_7_0)
+          .stringConf()
+          .create();
+
+  public static final ConfigEntry<String> S3_ACCESS_KEY_ID =
+      new ConfigBuilder(S3Properties.GRAVITINO_S3_ACCESS_KEY_ID)
+          .doc("The static access key ID used to access S3 data")
+          .version(ConfigConstants.VERSION_0_7_0)
+          .stringConf()
+          .checkValue(StringUtils::isNotBlank, 
ConfigConstants.NOT_BLANK_ERROR_MSG)
+          .create();
+
+  public static final ConfigEntry<String> S3_SECRET_ACCESS_KEY =
+      new ConfigBuilder(S3Properties.GRAVITINO_S3_SECRET_ACCESS_KEY)
+          .doc("The static secret access key used to access S3 data")
+          .version(ConfigConstants.VERSION_0_7_0)
+          .stringConf()
+          .checkValue(StringUtils::isNotBlank, 
ConfigConstants.NOT_BLANK_ERROR_MSG)
+          .create();
+
+  public static final ConfigEntry<String> S3_ROLE_ARN =
+      new ConfigBuilder(S3Properties.GRAVITINO_S3_ROLE_ARN)
+          .doc("S3 role arn")
+          .version(ConfigConstants.VERSION_0_7_0)
+          .stringConf()
+          .checkValue(StringUtils::isNotBlank, 
ConfigConstants.NOT_BLANK_ERROR_MSG)
+          .create();
+
+  public static final ConfigEntry<String> S3_EXTERNAL_ID =
+      new ConfigBuilder(S3Properties.GRAVITINO_S3_EXTERNAL_ID)
+          .doc("S3 external ID")
+          .version(ConfigConstants.VERSION_0_7_0)
+          .stringConf()
+          .create();
+
+  public static final ConfigEntry<Integer> S3_TOKEN_EXPIRE_SECS =
+      new ConfigBuilder(CredentialConstants.TOKEN_EXPIRE_TIME)
+          .doc("S3 token expire seconds")
+          .version(ConfigConstants.VERSION_0_7_0)
+          .intConf()
+          .createWithDefault(3600);
+
+  public S3CredentialConfig(Map<String, String> properties) {
+    super(false);
+    loadFromMap(properties, k -> true);
+  }
+
+  @NotNull
+  public String s3RoleArn() {
+    return this.get(S3_ROLE_ARN);
+  }
+
+  @NotNull
+  public String accessKeyID() {
+    return this.get(S3_ACCESS_KEY_ID);
+  }
+
+  @NotNull
+  public String secretAccessKey() {
+    return this.get(S3_SECRET_ACCESS_KEY);
+  }
+
+  public String region() {
+    return this.get(S3_REGION);
+  }
+
+  public String externalID() {
+    return this.get(S3_EXTERNAL_ID);
+  }
+
+  public Integer tokenExpireSecs() {
+    return this.get(S3_TOKEN_EXPIRE_SECS);
+  }
+}
diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml
index 830fe5e74..20a0a218e 100644
--- a/gradle/libs.versions.toml
+++ b/gradle/libs.versions.toml
@@ -17,6 +17,7 @@
 # under the License.
 #
 [versions]
+awssdk = "2.28.3"
 junit = "5.8.1"
 protoc = "3.24.4"
 jackson = "2.15.2"
@@ -40,6 +41,7 @@ httpclient5 = "5.2.1"
 mockserver = "5.15.0"
 commons-lang3 = "3.14.0"
 commons-lang = "2.6"
+commons-logging = "1.2"
 commons-io = "2.15.0"
 commons-collections4 = "4.4"
 commons-collections3 = "3.2.2"
@@ -110,6 +112,11 @@ hudi = "0.15.0"
 google-auth = "1.28.0"
 
 [libraries]
+aws-iam = { group = "software.amazon.awssdk", name = "iam", version.ref = 
"awssdk" }
+aws-policy = { group = "software.amazon.awssdk", name = "iam-policy-builder", 
version.ref = "awssdk" }
+aws-s3 = { group = "software.amazon.awssdk", name = "s3", version.ref = 
"awssdk" }
+aws-sts = { group = "software.amazon.awssdk", name = "sts", version.ref = 
"awssdk" }
+aws-kms = { group = "software.amazon.awssdk", name = "kms", version.ref = 
"awssdk" }
 protobuf-java = { group = "com.google.protobuf", name = "protobuf-java", 
version.ref = "protoc" }
 protobuf-java-util = { group = "com.google.protobuf", name = 
"protobuf-java-util", version.ref = "protoc" }
 jackson-databind = { group = "com.fasterxml.jackson.core", name = 
"jackson-databind", version.ref = "jackson" }
@@ -169,6 +176,7 @@ mockserver-netty = { group = "org.mock-server", name = 
"mockserver-netty", versi
 mockserver-client-java = { group = "org.mock-server", name = 
"mockserver-client-java", version.ref = "mockserver" }
 commons-lang = { group = "commons-lang", name = "commons-lang", version.ref = 
"commons-lang" }
 commons-lang3 = { group = "org.apache.commons", name = "commons-lang3", 
version.ref = "commons-lang3" }
+commons-logging = { group = "commons-logging", name = "commons-logging", 
version.ref = "commons-logging" }
 commons-io = { group = "commons-io", name = "commons-io", version.ref = 
"commons-io" }
 caffeine = { group = "com.github.ben-manes.caffeine", name = "caffeine", 
version.ref = "caffeine" }
 rocksdbjni = { group = "org.rocksdb", name = "rocksdbjni", version.ref = 
"rocksdbjni" }
@@ -177,6 +185,7 @@ commons-collections3 = { group = "commons-collections", 
name = "commons-collecti
 commons-configuration1 = { group = "commons-configuration", name = 
"commons-configuration", version.ref = "commons-configuration1" }
 iceberg-aliyun = { group = "org.apache.iceberg", name = "iceberg-aliyun", 
version.ref = "iceberg" }
 iceberg-aws = { group = "org.apache.iceberg", name = "iceberg-aws", 
version.ref = "iceberg" }
+iceberg-aws-bundle = { group = "org.apache.iceberg", name = 
"iceberg-aws-bundle", version.ref = "iceberg" }
 iceberg-core = { group = "org.apache.iceberg", name = "iceberg-core", 
version.ref = "iceberg" }
 iceberg-api = { group = "org.apache.iceberg", name = "iceberg-api", 
version.ref = "iceberg" }
 iceberg-hive-metastore = { group = "org.apache.iceberg", name = 
"iceberg-hive-metastore", version.ref = "iceberg" }
diff --git 
a/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/ops/IcebergCatalogWrapper.java
 
b/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/ops/IcebergCatalogWrapper.java
index 95e82aa22..f1d3b178e 100644
--- 
a/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/ops/IcebergCatalogWrapper.java
+++ 
b/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/ops/IcebergCatalogWrapper.java
@@ -75,8 +75,6 @@ public class IcebergCatalogWrapper implements AutoCloseable {
       ImmutableSet.of(
           IcebergConstants.IO_IMPL,
           IcebergConstants.AWS_S3_REGION,
-          IcebergConstants.ICEBERG_S3_ACCESS_KEY_ID,
-          IcebergConstants.ICEBERG_S3_SECRET_ACCESS_KEY,
           IcebergConstants.ICEBERG_S3_ENDPOINT,
           IcebergConstants.ICEBERG_OSS_ENDPOINT,
           IcebergConstants.ICEBERG_OSS_ACCESS_KEY_ID,
diff --git a/iceberg/iceberg-rest-server/build.gradle.kts 
b/iceberg/iceberg-rest-server/build.gradle.kts
index f088ce292..c635ecd79 100644
--- a/iceberg/iceberg-rest-server/build.gradle.kts
+++ b/iceberg/iceberg-rest-server/build.gradle.kts
@@ -63,6 +63,7 @@ dependencies {
 
   compileOnly(libs.lombok)
 
+  testImplementation(project(":bundles:aws-bundle"))
   testImplementation(project(":bundles:gcp-bundle", configuration = "shadow"))
   testImplementation(project(":integration-test-common", "testArtifacts"))
 
@@ -76,6 +77,7 @@ dependencies {
     exclude("org.rocksdb")
   }
 
+  testImplementation(libs.iceberg.aws.bundle)
   testImplementation(libs.iceberg.gcp.bundle)
   testImplementation(libs.jersey.test.framework.core) {
     exclude(group = "org.junit.jupiter")
diff --git 
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/rest/IcebergTableOperations.java
 
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/rest/IcebergTableOperations.java
index 46546bbdc..ce245e945 100644
--- 
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/rest/IcebergTableOperations.java
+++ 
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/rest/IcebergTableOperations.java
@@ -133,7 +133,11 @@ public class IcebergTableOperations {
     LoadTableResponse loadTableResponse =
         tableOperationDispatcher.createTable(catalogName, icebergNS, 
createTableRequest);
     if (isCredentialVending) {
-      return IcebergRestUtils.ok(injectCredentialConfig(catalogName, 
loadTableResponse));
+      return IcebergRestUtils.ok(
+          injectCredentialConfig(
+              catalogName,
+              TableIdentifier.of(icebergNS, createTableRequest.name()),
+              loadTableResponse));
     } else {
       return IcebergRestUtils.ok(loadTableResponse);
     }
@@ -215,7 +219,8 @@ public class IcebergTableOperations {
     LoadTableResponse loadTableResponse =
         tableOperationDispatcher.loadTable(catalogName, tableIdentifier);
     if (isCredentialVending) {
-      return IcebergRestUtils.ok(injectCredentialConfig(catalogName, 
loadTableResponse));
+      return IcebergRestUtils.ok(
+          injectCredentialConfig(catalogName, tableIdentifier, 
loadTableResponse));
     } else {
       return IcebergRestUtils.ok(loadTableResponse);
     }
@@ -270,7 +275,7 @@ public class IcebergTableOperations {
   }
 
   private LoadTableResponse injectCredentialConfig(
-      String catalogName, LoadTableResponse loadTableResponse) {
+      String catalogName, TableIdentifier tableIdentifier, LoadTableResponse 
loadTableResponse) {
     CredentialProvider credentialProvider =
         icebergCatalogWrapperManager.getCredentialProvider(catalogName);
     if (credentialProvider == null) {
@@ -286,6 +291,12 @@ public class IcebergTableOperations {
       throw new ServiceUnavailableException(
           "Couldn't generate credential for %s", 
credentialProvider.credentialType());
     }
+
+    LOG.info(
+        "Generate credential: {} for Iceberg table: {}",
+        credential.credentialType(),
+        tableIdentifier);
+
     Map<String, String> credentialConfig = 
CredentialPropertyUtils.toIcebergProperties(credential);
     return LoadTableResponse.builder()
         .withTableMetadata(loadTableResponse.tableMetadata())
diff --git 
a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/integration/test/IcebergRESTS3IT.java
 
b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/integration/test/IcebergRESTS3IT.java
new file mode 100644
index 000000000..7941177a7
--- /dev/null
+++ 
b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/integration/test/IcebergRESTS3IT.java
@@ -0,0 +1,127 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.gravitino.iceberg.integration.test;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergConstants;
+import org.apache.gravitino.credential.CredentialConstants;
+import org.apache.gravitino.iceberg.common.IcebergConfig;
+import org.apache.gravitino.integration.test.util.BaseIT;
+import org.apache.gravitino.integration.test.util.DownloaderUtils;
+import org.apache.gravitino.integration.test.util.ITUtils;
+import org.apache.gravitino.storage.S3Properties;
+import org.junit.jupiter.api.condition.EnabledIfEnvironmentVariable;
+import org.junit.platform.commons.util.StringUtils;
+
+@EnabledIfEnvironmentVariable(named = "GRAVITINO_TEST_CLOUD_IT", matches = 
"true")
+public class IcebergRESTS3IT extends IcebergRESTJdbcCatalogIT {
+
+  private String s3Warehouse;
+  private String accessKey;
+  private String secretKey;
+  private String region;
+  private String roleArn;
+  private String externalId;
+
+  @Override
+  void initEnv() {
+    this.s3Warehouse =
+        String.format("s3://%s/test1", 
getFromEnvOrDefault("GRAVITINO_S3_BUCKET", "{BUCKET_NAME}"));
+    this.accessKey = getFromEnvOrDefault("GRAVITINO_S3_ACCESS_KEY", 
"{ACCESS_KEY}");
+    this.secretKey = getFromEnvOrDefault("GRAVITINO_S3_SECRET_KEY", 
"{SECRET_KEY}");
+    this.region = getFromEnvOrDefault("GRAVITINO_S3_REGION", "ap-southeast-2");
+    this.roleArn = getFromEnvOrDefault("GRAVITINO_S3_ROLE_ARN", "{ROLE_ARN}");
+    this.externalId = getFromEnvOrDefault("GRAVITINO_S3_EXTERNAL_ID", "");
+    if (ITUtils.isEmbedded()) {
+      return;
+    }
+    try {
+      downloadIcebergAwsBundleJar();
+    } catch (IOException e) {
+      LOG.warn("Download Iceberg AWS bundle jar failed,", e);
+      throw new RuntimeException(e);
+    }
+    copyS3BundleJar();
+  }
+
+  @Override
+  public Map<String, String> getCatalogConfig() {
+    HashMap m = new HashMap<String, String>();
+    m.putAll(getCatalogJdbcConfig());
+    m.putAll(getS3Config());
+    return m;
+  }
+
+  public boolean supportsCredentialVending() {
+    return true;
+  }
+
+  private Map<String, String> getS3Config() {
+    Map configMap = new HashMap<String, String>();
+
+    configMap.put(
+        IcebergConfig.ICEBERG_CONFIG_PREFIX + 
CredentialConstants.CREDENTIAL_PROVIDER_TYPE,
+        CredentialConstants.S3_TOKEN_CREDENTIAL_PROVIDER);
+    configMap.put(IcebergConfig.ICEBERG_CONFIG_PREFIX + 
S3Properties.GRAVITINO_S3_REGION, region);
+    configMap.put(
+        IcebergConfig.ICEBERG_CONFIG_PREFIX + 
S3Properties.GRAVITINO_S3_ACCESS_KEY_ID, accessKey);
+    configMap.put(
+        IcebergConfig.ICEBERG_CONFIG_PREFIX + 
S3Properties.GRAVITINO_S3_SECRET_ACCESS_KEY,
+        secretKey);
+    configMap.put(
+        IcebergConfig.ICEBERG_CONFIG_PREFIX + 
S3Properties.GRAVITINO_S3_ROLE_ARN, roleArn);
+    if (StringUtils.isNotBlank(externalId)) {
+      configMap.put(
+          IcebergConfig.ICEBERG_CONFIG_PREFIX + 
S3Properties.GRAVITINO_S3_EXTERNAL_ID, externalId);
+    }
+
+    configMap.put(
+        IcebergConfig.ICEBERG_CONFIG_PREFIX + IcebergConstants.IO_IMPL,
+        "org.apache.iceberg.aws.s3.S3FileIO");
+    configMap.put(IcebergConfig.ICEBERG_CONFIG_PREFIX + 
IcebergConstants.WAREHOUSE, s3Warehouse);
+
+    return configMap;
+  }
+
+  private void downloadIcebergAwsBundleJar() throws IOException {
+    String icebergBundleJarName = "iceberg-aws-bundle-1.5.2.jar";
+    String icebergBundleJarUri =
+        "https://repo1.maven.org/maven2/org/apache/iceberg/";
+            + "iceberg-aws-bundle/1.5.2/"
+            + icebergBundleJarName;
+    String gravitinoHome = System.getenv("GRAVITINO_HOME");
+    String targetDir = String.format("%s/iceberg-rest-server/libs/", 
gravitinoHome);
+    DownloaderUtils.downloadFile(icebergBundleJarUri, targetDir);
+  }
+
+  private void copyS3BundleJar() {
+    String gravitinoHome = System.getenv("GRAVITINO_HOME");
+    String targetDir = String.format("%s/iceberg-rest-server/libs/", 
gravitinoHome);
+    BaseIT.copyBundleJarsToDirectory("aws-bundle", targetDir);
+  }
+
+  private String getFromEnvOrDefault(String envVar, String defaultValue) {
+    String envValue = System.getenv(envVar);
+    return Optional.ofNullable(envValue).orElse(defaultValue);
+  }
+}

Reply via email to