This is an automated email from the ASF dual-hosted git repository.

snlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 20ff0187fa Support the cross-account access using IAM role for S3 
PinotFS (#10009)
20ff0187fa is described below

commit 20ff0187faea95d12e84f76f7a00227834284a6d
Author: Seunghyun Lee <[email protected]>
AuthorDate: Wed Dec 21 08:00:51 2022 -0800

    Support the cross-account access using IAM role for S3 PinotFS (#10009)
    
    * Support the cross-account access using IAM role for S3 PinotFS
    
    When accountA tries to access data in S3 bucket owned by accountB,
    AWS provides a way to establish the access to the S3 bucket
    using cross-account IAM role. This approach is preferred in
    some cases because `accountKey, secretKey` doesn't need to be
    exposed.
    
    https://repost.aws/knowledge-center/cross-account-access-s3
    
    * Addressed the comments
---
 .../apache/pinot/plugin/filesystem/S3Config.java   | 155 +++++++++++++++++++++
 .../apache/pinot/plugin/filesystem/S3PinotFS.java  |  73 +++++-----
 2 files changed, 195 insertions(+), 33 deletions(-)

diff --git 
a/pinot-plugins/pinot-file-system/pinot-s3/src/main/java/org/apache/pinot/plugin/filesystem/S3Config.java
 
b/pinot-plugins/pinot-file-system/pinot-s3/src/main/java/org/apache/pinot/plugin/filesystem/S3Config.java
new file mode 100644
index 0000000000..b9b6c1ca35
--- /dev/null
+++ 
b/pinot-plugins/pinot-file-system/pinot-s3/src/main/java/org/apache/pinot/plugin/filesystem/S3Config.java
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.filesystem;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import java.util.UUID;
+import org.apache.pinot.spi.env.PinotConfiguration;
+
+
+/**
+ * S3 related config
+ */
+public class S3Config {
+
+  private static final boolean DEFAULT_DISABLE_ACL = true;
+
+  public static final String ACCESS_KEY = "accessKey";
+  public static final String SECRET_KEY = "secretKey";
+  public static final String REGION = "region";
+  public static final String ENDPOINT = "endpoint";
+  public static final String DISABLE_ACL_CONFIG_KEY = "disableAcl";
+
+  // Encryption related configurations
+  public static final String SERVER_SIDE_ENCRYPTION_CONFIG_KEY = 
"serverSideEncryption";
+  public static final String SSE_KMS_KEY_ID_CONFIG_KEY = "ssekmsKeyId";
+  public static final String SSE_KMS_ENCRYPTION_CONTEXT_CONFIG_KEY = 
"ssekmsEncryptionContext";
+
+  // IAM Role related configurations
+  public static final String IAM_ROLE_BASED_ACCESS_ENABLED = 
"iamRoleBasedAccessEnabled";
+  public static final String ROLE_ARN = "roleArn";
+  public static final String ROLE_SESSION_NAME = "roleSessionName";
+  public static final String EXTERNAL_ID = "externalId";
+  public static final String SESSION_DURATION_SECONDS = 
"sessionDurationSeconds";
+  public static final String ASYNC_SESSION_UPDATED_ENABLED = 
"asyncSessionUpdateEnabled";
+  public static final String DEFAULT_IAM_ROLE_BASED_ACCESS_ENABLED = "false";
+  public static final String DEFAULT_SESSION_DURATION_SECONDS = "900";
+  public static final String DEFAULT_ASYNC_SESSION_UPDATED_ENABLED = "true";
+
+  private final String _accessKey;
+  private final String _secretKey;
+  private final String _region;
+  private final boolean _disableAcl;
+  private final String _endpoint;
+
+  private final String _serverSideEncryption;
+  private String _ssekmsKeyId;
+  private String _ssekmsEncryptionContext;
+
+  private boolean _iamRoleBasedAccess;
+  private String _roleArn;
+  private String _roleSessionName;
+  private String _externalId;
+  private int _sessionDurationSeconds;
+  private boolean _asyncSessionUpdateEnabled;
+
+  public S3Config(PinotConfiguration pinotConfig) {
+    _disableAcl = pinotConfig.getProperty(DISABLE_ACL_CONFIG_KEY, 
DEFAULT_DISABLE_ACL);
+    _accessKey = pinotConfig.getProperty(ACCESS_KEY);
+    _secretKey = pinotConfig.getProperty(SECRET_KEY);
+    _region = pinotConfig.getProperty(REGION);
+    _endpoint = pinotConfig.getProperty(ENDPOINT);
+
+    _serverSideEncryption = 
pinotConfig.getProperty(SERVER_SIDE_ENCRYPTION_CONFIG_KEY);
+    _ssekmsKeyId = pinotConfig.getProperty(SSE_KMS_KEY_ID_CONFIG_KEY);
+    _ssekmsEncryptionContext = 
pinotConfig.getProperty(SSE_KMS_ENCRYPTION_CONTEXT_CONFIG_KEY);
+
+    _iamRoleBasedAccess = Boolean.parseBoolean(
+        pinotConfig.getProperty(IAM_ROLE_BASED_ACCESS_ENABLED, 
DEFAULT_IAM_ROLE_BASED_ACCESS_ENABLED));
+    _roleArn = pinotConfig.getProperty(ROLE_ARN);
+    _roleSessionName =
+        pinotConfig.getProperty(ROLE_SESSION_NAME, 
Joiner.on("-").join("pinot", "s3", UUID.randomUUID()));
+    _externalId = pinotConfig.getProperty(EXTERNAL_ID);
+    _sessionDurationSeconds =
+        Integer.parseInt(pinotConfig.getProperty(SESSION_DURATION_SECONDS, 
DEFAULT_SESSION_DURATION_SECONDS));
+    _asyncSessionUpdateEnabled = Boolean.parseBoolean(
+        pinotConfig.getProperty(ASYNC_SESSION_UPDATED_ENABLED, 
DEFAULT_ASYNC_SESSION_UPDATED_ENABLED));
+
+    if (_iamRoleBasedAccess) {
+      Preconditions.checkNotNull(_roleArn, "Must provide 'roleArn' if 
iamRoleBasedAccess is enabled");
+    }
+  }
+
+  public String getAccessKey() {
+    return _accessKey;
+  }
+
+  public String getSecretKey() {
+    return _secretKey;
+  }
+
+  public String getRegion() {
+    return _region;
+  }
+
+  public boolean getDisableAcl() {
+    return _disableAcl;
+  }
+
+  public String getEndpoint() {
+    return _endpoint;
+  }
+
+  public String getServerSideEncryption() {
+    return _serverSideEncryption;
+  }
+
+  public String getSseKmsKeyId() {
+    return _ssekmsKeyId;
+  }
+
+  public String getSsekmsEncryptionContext() {
+    return _ssekmsEncryptionContext;
+  }
+
+  public boolean isIamRoleBasedAccess() {
+    return _iamRoleBasedAccess;
+  }
+
+  public String getRoleArn() {
+    return _roleArn;
+  }
+
+  public String getRoleSessionName() {
+    return _roleSessionName;
+  }
+
+  public String getExternalId() {
+    return _externalId;
+  }
+
+  public int getSessionDurationSeconds() {
+    return _sessionDurationSeconds;
+  }
+
+  public boolean isAsyncSessionUpdateEnabled() {
+    return _asyncSessionUpdateEnabled;
+  }
+}
diff --git 
a/pinot-plugins/pinot-file-system/pinot-s3/src/main/java/org/apache/pinot/plugin/filesystem/S3PinotFS.java
 
b/pinot-plugins/pinot-file-system/pinot-s3/src/main/java/org/apache/pinot/plugin/filesystem/S3PinotFS.java
index 99cf60a89c..7b4462622c 100644
--- 
a/pinot-plugins/pinot-file-system/pinot-s3/src/main/java/org/apache/pinot/plugin/filesystem/S3PinotFS.java
+++ 
b/pinot-plugins/pinot-file-system/pinot-s3/src/main/java/org/apache/pinot/plugin/filesystem/S3PinotFS.java
@@ -37,6 +37,7 @@ import java.util.Map;
 import java.util.function.Consumer;
 import javax.annotation.Nullable;
 import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.filesystem.BasePinotFS;
 import org.apache.pinot.spi.filesystem.FileMetadata;
@@ -68,56 +69,67 @@ import 
software.amazon.awssdk.services.s3.model.PutObjectResponse;
 import software.amazon.awssdk.services.s3.model.S3Exception;
 import software.amazon.awssdk.services.s3.model.S3Object;
 import software.amazon.awssdk.services.s3.model.ServerSideEncryption;
+import software.amazon.awssdk.services.sts.StsClient;
+import 
software.amazon.awssdk.services.sts.auth.StsAssumeRoleCredentialsProvider;
+import software.amazon.awssdk.services.sts.model.AssumeRoleRequest;
 
 
 /**
  * Implementation of PinotFS for AWS S3 file system
  */
 public class S3PinotFS extends BasePinotFS {
-  public static final String ACCESS_KEY = "accessKey";
-  public static final String SECRET_KEY = "secretKey";
-  public static final String REGION = "region";
-  public static final String ENDPOINT = "endpoint";
-  public static final String DISABLE_ACL_CONFIG_KEY = "disableAcl";
-  public static final String SERVER_SIDE_ENCRYPTION_CONFIG_KEY = 
"serverSideEncryption";
-  public static final String SSE_KMS_KEY_ID_CONFIG_KEY = "ssekmsKeyId";
-  public static final String SSE_KMS_ENCRYPTION_CONTEXT_CONFIG_KEY = 
"ssekmsEncryptionContext";
-
   private static final Logger LOGGER = 
LoggerFactory.getLogger(S3PinotFS.class);
+
   private static final String DELIMITER = "/";
   public static final String S3_SCHEME = "s3://";
-  private static final boolean DEFAULT_DISABLE_ACL = true;
   private S3Client _s3Client;
-  private boolean _disableAcl = DEFAULT_DISABLE_ACL;
+  private boolean _disableAcl;
   private ServerSideEncryption _serverSideEncryption = null;
   private String _ssekmsKeyId;
   private String _ssekmsEncryptionContext;
 
   @Override
   public void init(PinotConfiguration config) {
-    Preconditions.checkArgument(!isNullOrEmpty(config.getProperty(REGION)), 
"Region can't be null or empty");
-    String region = config.getProperty(REGION);
-    _disableAcl = config.getProperty(DISABLE_ACL_CONFIG_KEY, 
DEFAULT_DISABLE_ACL);
-    String serverSideEncryption = 
config.getProperty(SERVER_SIDE_ENCRYPTION_CONFIG_KEY);
-    setServerSideEncryption(serverSideEncryption, config);
+    S3Config s3Config = new S3Config(config);
+    Preconditions.checkArgument(StringUtils.isNotEmpty(s3Config.getRegion()), 
"Region can't be null or empty");
+
+    _disableAcl = s3Config.getDisableAcl();
+    setServerSideEncryption(s3Config.getServerSideEncryption(), s3Config);
 
     AwsCredentialsProvider awsCredentialsProvider;
     try {
-      if (!isNullOrEmpty(config.getProperty(ACCESS_KEY)) && 
!isNullOrEmpty(config.getProperty(SECRET_KEY))) {
-        String accessKey = config.getProperty(ACCESS_KEY);
-        String secretKey = config.getProperty(SECRET_KEY);
-        AwsBasicCredentials awsBasicCredentials = 
AwsBasicCredentials.create(accessKey, secretKey);
+      if (StringUtils.isNotEmpty(s3Config.getAccessKey()) && 
StringUtils.isNotEmpty(s3Config.getSecretKey())) {
+        AwsBasicCredentials awsBasicCredentials =
+            AwsBasicCredentials.create(s3Config.getAccessKey(), 
s3Config.getSecretKey());
         awsCredentialsProvider = 
StaticCredentialsProvider.create(awsBasicCredentials);
       } else {
         awsCredentialsProvider = DefaultCredentialsProvider.create();
       }
 
+      // IAM Role based access
+      if (s3Config.isIamRoleBasedAccess()) {
+        AssumeRoleRequest.Builder assumeRoleRequestBuilder =
+            
AssumeRoleRequest.builder().roleArn(s3Config.getRoleArn()).roleSessionName(s3Config.getRoleSessionName())
+                .durationSeconds(s3Config.getSessionDurationSeconds());
+        AssumeRoleRequest assumeRoleRequest;
+        if (StringUtils.isNotEmpty(s3Config.getExternalId())) {
+          assumeRoleRequest = 
assumeRoleRequestBuilder.externalId(s3Config.getExternalId()).build();
+        } else {
+          assumeRoleRequest = assumeRoleRequestBuilder.build();
+        }
+        StsClient stsClient =
+            
StsClient.builder().region(Region.of(s3Config.getRegion())).credentialsProvider(awsCredentialsProvider)
+                .build();
+        awsCredentialsProvider =
+            
StsAssumeRoleCredentialsProvider.builder().stsClient(stsClient).refreshRequest(assumeRoleRequest)
+                
.asyncCredentialUpdateEnabled(s3Config.isAsyncSessionUpdateEnabled()).build();
+      }
+
       S3ClientBuilder s3ClientBuilder =
-          
S3Client.builder().region(Region.of(region)).credentialsProvider(awsCredentialsProvider);
-      if (!isNullOrEmpty(config.getProperty(ENDPOINT))) {
-        String endpoint = config.getProperty(ENDPOINT);
+          
S3Client.builder().region(Region.of(s3Config.getRegion())).credentialsProvider(awsCredentialsProvider);
+      if (StringUtils.isNotEmpty(s3Config.getEndpoint())) {
         try {
-          s3ClientBuilder.endpointOverride(new URI(endpoint));
+          s3ClientBuilder.endpointOverride(new URI(s3Config.getEndpoint()));
         } catch (URISyntaxException e) {
           throw new RuntimeException(e);
         }
@@ -145,11 +157,10 @@ public class S3PinotFS extends BasePinotFS {
    */
   public void init(S3Client s3Client, String serverSideEncryption, 
PinotConfiguration serverSideEncryptionConfig) {
     _s3Client = s3Client;
-    setServerSideEncryption(serverSideEncryption, serverSideEncryptionConfig);
+    setServerSideEncryption(serverSideEncryption, new 
S3Config(serverSideEncryptionConfig));
   }
 
-  private void setServerSideEncryption(@Nullable String serverSideEncryption,
-      PinotConfiguration serverSideEncryptionConfig) {
+  private void setServerSideEncryption(@Nullable String serverSideEncryption, 
S3Config s3Config) {
     if (serverSideEncryption != null) {
       try {
         _serverSideEncryption = 
ServerSideEncryption.valueOf(serverSideEncryption);
@@ -160,12 +171,12 @@ public class S3PinotFS extends BasePinotFS {
       }
       switch (_serverSideEncryption) {
         case AWS_KMS:
-          _ssekmsKeyId = 
serverSideEncryptionConfig.getProperty(SSE_KMS_KEY_ID_CONFIG_KEY);
+          _ssekmsKeyId = s3Config.getSseKmsKeyId();
           if (_ssekmsKeyId == null) {
             throw new UnsupportedOperationException(
                 "Missing required config: 'sseKmsKeyId' when AWS_KMS is used 
for server side encryption");
           }
-          _ssekmsEncryptionContext = 
serverSideEncryptionConfig.getProperty(SSE_KMS_ENCRYPTION_CONTEXT_CONFIG_KEY);
+          _ssekmsEncryptionContext = s3Config.getSsekmsEncryptionContext();
           break;
         case AES256:
           // Todo: Support AES256.
@@ -175,10 +186,6 @@ public class S3PinotFS extends BasePinotFS {
     }
   }
 
-  boolean isNullOrEmpty(String target) {
-    return target == null || target.isEmpty();
-  }
-
   private HeadObjectResponse getS3ObjectMetadata(URI uri)
       throws IOException {
     URI base = getBase(uri);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to