This is an automated email from the ASF dual-hosted git repository.

ethanfeng pushed a commit to branch branch-0.6
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/branch-0.6 by this push:
     new d3479e1b2 [CELEBORN-2003] Add retry mechanism when completing S3 
multipart upload
d3479e1b2 is described below

commit d3479e1b21df208a474e45f7ff8963fdc9a17175
Author: [email protected] <[email protected]>
AuthorDate: Fri Jun 6 10:15:26 2025 +0800

    [CELEBORN-2003] Add retry mechanism when completing S3 multipart upload
    
    ### What changes were proposed in this pull request?
    
    Add a retry mechanism when completing S3 multipart upload to ensure that 
completeMultipartUpload is retry when facing retryable exception like SlowDown 
one
    
    ### Why are the changes needed?
    
    While running a “simple” spark jobs creating 10TiB of shuffle data 
(repartition from 100k partition to 20) the job was constantly failing when all 
files should be committed. relying on SOFT 
`celeborn.client.shuffle.partitionSplit.mode`
    
    Despite an increase of `celeborn.storage.s3.mpu.maxRetries` up to `200`. 
Job was still failing due to SlowDown exception
    Adding some debug logs on the retry policy from AWS S3 SDK I've seen that 
the policy is never called when doing completeMultipartUpload action while it 
is well called on other actions. See 
https://issues.apache.org/jira/browse/CELEBORN-2003
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Created a cluster on a kubernetes server relying on S3 storage.
    Launch a 10TiB shuffle from 100000 partitions to 200 partitions with SOFT 
`celeborn.client.shuffle.partitionSplit.mode`
    The job succeed and well display some warn logs indicating that the 
`completeMultipartUpload` is retried due to SlowDown:
    ```
    bucket ******* key poc/spark-2c86663c948243d19c127e90f704a3d5/0/35-39-0 
uploadId 
Pbaq.pp1qyLvtGbfZrMwA8RgLJ4QYanAMhmv0DvKUk0m6.GlCKdC3ICGngn7Q7iIa0Dw1h3wEn78EoogMlYgFD6.tDqiatOTbFprsNkk0qzLu9KY8YCC48pqaINcvgi8c1gQKKhsf1zZ.5Et5j40wQ--
 upload failed to complete, will retry (1/10)
    com.amazonaws.services.s3.model.AmazonS3Exception: Please reduce your 
request rate. (Service: null; Status Code: 0; Error Code: SlowDown; Request ID: 
RAV5MXX3B9Z3ZHTG; S3 Extended Request ID: 
9Qqm3vfJVLFNY1Y3yKAobJHv7JkHQP2+v8hGSW2HYIOputAtiPdkqkY5MfD66lEzAl45m71aiPVB0f1TxTUD+upUo0NxXp6S;
 Proxy: null), S3 Extended Request ID: 
9Qqm3vfJVLFNY1Y3yKAobJHv7JkHQP2+v8hGSW2HYIOputAtiPdkqkY5MfD66lEzAl45m71aiPVB0f1TxTUD+upUo0NxXp6S
        at com.amazonaws.services.s3.model.transform.XmlResponses [...]
    ```
    
    Closes #3293 from ashangit/nfraison/CELEBORN-2003.
    
    Authored-by: [email protected] <[email protected]>
    Signed-off-by: mingji <[email protected]>
    (cherry picked from commit 061cdc38207c73ca183555600ba7b7c1f12b9e19)
    Signed-off-by: mingji <[email protected]>
---
 .../org/apache/celeborn/common/CelebornConf.scala  | 18 +++++++
 docs/configuration/worker.md                       |  2 +
 .../apache/celeborn/S3MultipartUploadHandler.java  | 59 ++++++++++++++++++++--
 .../deploy/worker/storage/TierWriterHelper.java    | 11 +++-
 .../service/deploy/worker/storage/TierWriter.scala |  4 +-
 5 files changed, 86 insertions(+), 8 deletions(-)

diff --git 
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala 
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index 0ec7de075..248297dd3 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -1194,6 +1194,8 @@ class CelebornConf(loadDefaults: Boolean) extends 
Cloneable with Logging with Se
   def s3EndpointRegion: String = get(S3_ENDPOINT_REGION).getOrElse("")
 
   def s3MultiplePartUploadMaxRetries: Int = get(S3_MPU_MAX_RETRIES)
+  def s3MultiplePartUploadBaseDelay: Int = get(S3_MPU_BASE_DELAY).toInt
+  def s3MultiplePartUploadMaxBackoff: Int = get(S3_MPU_MAX_BACKOFF).toInt
 
   def s3Dir: String = {
     get(S3_DIR).map {
@@ -3198,6 +3200,22 @@ object CelebornConf extends Logging {
       .intConf
       .createWithDefault(5)
 
+  val S3_MPU_BASE_DELAY: ConfigEntry[Long] =
+    buildConf("celeborn.storage.s3.mpu.baseDelay")
+      .categories("worker")
+      .version("0.6.0")
+      .doc("S3 MPU base sleep time (milliseconds) for retryable exceptions.")
+      .timeConf(TimeUnit.MILLISECONDS)
+      .createWithDefaultString("100ms")
+
+  val S3_MPU_MAX_BACKOFF: ConfigEntry[Long] =
+    buildConf("celeborn.storage.s3.mpu.maxBackoff")
+      .categories("worker")
+      .version("0.6.0")
+      .doc("S3 MPU max sleep time (milliseconds) for retryable exceptions.")
+      .timeConf(TimeUnit.MILLISECONDS)
+      .createWithDefaultString("20s")
+
   val OSS_ENDPOINT: OptionalConfigEntry[String] =
     buildConf("celeborn.storage.oss.endpoint")
       .categories("worker", "master", "client")
diff --git a/docs/configuration/worker.md b/docs/configuration/worker.md
index b601a59a2..8e9af6608 100644
--- a/docs/configuration/worker.md
+++ b/docs/configuration/worker.md
@@ -53,6 +53,8 @@ license: |
 | celeborn.storage.oss.secret.key | &lt;undefined&gt; | false | OSS secret key 
for Celeborn to store shuffle data. | 0.6.0 |  | 
 | celeborn.storage.s3.dir | &lt;undefined&gt; | false | S3 base directory for 
Celeborn to store shuffle data. | 0.6.0 |  | 
 | celeborn.storage.s3.endpoint.region | &lt;undefined&gt; | false | S3 
endpoint for Celeborn to store shuffle data. | 0.6.0 |  | 
+| celeborn.storage.s3.mpu.baseDelay | 100ms | false | S3 MPU base sleep time 
(milliseconds) for retryable exceptions. | 0.6.0 |  | 
+| celeborn.storage.s3.mpu.maxBackoff | 20s | false | S3 MPU max sleep time 
(milliseconds) for retryable exceptions. | 0.6.0 |  | 
 | celeborn.storage.s3.mpu.maxRetries | 5 | false | S3 MPU upload max retries. 
| 0.6.0 |  | 
 | celeborn.worker.activeConnection.max | &lt;undefined&gt; | false | If the 
number of active connections on a worker exceeds this configuration value, the 
worker will be marked as high-load in the heartbeat report, and the master will 
not include that node in the response of RequestSlots. | 0.3.1 |  | 
 | celeborn.worker.applicationRegistry.cache.size | 10000 | false | Cache size 
of the application registry on Workers. | 0.5.0 |  | 
diff --git 
a/multipart-uploader/multipart-uploader-s3/src/main/java/org/apache/celeborn/S3MultipartUploadHandler.java
 
b/multipart-uploader/multipart-uploader-s3/src/main/java/org/apache/celeborn/S3MultipartUploadHandler.java
index f22217a2c..11951e36e 100644
--- 
a/multipart-uploader/multipart-uploader-s3/src/main/java/org/apache/celeborn/S3MultipartUploadHandler.java
+++ 
b/multipart-uploader/multipart-uploader-s3/src/main/java/org/apache/celeborn/S3MultipartUploadHandler.java
@@ -24,10 +24,13 @@ import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.List;
 
+import com.amazonaws.AmazonClientException;
 import com.amazonaws.ClientConfiguration;
 import com.amazonaws.auth.EnvironmentVariableCredentialsProvider;
 import com.amazonaws.event.ProgressListener;
+import com.amazonaws.retry.PredefinedBackoffStrategies;
 import com.amazonaws.retry.PredefinedRetryPolicies;
+import com.amazonaws.retry.RetryPolicy;
 import com.amazonaws.services.s3.AmazonS3;
 import com.amazonaws.services.s3.AmazonS3ClientBuilder;
 import com.amazonaws.services.s3.model.AbortMultipartUploadRequest;
@@ -65,12 +68,21 @@ public class S3MultipartUploadHandler implements 
MultipartUploadHandler {
   private String bucketName;
 
   private Integer s3MultiplePartUploadMaxRetries;
+  private Integer baseDelay;
+  private Integer maxBackoff;
 
   public S3MultipartUploadHandler(
-      FileSystem hadoopFs, String bucketName, String key, Integer 
s3MultiplePartUploadMaxRetries)
+      FileSystem hadoopFs,
+      String bucketName,
+      String key,
+      Integer s3MultiplePartUploadMaxRetries,
+      Integer baseDelay,
+      Integer maxBackoff)
       throws IOException, URISyntaxException {
     this.bucketName = bucketName;
     this.s3MultiplePartUploadMaxRetries = s3MultiplePartUploadMaxRetries;
+    this.baseDelay = baseDelay;
+    this.maxBackoff = maxBackoff;
 
     Configuration conf = hadoopFs.getConf();
     AWSCredentialProviderList providers = new AWSCredentialProviderList();
@@ -80,11 +92,16 @@ public class S3MultipartUploadHandler implements 
MultipartUploadHandler {
     providers.add(new EnvironmentVariableCredentialsProvider());
     providers.add(new IAMInstanceCredentialsProvider());
 
+    RetryPolicy retryPolicy =
+        new RetryPolicy(
+            PredefinedRetryPolicies.DEFAULT_RETRY_CONDITION,
+            new PredefinedBackoffStrategies.SDKDefaultBackoffStrategy(
+                baseDelay, baseDelay, maxBackoff),
+            s3MultiplePartUploadMaxRetries,
+            false);
     ClientConfiguration clientConfig =
         new ClientConfiguration()
-            .withRetryPolicy(
-                
PredefinedRetryPolicies.getDefaultRetryPolicyWithCustomMaxRetries(
-                    s3MultiplePartUploadMaxRetries))
+            .withRetryPolicy(retryPolicy)
             .withMaxErrorRetry(s3MultiplePartUploadMaxRetries);
     this.s3Client =
         AmazonS3ClientBuilder.standard()
@@ -175,7 +192,39 @@ public class S3MultipartUploadHandler implements 
MultipartUploadHandler {
     CompleteMultipartUploadRequest compRequest =
         new CompleteMultipartUploadRequest(bucketName, key, uploadId, 
partETags)
             .withGeneralProgressListener(progressListener);
-    CompleteMultipartUploadResult compResult = 
s3Client.completeMultipartUpload(compRequest);
+    CompleteMultipartUploadResult compResult = null;
+    for (int attempt = 1; attempt <= this.s3MultiplePartUploadMaxRetries; 
attempt++) {
+      try {
+        compResult = s3Client.completeMultipartUpload(compRequest);
+        break;
+      } catch (AmazonClientException e) {
+        if (attempt == this.s3MultiplePartUploadMaxRetries
+            || 
!PredefinedRetryPolicies.DEFAULT_RETRY_CONDITION.shouldRetry(null, e, attempt)) 
{
+          logger.error(
+              "bucket {} key {} uploadId {} upload failed to complete, will 
not retry",
+              bucketName,
+              key,
+              uploadId,
+              e);
+          throw e;
+        }
+
+        long backoffTime = Math.min(maxBackoff, baseDelay * (long) Math.pow(2, 
attempt - 1));
+        try {
+          logger.warn(
+              "bucket {} key {} uploadId {} upload failed to complete, will 
retry ({}/{})",
+              bucketName,
+              key,
+              uploadId,
+              attempt,
+              this.s3MultiplePartUploadMaxRetries,
+              e);
+          Thread.sleep(backoffTime);
+        } catch (InterruptedException ex) {
+          throw new RuntimeException(ex);
+        }
+      }
+    }
     logger.debug(
         "bucket {} key {} uploadId {} upload completed location is in {} ",
         bucketName,
diff --git 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/TierWriterHelper.java
 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/TierWriterHelper.java
index 62088e5d7..8c0103912 100644
--- 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/TierWriterHelper.java
+++ 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/TierWriterHelper.java
@@ -24,7 +24,12 @@ import 
org.apache.celeborn.server.common.service.mpu.MultipartUploadHandler;
 
 public class TierWriterHelper {
   public static MultipartUploadHandler getS3MultipartUploadHandler(
-      FileSystem hadoopFs, String bucketName, String key, int maxRetryies) {
+      FileSystem hadoopFs,
+      String bucketName,
+      String key,
+      int maxRetryies,
+      int baseDelay,
+      int maxBackoff) {
     return (MultipartUploadHandler)
         DynConstructors.builder()
             .impl(
@@ -32,9 +37,11 @@ public class TierWriterHelper {
                 FileSystem.class,
                 String.class,
                 String.class,
+                Integer.class,
+                Integer.class,
                 Integer.class)
             .build()
-            .newInstance(hadoopFs, bucketName, key, maxRetryies);
+            .newInstance(hadoopFs, bucketName, key, maxRetryies, baseDelay, 
maxBackoff);
   }
 
   public static MultipartUploadHandler getOssMultipartUploadHandler(
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala
index fda56d5d8..278b6f945 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala
@@ -542,7 +542,9 @@ class DfsTierWriter(
         hadoopFs,
         bucketName,
         key,
-        conf.s3MultiplePartUploadMaxRetries)
+        conf.s3MultiplePartUploadMaxRetries,
+        conf.s3MultiplePartUploadBaseDelay,
+        conf.s3MultiplePartUploadMaxBackoff)
       s3MultipartUploadHandler.startUpload()
     } else if (hdfsFileInfo.isOSS) {
       val configuration = hadoopFs.getConf

Reply via email to