This is an automated email from the ASF dual-hosted git repository.
ethanfeng pushed a commit to branch branch-0.6
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/branch-0.6 by this push:
new d3479e1b2 [CELEBORN-2003] Add retry mechanism when completing S3
multipart upload
d3479e1b2 is described below
commit d3479e1b21df208a474e45f7ff8963fdc9a17175
Author: [email protected] <[email protected]>
AuthorDate: Fri Jun 6 10:15:26 2025 +0800
[CELEBORN-2003] Add retry mechanism when completing S3 multipart upload
### What changes were proposed in this pull request?
Add a retry mechanism when completing S3 multipart upload to ensure that
completeMultipartUpload is retry when facing retryable exception like SlowDown
one
### Why are the changes needed?
While running a “simple” spark jobs creating 10TiB of shuffle data
(repartition from 100k partition to 20) the job was constantly failing when all
files should be committed. relying on SOFT
`celeborn.client.shuffle.partitionSplit.mode`
Despite an increase of `celeborn.storage.s3.mpu.maxRetries` up to `200`.
Job was still failing due to SlowDown exception
Adding some debug logs on the retry policy from AWS S3 SDK I've seen that
the policy is never called when doing completeMultipartUpload action while it
is well called on other actions. See
https://issues.apache.org/jira/browse/CELEBORN-2003
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Created a cluster on a kubernetes server relying on S3 storage.
Launch a 10TiB shuffle from 100000 partitions to 200 partitions with SOFT
`celeborn.client.shuffle.partitionSplit.mode`
The job succeed and well display some warn logs indicating that the
`completeMultipartUpload` is retried due to SlowDown:
```
bucket ******* key poc/spark-2c86663c948243d19c127e90f704a3d5/0/35-39-0
uploadId
Pbaq.pp1qyLvtGbfZrMwA8RgLJ4QYanAMhmv0DvKUk0m6.GlCKdC3ICGngn7Q7iIa0Dw1h3wEn78EoogMlYgFD6.tDqiatOTbFprsNkk0qzLu9KY8YCC48pqaINcvgi8c1gQKKhsf1zZ.5Et5j40wQ--
upload failed to complete, will retry (1/10)
com.amazonaws.services.s3.model.AmazonS3Exception: Please reduce your
request rate. (Service: null; Status Code: 0; Error Code: SlowDown; Request ID:
RAV5MXX3B9Z3ZHTG; S3 Extended Request ID:
9Qqm3vfJVLFNY1Y3yKAobJHv7JkHQP2+v8hGSW2HYIOputAtiPdkqkY5MfD66lEzAl45m71aiPVB0f1TxTUD+upUo0NxXp6S;
Proxy: null), S3 Extended Request ID:
9Qqm3vfJVLFNY1Y3yKAobJHv7JkHQP2+v8hGSW2HYIOputAtiPdkqkY5MfD66lEzAl45m71aiPVB0f1TxTUD+upUo0NxXp6S
at com.amazonaws.services.s3.model.transform.XmlResponses [...]
```
Closes #3293 from ashangit/nfraison/CELEBORN-2003.
Authored-by: [email protected] <[email protected]>
Signed-off-by: mingji <[email protected]>
(cherry picked from commit 061cdc38207c73ca183555600ba7b7c1f12b9e19)
Signed-off-by: mingji <[email protected]>
---
.../org/apache/celeborn/common/CelebornConf.scala | 18 +++++++
docs/configuration/worker.md | 2 +
.../apache/celeborn/S3MultipartUploadHandler.java | 59 ++++++++++++++++++++--
.../deploy/worker/storage/TierWriterHelper.java | 11 +++-
.../service/deploy/worker/storage/TierWriter.scala | 4 +-
5 files changed, 86 insertions(+), 8 deletions(-)
diff --git
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index 0ec7de075..248297dd3 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -1194,6 +1194,8 @@ class CelebornConf(loadDefaults: Boolean) extends
Cloneable with Logging with Se
def s3EndpointRegion: String = get(S3_ENDPOINT_REGION).getOrElse("")
def s3MultiplePartUploadMaxRetries: Int = get(S3_MPU_MAX_RETRIES)
+ def s3MultiplePartUploadBaseDelay: Int = get(S3_MPU_BASE_DELAY).toInt
+ def s3MultiplePartUploadMaxBackoff: Int = get(S3_MPU_MAX_BACKOFF).toInt
def s3Dir: String = {
get(S3_DIR).map {
@@ -3198,6 +3200,22 @@ object CelebornConf extends Logging {
.intConf
.createWithDefault(5)
+ val S3_MPU_BASE_DELAY: ConfigEntry[Long] =
+ buildConf("celeborn.storage.s3.mpu.baseDelay")
+ .categories("worker")
+ .version("0.6.0")
+ .doc("S3 MPU base sleep time (milliseconds) for retryable exceptions.")
+ .timeConf(TimeUnit.MILLISECONDS)
+ .createWithDefaultString("100ms")
+
+ val S3_MPU_MAX_BACKOFF: ConfigEntry[Long] =
+ buildConf("celeborn.storage.s3.mpu.maxBackoff")
+ .categories("worker")
+ .version("0.6.0")
+ .doc("S3 MPU max sleep time (milliseconds) for retryable exceptions.")
+ .timeConf(TimeUnit.MILLISECONDS)
+ .createWithDefaultString("20s")
+
val OSS_ENDPOINT: OptionalConfigEntry[String] =
buildConf("celeborn.storage.oss.endpoint")
.categories("worker", "master", "client")
diff --git a/docs/configuration/worker.md b/docs/configuration/worker.md
index b601a59a2..8e9af6608 100644
--- a/docs/configuration/worker.md
+++ b/docs/configuration/worker.md
@@ -53,6 +53,8 @@ license: |
| celeborn.storage.oss.secret.key | <undefined> | false | OSS secret key
for Celeborn to store shuffle data. | 0.6.0 | |
| celeborn.storage.s3.dir | <undefined> | false | S3 base directory for
Celeborn to store shuffle data. | 0.6.0 | |
| celeborn.storage.s3.endpoint.region | <undefined> | false | S3
endpoint for Celeborn to store shuffle data. | 0.6.0 | |
+| celeborn.storage.s3.mpu.baseDelay | 100ms | false | S3 MPU base sleep time
(milliseconds) for retryable exceptions. | 0.6.0 | |
+| celeborn.storage.s3.mpu.maxBackoff | 20s | false | S3 MPU max sleep time
(milliseconds) for retryable exceptions. | 0.6.0 | |
| celeborn.storage.s3.mpu.maxRetries | 5 | false | S3 MPU upload max retries.
| 0.6.0 | |
| celeborn.worker.activeConnection.max | <undefined> | false | If the
number of active connections on a worker exceeds this configuration value, the
worker will be marked as high-load in the heartbeat report, and the master will
not include that node in the response of RequestSlots. | 0.3.1 | |
| celeborn.worker.applicationRegistry.cache.size | 10000 | false | Cache size
of the application registry on Workers. | 0.5.0 | |
diff --git
a/multipart-uploader/multipart-uploader-s3/src/main/java/org/apache/celeborn/S3MultipartUploadHandler.java
b/multipart-uploader/multipart-uploader-s3/src/main/java/org/apache/celeborn/S3MultipartUploadHandler.java
index f22217a2c..11951e36e 100644
---
a/multipart-uploader/multipart-uploader-s3/src/main/java/org/apache/celeborn/S3MultipartUploadHandler.java
+++
b/multipart-uploader/multipart-uploader-s3/src/main/java/org/apache/celeborn/S3MultipartUploadHandler.java
@@ -24,10 +24,13 @@ import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
+import com.amazonaws.AmazonClientException;
import com.amazonaws.ClientConfiguration;
import com.amazonaws.auth.EnvironmentVariableCredentialsProvider;
import com.amazonaws.event.ProgressListener;
+import com.amazonaws.retry.PredefinedBackoffStrategies;
import com.amazonaws.retry.PredefinedRetryPolicies;
+import com.amazonaws.retry.RetryPolicy;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.amazonaws.services.s3.model.AbortMultipartUploadRequest;
@@ -65,12 +68,21 @@ public class S3MultipartUploadHandler implements
MultipartUploadHandler {
private String bucketName;
private Integer s3MultiplePartUploadMaxRetries;
+ private Integer baseDelay;
+ private Integer maxBackoff;
public S3MultipartUploadHandler(
- FileSystem hadoopFs, String bucketName, String key, Integer
s3MultiplePartUploadMaxRetries)
+ FileSystem hadoopFs,
+ String bucketName,
+ String key,
+ Integer s3MultiplePartUploadMaxRetries,
+ Integer baseDelay,
+ Integer maxBackoff)
throws IOException, URISyntaxException {
this.bucketName = bucketName;
this.s3MultiplePartUploadMaxRetries = s3MultiplePartUploadMaxRetries;
+ this.baseDelay = baseDelay;
+ this.maxBackoff = maxBackoff;
Configuration conf = hadoopFs.getConf();
AWSCredentialProviderList providers = new AWSCredentialProviderList();
@@ -80,11 +92,16 @@ public class S3MultipartUploadHandler implements
MultipartUploadHandler {
providers.add(new EnvironmentVariableCredentialsProvider());
providers.add(new IAMInstanceCredentialsProvider());
+ RetryPolicy retryPolicy =
+ new RetryPolicy(
+ PredefinedRetryPolicies.DEFAULT_RETRY_CONDITION,
+ new PredefinedBackoffStrategies.SDKDefaultBackoffStrategy(
+ baseDelay, baseDelay, maxBackoff),
+ s3MultiplePartUploadMaxRetries,
+ false);
ClientConfiguration clientConfig =
new ClientConfiguration()
- .withRetryPolicy(
-
PredefinedRetryPolicies.getDefaultRetryPolicyWithCustomMaxRetries(
- s3MultiplePartUploadMaxRetries))
+ .withRetryPolicy(retryPolicy)
.withMaxErrorRetry(s3MultiplePartUploadMaxRetries);
this.s3Client =
AmazonS3ClientBuilder.standard()
@@ -175,7 +192,39 @@ public class S3MultipartUploadHandler implements
MultipartUploadHandler {
CompleteMultipartUploadRequest compRequest =
new CompleteMultipartUploadRequest(bucketName, key, uploadId,
partETags)
.withGeneralProgressListener(progressListener);
- CompleteMultipartUploadResult compResult =
s3Client.completeMultipartUpload(compRequest);
+ CompleteMultipartUploadResult compResult = null;
+ for (int attempt = 1; attempt <= this.s3MultiplePartUploadMaxRetries;
attempt++) {
+ try {
+ compResult = s3Client.completeMultipartUpload(compRequest);
+ break;
+ } catch (AmazonClientException e) {
+ if (attempt == this.s3MultiplePartUploadMaxRetries
+ ||
!PredefinedRetryPolicies.DEFAULT_RETRY_CONDITION.shouldRetry(null, e, attempt))
{
+ logger.error(
+ "bucket {} key {} uploadId {} upload failed to complete, will
not retry",
+ bucketName,
+ key,
+ uploadId,
+ e);
+ throw e;
+ }
+
+ long backoffTime = Math.min(maxBackoff, baseDelay * (long) Math.pow(2,
attempt - 1));
+ try {
+ logger.warn(
+ "bucket {} key {} uploadId {} upload failed to complete, will
retry ({}/{})",
+ bucketName,
+ key,
+ uploadId,
+ attempt,
+ this.s3MultiplePartUploadMaxRetries,
+ e);
+ Thread.sleep(backoffTime);
+ } catch (InterruptedException ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+ }
logger.debug(
"bucket {} key {} uploadId {} upload completed location is in {} ",
bucketName,
diff --git
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/TierWriterHelper.java
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/TierWriterHelper.java
index 62088e5d7..8c0103912 100644
---
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/TierWriterHelper.java
+++
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/TierWriterHelper.java
@@ -24,7 +24,12 @@ import
org.apache.celeborn.server.common.service.mpu.MultipartUploadHandler;
public class TierWriterHelper {
public static MultipartUploadHandler getS3MultipartUploadHandler(
- FileSystem hadoopFs, String bucketName, String key, int maxRetryies) {
+ FileSystem hadoopFs,
+ String bucketName,
+ String key,
+ int maxRetryies,
+ int baseDelay,
+ int maxBackoff) {
return (MultipartUploadHandler)
DynConstructors.builder()
.impl(
@@ -32,9 +37,11 @@ public class TierWriterHelper {
FileSystem.class,
String.class,
String.class,
+ Integer.class,
+ Integer.class,
Integer.class)
.build()
- .newInstance(hadoopFs, bucketName, key, maxRetryies);
+ .newInstance(hadoopFs, bucketName, key, maxRetryies, baseDelay,
maxBackoff);
}
public static MultipartUploadHandler getOssMultipartUploadHandler(
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala
index fda56d5d8..278b6f945 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala
@@ -542,7 +542,9 @@ class DfsTierWriter(
hadoopFs,
bucketName,
key,
- conf.s3MultiplePartUploadMaxRetries)
+ conf.s3MultiplePartUploadMaxRetries,
+ conf.s3MultiplePartUploadBaseDelay,
+ conf.s3MultiplePartUploadMaxBackoff)
s3MultipartUploadHandler.startUpload()
} else if (hdfsFileInfo.isOSS) {
val configuration = hadoopFs.getConf