This is an automated email from the ASF dual-hosted git repository.
gianm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new a9469502238 feat: consolidate S3 client creation and enable ARN role
for MSQ export (#19317)
a9469502238 is described below
commit a946950223808e6adb1f335a727637319d43ffba
Author: Cece Mei <[email protected]>
AuthorDate: Thu May 21 21:46:03 2026 -0700
feat: consolidate S3 client creation and enable ARN role for MSQ export
(#19317)
---
docs/multi-stage-query/reference.md | 4 +-
docs/tutorials/tutorial-extern.md | 2 +-
.../apache/druid/data/input/s3/S3InputSource.java | 151 ++--------------
.../druid/storage/s3/S3StorageDruidModule.java | 86 +--------
.../storage/s3/ServerSideEncryptingAmazonS3.java | 195 +++++++++++++++++++++
.../storage/s3/output/S3ExportStorageProvider.java | 72 +++++++-
.../druid/data/input/s3/S3InputSourceTest.java | 104 +++--------
.../s3/output/S3ExportStorageProviderTest.java | 2 +-
8 files changed, 311 insertions(+), 305 deletions(-)
diff --git a/docs/multi-stage-query/reference.md
b/docs/multi-stage-query/reference.md
index b2311382052..6fb63924ee7 100644
--- a/docs/multi-stage-query/reference.md
+++ b/docs/multi-stage-query/reference.md
@@ -138,7 +138,7 @@ Pass all arguments for `s3()` as named parameters with
their values enclosed in
```sql
INSERT INTO
EXTERN(
- s3(bucket => 'your_bucket', prefix => 'prefix/to/files')
+ s3(bucket => 'your_bucket', prefix => 'prefix/to/files', assumeRoleArn =>
'arn:aws:iam::some-role')
)
AS CSV
SELECT
@@ -152,6 +152,8 @@ Supported arguments for the function:
|---|---|---|---|
| `bucket` | Yes | S3 bucket destination for exported files. You must add the
bucket and prefix combination to the
`druid.export.storage.s3.allowedExportPaths` allow list. | n/a |
| `prefix` | Yes | Destination path in the bucket to create exported files.
The export query expects the destination path to be empty. If the location
includes other files, the query will fail. You must add the bucket and prefix
combination to the `druid.export.storage.s3.allowedExportPaths` allow list. |
n/a |
+| `assumeRoleArn` | No | ARN of the role to assume before exporting data. If
not provided, the default credentials configured for the Druid process are
used. | n/a |
+| `assumeRoleExternalId` | No | External ID to use when assuming the role
specified in assumeRoleArn. This provides an additional layer of security for
role assumption. Only used when assumeRoleArn is set. | n/a |
Configure the following runtime parameters to export to an S3 destination:
diff --git a/docs/tutorials/tutorial-extern.md
b/docs/tutorials/tutorial-extern.md
index a58ed13a67f..d6924e5b967 100644
--- a/docs/tutorials/tutorial-extern.md
+++ b/docs/tutorials/tutorial-extern.md
@@ -167,7 +167,7 @@ Druid supports Amazon S3 or Google Cloud Storage (GCS) as
cloud storage destinat
```sql
INSERT INTO
EXTERN(
- s3(bucket => 'your_bucket', prefix => 'prefix/to/files'))
+ s3(bucket => 'your_bucket', prefix => 'prefix/to/files', assumeRoleArn
=> 'arn:aws:iam::some-role'))
AS CSV
SELECT "channel",
SUM("delta") AS "changes"
diff --git
a/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSource.java
b/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSource.java
index 9b85c587297..85cd832d98f 100644
---
a/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSource.java
+++
b/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSource.java
@@ -41,37 +41,21 @@ import
org.apache.druid.data.input.impl.SplittableInputSource;
import org.apache.druid.data.input.impl.systemfield.SystemField;
import org.apache.druid.data.input.impl.systemfield.SystemFields;
import org.apache.druid.java.util.common.RetryUtils;
-import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.storage.s3.S3InputDataConfig;
import org.apache.druid.storage.s3.S3StorageDruidModule;
import org.apache.druid.storage.s3.S3Utils;
import org.apache.druid.storage.s3.ServerSideEncryptingAmazonS3;
-import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
-import software.amazon.awssdk.auth.credentials.AwsSessionCredentials;
-import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
-import software.amazon.awssdk.http.apache.ApacheHttpClient;
-import software.amazon.awssdk.http.apache.ProxyConfiguration;
-import software.amazon.awssdk.regions.Region;
-import software.amazon.awssdk.services.s3.S3Client;
-import software.amazon.awssdk.services.s3.S3ClientBuilder;
-import software.amazon.awssdk.services.s3.S3Configuration;
import software.amazon.awssdk.services.s3.model.HeadObjectResponse;
-import software.amazon.awssdk.services.sts.StsClient;
-import software.amazon.awssdk.services.sts.StsClientBuilder;
-import
software.amazon.awssdk.services.sts.auth.StsAssumeRoleCredentialsProvider;
-import software.amazon.awssdk.services.sts.model.AssumeRoleRequest;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.net.URI;
-import java.time.Duration;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Set;
-import java.util.UUID;
public class S3InputSource extends CloudObjectInputSource
{
@@ -134,72 +118,20 @@ public class S3InputSource extends CloudObjectInputSource
this.awsClientConfig = awsClientConfig;
this.awsEndpointConfig = awsEndpointConfig;
- this.s3ClientSupplier = Suppliers.memoize(
- () -> {
- if (s3ClientBuilder != null && s3InputSourceConfig != null) {
- // Build a custom S3Client with the provided configuration
- S3ClientBuilder customBuilder = S3Client.builder();
-
- // Configure endpoint and region
- if (awsEndpointConfig != null && awsEndpointConfig.getUrl() !=
null) {
- String endpointUrl = awsEndpointConfig.getUrl();
- // Ensure endpoint URL has a scheme
- if (!endpointUrl.startsWith("http://") &&
!endpointUrl.startsWith("https://")) {
- boolean useHttps = S3Utils.useHttps(awsClientConfig,
awsEndpointConfig);
- endpointUrl = S3Utils.ensureEndpointHasScheme(endpointUrl,
useHttps);
- }
- customBuilder.endpointOverride(URI.create(endpointUrl));
- if (awsEndpointConfig.getSigningRegion() != null) {
-
customBuilder.region(Region.of(awsEndpointConfig.getSigningRegion()));
- }
- }
-
- // Configure S3-specific settings
- if (awsClientConfig != null) {
- S3Configuration.Builder s3ConfigBuilder =
S3Configuration.builder()
-
.pathStyleAccessEnabled(awsClientConfig.isEnablePathStyleAccess())
-
.chunkedEncodingEnabled(!awsClientConfig.isDisableChunkedEncoding());
- customBuilder.serviceConfiguration(s3ConfigBuilder.build());
-
customBuilder.crossRegionAccessEnabled(awsClientConfig.isCrossRegionAccessEnabled());
- }
-
- // Configure HTTP client with proxy if needed
- ApacheHttpClient.Builder httpClientBuilder =
ApacheHttpClient.builder();
- if (awsProxyConfig != null && awsProxyConfig.getHost() != null) {
- ProxyConfiguration proxyConfig =
S3Utils.buildProxyConfiguration(awsProxyConfig);
- if (proxyConfig != null) {
- httpClientBuilder.proxyConfiguration(proxyConfig);
- }
- }
- customBuilder.httpClientBuilder(httpClientBuilder);
-
- // Configure credentials
- AwsCredentialsProvider credentialsProvider;
- if (s3InputSourceConfig.isCredentialsConfigured()) {
- credentialsProvider =
createStaticCredentialsProvider(s3InputSourceConfig);
- } else {
- credentialsProvider = awsCredentialsProvider;
- }
-
- // Apply assume role if configured
- if (s3InputSourceConfig.getAssumeRoleArn() != null) {
- credentialsProvider = createAssumeRoleCredentialsProvider(
- s3InputSourceConfig,
- credentialsProvider
- );
- }
-
- customBuilder.credentialsProvider(credentialsProvider);
-
- // Build and wrap in ServerSideEncryptingAmazonS3
- return s3ClientBuilder
- .setS3ClientSupplier(customBuilder::build)
- .build();
- } else {
- return s3Client;
- }
- }
- );
+ this.s3ClientSupplier = Suppliers.memoize(() -> {
+ if (s3ClientBuilder == null || s3InputSourceConfig == null) {
+ return s3Client;
+ }
+ return ServerSideEncryptingAmazonS3.builder(
+ awsCredentialsProvider,
+ s3ClientBuilder.getS3StorageConfig(),
+ awsProxyConfig,
+ awsEndpointConfig,
+ awsClientConfig,
+ s3InputSourceConfig,
+ null
+ ).build();
+ });
this.maxRetries = RetryUtils.DEFAULT_MAX_TRIES;
}
@@ -278,61 +210,6 @@ public class S3InputSource extends CloudObjectInputSource
return Collections.singleton(TYPE_KEY);
}
- private AwsCredentialsProvider createAssumeRoleCredentialsProvider(
- S3InputSourceConfig s3InputSourceConfig,
- AwsCredentialsProvider baseCredentialsProvider
- )
- {
- String assumeRoleArn = s3InputSourceConfig.getAssumeRoleArn();
- String roleSessionName = StringUtils.format("druid-s3-input-source-%s",
UUID.randomUUID().toString());
-
- StsClientBuilder stsBuilder = StsClient.builder()
- .credentialsProvider(baseCredentialsProvider);
-
- // If we have endpoint config, use its region for STS too
- if (awsEndpointConfig != null && awsEndpointConfig.getSigningRegion() !=
null) {
- stsBuilder.region(Region.of(awsEndpointConfig.getSigningRegion()));
- }
-
- StsClient stsClient = stsBuilder.build();
-
- AssumeRoleRequest.Builder assumeRoleRequestBuilder =
AssumeRoleRequest.builder()
- .roleArn(assumeRoleArn)
- .roleSessionName(roleSessionName)
- .durationSeconds(3600);
-
- if (s3InputSourceConfig.getAssumeRoleExternalId() != null) {
-
assumeRoleRequestBuilder.externalId(s3InputSourceConfig.getAssumeRoleExternalId());
- }
-
- return StsAssumeRoleCredentialsProvider.builder()
- .stsClient(stsClient)
- .refreshRequest(assumeRoleRequestBuilder.build())
- .asyncCredentialUpdateEnabled(true)
- .staleTime(Duration.ofMinutes(3))
- .build();
- }
-
- @Nonnull
- private StaticCredentialsProvider
createStaticCredentialsProvider(S3InputSourceConfig s3InputSourceConfig)
- {
- if (s3InputSourceConfig.getSessionToken() != null) {
- AwsSessionCredentials sessionCredentials = AwsSessionCredentials.create(
- s3InputSourceConfig.getAccessKeyId().getPassword(),
- s3InputSourceConfig.getSecretAccessKey().getPassword(),
- s3InputSourceConfig.getSessionToken().getPassword()
- );
- return StaticCredentialsProvider.create(sessionCredentials);
- } else {
- return StaticCredentialsProvider.create(
- AwsBasicCredentials.create(
- s3InputSourceConfig.getAccessKeyId().getPassword(),
- s3InputSourceConfig.getSecretAccessKey().getPassword()
- )
- );
- }
- }
-
@Nullable
@JsonProperty("properties")
@JsonInclude(JsonInclude.Include.NON_NULL)
diff --git
a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3StorageDruidModule.java
b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3StorageDruidModule.java
index 27fc537e34b..a0c7f986655 100644
---
a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3StorageDruidModule.java
+++
b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3StorageDruidModule.java
@@ -39,17 +39,9 @@ import org.apache.druid.initialization.DruidModule;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.logger.Logger;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
-import software.amazon.awssdk.http.apache.ApacheHttpClient;
-import software.amazon.awssdk.http.apache.ProxyConfiguration;
import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
import software.amazon.awssdk.http.crt.AwsCrtAsyncHttpClient;
import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient;
-import software.amazon.awssdk.regions.Region;
-import software.amazon.awssdk.services.s3.S3AsyncClient;
-import software.amazon.awssdk.services.s3.S3AsyncClientBuilder;
-import software.amazon.awssdk.services.s3.S3Client;
-import software.amazon.awssdk.services.s3.S3ClientBuilder;
-import software.amazon.awssdk.services.s3.S3Configuration;
import javax.annotation.Nullable;
import java.net.URI;
@@ -150,75 +142,15 @@ public class S3StorageDruidModule implements DruidModule
: ""
);
}
-
- final boolean useHttps = S3Utils.useHttps(clientConfig, endpointConfig);
- final URI endpointOverride = buildEndpointOverride(endpointConfig,
useHttps);
- final Region region =
StringUtils.isNotEmpty(endpointConfig.getSigningRegion())
- ? Region.of(endpointConfig.getSigningRegion())
- : null;
-
- final Supplier<S3Client> s3ClientSupplier = () -> {
- // Build HTTP client with proxy configuration
- ApacheHttpClient.Builder httpClientBuilder = ApacheHttpClient.builder()
-
.connectionTimeout(Duration.ofMillis(clientConfig.getConnectionTimeoutMillis()))
-
.socketTimeout(Duration.ofMillis(clientConfig.getSocketTimeoutMillis()))
- .maxConnections(clientConfig.getMaxConnections());
-
- ProxyConfiguration proxyConfiguration =
S3Utils.buildProxyConfiguration(proxyConfig);
- if (proxyConfiguration != null) {
- httpClientBuilder.proxyConfiguration(proxyConfiguration);
- }
-
- // Build S3 configuration
- // Note: forcePathStyle is configured on the S3ClientBuilder, not in
S3Configuration
- S3Configuration s3Configuration = S3Configuration.builder()
- .chunkedEncodingEnabled(!clientConfig.isDisableChunkedEncoding())
- .build();
-
- S3ClientBuilder s3ClientBuilder = S3Client.builder()
- .credentialsProvider(provider)
- .httpClientBuilder(httpClientBuilder)
- .serviceConfiguration(s3Configuration)
- .forcePathStyle(clientConfig.isEnablePathStyleAccess())
- .crossRegionAccessEnabled(clientConfig.isCrossRegionAccessEnabled());
-
- if (endpointOverride != null) {
- s3ClientBuilder.endpointOverride(endpointOverride);
- }
-
- if (region != null) {
- s3ClientBuilder.region(region);
- }
-
- return s3ClientBuilder.build();
- };
-
- // Create async client supplier for S3TransferManager
- final AsyncHttpClientType asyncHttpClientType =
-
AsyncHttpClientType.fromString(storageConfig.getS3TransferConfig().getAsyncHttpClientType());
- final Supplier<S3AsyncClient> s3AsyncClientSupplier = () -> {
- S3AsyncClientBuilder s3AsyncClientBuilder = S3AsyncClient.builder()
- .credentialsProvider(provider)
- .httpClientBuilder(asyncHttpClientType.buildBuilder(clientConfig))
- .forcePathStyle(clientConfig.isEnablePathStyleAccess())
- .crossRegionAccessEnabled(clientConfig.isCrossRegionAccessEnabled())
- .multipartEnabled(true);
-
- if (endpointOverride != null) {
- s3AsyncClientBuilder.endpointOverride(endpointOverride);
- }
-
- if (region != null) {
- s3AsyncClientBuilder.region(region);
- }
-
- return s3AsyncClientBuilder.build();
- };
-
- return ServerSideEncryptingAmazonS3.builder()
- .setS3ClientSupplier(s3ClientSupplier)
-
.setS3AsyncClientSupplier(s3AsyncClientSupplier)
- .setS3StorageConfig(storageConfig);
+ return ServerSideEncryptingAmazonS3.builder(
+ provider,
+ storageConfig,
+ proxyConfig,
+ endpointConfig,
+ clientConfig,
+ null,
+ null
+ );
}
public enum AsyncHttpClientType
diff --git
a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/ServerSideEncryptingAmazonS3.java
b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/ServerSideEncryptingAmazonS3.java
index b8b76d0cf5a..b5b926ec71c 100644
---
a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/ServerSideEncryptingAmazonS3.java
+++
b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/ServerSideEncryptingAmazonS3.java
@@ -19,13 +19,32 @@
package org.apache.druid.storage.s3;
+import com.google.common.base.Strings;
+import org.apache.druid.common.aws.AWSClientConfig;
+import org.apache.druid.common.aws.AWSEndpointConfig;
+import org.apache.druid.common.aws.AWSProxyConfig;
+import org.apache.druid.data.input.s3.S3InputSourceConfig;
+import org.apache.druid.error.DruidException;
import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.storage.s3.S3StorageDruidModule.AsyncHttpClientType;
+import org.apache.druid.storage.s3.output.S3ExportStorageProvider;
+import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
+import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.AwsSessionCredentials;
+import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.core.ResponseInputStream;
import software.amazon.awssdk.core.exception.SdkClientException;
import software.amazon.awssdk.core.sync.RequestBody;
+import software.amazon.awssdk.http.apache.ApacheHttpClient;
+import software.amazon.awssdk.http.apache.ProxyConfiguration;
+import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3AsyncClient;
+import software.amazon.awssdk.services.s3.S3AsyncClientBuilder;
import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.S3ClientBuilder;
+import software.amazon.awssdk.services.s3.S3Configuration;
import software.amazon.awssdk.services.s3.model.AbortMultipartUploadRequest;
import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest;
import
software.amazon.awssdk.services.s3.model.CompleteMultipartUploadResponse;
@@ -52,13 +71,21 @@ import software.amazon.awssdk.services.s3.model.S3Exception;
import software.amazon.awssdk.services.s3.model.Type;
import software.amazon.awssdk.services.s3.model.UploadPartRequest;
import software.amazon.awssdk.services.s3.model.UploadPartResponse;
+import software.amazon.awssdk.services.sts.StsClient;
+import software.amazon.awssdk.services.sts.StsClientBuilder;
+import
software.amazon.awssdk.services.sts.auth.StsAssumeRoleCredentialsProvider;
+import software.amazon.awssdk.services.sts.model.AssumeRoleRequest;
import software.amazon.awssdk.transfer.s3.S3TransferManager;
import software.amazon.awssdk.transfer.s3.model.FileUpload;
import software.amazon.awssdk.transfer.s3.model.UploadFileRequest;
+import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.io.File;
import java.io.InputStream;
+import java.net.URI;
+import java.time.Duration;
+import java.util.UUID;
import java.util.function.Supplier;
/**
@@ -286,6 +313,161 @@ public class ServerSideEncryptingAmazonS3
}
}
+ public static ServerSideEncryptingAmazonS3.Builder builder(
+ AwsCredentialsProvider awsCredentialsProvider,
+ S3StorageConfig s3StorageConfig,
+ @Nullable AWSProxyConfig awsProxyConfig,
+ @Nullable AWSEndpointConfig awsEndpointConfig,
+ @Nullable AWSClientConfig awsClientConfig,
+ @Nullable S3InputSourceConfig s3InputSourceConfig,
+ @Nullable S3ExportStorageProvider s3ExportStorageProvider
+ )
+ {
+ if (s3InputSourceConfig != null && s3ExportStorageProvider != null) {
+ throw DruidException.defensive("Cannot set both s3InputSourceConfig and
s3ExportStorageProvider!");
+ }
+ final String assumeRoleArn;
+ final String assumeRoleExternalId;
+ if (s3InputSourceConfig != null) {
+ assumeRoleArn = s3InputSourceConfig.getAssumeRoleArn();
+ assumeRoleExternalId = s3InputSourceConfig.getAssumeRoleExternalId();
+ } else if (s3ExportStorageProvider != null) {
+ assumeRoleArn = s3ExportStorageProvider.getAssumeRoleArn();
+ assumeRoleExternalId = s3ExportStorageProvider.getAssumeRoleExternalId();
+ } else {
+ assumeRoleArn = null;
+ assumeRoleExternalId = null;
+ }
+
+ // Build a custom S3Client with the provided configuration
+ S3ClientBuilder clientBuilder = S3Client.builder();
+ S3AsyncClientBuilder asyncClientBuilder = S3AsyncClient.builder();
+
+ // Configure endpoint and region
+ if (awsEndpointConfig != null) {
+ if (!Strings.isNullOrEmpty(awsEndpointConfig.getUrl())) {
+ String endpointUrl = awsEndpointConfig.getUrl();
+ // Ensure endpoint URL has a scheme
+ if (!endpointUrl.startsWith("http://") &&
!endpointUrl.startsWith("https://")) {
+ boolean useHttps = S3Utils.useHttps(awsClientConfig,
awsEndpointConfig);
+ endpointUrl = S3Utils.ensureEndpointHasScheme(endpointUrl, useHttps);
+ }
+ URI endpointOverride = URI.create(endpointUrl);
+ clientBuilder.endpointOverride(endpointOverride);
+ asyncClientBuilder.endpointOverride(endpointOverride);
+ }
+ if (!Strings.isNullOrEmpty(awsEndpointConfig.getSigningRegion())) {
+ Region region = Region.of(awsEndpointConfig.getSigningRegion());
+ clientBuilder.region(region);
+ asyncClientBuilder.region(region);
+ }
+ }
+
+ ApacheHttpClient.Builder httpClientBuilder = ApacheHttpClient.builder();
+
+ // Configure S3-specific settings
+ if (awsClientConfig != null) {
+
httpClientBuilder.connectionTimeout(Duration.ofMillis(awsClientConfig.getConnectionTimeoutMillis()))
+
.socketTimeout(Duration.ofMillis(awsClientConfig.getSocketTimeoutMillis()))
+ .maxConnections(awsClientConfig.getMaxConnections());
+ S3Configuration s3Config = S3Configuration.builder()
+
.chunkedEncodingEnabled(!awsClientConfig.isDisableChunkedEncoding())
+ .build();
+ clientBuilder.serviceConfiguration(s3Config)
+ .forcePathStyle(awsClientConfig.isEnablePathStyleAccess())
+
.crossRegionAccessEnabled(awsClientConfig.isCrossRegionAccessEnabled());
+
asyncClientBuilder.forcePathStyle(awsClientConfig.isEnablePathStyleAccess())
+
.crossRegionAccessEnabled(awsClientConfig.isCrossRegionAccessEnabled())
+
.httpClientBuilder(AsyncHttpClientType.fromString(s3StorageConfig.getS3TransferConfig().getAsyncHttpClientType()).buildBuilder(awsClientConfig))
+ .multipartEnabled(true);
+ }
+
+ // Configure HTTP client with proxy if needed
+ if (awsProxyConfig != null) {
+ ProxyConfiguration proxyConfig =
S3Utils.buildProxyConfiguration(awsProxyConfig);
+ if (proxyConfig != null) {
+ httpClientBuilder.proxyConfiguration(proxyConfig);
+ }
+ }
+ clientBuilder.httpClientBuilder(httpClientBuilder);
+
+ // Configure credentials
+ AwsCredentialsProvider credentialsProvider;
+ if (s3InputSourceConfig != null &&
s3InputSourceConfig.isCredentialsConfigured()) {
+ credentialsProvider =
createStaticCredentialsProvider(s3InputSourceConfig);
+ } else {
+ credentialsProvider = awsCredentialsProvider;
+ }
+
+ // Apply assume role if configured
+ if (!Strings.isNullOrEmpty(assumeRoleArn)) {
+ credentialsProvider = createAssumeRoleCredentialsProvider(
+ assumeRoleArn,
+ assumeRoleExternalId,
+ awsEndpointConfig,
+ credentialsProvider
+ );
+ }
+
+ clientBuilder.credentialsProvider(credentialsProvider);
+ asyncClientBuilder.credentialsProvider(credentialsProvider);
+
+ // Build and wrap in ServerSideEncryptingAmazonS3
+ return ServerSideEncryptingAmazonS3.builder()
+
.setS3ClientSupplier(clientBuilder::build)
+
.setS3AsyncClientSupplier(asyncClientBuilder::build)
+ .setS3StorageConfig(s3StorageConfig);
+ }
+
+ @Nonnull
+ private static StaticCredentialsProvider
createStaticCredentialsProvider(S3InputSourceConfig s3InputSourceConfig)
+ {
+ if (s3InputSourceConfig.getSessionToken() != null) {
+ AwsSessionCredentials sessionCredentials = AwsSessionCredentials.create(
+ s3InputSourceConfig.getAccessKeyId().getPassword(),
+ s3InputSourceConfig.getSecretAccessKey().getPassword(),
+ s3InputSourceConfig.getSessionToken().getPassword()
+ );
+ return StaticCredentialsProvider.create(sessionCredentials);
+ } else {
+ return StaticCredentialsProvider.create(
+ AwsBasicCredentials.create(
+ s3InputSourceConfig.getAccessKeyId().getPassword(),
+ s3InputSourceConfig.getSecretAccessKey().getPassword()
+ )
+ );
+ }
+ }
+
+ public static AwsCredentialsProvider createAssumeRoleCredentialsProvider(
+ String assumeRoleArn,
+ @Nullable String assumeRoleExternalId,
+ @Nullable AWSEndpointConfig awsEndpointConfig,
+ AwsCredentialsProvider baseCredentialsProvider
+ )
+ {
+ String roleSessionName = StringUtils.format("druid-s3-%s",
UUID.randomUUID().toString());
+
+ StsClientBuilder stsBuilder =
StsClient.builder().credentialsProvider(baseCredentialsProvider);
+ // If we have endpoint config, use its region for STS too
+ if (awsEndpointConfig != null && awsEndpointConfig.getSigningRegion() !=
null) {
+ stsBuilder.region(Region.of(awsEndpointConfig.getSigningRegion()));
+ }
+
+ AssumeRoleRequest.Builder assumeRoleRequestBuilder =
+
AssumeRoleRequest.builder().roleArn(assumeRoleArn).roleSessionName(roleSessionName).durationSeconds(3600);
+ if (assumeRoleExternalId != null) {
+ assumeRoleRequestBuilder.externalId(assumeRoleExternalId);
+ }
+
+ return StsAssumeRoleCredentialsProvider.builder()
+ .stsClient(stsBuilder.build())
+
.refreshRequest(assumeRoleRequestBuilder.build())
+ .asyncCredentialUpdateEnabled(true)
+ .staleTime(Duration.ofMinutes(3))
+ .build();
+ }
+
public static class Builder
{
private Supplier<S3Client> s3ClientSupplier;
@@ -316,6 +498,19 @@ public class ServerSideEncryptingAmazonS3
return this.s3StorageConfig;
}
+ /**
+ * Builds a new {@link ServerSideEncryptingAmazonS3} instance.
+ *
+ * <p><b>Resource leak warning:</b> Each instance created by this method
holds internal resources such as thread
+ * pools and connection pools. {@link ServerSideEncryptingAmazonS3} is not
{@link java.io.Closeable}, so there is
+ * currently no way for callers to release these resources when the
instance is no longer needed. Avoid calling
+ * this method repeatedly (e.g., once per file or per task) when a single
shared instance would suffice. Consider
+ * memoizing the result, as {@link
org.apache.druid.data.input.s3.S3InputSource} does, to ensure the client is
+ * created at most once per configuration.
+ *
+ * <p>The long-term fix is to make {@link ServerSideEncryptingAmazonS3}
implement {@link java.io.Closeable} and
+ * arrange for {@code close()} to be called appropriately, but that is a
larger change deferred for the future.
+ */
public ServerSideEncryptingAmazonS3 build()
{
if (s3ClientSupplier == null) {
diff --git
a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3ExportStorageProvider.java
b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3ExportStorageProvider.java
index 129622dfbd7..69f4687282e 100644
---
a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3ExportStorageProvider.java
+++
b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3ExportStorageProvider.java
@@ -22,19 +22,29 @@ package org.apache.druid.storage.s3.output;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
+import com.google.common.base.Strings;
+import com.google.common.base.Supplier;
+import com.google.common.base.Suppliers;
+import org.apache.druid.common.aws.AWSClientConfig;
+import org.apache.druid.common.aws.AWSEndpointConfig;
+import org.apache.druid.common.aws.AWSProxyConfig;
import org.apache.druid.data.input.impl.CloudObjectLocation;
import org.apache.druid.data.input.s3.S3InputSource;
import org.apache.druid.error.DruidException;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.storage.ExportStorageProvider;
import org.apache.druid.storage.StorageConnector;
+import org.apache.druid.storage.s3.S3StorageConfig;
import org.apache.druid.storage.s3.S3StorageDruidModule;
import org.apache.druid.storage.s3.ServerSideEncryptingAmazonS3;
+import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
+import javax.annotation.Nullable;
import javax.validation.constraints.NotNull;
import java.io.File;
import java.net.URI;
@@ -51,10 +61,30 @@ public class S3ExportStorageProvider implements
ExportStorageProvider
@JsonProperty
private final String prefix;
+ @Nullable
+ @JsonProperty
+ private final String assumeRoleArn;
+ @Nullable
+ @JsonProperty
+ private final String assumeRoleExternalId;
+
+ private final Supplier<ServerSideEncryptingAmazonS3> s3ClientSupplier;
+
@JacksonInject
S3ExportConfig s3ExportConfig;
@JacksonInject
- ServerSideEncryptingAmazonS3 s3;
+ ServerSideEncryptingAmazonS3 s3Client;
+
+ @JacksonInject
+ S3StorageConfig s3StorageConfig;
+ @JacksonInject
+ AWSProxyConfig awsProxyConfig;
+ @JacksonInject
+ AWSEndpointConfig awsEndpointConfig;
+ @JacksonInject
+ AWSClientConfig awsClientConfig;
+ @JacksonInject
+ AwsCredentialsProvider baseCredentialsProvider;
@JacksonInject
S3UploadManager s3UploadManager;
@@ -62,14 +92,32 @@ public class S3ExportStorageProvider implements
ExportStorageProvider
@JsonCreator
public S3ExportStorageProvider(
@JsonProperty(value = "bucket", required = true) String bucket,
- @JsonProperty(value = "prefix", required = true) String prefix
+ @JsonProperty(value = "prefix", required = true) String prefix,
+ @Nullable @JsonProperty(value = "assumeRoleArn") String assumeRoleArn,
+ @Nullable @JsonProperty(value = "assumeRoleExternalId") String
assumeRoleExternalId
+
)
{
this.bucket = bucket;
this.prefix = prefix;
+ this.assumeRoleArn = assumeRoleArn;
+ this.assumeRoleExternalId = assumeRoleExternalId;
+ this.s3ClientSupplier = Suppliers.memoize(() -> {
+ if (Strings.isNullOrEmpty(assumeRoleArn)) {
+ return s3Client;
+ }
+ return ServerSideEncryptingAmazonS3.builder(
+ baseCredentialsProvider,
+ s3StorageConfig,
+ awsProxyConfig,
+ awsEndpointConfig,
+ awsClientConfig,
+ null,
+ this
+ ).build();
+ });
}
-
@Override
public StorageConnector createStorageConnector(File taskTempDir)
{
@@ -93,7 +141,7 @@ public class S3ExportStorageProvider implements
ExportStorageProvider
s3ExportConfig.getChunkSize(),
s3ExportConfig.getMaxRetry()
);
- return new S3StorageConnector(s3OutputConfig, s3, s3UploadManager);
+ return new S3StorageConnector(s3OutputConfig, s3ClientSupplier.get(),
s3UploadManager);
}
@VisibleForTesting
@@ -136,6 +184,22 @@ public class S3ExportStorageProvider implements
ExportStorageProvider
return prefix;
}
+ @Nullable
+ @JsonProperty("assumeRoleArn")
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ public String getAssumeRoleArn()
+ {
+ return assumeRoleArn;
+ }
+
+ @Nullable
+ @JsonProperty("assumeRoleExternalId")
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ public String getAssumeRoleExternalId()
+ {
+ return assumeRoleExternalId;
+ }
+
@Override
@JsonIgnore
public String getResourceType()
diff --git
a/extensions-core/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceTest.java
b/extensions-core/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceTest.java
index 22c2c767355..168216affdc 100644
---
a/extensions-core/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceTest.java
+++
b/extensions-core/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceTest.java
@@ -59,6 +59,7 @@ import org.apache.druid.java.util.common.parsers.JSONPathSpec;
import org.apache.druid.metadata.DefaultPasswordProvider;
import org.apache.druid.storage.s3.NoopServerSideEncryption;
import org.apache.druid.storage.s3.S3InputDataConfig;
+import org.apache.druid.storage.s3.S3StorageConfig;
import org.apache.druid.storage.s3.S3TransferConfig;
import org.apache.druid.storage.s3.S3Utils;
import org.apache.druid.storage.s3.ServerSideEncryptingAmazonS3;
@@ -110,7 +111,10 @@ public class S3InputSourceTest extends
InitializedNullHandlingTest
public static final S3Client S3_CLIENT = EasyMock.createMock(S3Client.class);
public static final ServerSideEncryptingAmazonS3.Builder
SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER =
EasyMock.createMock(ServerSideEncryptingAmazonS3.Builder.class);
- public static final S3ClientBuilder S3_CLIENT_BUILDER = S3Client.builder();
+ public static final S3StorageConfig S3_STORAGE_CONFIG = new S3StorageConfig(
+ new NoopServerSideEncryption(),
+ new S3TransferConfig()
+ );
public static final ServerSideEncryptingAmazonS3 SERVICE = new
ServerSideEncryptingAmazonS3(
S3_CLIENT,
null,
@@ -346,10 +350,7 @@ public class S3InputSourceTest extends
InitializedNullHandlingTest
public void testSerdeWithCloudConfigPropertiesWithKeyAndSecret() throws
Exception
{
EasyMock.reset(SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER);
-
EasyMock.expect(SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER.setS3ClientSupplier(EasyMock.anyObject()))
- .andReturn(SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER);
- EasyMock.expect(SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER.build())
- .andReturn(SERVICE);
+
EasyMock.expect(SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER.getS3StorageConfig()).andStubReturn(S3_STORAGE_CONFIG);
EasyMock.replay(SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER);
final S3InputSource withPrefixes = new S3InputSource(
SERVICE,
@@ -376,10 +377,7 @@ public class S3InputSourceTest extends
InitializedNullHandlingTest
public void testSerdeWithCloudConfigPropertiesWithSessionToken() throws
Exception
{
EasyMock.reset(SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER);
-
EasyMock.expect(SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER.setS3ClientSupplier(EasyMock.anyObject()))
- .andReturn(SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER);
- EasyMock.expect(SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER.build())
- .andReturn(SERVICE);
+
EasyMock.expect(SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER.getS3StorageConfig()).andStubReturn(S3_STORAGE_CONFIG);
EasyMock.replay(SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER);
final S3InputSource withSessionToken = new S3InputSource(
SERVICE,
@@ -450,73 +448,6 @@ public class S3InputSourceTest extends
InitializedNullHandlingTest
Assert.assertNull(inputSourceWithoutSessionToken.getS3InputSourceConfig().getSessionToken());
}
- @Test
- public void testSessionCredentialsUsedWhenSessionTokenProvided() throws
IOException
- {
- // This test verifies that when session token is provided, the
S3InputSource
- // correctly uses BasicSessionCredentials instead of BasicAWSCredentials
- EasyMock.reset(S3_CLIENT);
- expectListObjects(PREFIXES.get(0), ImmutableList.of(EXPECTED_URIS.get(0)),
CONTENT);
- expectGetObject(EXPECTED_URIS.get(0));
- EasyMock.replay(S3_CLIENT);
-
- EasyMock.reset(SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER);
-
EasyMock.expect(SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER.setS3ClientSupplier(EasyMock.anyObject()))
- .andReturn(SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER);
- EasyMock.expect(SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER.build())
- .andReturn(SERVICE);
- EasyMock.replay(SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER);
-
- // Create S3InputSource with session token
- S3InputSource inputSource = new S3InputSource(
- SERVICE,
- SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER,
- INPUT_DATA_CONFIG,
- null,
- ImmutableList.of(PREFIXES.get(0)),
- null,
- null,
- CLOUD_CONFIG_PROPERTIES_WITH_SESSION_TOKEN,
- null,
- null,
- null
- );
-
- // Verify session token is set
- Assert.assertNotNull(inputSource.getS3InputSourceConfig());
-
Assert.assertNotNull(inputSource.getS3InputSourceConfig().getSessionToken());
- Assert.assertEquals(
- "mySessionToken",
- inputSource.getS3InputSourceConfig().getSessionToken().getPassword()
- );
-
- // Create a reader which will trigger the s3ClientSupplier and use the
session credentials
- InputRowSchema someSchema = new InputRowSchema(
- new TimestampSpec("time", "auto", null),
- new
DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("dim1",
"dim2"))),
- ColumnsFilter.all()
- );
-
- InputSourceReader reader = inputSource.reader(
- someSchema,
- new CsvInputFormat(ImmutableList.of("time", "dim1", "dim2"), "|",
false, null, 0, null),
- temporaryFolder.newFolder()
- );
-
- // Read data - this exercises the session credentials path
- CloseableIterator<InputRow> iterator = reader.read();
-
- while (iterator.hasNext()) {
- InputRow nextRow = iterator.next();
- Assert.assertEquals(NOW, nextRow.getTimestamp());
- Assert.assertEquals("hello", nextRow.getDimension("dim1").get(0));
- Assert.assertEquals("world", nextRow.getDimension("dim2").get(0));
- }
-
- EasyMock.verify(S3_CLIENT);
- EasyMock.verify(SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER);
- }
-
@Test
public void testGetTypes()
{
@@ -556,6 +487,9 @@ public class S3InputSourceTest extends
InitializedNullHandlingTest
EasyMock.expect(mockAwsClientConfig.isEnablePathStyleAccess()).andStubReturn(false);
EasyMock.expect(mockAwsClientConfig.isCrossRegionAccessEnabled()).andStubReturn(true);
EasyMock.expect(mockAwsClientConfig.getProtocol()).andStubReturn("http");
+
EasyMock.expect(mockAwsClientConfig.getConnectionTimeoutMillis()).andStubReturn(10_000);
+
EasyMock.expect(mockAwsClientConfig.getSocketTimeoutMillis()).andStubReturn(50_000);
+ EasyMock.expect(mockAwsClientConfig.getMaxConnections()).andStubReturn(50);
EasyMock.expect(mockAwsProxyConfig.getHost()).andStubReturn("");
EasyMock.expect(mockAwsProxyConfig.getPort()).andStubReturn(-1);
@@ -563,6 +497,7 @@ public class S3InputSourceTest extends
InitializedNullHandlingTest
EasyMock.expect(mockAwsProxyConfig.getPassword()).andStubReturn("");
EasyMock.expect(mockConfigPropertiesWithoutKeyAndSecret.getAssumeRoleArn()).andStubReturn(null);
+
EasyMock.expect(mockConfigPropertiesWithoutKeyAndSecret.getAssumeRoleExternalId()).andStubReturn(null);
EasyMock.expect(mockConfigPropertiesWithoutKeyAndSecret.isCredentialsConfigured())
.andStubReturn(false);
EasyMock.replay(mockConfigPropertiesWithoutKeyAndSecret);
@@ -571,10 +506,7 @@ public class S3InputSourceTest extends
InitializedNullHandlingTest
EasyMock.replay(mockAwsProxyConfig);
EasyMock.reset(SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER);
-
EasyMock.expect(SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER.setS3ClientSupplier(EasyMock.anyObject()))
- .andReturn(SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER);
- EasyMock.expect(SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER.build())
- .andReturn(SERVICE);
+
EasyMock.expect(SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER.getS3StorageConfig()).andStubReturn(S3_STORAGE_CONFIG);
EasyMock.replay(SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER);
final S3InputSource withPrefixes = new S3InputSource(
SERVICE,
@@ -604,14 +536,18 @@ public class S3InputSourceTest extends
InitializedNullHandlingTest
S3InputSourceConfig mockConfigPropertiesWithoutKeyAndSecret =
EasyMock.createMock(S3InputSourceConfig.class);
EasyMock.reset(mockConfigPropertiesWithoutKeyAndSecret);
EasyMock.expect(mockConfigPropertiesWithoutKeyAndSecret.getAssumeRoleArn()).andStubReturn(null);
+
EasyMock.expect(mockConfigPropertiesWithoutKeyAndSecret.getAssumeRoleExternalId()).andStubReturn(null);
EasyMock.expect(mockConfigPropertiesWithoutKeyAndSecret.isCredentialsConfigured())
.andStubReturn(false);
EasyMock.replay(mockConfigPropertiesWithoutKeyAndSecret);
+
+ S3StorageConfig mockS3StorageConfig =
EasyMock.createMock(S3StorageConfig.class);
+ EasyMock.reset(mockS3StorageConfig);
+
EasyMock.expect(mockS3StorageConfig.getS3TransferConfig()).andStubReturn(new
S3TransferConfig());
+
EasyMock.expect(mockS3StorageConfig.getServerSideEncryption()).andStubReturn(new
NoopServerSideEncryption());
+ EasyMock.replay(mockS3StorageConfig);
EasyMock.reset(SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER);
-
EasyMock.expect(SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER.setS3ClientSupplier(EasyMock.anyObject()))
- .andReturn(SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER);
- EasyMock.expect(SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER.build())
- .andReturn(SERVICE);
+
EasyMock.expect(SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER.getS3StorageConfig()).andStubReturn(mockS3StorageConfig);
EasyMock.replay(SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER);
final S3InputSource withPrefixes = new S3InputSource(
SERVICE,
diff --git
a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/output/S3ExportStorageProviderTest.java
b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/output/S3ExportStorageProviderTest.java
index 0a87c3a4c01..9cb311cec8a 100644
---
a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/output/S3ExportStorageProviderTest.java
+++
b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/output/S3ExportStorageProviderTest.java
@@ -63,7 +63,7 @@ public class S3ExportStorageProviderTest
{
Assert.assertEquals(
"s3://export-bucket/export/table/file1",
- new S3ExportStorageProvider("export-bucket",
"export/table").getFilePathForManifest("file1")
+ new S3ExportStorageProvider("export-bucket", "export/table", null,
null).getFilePathForManifest("file1")
);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]