This is an automated email from the ASF dual-hosted git repository.
kharekartik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new cb6ea77eb11 Fix AWS SDK credential provider thread leak in S3PinotFS
(#17869)
cb6ea77eb11 is described below
commit cb6ea77eb1133137359270a4ef0274b7d1a769fc
Author: Suvodeep Pyne <[email protected]>
AuthorDate: Thu Mar 12 22:26:31 2026 -0700
Fix AWS SDK credential provider thread leak in S3PinotFS (#17869)
StsClient and StsAssumeRoleCredentialsProvider were created as local
variables in initOrRefreshS3Client(), never stored as fields, and never
closed on refresh or in close(). Each leaked provider spawned sdk-cache-*
background threads that accumulated indefinitely (observed: 91 threads
over 4.6 days on a production controller).
- Store StsClient and StsAssumeRoleCredentialsProvider as instance fields
- Close old credential resources after building new client (not before,
to avoid a window where concurrent S3 operations hit a closed provider)
- Synchronize initOrRefreshS3Client() and close() on _clientLock to
prevent races between credential refresh and shutdown
- Add closeQuietly() helper for safe resource cleanup
---
.../apache/pinot/plugin/filesystem/S3PinotFS.java | 153 +++++++++++++--------
1 file changed, 95 insertions(+), 58 deletions(-)
diff --git
a/pinot-plugins/pinot-file-system/pinot-s3/src/main/java/org/apache/pinot/plugin/filesystem/S3PinotFS.java
b/pinot-plugins/pinot-file-system/pinot-s3/src/main/java/org/apache/pinot/plugin/filesystem/S3PinotFS.java
index f5e29a36e06..2700ea46e3f 100644
---
a/pinot-plugins/pinot-file-system/pinot-s3/src/main/java/org/apache/pinot/plugin/filesystem/S3PinotFS.java
+++
b/pinot-plugins/pinot-file-system/pinot-s3/src/main/java/org/apache/pinot/plugin/filesystem/S3PinotFS.java
@@ -113,6 +113,19 @@ public class S3PinotFS extends BasePinotFS {
private long _minObjectSizeToUploadInParts;
private long _multiPartUploadPartSize;
private @Nullable StorageClass _storageClass;
+ private StsClient _stsClient;
+ private StsAssumeRoleCredentialsProvider _stsCredentialsProvider;
+ private final Object _clientLock = new Object();
+
+ private static void closeQuietly(AutoCloseable closeable, String name) {
+ if (closeable != null) {
+ try {
+ closeable.close();
+ } catch (Exception e) {
+ LOGGER.warn("Error closing {}", name, e);
+ }
+ }
+ }
@Override
public void init(PinotConfiguration config) {
@@ -121,74 +134,89 @@ public class S3PinotFS extends BasePinotFS {
}
public void initOrRefreshS3Client() {
- Preconditions.checkArgument(StringUtils.isNotEmpty(_s3Config.getRegion()),
"Region can't be null or empty");
+ synchronized (_clientLock) {
+
Preconditions.checkArgument(StringUtils.isNotEmpty(_s3Config.getRegion()),
"Region can't be null or empty");
- _disableAcl = _s3Config.getDisableAcl();
- setServerSideEncryption(_s3Config.getServerSideEncryption(), _s3Config);
+ _disableAcl = _s3Config.getDisableAcl();
+ setServerSideEncryption(_s3Config.getServerSideEncryption(), _s3Config);
- AwsCredentialsProvider awsCredentialsProvider;
- try {
- if (StringUtils.isNotEmpty(_s3Config.getAccessKey()) &&
StringUtils.isNotEmpty(_s3Config.getSecretKey())) {
- AwsBasicCredentials awsBasicCredentials =
- AwsBasicCredentials.create(_s3Config.getAccessKey(),
_s3Config.getSecretKey());
- awsCredentialsProvider =
StaticCredentialsProvider.create(awsBasicCredentials);
- } else if (_s3Config.isAnonymousCredentialsProvider()) {
- awsCredentialsProvider = AnonymousCredentialsProvider.create();
- } else {
- awsCredentialsProvider = DefaultCredentialsProvider.builder().build();
- }
+ // Save old resources to close after the new client is live
+ S3Client oldS3Client = _s3Client;
+ StsAssumeRoleCredentialsProvider oldStsCredentialsProvider =
_stsCredentialsProvider;
+ StsClient oldStsClient = _stsClient;
- // IAM Role based access
- if (_s3Config.isIamRoleBasedAccess()) {
- AssumeRoleRequest.Builder assumeRoleRequestBuilder =
-
AssumeRoleRequest.builder().roleArn(_s3Config.getRoleArn()).roleSessionName(_s3Config.getRoleSessionName())
- .durationSeconds(_s3Config.getSessionDurationSeconds());
- AssumeRoleRequest assumeRoleRequest;
- if (StringUtils.isNotEmpty(_s3Config.getExternalId())) {
- assumeRoleRequest =
assumeRoleRequestBuilder.externalId(_s3Config.getExternalId()).build();
+ AwsCredentialsProvider awsCredentialsProvider;
+ try {
+ if (StringUtils.isNotEmpty(_s3Config.getAccessKey()) &&
StringUtils.isNotEmpty(_s3Config.getSecretKey())) {
+ AwsBasicCredentials awsBasicCredentials =
+ AwsBasicCredentials.create(_s3Config.getAccessKey(),
_s3Config.getSecretKey());
+ awsCredentialsProvider =
StaticCredentialsProvider.create(awsBasicCredentials);
+ } else if (_s3Config.isAnonymousCredentialsProvider()) {
+ awsCredentialsProvider = AnonymousCredentialsProvider.create();
} else {
- assumeRoleRequest = assumeRoleRequestBuilder.build();
+ awsCredentialsProvider =
DefaultCredentialsProvider.builder().build();
}
- StsClient stsClient =
-
StsClient.builder().region(Region.of(_s3Config.getRegion())).credentialsProvider(awsCredentialsProvider)
- .build();
- awsCredentialsProvider =
-
StsAssumeRoleCredentialsProvider.builder().stsClient(stsClient).refreshRequest(assumeRoleRequest)
-
.asyncCredentialUpdateEnabled(_s3Config.isAsyncSessionUpdateEnabled()).build();
- }
- S3ClientBuilder s3ClientBuilder =
S3Client.builder().forcePathStyle(true).region(Region.of(_s3Config.getRegion()))
-
.credentialsProvider(awsCredentialsProvider).crossRegionAccessEnabled(_s3Config.isCrossRegionAccessEnabled());
- if (StringUtils.isNotEmpty(_s3Config.getEndpoint())) {
- try {
- s3ClientBuilder.endpointOverride(new URI(_s3Config.getEndpoint()));
- } catch (URISyntaxException e) {
- throw new RuntimeException(e);
+ // IAM Role based access
+ if (_s3Config.isIamRoleBasedAccess()) {
+ AssumeRoleRequest.Builder assumeRoleRequestBuilder =
+ AssumeRoleRequest.builder().roleArn(_s3Config.getRoleArn())
+ .roleSessionName(_s3Config.getRoleSessionName())
+ .durationSeconds(_s3Config.getSessionDurationSeconds());
+ AssumeRoleRequest assumeRoleRequest;
+ if (StringUtils.isNotEmpty(_s3Config.getExternalId())) {
+ assumeRoleRequest =
assumeRoleRequestBuilder.externalId(_s3Config.getExternalId()).build();
+ } else {
+ assumeRoleRequest = assumeRoleRequestBuilder.build();
+ }
+ _stsClient =
StsClient.builder().region(Region.of(_s3Config.getRegion()))
+ .credentialsProvider(awsCredentialsProvider).build();
+ _stsCredentialsProvider =
StsAssumeRoleCredentialsProvider.builder().stsClient(_stsClient)
+ .refreshRequest(assumeRoleRequest)
+
.asyncCredentialUpdateEnabled(_s3Config.isAsyncSessionUpdateEnabled()).build();
+ awsCredentialsProvider = _stsCredentialsProvider;
}
- }
- if (_s3Config.getHttpClientBuilder() != null) {
- s3ClientBuilder.httpClientBuilder(_s3Config.getHttpClientBuilder());
- }
- if (_s3Config.getStorageClass() != null) {
- _storageClass = StorageClass.fromValue(_s3Config.getStorageClass());
- assert (_storageClass != StorageClass.UNKNOWN_TO_SDK_VERSION);
- }
+ S3ClientBuilder s3ClientBuilder =
+
S3Client.builder().forcePathStyle(true).region(Region.of(_s3Config.getRegion()))
+ .credentialsProvider(awsCredentialsProvider)
+
.crossRegionAccessEnabled(_s3Config.isCrossRegionAccessEnabled());
+ if (StringUtils.isNotEmpty(_s3Config.getEndpoint())) {
+ try {
+ s3ClientBuilder.endpointOverride(new URI(_s3Config.getEndpoint()));
+ } catch (URISyntaxException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ if (_s3Config.getHttpClientBuilder() != null) {
+ s3ClientBuilder.httpClientBuilder(_s3Config.getHttpClientBuilder());
+ }
- if (_s3Config.getRequestChecksumCalculationWhenRequired() ==
RequestChecksumCalculation.WHEN_REQUIRED) {
-
s3ClientBuilder.responseChecksumValidation(ResponseChecksumValidation.WHEN_REQUIRED);
- }
- if (_s3Config.getResponseChecksumValidationWhenRequired() ==
ResponseChecksumValidation.WHEN_REQUIRED) {
-
s3ClientBuilder.requestChecksumCalculation(RequestChecksumCalculation.WHEN_REQUIRED);
- }
- if (_s3Config.useLegacyMd5Plugin()) {
- s3ClientBuilder.addPlugin(LegacyMd5Plugin.create());
+ if (_s3Config.getStorageClass() != null) {
+ _storageClass = StorageClass.fromValue(_s3Config.getStorageClass());
+ assert (_storageClass != StorageClass.UNKNOWN_TO_SDK_VERSION);
+ }
+
+ if (_s3Config.getRequestChecksumCalculationWhenRequired() ==
RequestChecksumCalculation.WHEN_REQUIRED) {
+
s3ClientBuilder.responseChecksumValidation(ResponseChecksumValidation.WHEN_REQUIRED);
+ }
+ if (_s3Config.getResponseChecksumValidationWhenRequired() ==
ResponseChecksumValidation.WHEN_REQUIRED) {
+
s3ClientBuilder.requestChecksumCalculation(RequestChecksumCalculation.WHEN_REQUIRED);
+ }
+ if (_s3Config.useLegacyMd5Plugin()) {
+ s3ClientBuilder.addPlugin(LegacyMd5Plugin.create());
+ }
+
+ _s3Client = s3ClientBuilder.build();
+ setMultiPartUploadConfigs(_s3Config);
+ } catch (S3Exception e) {
+ throw new RuntimeException("Could not initialize S3PinotFS", e);
}
- _s3Client = s3ClientBuilder.build();
- setMultiPartUploadConfigs(_s3Config);
- } catch (S3Exception e) {
- throw new RuntimeException("Could not initialize S3PinotFS", e);
+ // Close old resources after new client is live (order: provider → STS
client → S3 client)
+ closeQuietly(oldStsCredentialsProvider, "oldStsCredentialsProvider");
+ closeQuietly(oldStsClient, "oldStsClient");
+ closeQuietly(oldS3Client, "oldS3Client");
}
}
@@ -974,7 +1002,16 @@ public class S3PinotFS extends BasePinotFS {
@Override
public void close()
throws IOException {
- _s3Client.close();
+ synchronized (_clientLock) {
+ closeQuietly(_stsCredentialsProvider, "STS credentials provider");
+ _stsCredentialsProvider = null;
+
+ closeQuietly(_stsClient, "STS client");
+ _stsClient = null;
+
+ closeQuietly(_s3Client, "S3 client");
+ _s3Client = null;
+ }
super.close();
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]