This is an automated email from the ASF dual-hosted git repository.
snlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 20ff0187fa Support the cross-account access using IAM role for S3
PinotFS (#10009)
20ff0187fa is described below
commit 20ff0187faea95d12e84f76f7a00227834284a6d
Author: Seunghyun Lee <[email protected]>
AuthorDate: Wed Dec 21 08:00:51 2022 -0800
Support the cross-account access using IAM role for S3 PinotFS (#10009)
* Support the cross-account access using IAM role for S3 PinotFS
When accountA tries to access data in S3 bucket owned by accountB,
AWS provides a way to establish the access to the S3 bucket
using cross-account IAM role. This approach is preferred in
some cases because `accountKey, secretKey` doesn't need to be
exposed.
https://repost.aws/knowledge-center/cross-account-access-s3
* Addressed the comments
---
.../apache/pinot/plugin/filesystem/S3Config.java | 155 +++++++++++++++++++++
.../apache/pinot/plugin/filesystem/S3PinotFS.java | 73 +++++-----
2 files changed, 195 insertions(+), 33 deletions(-)
diff --git
a/pinot-plugins/pinot-file-system/pinot-s3/src/main/java/org/apache/pinot/plugin/filesystem/S3Config.java
b/pinot-plugins/pinot-file-system/pinot-s3/src/main/java/org/apache/pinot/plugin/filesystem/S3Config.java
new file mode 100644
index 0000000000..b9b6c1ca35
--- /dev/null
+++
b/pinot-plugins/pinot-file-system/pinot-s3/src/main/java/org/apache/pinot/plugin/filesystem/S3Config.java
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.filesystem;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import java.util.UUID;
+import org.apache.pinot.spi.env.PinotConfiguration;
+
+
+/**
+ * S3 related config
+ */
+public class S3Config {
+
+ private static final boolean DEFAULT_DISABLE_ACL = true;
+
+ public static final String ACCESS_KEY = "accessKey";
+ public static final String SECRET_KEY = "secretKey";
+ public static final String REGION = "region";
+ public static final String ENDPOINT = "endpoint";
+ public static final String DISABLE_ACL_CONFIG_KEY = "disableAcl";
+
+ // Encryption related configurations
+ public static final String SERVER_SIDE_ENCRYPTION_CONFIG_KEY =
"serverSideEncryption";
+ public static final String SSE_KMS_KEY_ID_CONFIG_KEY = "ssekmsKeyId";
+ public static final String SSE_KMS_ENCRYPTION_CONTEXT_CONFIG_KEY =
"ssekmsEncryptionContext";
+
+ // IAM Role related configurations
+ public static final String IAM_ROLE_BASED_ACCESS_ENABLED =
"iamRoleBasedAccessEnabled";
+ public static final String ROLE_ARN = "roleArn";
+ public static final String ROLE_SESSION_NAME = "roleSessionName";
+ public static final String EXTERNAL_ID = "externalId";
+ public static final String SESSION_DURATION_SECONDS =
"sessionDurationSeconds";
+ public static final String ASYNC_SESSION_UPDATED_ENABLED =
"asyncSessionUpdateEnabled";
+ public static final String DEFAULT_IAM_ROLE_BASED_ACCESS_ENABLED = "false";
+ public static final String DEFAULT_SESSION_DURATION_SECONDS = "900";
+ public static final String DEFAULT_ASYNC_SESSION_UPDATED_ENABLED = "true";
+
+ private final String _accessKey;
+ private final String _secretKey;
+ private final String _region;
+ private final boolean _disableAcl;
+ private final String _endpoint;
+
+ private final String _serverSideEncryption;
+ private String _ssekmsKeyId;
+ private String _ssekmsEncryptionContext;
+
+ private boolean _iamRoleBasedAccess;
+ private String _roleArn;
+ private String _roleSessionName;
+ private String _externalId;
+ private int _sessionDurationSeconds;
+ private boolean _asyncSessionUpdateEnabled;
+
+ public S3Config(PinotConfiguration pinotConfig) {
+ _disableAcl = pinotConfig.getProperty(DISABLE_ACL_CONFIG_KEY,
DEFAULT_DISABLE_ACL);
+ _accessKey = pinotConfig.getProperty(ACCESS_KEY);
+ _secretKey = pinotConfig.getProperty(SECRET_KEY);
+ _region = pinotConfig.getProperty(REGION);
+ _endpoint = pinotConfig.getProperty(ENDPOINT);
+
+ _serverSideEncryption =
pinotConfig.getProperty(SERVER_SIDE_ENCRYPTION_CONFIG_KEY);
+ _ssekmsKeyId = pinotConfig.getProperty(SSE_KMS_KEY_ID_CONFIG_KEY);
+ _ssekmsEncryptionContext =
pinotConfig.getProperty(SSE_KMS_ENCRYPTION_CONTEXT_CONFIG_KEY);
+
+ _iamRoleBasedAccess = Boolean.parseBoolean(
+ pinotConfig.getProperty(IAM_ROLE_BASED_ACCESS_ENABLED,
DEFAULT_IAM_ROLE_BASED_ACCESS_ENABLED));
+ _roleArn = pinotConfig.getProperty(ROLE_ARN);
+ _roleSessionName =
+ pinotConfig.getProperty(ROLE_SESSION_NAME,
Joiner.on("-").join("pinot", "s3", UUID.randomUUID()));
+ _externalId = pinotConfig.getProperty(EXTERNAL_ID);
+ _sessionDurationSeconds =
+ Integer.parseInt(pinotConfig.getProperty(SESSION_DURATION_SECONDS,
DEFAULT_SESSION_DURATION_SECONDS));
+ _asyncSessionUpdateEnabled = Boolean.parseBoolean(
+ pinotConfig.getProperty(ASYNC_SESSION_UPDATED_ENABLED,
DEFAULT_ASYNC_SESSION_UPDATED_ENABLED));
+
+ if (_iamRoleBasedAccess) {
+ Preconditions.checkNotNull(_roleArn, "Must provide 'roleArn' if
iamRoleBasedAccess is enabled");
+ }
+ }
+
+ public String getAccessKey() {
+ return _accessKey;
+ }
+
+ public String getSecretKey() {
+ return _secretKey;
+ }
+
+ public String getRegion() {
+ return _region;
+ }
+
+ public boolean getDisableAcl() {
+ return _disableAcl;
+ }
+
+ public String getEndpoint() {
+ return _endpoint;
+ }
+
+ public String getServerSideEncryption() {
+ return _serverSideEncryption;
+ }
+
+ public String getSseKmsKeyId() {
+ return _ssekmsKeyId;
+ }
+
+ public String getSsekmsEncryptionContext() {
+ return _ssekmsEncryptionContext;
+ }
+
+ public boolean isIamRoleBasedAccess() {
+ return _iamRoleBasedAccess;
+ }
+
+ public String getRoleArn() {
+ return _roleArn;
+ }
+
+ public String getRoleSessionName() {
+ return _roleSessionName;
+ }
+
+ public String getExternalId() {
+ return _externalId;
+ }
+
+ public int getSessionDurationSeconds() {
+ return _sessionDurationSeconds;
+ }
+
+ public boolean isAsyncSessionUpdateEnabled() {
+ return _asyncSessionUpdateEnabled;
+ }
+}
diff --git
a/pinot-plugins/pinot-file-system/pinot-s3/src/main/java/org/apache/pinot/plugin/filesystem/S3PinotFS.java
b/pinot-plugins/pinot-file-system/pinot-s3/src/main/java/org/apache/pinot/plugin/filesystem/S3PinotFS.java
index 99cf60a89c..7b4462622c 100644
---
a/pinot-plugins/pinot-file-system/pinot-s3/src/main/java/org/apache/pinot/plugin/filesystem/S3PinotFS.java
+++
b/pinot-plugins/pinot-file-system/pinot-s3/src/main/java/org/apache/pinot/plugin/filesystem/S3PinotFS.java
@@ -37,6 +37,7 @@ import java.util.Map;
import java.util.function.Consumer;
import javax.annotation.Nullable;
import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.StringUtils;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.filesystem.BasePinotFS;
import org.apache.pinot.spi.filesystem.FileMetadata;
@@ -68,56 +69,67 @@ import
software.amazon.awssdk.services.s3.model.PutObjectResponse;
import software.amazon.awssdk.services.s3.model.S3Exception;
import software.amazon.awssdk.services.s3.model.S3Object;
import software.amazon.awssdk.services.s3.model.ServerSideEncryption;
+import software.amazon.awssdk.services.sts.StsClient;
+import
software.amazon.awssdk.services.sts.auth.StsAssumeRoleCredentialsProvider;
+import software.amazon.awssdk.services.sts.model.AssumeRoleRequest;
/**
* Implementation of PinotFS for AWS S3 file system
*/
public class S3PinotFS extends BasePinotFS {
- public static final String ACCESS_KEY = "accessKey";
- public static final String SECRET_KEY = "secretKey";
- public static final String REGION = "region";
- public static final String ENDPOINT = "endpoint";
- public static final String DISABLE_ACL_CONFIG_KEY = "disableAcl";
- public static final String SERVER_SIDE_ENCRYPTION_CONFIG_KEY =
"serverSideEncryption";
- public static final String SSE_KMS_KEY_ID_CONFIG_KEY = "ssekmsKeyId";
- public static final String SSE_KMS_ENCRYPTION_CONTEXT_CONFIG_KEY =
"ssekmsEncryptionContext";
-
private static final Logger LOGGER =
LoggerFactory.getLogger(S3PinotFS.class);
+
private static final String DELIMITER = "/";
public static final String S3_SCHEME = "s3://";
- private static final boolean DEFAULT_DISABLE_ACL = true;
private S3Client _s3Client;
- private boolean _disableAcl = DEFAULT_DISABLE_ACL;
+ private boolean _disableAcl;
private ServerSideEncryption _serverSideEncryption = null;
private String _ssekmsKeyId;
private String _ssekmsEncryptionContext;
@Override
public void init(PinotConfiguration config) {
- Preconditions.checkArgument(!isNullOrEmpty(config.getProperty(REGION)),
"Region can't be null or empty");
- String region = config.getProperty(REGION);
- _disableAcl = config.getProperty(DISABLE_ACL_CONFIG_KEY,
DEFAULT_DISABLE_ACL);
- String serverSideEncryption =
config.getProperty(SERVER_SIDE_ENCRYPTION_CONFIG_KEY);
- setServerSideEncryption(serverSideEncryption, config);
+ S3Config s3Config = new S3Config(config);
+ Preconditions.checkArgument(StringUtils.isNotEmpty(s3Config.getRegion()),
"Region can't be null or empty");
+
+ _disableAcl = s3Config.getDisableAcl();
+ setServerSideEncryption(s3Config.getServerSideEncryption(), s3Config);
AwsCredentialsProvider awsCredentialsProvider;
try {
- if (!isNullOrEmpty(config.getProperty(ACCESS_KEY)) &&
!isNullOrEmpty(config.getProperty(SECRET_KEY))) {
- String accessKey = config.getProperty(ACCESS_KEY);
- String secretKey = config.getProperty(SECRET_KEY);
- AwsBasicCredentials awsBasicCredentials =
AwsBasicCredentials.create(accessKey, secretKey);
+ if (StringUtils.isNotEmpty(s3Config.getAccessKey()) &&
StringUtils.isNotEmpty(s3Config.getSecretKey())) {
+ AwsBasicCredentials awsBasicCredentials =
+ AwsBasicCredentials.create(s3Config.getAccessKey(),
s3Config.getSecretKey());
awsCredentialsProvider =
StaticCredentialsProvider.create(awsBasicCredentials);
} else {
awsCredentialsProvider = DefaultCredentialsProvider.create();
}
+ // IAM Role based access
+ if (s3Config.isIamRoleBasedAccess()) {
+ AssumeRoleRequest.Builder assumeRoleRequestBuilder =
+
AssumeRoleRequest.builder().roleArn(s3Config.getRoleArn()).roleSessionName(s3Config.getRoleSessionName())
+ .durationSeconds(s3Config.getSessionDurationSeconds());
+ AssumeRoleRequest assumeRoleRequest;
+ if (StringUtils.isNotEmpty(s3Config.getExternalId())) {
+ assumeRoleRequest =
assumeRoleRequestBuilder.externalId(s3Config.getExternalId()).build();
+ } else {
+ assumeRoleRequest = assumeRoleRequestBuilder.build();
+ }
+ StsClient stsClient =
+
StsClient.builder().region(Region.of(s3Config.getRegion())).credentialsProvider(awsCredentialsProvider)
+ .build();
+ awsCredentialsProvider =
+
StsAssumeRoleCredentialsProvider.builder().stsClient(stsClient).refreshRequest(assumeRoleRequest)
+
.asyncCredentialUpdateEnabled(s3Config.isAsyncSessionUpdateEnabled()).build();
+ }
+
S3ClientBuilder s3ClientBuilder =
-
S3Client.builder().region(Region.of(region)).credentialsProvider(awsCredentialsProvider);
- if (!isNullOrEmpty(config.getProperty(ENDPOINT))) {
- String endpoint = config.getProperty(ENDPOINT);
+
S3Client.builder().region(Region.of(s3Config.getRegion())).credentialsProvider(awsCredentialsProvider);
+ if (StringUtils.isNotEmpty(s3Config.getEndpoint())) {
try {
- s3ClientBuilder.endpointOverride(new URI(endpoint));
+ s3ClientBuilder.endpointOverride(new URI(s3Config.getEndpoint()));
} catch (URISyntaxException e) {
throw new RuntimeException(e);
}
@@ -145,11 +157,10 @@ public class S3PinotFS extends BasePinotFS {
*/
public void init(S3Client s3Client, String serverSideEncryption,
PinotConfiguration serverSideEncryptionConfig) {
_s3Client = s3Client;
- setServerSideEncryption(serverSideEncryption, serverSideEncryptionConfig);
+ setServerSideEncryption(serverSideEncryption, new
S3Config(serverSideEncryptionConfig));
}
- private void setServerSideEncryption(@Nullable String serverSideEncryption,
- PinotConfiguration serverSideEncryptionConfig) {
+ private void setServerSideEncryption(@Nullable String serverSideEncryption,
S3Config s3Config) {
if (serverSideEncryption != null) {
try {
_serverSideEncryption =
ServerSideEncryption.valueOf(serverSideEncryption);
@@ -160,12 +171,12 @@ public class S3PinotFS extends BasePinotFS {
}
switch (_serverSideEncryption) {
case AWS_KMS:
- _ssekmsKeyId =
serverSideEncryptionConfig.getProperty(SSE_KMS_KEY_ID_CONFIG_KEY);
+ _ssekmsKeyId = s3Config.getSseKmsKeyId();
if (_ssekmsKeyId == null) {
throw new UnsupportedOperationException(
"Missing required config: 'sseKmsKeyId' when AWS_KMS is used
for server side encryption");
}
- _ssekmsEncryptionContext =
serverSideEncryptionConfig.getProperty(SSE_KMS_ENCRYPTION_CONTEXT_CONFIG_KEY);
+ _ssekmsEncryptionContext = s3Config.getSsekmsEncryptionContext();
break;
case AES256:
// Todo: Support AES256.
@@ -175,10 +186,6 @@ public class S3PinotFS extends BasePinotFS {
}
}
- boolean isNullOrEmpty(String target) {
- return target == null || target.isEmpty();
- }
-
private HeadObjectResponse getS3ObjectMetadata(URI uri)
throws IOException {
URI base = getBase(uri);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]