This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 8136127f0 [filesystem] Support AssumeRole STS for RustFS (#2989)
8136127f0 is described below

commit 8136127f0c722f5da80c832e11dde30f8317f5df
Author: Anton Borisov <[email protected]>
AuthorDate: Wed Apr 8 09:16:00 2026 +0100

    [filesystem] Support AssumeRole STS for RustFS (#2989)
---
 .../fs/s3/token/S3DelegationTokenProvider.java     | 77 +++++++++++++++++-----
 1 file changed, 62 insertions(+), 15 deletions(-)

diff --git 
a/fluss-filesystems/fluss-fs-s3/src/main/java/org/apache/fluss/fs/s3/token/S3DelegationTokenProvider.java
 
b/fluss-filesystems/fluss-fs-s3/src/main/java/org/apache/fluss/fs/s3/token/S3DelegationTokenProvider.java
index 33caaacdd..74c178a34 100644
--- 
a/fluss-filesystems/fluss-fs-s3/src/main/java/org/apache/fluss/fs/s3/token/S3DelegationTokenProvider.java
+++ 
b/fluss-filesystems/fluss-fs-s3/src/main/java/org/apache/fluss/fs/s3/token/S3DelegationTokenProvider.java
@@ -22,17 +22,23 @@ import org.apache.fluss.fs.token.ObtainedSecurityToken;
 
 import com.amazonaws.auth.AWSStaticCredentialsProvider;
 import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.client.builder.AwsClientBuilder;
 import com.amazonaws.services.securitytoken.AWSSecurityTokenService;
 import 
com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClientBuilder;
+import com.amazonaws.services.securitytoken.model.AssumeRoleRequest;
+import com.amazonaws.services.securitytoken.model.AssumeRoleResult;
 import com.amazonaws.services.securitytoken.model.Credentials;
 import com.amazonaws.services.securitytoken.model.GetSessionTokenResult;
 import org.apache.hadoop.conf.Configuration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
+
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.UUID;
 
 import static org.apache.fluss.utils.Preconditions.checkNotNull;
 
@@ -47,10 +53,15 @@ public class S3DelegationTokenProvider {
     private static final String REGION_KEY = "fs.s3a.region";
     private static final String ENDPOINT_KEY = "fs.s3a.endpoint";
 
+    private static final String ROLE_ARN_KEY = "fs.s3a.assumed.role.arn";
+    private static final String STS_ENDPOINT_KEY = 
"fs.s3a.assumed.role.sts.endpoint";
+
     private final String scheme;
     private final String region;
     private final String accessKey;
     private final String secretKey;
+    @Nullable private final String roleArn;
+    @Nullable private final String stsEndpoint;
     private final Map<String, String> additionInfos;
 
     public S3DelegationTokenProvider(String scheme, Configuration conf) {
@@ -59,6 +70,8 @@ public class S3DelegationTokenProvider {
         checkNotNull(region, "Region is not set.");
         this.accessKey = conf.get(ACCESS_KEY_ID);
         this.secretKey = conf.get(ACCESS_KEY_SECRET);
+        this.roleArn = conf.get(ROLE_ARN_KEY);
+        this.stsEndpoint = conf.get(STS_ENDPOINT_KEY);
         this.additionInfos = new HashMap<>();
         for (String key : Arrays.asList(REGION_KEY, ENDPOINT_KEY)) {
             if (conf.get(key) != null) {
@@ -68,25 +81,59 @@ public class S3DelegationTokenProvider {
     }
 
     public ObtainedSecurityToken obtainSecurityToken() {
-        LOG.info("Obtaining session credentials token with access key: {}", 
accessKey);
+        AWSSecurityTokenService stsClient = buildStsClient();
+        try {
+            Credentials credentials;
+
+            if (roleArn != null) {
+                LOG.info(
+                        "Obtaining session credentials via AssumeRole with 
access key: {}, role: {}",
+                        accessKey,
+                        roleArn);
+                AssumeRoleRequest request =
+                        new AssumeRoleRequest()
+                                .withRoleArn(roleArn)
+                                .withRoleSessionName("fluss-" + 
UUID.randomUUID());
+                AssumeRoleResult result = stsClient.assumeRole(request);
+                credentials = result.getCredentials();
+            } else {
+                LOG.info(
+                        "Obtaining session credentials via GetSessionToken 
with access key: {}",
+                        accessKey);
+                GetSessionTokenResult result = stsClient.getSessionToken();
+                credentials = result.getCredentials();
+            }
 
-        AWSSecurityTokenService stsClient =
+            LOG.info(
+                    "Session credentials obtained successfully with access 
key: {} expiration: {}",
+                    credentials.getAccessKeyId(),
+                    credentials.getExpiration());
+
+            return new ObtainedSecurityToken(
+                    scheme,
+                    toJson(credentials),
+                    credentials.getExpiration().getTime(),
+                    additionInfos);
+        } finally {
+            stsClient.shutdown();
+        }
+    }
+
+    private AWSSecurityTokenService buildStsClient() {
+        AWSSecurityTokenServiceClientBuilder builder =
                 AWSSecurityTokenServiceClientBuilder.standard()
-                        .withRegion(region)
                         .withCredentials(
                                 new AWSStaticCredentialsProvider(
-                                        new BasicAWSCredentials(accessKey, 
secretKey)))
-                        .build();
-        GetSessionTokenResult sessionTokenResult = stsClient.getSessionToken();
-        Credentials credentials = sessionTokenResult.getCredentials();
-
-        LOG.info(
-                "Session credentials obtained successfully with access key: {} 
expiration: {}",
-                credentials.getAccessKeyId(),
-                credentials.getExpiration());
-
-        return new ObtainedSecurityToken(
-                scheme, toJson(credentials), 
credentials.getExpiration().getTime(), additionInfos);
+                                        new BasicAWSCredentials(accessKey, 
secretKey)));
+
+        if (stsEndpoint != null) {
+            builder.withEndpointConfiguration(
+                    new AwsClientBuilder.EndpointConfiguration(stsEndpoint, 
region));
+        } else {
+            builder.withRegion(region);
+        }
+
+        return builder.build();
     }
 
     private byte[] toJson(Credentials credentials) {

Reply via email to