xiangfu0 commented on code in PR #17869:
URL: https://github.com/apache/pinot/pull/17869#discussion_r2928248421


##########
pinot-plugins/pinot-file-system/pinot-s3/src/main/java/org/apache/pinot/plugin/filesystem/S3PinotFS.java:
##########
@@ -121,74 +134,89 @@ public void init(PinotConfiguration config) {
   }
 
   public void initOrRefreshS3Client() {
-    Preconditions.checkArgument(StringUtils.isNotEmpty(_s3Config.getRegion()), 
"Region can't be null or empty");
+    synchronized (_clientLock) {
+      
Preconditions.checkArgument(StringUtils.isNotEmpty(_s3Config.getRegion()), 
"Region can't be null or empty");
 
-    _disableAcl = _s3Config.getDisableAcl();
-    setServerSideEncryption(_s3Config.getServerSideEncryption(), _s3Config);
+      _disableAcl = _s3Config.getDisableAcl();
+      setServerSideEncryption(_s3Config.getServerSideEncryption(), _s3Config);
 
-    AwsCredentialsProvider awsCredentialsProvider;
-    try {
-      if (StringUtils.isNotEmpty(_s3Config.getAccessKey()) && 
StringUtils.isNotEmpty(_s3Config.getSecretKey())) {
-        AwsBasicCredentials awsBasicCredentials =
-            AwsBasicCredentials.create(_s3Config.getAccessKey(), 
_s3Config.getSecretKey());
-        awsCredentialsProvider = 
StaticCredentialsProvider.create(awsBasicCredentials);
-      } else if (_s3Config.isAnonymousCredentialsProvider()) {
-        awsCredentialsProvider = AnonymousCredentialsProvider.create();
-      } else {
-        awsCredentialsProvider = DefaultCredentialsProvider.builder().build();
-      }
+      // Save old resources to close after the new client is live
+      S3Client oldS3Client = _s3Client;
+      StsAssumeRoleCredentialsProvider oldStsCredentialsProvider = 
_stsCredentialsProvider;
+      StsClient oldStsClient = _stsClient;
 
-      // IAM Role based access
-      if (_s3Config.isIamRoleBasedAccess()) {
-        AssumeRoleRequest.Builder assumeRoleRequestBuilder =
-            
AssumeRoleRequest.builder().roleArn(_s3Config.getRoleArn()).roleSessionName(_s3Config.getRoleSessionName())
-                .durationSeconds(_s3Config.getSessionDurationSeconds());
-        AssumeRoleRequest assumeRoleRequest;
-        if (StringUtils.isNotEmpty(_s3Config.getExternalId())) {
-          assumeRoleRequest = 
assumeRoleRequestBuilder.externalId(_s3Config.getExternalId()).build();
+      AwsCredentialsProvider awsCredentialsProvider;
+      try {
+        if (StringUtils.isNotEmpty(_s3Config.getAccessKey()) && 
StringUtils.isNotEmpty(_s3Config.getSecretKey())) {
+          AwsBasicCredentials awsBasicCredentials =
+              AwsBasicCredentials.create(_s3Config.getAccessKey(), 
_s3Config.getSecretKey());
+          awsCredentialsProvider = 
StaticCredentialsProvider.create(awsBasicCredentials);
+        } else if (_s3Config.isAnonymousCredentialsProvider()) {
+          awsCredentialsProvider = AnonymousCredentialsProvider.create();
         } else {
-          assumeRoleRequest = assumeRoleRequestBuilder.build();
+          awsCredentialsProvider = 
DefaultCredentialsProvider.builder().build();
         }
-        StsClient stsClient =
-            
StsClient.builder().region(Region.of(_s3Config.getRegion())).credentialsProvider(awsCredentialsProvider)
-                .build();
-        awsCredentialsProvider =
-            
StsAssumeRoleCredentialsProvider.builder().stsClient(stsClient).refreshRequest(assumeRoleRequest)
-                
.asyncCredentialUpdateEnabled(_s3Config.isAsyncSessionUpdateEnabled()).build();
-      }
 
-      S3ClientBuilder s3ClientBuilder = 
S3Client.builder().forcePathStyle(true).region(Region.of(_s3Config.getRegion()))
-          
.credentialsProvider(awsCredentialsProvider).crossRegionAccessEnabled(_s3Config.isCrossRegionAccessEnabled());
-      if (StringUtils.isNotEmpty(_s3Config.getEndpoint())) {
-        try {
-          s3ClientBuilder.endpointOverride(new URI(_s3Config.getEndpoint()));
-        } catch (URISyntaxException e) {
-          throw new RuntimeException(e);
+        // IAM Role based access
+        if (_s3Config.isIamRoleBasedAccess()) {
+          AssumeRoleRequest.Builder assumeRoleRequestBuilder =
+              AssumeRoleRequest.builder().roleArn(_s3Config.getRoleArn())
+                  .roleSessionName(_s3Config.getRoleSessionName())
+                  .durationSeconds(_s3Config.getSessionDurationSeconds());
+          AssumeRoleRequest assumeRoleRequest;
+          if (StringUtils.isNotEmpty(_s3Config.getExternalId())) {
+            assumeRoleRequest = 
assumeRoleRequestBuilder.externalId(_s3Config.getExternalId()).build();
+          } else {
+            assumeRoleRequest = assumeRoleRequestBuilder.build();
+          }
+          _stsClient = 
StsClient.builder().region(Region.of(_s3Config.getRegion()))
+              .credentialsProvider(awsCredentialsProvider).build();
+          _stsCredentialsProvider = 
StsAssumeRoleCredentialsProvider.builder().stsClient(_stsClient)
+              .refreshRequest(assumeRoleRequest)
+              
.asyncCredentialUpdateEnabled(_s3Config.isAsyncSessionUpdateEnabled()).build();
+          awsCredentialsProvider = _stsCredentialsProvider;
         }
-      }
-      if (_s3Config.getHttpClientBuilder() != null) {
-        s3ClientBuilder.httpClientBuilder(_s3Config.getHttpClientBuilder());
-      }
 
-      if (_s3Config.getStorageClass() != null) {
-        _storageClass = StorageClass.fromValue(_s3Config.getStorageClass());
-        assert (_storageClass != StorageClass.UNKNOWN_TO_SDK_VERSION);
-      }
+        S3ClientBuilder s3ClientBuilder =
+            
S3Client.builder().forcePathStyle(true).region(Region.of(_s3Config.getRegion()))
+                .credentialsProvider(awsCredentialsProvider)
+                
.crossRegionAccessEnabled(_s3Config.isCrossRegionAccessEnabled());
+        if (StringUtils.isNotEmpty(_s3Config.getEndpoint())) {
+          try {
+            s3ClientBuilder.endpointOverride(new URI(_s3Config.getEndpoint()));
+          } catch (URISyntaxException e) {
+            throw new RuntimeException(e);
+          }
+        }
+        if (_s3Config.getHttpClientBuilder() != null) {
+          s3ClientBuilder.httpClientBuilder(_s3Config.getHttpClientBuilder());
+        }
 
-      if (_s3Config.getRequestChecksumCalculationWhenRequired() == 
RequestChecksumCalculation.WHEN_REQUIRED) {
-        
s3ClientBuilder.responseChecksumValidation(ResponseChecksumValidation.WHEN_REQUIRED);
-      }
-      if (_s3Config.getResponseChecksumValidationWhenRequired() == 
ResponseChecksumValidation.WHEN_REQUIRED) {
-        
s3ClientBuilder.requestChecksumCalculation(RequestChecksumCalculation.WHEN_REQUIRED);
-      }
-      if (_s3Config.useLegacyMd5Plugin()) {
-        s3ClientBuilder.addPlugin(LegacyMd5Plugin.create());
+        if (_s3Config.getStorageClass() != null) {
+          _storageClass = StorageClass.fromValue(_s3Config.getStorageClass());
+          assert (_storageClass != StorageClass.UNKNOWN_TO_SDK_VERSION);
+        }
+
+        if (_s3Config.getRequestChecksumCalculationWhenRequired() == 
RequestChecksumCalculation.WHEN_REQUIRED) {
+          
s3ClientBuilder.responseChecksumValidation(ResponseChecksumValidation.WHEN_REQUIRED);
+        }
+        if (_s3Config.getResponseChecksumValidationWhenRequired() == 
ResponseChecksumValidation.WHEN_REQUIRED) {
+          
s3ClientBuilder.requestChecksumCalculation(RequestChecksumCalculation.WHEN_REQUIRED);
+        }
+        if (_s3Config.useLegacyMd5Plugin()) {
+          s3ClientBuilder.addPlugin(LegacyMd5Plugin.create());
+        }
+
+        _s3Client = s3ClientBuilder.build();
+        setMultiPartUploadConfigs(_s3Config);
+      } catch (S3Exception e) {
+        throw new RuntimeException("Could not initialize S3PinotFS", e);
       }
 
-      _s3Client = s3ClientBuilder.build();
-      setMultiPartUploadConfigs(_s3Config);
-    } catch (S3Exception e) {
-      throw new RuntimeException("Could not initialize S3PinotFS", e);
+      // Close old resources after new client is live (order: provider → STS 
client → S3 client)

Review Comment:
   good catch



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to