This is an automated email from the ASF dual-hosted git repository.

kharekartik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new cb6ea77eb11 Fix AWS SDK credential provider thread leak in S3PinotFS 
(#17869)
cb6ea77eb11 is described below

commit cb6ea77eb1133137359270a4ef0274b7d1a769fc
Author: Suvodeep Pyne <[email protected]>
AuthorDate: Thu Mar 12 22:26:31 2026 -0700

    Fix AWS SDK credential provider thread leak in S3PinotFS (#17869)
    
    StsClient and StsAssumeRoleCredentialsProvider were created as local
    variables in initOrRefreshS3Client(), never stored as fields, and never
    closed on refresh or in close(). Each leaked provider spawned sdk-cache-*
    background threads that accumulated indefinitely (observed: 91 threads
    over 4.6 days on a production controller).
    
    - Store StsClient and StsAssumeRoleCredentialsProvider as instance fields
    - Close old credential resources after building new client (not before,
      to avoid a window where concurrent S3 operations hit a closed provider)
    - Synchronize initOrRefreshS3Client() and close() on _clientLock to
      prevent races between credential refresh and shutdown
    - Add closeQuietly() helper for safe resource cleanup
---
 .../apache/pinot/plugin/filesystem/S3PinotFS.java  | 153 +++++++++++++--------
 1 file changed, 95 insertions(+), 58 deletions(-)

diff --git 
a/pinot-plugins/pinot-file-system/pinot-s3/src/main/java/org/apache/pinot/plugin/filesystem/S3PinotFS.java
 
b/pinot-plugins/pinot-file-system/pinot-s3/src/main/java/org/apache/pinot/plugin/filesystem/S3PinotFS.java
index f5e29a36e06..2700ea46e3f 100644
--- 
a/pinot-plugins/pinot-file-system/pinot-s3/src/main/java/org/apache/pinot/plugin/filesystem/S3PinotFS.java
+++ 
b/pinot-plugins/pinot-file-system/pinot-s3/src/main/java/org/apache/pinot/plugin/filesystem/S3PinotFS.java
@@ -113,6 +113,19 @@ public class S3PinotFS extends BasePinotFS {
   private long _minObjectSizeToUploadInParts;
   private long _multiPartUploadPartSize;
   private @Nullable StorageClass _storageClass;
+  private StsClient _stsClient;
+  private StsAssumeRoleCredentialsProvider _stsCredentialsProvider;
+  private final Object _clientLock = new Object();
+
+  private static void closeQuietly(AutoCloseable closeable, String name) {
+    if (closeable != null) {
+      try {
+        closeable.close();
+      } catch (Exception e) {
+        LOGGER.warn("Error closing {}", name, e);
+      }
+    }
+  }
 
   @Override
   public void init(PinotConfiguration config) {
@@ -121,74 +134,89 @@ public class S3PinotFS extends BasePinotFS {
   }
 
   public void initOrRefreshS3Client() {
-    Preconditions.checkArgument(StringUtils.isNotEmpty(_s3Config.getRegion()), 
"Region can't be null or empty");
+    synchronized (_clientLock) {
+      
Preconditions.checkArgument(StringUtils.isNotEmpty(_s3Config.getRegion()), 
"Region can't be null or empty");
 
-    _disableAcl = _s3Config.getDisableAcl();
-    setServerSideEncryption(_s3Config.getServerSideEncryption(), _s3Config);
+      _disableAcl = _s3Config.getDisableAcl();
+      setServerSideEncryption(_s3Config.getServerSideEncryption(), _s3Config);
 
-    AwsCredentialsProvider awsCredentialsProvider;
-    try {
-      if (StringUtils.isNotEmpty(_s3Config.getAccessKey()) && 
StringUtils.isNotEmpty(_s3Config.getSecretKey())) {
-        AwsBasicCredentials awsBasicCredentials =
-            AwsBasicCredentials.create(_s3Config.getAccessKey(), 
_s3Config.getSecretKey());
-        awsCredentialsProvider = 
StaticCredentialsProvider.create(awsBasicCredentials);
-      } else if (_s3Config.isAnonymousCredentialsProvider()) {
-        awsCredentialsProvider = AnonymousCredentialsProvider.create();
-      } else {
-        awsCredentialsProvider = DefaultCredentialsProvider.builder().build();
-      }
+      // Save old resources to close after the new client is live
+      S3Client oldS3Client = _s3Client;
+      StsAssumeRoleCredentialsProvider oldStsCredentialsProvider = 
_stsCredentialsProvider;
+      StsClient oldStsClient = _stsClient;
 
-      // IAM Role based access
-      if (_s3Config.isIamRoleBasedAccess()) {
-        AssumeRoleRequest.Builder assumeRoleRequestBuilder =
-            
AssumeRoleRequest.builder().roleArn(_s3Config.getRoleArn()).roleSessionName(_s3Config.getRoleSessionName())
-                .durationSeconds(_s3Config.getSessionDurationSeconds());
-        AssumeRoleRequest assumeRoleRequest;
-        if (StringUtils.isNotEmpty(_s3Config.getExternalId())) {
-          assumeRoleRequest = 
assumeRoleRequestBuilder.externalId(_s3Config.getExternalId()).build();
+      AwsCredentialsProvider awsCredentialsProvider;
+      try {
+        if (StringUtils.isNotEmpty(_s3Config.getAccessKey()) && 
StringUtils.isNotEmpty(_s3Config.getSecretKey())) {
+          AwsBasicCredentials awsBasicCredentials =
+              AwsBasicCredentials.create(_s3Config.getAccessKey(), 
_s3Config.getSecretKey());
+          awsCredentialsProvider = 
StaticCredentialsProvider.create(awsBasicCredentials);
+        } else if (_s3Config.isAnonymousCredentialsProvider()) {
+          awsCredentialsProvider = AnonymousCredentialsProvider.create();
         } else {
-          assumeRoleRequest = assumeRoleRequestBuilder.build();
+          awsCredentialsProvider = 
DefaultCredentialsProvider.builder().build();
         }
-        StsClient stsClient =
-            
StsClient.builder().region(Region.of(_s3Config.getRegion())).credentialsProvider(awsCredentialsProvider)
-                .build();
-        awsCredentialsProvider =
-            
StsAssumeRoleCredentialsProvider.builder().stsClient(stsClient).refreshRequest(assumeRoleRequest)
-                
.asyncCredentialUpdateEnabled(_s3Config.isAsyncSessionUpdateEnabled()).build();
-      }
 
-      S3ClientBuilder s3ClientBuilder = 
S3Client.builder().forcePathStyle(true).region(Region.of(_s3Config.getRegion()))
-          
.credentialsProvider(awsCredentialsProvider).crossRegionAccessEnabled(_s3Config.isCrossRegionAccessEnabled());
-      if (StringUtils.isNotEmpty(_s3Config.getEndpoint())) {
-        try {
-          s3ClientBuilder.endpointOverride(new URI(_s3Config.getEndpoint()));
-        } catch (URISyntaxException e) {
-          throw new RuntimeException(e);
+        // IAM Role based access
+        if (_s3Config.isIamRoleBasedAccess()) {
+          AssumeRoleRequest.Builder assumeRoleRequestBuilder =
+              AssumeRoleRequest.builder().roleArn(_s3Config.getRoleArn())
+                  .roleSessionName(_s3Config.getRoleSessionName())
+                  .durationSeconds(_s3Config.getSessionDurationSeconds());
+          AssumeRoleRequest assumeRoleRequest;
+          if (StringUtils.isNotEmpty(_s3Config.getExternalId())) {
+            assumeRoleRequest = 
assumeRoleRequestBuilder.externalId(_s3Config.getExternalId()).build();
+          } else {
+            assumeRoleRequest = assumeRoleRequestBuilder.build();
+          }
+          _stsClient = 
StsClient.builder().region(Region.of(_s3Config.getRegion()))
+              .credentialsProvider(awsCredentialsProvider).build();
+          _stsCredentialsProvider = 
StsAssumeRoleCredentialsProvider.builder().stsClient(_stsClient)
+              .refreshRequest(assumeRoleRequest)
+              
.asyncCredentialUpdateEnabled(_s3Config.isAsyncSessionUpdateEnabled()).build();
+          awsCredentialsProvider = _stsCredentialsProvider;
         }
-      }
-      if (_s3Config.getHttpClientBuilder() != null) {
-        s3ClientBuilder.httpClientBuilder(_s3Config.getHttpClientBuilder());
-      }
 
-      if (_s3Config.getStorageClass() != null) {
-        _storageClass = StorageClass.fromValue(_s3Config.getStorageClass());
-        assert (_storageClass != StorageClass.UNKNOWN_TO_SDK_VERSION);
-      }
+        S3ClientBuilder s3ClientBuilder =
+            
S3Client.builder().forcePathStyle(true).region(Region.of(_s3Config.getRegion()))
+                .credentialsProvider(awsCredentialsProvider)
+                
.crossRegionAccessEnabled(_s3Config.isCrossRegionAccessEnabled());
+        if (StringUtils.isNotEmpty(_s3Config.getEndpoint())) {
+          try {
+            s3ClientBuilder.endpointOverride(new URI(_s3Config.getEndpoint()));
+          } catch (URISyntaxException e) {
+            throw new RuntimeException(e);
+          }
+        }
+        if (_s3Config.getHttpClientBuilder() != null) {
+          s3ClientBuilder.httpClientBuilder(_s3Config.getHttpClientBuilder());
+        }
 
-      if (_s3Config.getRequestChecksumCalculationWhenRequired() == 
RequestChecksumCalculation.WHEN_REQUIRED) {
-        
s3ClientBuilder.responseChecksumValidation(ResponseChecksumValidation.WHEN_REQUIRED);
-      }
-      if (_s3Config.getResponseChecksumValidationWhenRequired() == 
ResponseChecksumValidation.WHEN_REQUIRED) {
-        
s3ClientBuilder.requestChecksumCalculation(RequestChecksumCalculation.WHEN_REQUIRED);
-      }
-      if (_s3Config.useLegacyMd5Plugin()) {
-        s3ClientBuilder.addPlugin(LegacyMd5Plugin.create());
+        if (_s3Config.getStorageClass() != null) {
+          _storageClass = StorageClass.fromValue(_s3Config.getStorageClass());
+          assert (_storageClass != StorageClass.UNKNOWN_TO_SDK_VERSION);
+        }
+
+        if (_s3Config.getRequestChecksumCalculationWhenRequired() == 
RequestChecksumCalculation.WHEN_REQUIRED) {
+          
s3ClientBuilder.responseChecksumValidation(ResponseChecksumValidation.WHEN_REQUIRED);
+        }
+        if (_s3Config.getResponseChecksumValidationWhenRequired() == 
ResponseChecksumValidation.WHEN_REQUIRED) {
+          
s3ClientBuilder.requestChecksumCalculation(RequestChecksumCalculation.WHEN_REQUIRED);
+        }
+        if (_s3Config.useLegacyMd5Plugin()) {
+          s3ClientBuilder.addPlugin(LegacyMd5Plugin.create());
+        }
+
+        _s3Client = s3ClientBuilder.build();
+        setMultiPartUploadConfigs(_s3Config);
+      } catch (S3Exception e) {
+        throw new RuntimeException("Could not initialize S3PinotFS", e);
       }
 
-      _s3Client = s3ClientBuilder.build();
-      setMultiPartUploadConfigs(_s3Config);
-    } catch (S3Exception e) {
-      throw new RuntimeException("Could not initialize S3PinotFS", e);
+      // Close old resources after new client is live (order: provider → STS 
client → S3 client)
+      closeQuietly(oldStsCredentialsProvider, "oldStsCredentialsProvider");
+      closeQuietly(oldStsClient, "oldStsClient");
+      closeQuietly(oldS3Client, "oldS3Client");
     }
   }
 
@@ -974,7 +1002,16 @@ public class S3PinotFS extends BasePinotFS {
   @Override
   public void close()
       throws IOException {
-    _s3Client.close();
+    synchronized (_clientLock) {
+      closeQuietly(_stsCredentialsProvider, "STS credentials provider");
+      _stsCredentialsProvider = null;
+
+      closeQuietly(_stsClient, "STS client");
+      _stsClient = null;
+
+      closeQuietly(_s3Client, "S3 client");
+      _s3Client = null;
+    }
     super.close();
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to