eolivelli commented on code in PR #3604:
URL: https://github.com/apache/celeborn/pull/3604#discussion_r2845216482


##########
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala:
##########
@@ -436,6 +437,38 @@ final private[worker] class StorageManager(conf: 
CelebornConf, workerSource: Abs
       isSegmentGranularityVisible = false)
   }
 
+  def ensureS3MultipartUploaderSharedState(): Unit = this.synchronized {
+    if (s3MultipartUploadHandlerSharedState != null)
+      return
+
+    val s3HadoopFs: FileSystem = hadoopFs.get(StorageInfo.Type.S3)
+    if (s3HadoopFs == null)
+      throw new IllegalStateException("S3 is not configured")
+
+    val uri = s3HadoopFs.getUri
+    val bucketName = uri.getHost
+    logInfo(s"Creating S3 client for $uri, bucketName is $bucketName")
+    s3MultipartUploadHandlerSharedState = 
TierWriterHelper.getS3MultipartUploadHandlerSharedState(
+      s3HadoopFs,
+      bucketName,
+      conf.s3MultiplePartUploadMaxRetries,
+      conf.s3MultiplePartUploadBaseDelay,
+      conf.s3MultiplePartUploadMaxBackoff)
+  }
+
+  def ensureS3DirectoryForShuffleKey(appId: String, shuffleId: Int): Unit = 
this.synchronized {

Review Comment:
   Good point @RexXiong 
   I have removed `synchronized` and added a comment to the code please take a 
look to commit 045eb9c646d0cc4e8df5ba7ac1a61698dafe89c0 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to