This is an automated email from the ASF dual-hosted git repository.

ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new a2d397231 [CELEBORN-1530] support MPU for S3
a2d397231 is described below

commit a2d39723180468fb9bdd7c36c0344c982292b302
Author: zhaohehuhu <[email protected]>
AuthorDate: Fri Nov 22 15:03:53 2024 +0800

    [CELEBORN-1530] support MPU for S3
    
    ### What changes were proposed in this pull request?
    
    as title
    
    ### Why are the changes needed?
    AWS S3 doesn't support append, so Celeborn had to copy the historical data 
from s3 to worker and write to s3 again, which heavily scales out the write. 
This PR implements a better solution via MPU to avoid copy-and-write.
    
    ### Does this PR introduce _any_ user-facing change?
    
    ### How was this patch tested?
    
    
![WechatIMG257](https://github.com/user-attachments/assets/968d9162-e690-4767-8bed-e490e3055753)
    
    I conducted an experiment with a 1GB input dataset to compare the 
performance of Celeborn using only S3 storage versus using SSD storage. The 
results showed that Celeborn with SSD storage was approximately three times 
faster than with only S3 storage.
    
    <img width="1728" alt="Screenshot 2024-11-16 at 13 02 10" 
src="https://github.com/user-attachments/assets/8f879c47-c01a-4004-9eae-1c266c1f3ef2";>
    
    The above screenshot is the second test with 5000 mapper and reducer that I 
did.
    
    Closes #2830 from zhaohehuhu/dev-1021.
    
    Lead-authored-by: zhaohehuhu <[email protected]>
    Co-authored-by: He Zhao <[email protected]>
    Signed-off-by: mingji <[email protected]>
---
 build/make-distribution.sh                         |   4 +-
 common/pom.xml                                     |  20 ---
 .../org/apache/celeborn/common/CelebornConf.scala  |  18 ++-
 .../celeborn/common/util/CelebornHadoopUtils.scala |   6 +-
 dev/deps/dependencies-server                       |   3 +
 docs/configuration/client.md                       |   2 +-
 docs/configuration/master.md                       |   2 +-
 docs/configuration/worker.md                       |   5 +-
 master/pom.xml                                     |  25 ++--
 multipart-uploader/pom.xml                         |  57 +++++++
 .../apache/celeborn/S3MultipartUploadHandler.java  | 164 +++++++++++++++++++++
 pom.xml                                            |  22 +--
 project/CelebornBuild.scala                        |  36 +++--
 .../common/service/mpu/MultipartUploadHandler.java |  34 +++++
 worker/pom.xml                                     |  13 ++
 .../deploy/worker/storage/PartitionDataWriter.java |  71 ++++++++-
 .../service/deploy/worker/storage/FlushTask.scala  |  34 ++---
 17 files changed, 417 insertions(+), 99 deletions(-)

diff --git a/build/make-distribution.sh b/build/make-distribution.sh
index 55b419d15..1c9bbe6e5 100755
--- a/build/make-distribution.sh
+++ b/build/make-distribution.sh
@@ -264,8 +264,8 @@ function sbt_build_service {
   echo "Celeborn $VERSION$GITREVSTRING" > "$DIST_DIR/RELEASE"
   echo "Build flags: $@" >> "$DIST_DIR/RELEASE"
 
-  if [[ $@ == *"hadoop-aws"* ]]; then
-     SBT_MAVEN_PROFILES="hadoop-aws"
+  if [[ $@ == *"aws"* ]]; then
+     export SBT_MAVEN_PROFILES="aws"
   fi
   BUILD_COMMAND=("$SBT" clean package)
 
diff --git a/common/pom.xml b/common/pom.xml
index 3c34be923..0f4da3e7a 100644
--- a/common/pom.xml
+++ b/common/pom.xml
@@ -209,25 +209,5 @@
         </dependency>
       </dependencies>
     </profile>
-    <profile>
-      <id>hadoop-aws</id>
-      <activation>
-        <property>
-          <name>hadoop-aws-deps</name>
-        </property>
-      </activation>
-      <dependencies>
-        <dependency>
-          <groupId>org.apache.hadoop</groupId>
-          <artifactId>hadoop-aws</artifactId>
-          <version>${hadoop.version}</version>
-        </dependency>
-        <dependency>
-          <groupId>com.amazonaws</groupId>
-          <artifactId>aws-java-sdk-bundle</artifactId>
-          <version>${aws.version}</version>
-        </dependency>
-      </dependencies>
-    </profile>
   </profiles>
 </project>
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala 
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index 9bf3ea4d4..3e48d9388 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -1147,7 +1147,9 @@ class CelebornConf(loadDefaults: Boolean) extends 
Cloneable with Logging with Se
 
   def s3SecretKey: String = get(S3_SECRET_KEY).getOrElse("")
 
-  def s3Endpoint: String = get(S3_ENDPOINT).getOrElse("")
+  def s3EndpointRegion: String = get(S3_ENDPOINT_REGION).getOrElse("")
+
+  def s3MultiplePartUploadMaxRetries: Int = get(S3_MPU_MAX_RETRIES)
 
   def s3Dir: String = {
     get(S3_DIR).map {
@@ -3062,14 +3064,22 @@ object CelebornConf extends Logging {
       .stringConf
       .createOptional
 
-  val S3_ENDPOINT: OptionalConfigEntry[String] =
-    buildConf("celeborn.storage.s3.endpoint")
+  val S3_ENDPOINT_REGION: OptionalConfigEntry[String] =
+    buildConf("celeborn.storage.s3.endpoint.region")
       .categories("worker", "master", "client")
       .version("0.6.0")
       .doc("S3 endpoint for Celeborn to store shuffle data.")
       .stringConf
       .createOptional
 
+  val S3_MPU_MAX_RETRIES: ConfigEntry[Int] =
+    buildConf("celeborn.storage.s3.mpu.maxRetries")
+      .categories("worker")
+      .version("0.6.0")
+      .doc("S3 MPU upload max retries.")
+      .intConf
+      .createWithDefault(5)
+
   val WORKER_DISK_RESERVE_SIZE: ConfigEntry[Long] =
     buildConf("celeborn.worker.storage.disk.reserve.size")
       .withAlternative("celeborn.worker.disk.reserve.size")
@@ -3552,7 +3562,7 @@ object CelebornConf extends Logging {
       .version("0.6.0")
       .doc("Size of buffer used by a S3 flusher.")
       .bytesConf(ByteUnit.BYTE)
-      .createWithDefaultString("4m")
+      .createWithDefaultString("6m")
 
   val WORKER_WRITER_CLOSE_TIMEOUT: ConfigEntry[Long] =
     buildConf("celeborn.worker.writer.close.timeout")
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/util/CelebornHadoopUtils.scala
 
b/common/src/main/scala/org/apache/celeborn/common/util/CelebornHadoopUtils.scala
index 356711d75..b703da07c 100644
--- 
a/common/src/main/scala/org/apache/celeborn/common/util/CelebornHadoopUtils.scala
+++ 
b/common/src/main/scala/org/apache/celeborn/common/util/CelebornHadoopUtils.scala
@@ -49,9 +49,9 @@ object CelebornHadoopUtils extends Logging {
     }
 
     if (conf.s3Dir.nonEmpty) {
-      if (conf.s3AccessKey.isEmpty || conf.s3SecretKey.isEmpty || 
conf.s3Endpoint.isEmpty) {
+      if (conf.s3AccessKey.isEmpty || conf.s3SecretKey.isEmpty || 
conf.s3EndpointRegion.isEmpty) {
         throw new CelebornException(
-          "S3 storage is enabled but s3AccessKey, s3SecretKey, or s3Endpoint 
is not set")
+          "S3 storage is enabled but s3AccessKey, s3SecretKey, or 
s3EndpointRegion is not set")
       }
       hadoopConf.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
       hadoopConf.set(
@@ -59,7 +59,7 @@ object CelebornHadoopUtils extends Logging {
         "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider")
       hadoopConf.set("fs.s3a.access.key", conf.s3AccessKey)
       hadoopConf.set("fs.s3a.secret.key", conf.s3SecretKey)
-      hadoopConf.set("fs.s3a.endpoint", conf.s3Endpoint)
+      hadoopConf.set("fs.s3a.endpoint.region", conf.s3EndpointRegion)
     }
     appendSparkHadoopConfigs(conf, hadoopConf)
     hadoopConf
diff --git a/dev/deps/dependencies-server b/dev/deps/dependencies-server
index 74e71214a..0edc78b70 100644
--- a/dev/deps/dependencies-server
+++ b/dev/deps/dependencies-server
@@ -19,6 +19,7 @@ HikariCP/4.0.3//HikariCP-4.0.3.jar
 RoaringBitmap/1.0.6//RoaringBitmap-1.0.6.jar
 aopalliance-repackaged/2.6.1//aopalliance-repackaged-2.6.1.jar
 ap-loader-all/3.0-8//ap-loader-all-3.0-8.jar
+aws-java-sdk-bundle/1.12.367//aws-java-sdk-bundle-1.12.367.jar
 classgraph/4.8.138//classgraph-4.8.138.jar
 commons-cli/1.5.0//commons-cli-1.5.0.jar
 commons-crypto/1.0.0//commons-crypto-1.0.0.jar
@@ -27,6 +28,7 @@ commons-lang3/3.17.0//commons-lang3-3.17.0.jar
 commons-logging/1.1.3//commons-logging-1.1.3.jar
 failureaccess/1.0.2//failureaccess-1.0.2.jar
 guava/33.1.0-jre//guava-33.1.0-jre.jar
+hadoop-aws/3.3.6//hadoop-aws-3.3.6.jar
 hadoop-client-api/3.3.6//hadoop-client-api-3.3.6.jar
 hadoop-client-runtime/3.3.6//hadoop-client-runtime-3.3.6.jar
 hk2-api/2.6.1//hk2-api-2.6.1.jar
@@ -145,4 +147,5 @@ swagger-integration/2.2.1//swagger-integration-2.2.1.jar
 swagger-jaxrs2/2.2.1//swagger-jaxrs2-2.2.1.jar
 swagger-models/2.2.1//swagger-models-2.2.1.jar
 swagger-ui/4.9.1//swagger-ui-4.9.1.jar
+wildfly-openssl/1.1.3.Final//wildfly-openssl-1.1.3.Final.jar
 zstd-jni/1.5.2-1//zstd-jni-1.5.2-1.jar
diff --git a/docs/configuration/client.md b/docs/configuration/client.md
index bec9c56ae..2bb7a08f3 100644
--- a/docs/configuration/client.md
+++ b/docs/configuration/client.md
@@ -132,6 +132,6 @@ license: |
 | celeborn.storage.hdfs.dir | &lt;undefined&gt; | false | HDFS base directory 
for Celeborn to store shuffle data. | 0.2.0 |  | 
 | celeborn.storage.s3.access.key | &lt;undefined&gt; | false | S3 access key 
for Celeborn to store shuffle data. | 0.6.0 |  | 
 | celeborn.storage.s3.dir | &lt;undefined&gt; | false | S3 base directory for 
Celeborn to store shuffle data. | 0.6.0 |  | 
-| celeborn.storage.s3.endpoint | &lt;undefined&gt; | false | S3 endpoint for 
Celeborn to store shuffle data. | 0.6.0 |  | 
+| celeborn.storage.s3.endpoint.region | &lt;undefined&gt; | false | S3 
endpoint for Celeborn to store shuffle data. | 0.6.0 |  | 
 | celeborn.storage.s3.secret.key | &lt;undefined&gt; | false | S3 secret key 
for Celeborn to store shuffle data. | 0.6.0 |  | 
 <!--end-include-->
diff --git a/docs/configuration/master.md b/docs/configuration/master.md
index d3300369f..4359527c0 100644
--- a/docs/configuration/master.md
+++ b/docs/configuration/master.md
@@ -89,6 +89,6 @@ license: |
 | celeborn.storage.hdfs.kerberos.principal | &lt;undefined&gt; | false | 
Kerberos principal for HDFS storage connection. | 0.3.2 |  | 
 | celeborn.storage.s3.access.key | &lt;undefined&gt; | false | S3 access key 
for Celeborn to store shuffle data. | 0.6.0 |  | 
 | celeborn.storage.s3.dir | &lt;undefined&gt; | false | S3 base directory for 
Celeborn to store shuffle data. | 0.6.0 |  | 
-| celeborn.storage.s3.endpoint | &lt;undefined&gt; | false | S3 endpoint for 
Celeborn to store shuffle data. | 0.6.0 |  | 
+| celeborn.storage.s3.endpoint.region | &lt;undefined&gt; | false | S3 
endpoint for Celeborn to store shuffle data. | 0.6.0 |  | 
 | celeborn.storage.s3.secret.key | &lt;undefined&gt; | false | S3 secret key 
for Celeborn to store shuffle data. | 0.6.0 |  | 
 <!--end-include-->
diff --git a/docs/configuration/worker.md b/docs/configuration/worker.md
index 36e2cca97..97e262718 100644
--- a/docs/configuration/worker.md
+++ b/docs/configuration/worker.md
@@ -48,7 +48,8 @@ license: |
 | celeborn.storage.hdfs.kerberos.principal | &lt;undefined&gt; | false | 
Kerberos principal for HDFS storage connection. | 0.3.2 |  | 
 | celeborn.storage.s3.access.key | &lt;undefined&gt; | false | S3 access key 
for Celeborn to store shuffle data. | 0.6.0 |  | 
 | celeborn.storage.s3.dir | &lt;undefined&gt; | false | S3 base directory for 
Celeborn to store shuffle data. | 0.6.0 |  | 
-| celeborn.storage.s3.endpoint | &lt;undefined&gt; | false | S3 endpoint for 
Celeborn to store shuffle data. | 0.6.0 |  | 
+| celeborn.storage.s3.endpoint.region | &lt;undefined&gt; | false | S3 
endpoint for Celeborn to store shuffle data. | 0.6.0 |  | 
+| celeborn.storage.s3.mpu.maxRetries | 5 | false | S3 MPU upload max retries. 
| 0.6.0 |  | 
 | celeborn.storage.s3.secret.key | &lt;undefined&gt; | false | S3 secret key 
for Celeborn to store shuffle data. | 0.6.0 |  | 
 | celeborn.worker.activeConnection.max | &lt;undefined&gt; | false | If the 
number of active connections on a worker exceeds this configuration value, the 
worker will be marked as high-load in the heartbeat report, and the master will 
not include that node in the response of RequestSlots. | 0.3.1 |  | 
 | celeborn.worker.applicationRegistry.cache.size | 10000 | false | Cache size 
of the application registry on Workers. | 0.5.0 |  | 
@@ -84,7 +85,7 @@ license: |
 | celeborn.worker.flusher.hdd.threads | 1 | false | Flusher's thread count per 
disk used for write data to HDD disks. | 0.2.0 |  | 
 | celeborn.worker.flusher.hdfs.buffer.size | 4m | false | Size of buffer used 
by a HDFS flusher. | 0.3.0 |  | 
 | celeborn.worker.flusher.hdfs.threads | 8 | false | Flusher's thread count 
used for write data to HDFS. | 0.2.0 |  | 
-| celeborn.worker.flusher.s3.buffer.size | 4m | false | Size of buffer used by 
a S3 flusher. | 0.6.0 |  | 
+| celeborn.worker.flusher.s3.buffer.size | 6m | false | Size of buffer used by 
a S3 flusher. | 0.6.0 |  | 
 | celeborn.worker.flusher.s3.threads | 8 | false | Flusher's thread count used 
for write data to S3. | 0.6.0 |  | 
 | celeborn.worker.flusher.shutdownTimeout | 3s | false | Timeout for a flusher 
to shutdown. | 0.2.0 |  | 
 | celeborn.worker.flusher.ssd.threads | 16 | false | Flusher's thread count 
per disk used for write data to SSD disks. | 0.2.0 |  | 
diff --git a/master/pom.xml b/master/pom.xml
index 51286b9ef..88f2f9ece 100644
--- a/master/pom.xml
+++ b/master/pom.xml
@@ -164,38 +164,33 @@
       </dependencies>
     </profile>
     <profile>
-      <id>hadoop-aws</id>
+      <id>hadoop-2</id>
       <activation>
         <property>
-          <name>hadoop-aws-deps</name>
+          <name>hadoop-2-deps</name>
         </property>
       </activation>
       <dependencies>
         <dependency>
           <groupId>org.apache.hadoop</groupId>
-          <artifactId>hadoop-aws</artifactId>
+          <artifactId>hadoop-client</artifactId>
           <version>${hadoop.version}</version>
         </dependency>
-        <dependency>
-          <groupId>com.amazonaws</groupId>
-          <artifactId>aws-java-sdk-bundle</artifactId>
-          <version>${aws.version}</version>
-        </dependency>
       </dependencies>
     </profile>
     <profile>
-      <id>hadoop-2</id>
-      <activation>
-        <property>
-          <name>hadoop-2-deps</name>
-        </property>
-      </activation>
+      <id>aws</id>
       <dependencies>
         <dependency>
           <groupId>org.apache.hadoop</groupId>
-          <artifactId>hadoop-client</artifactId>
+          <artifactId>hadoop-aws</artifactId>
           <version>${hadoop.version}</version>
         </dependency>
+        <dependency>
+          <groupId>com.amazonaws</groupId>
+          <artifactId>aws-java-sdk-bundle</artifactId>
+          <version>${aws.version}</version>
+        </dependency>
       </dependencies>
     </profile>
   </profiles>
diff --git a/multipart-uploader/pom.xml b/multipart-uploader/pom.xml
new file mode 100644
index 000000000..cfdbbb4ee
--- /dev/null
+++ b/multipart-uploader/pom.xml
@@ -0,0 +1,57 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.celeborn</groupId>
+    <artifactId>celeborn-parent_${scala.binary.version}</artifactId>
+    <version>${project.version}</version>
+    <relativePath>../pom.xml</relativePath>
+  </parent>
+
+  <artifactId>celeborn-multipart-uploader_${scala.binary.version}</artifactId>
+  <packaging>jar</packaging>
+  <name>Celeborn Multipart Uploader</name>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.celeborn</groupId>
+      <artifactId>celeborn-service_${scala.binary.version}</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+        <groupId>org.apache.hadoop</groupId>
+        <artifactId>hadoop-aws</artifactId>
+        <version>${hadoop.version}</version>
+    </dependency>
+    <dependency>
+        <groupId>com.amazonaws</groupId>
+        <artifactId>aws-java-sdk-bundle</artifactId>
+        <version>${aws.version}</version>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-dependency-plugin</artifactId>
+      </plugin>
+    </plugins>
+  </build>
+</project>
diff --git 
a/multipart-uploader/src/main/java/org/apache/celeborn/S3MultipartUploadHandler.java
 
b/multipart-uploader/src/main/java/org/apache/celeborn/S3MultipartUploadHandler.java
new file mode 100644
index 000000000..f1699c884
--- /dev/null
+++ 
b/multipart-uploader/src/main/java/org/apache/celeborn/S3MultipartUploadHandler.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.auth.AWSStaticCredentialsProvider;
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.event.ProgressListener;
+import com.amazonaws.retry.PredefinedRetryPolicies;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.AmazonS3ClientBuilder;
+import com.amazonaws.services.s3.model.AbortMultipartUploadRequest;
+import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
+import com.amazonaws.services.s3.model.CompleteMultipartUploadResult;
+import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
+import com.amazonaws.services.s3.model.InitiateMultipartUploadResult;
+import com.amazonaws.services.s3.model.ListPartsRequest;
+import com.amazonaws.services.s3.model.PartETag;
+import com.amazonaws.services.s3.model.PartListing;
+import com.amazonaws.services.s3.model.PartSummary;
+import com.amazonaws.services.s3.model.UploadPartRequest;
+
+import org.apache.celeborn.server.common.service.mpu.MultipartUploadHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class S3MultipartUploadHandler implements MultipartUploadHandler {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(S3MultipartUploadHandler.class);
+
+  private String uploadId;
+
+  private AmazonS3 s3Client;
+
+  private String key;
+
+  private String bucketName;
+
+  private String s3AccessKey;
+
+  private String s3SecretKey;
+
+  private String s3EndpointRegion;
+
+  private Integer s3MultiplePartUploadMaxRetries;
+
+  public S3MultipartUploadHandler(String bucketName, String s3AccessKey, 
String s3SecretKey, String s3EndpointRegion, String key, Integer 
s3MultiplePartUploadMaxRetries) {
+    this.bucketName = bucketName;
+    this.s3AccessKey = s3AccessKey;
+    this.s3SecretKey = s3SecretKey;
+    this.s3EndpointRegion = s3EndpointRegion;
+    this.s3MultiplePartUploadMaxRetries = s3MultiplePartUploadMaxRetries;
+    BasicAWSCredentials basicAWSCredentials =
+        new BasicAWSCredentials(s3AccessKey, s3SecretKey);
+    ClientConfiguration clientConfig = new ClientConfiguration()
+            
.withRetryPolicy(PredefinedRetryPolicies.getDefaultRetryPolicyWithCustomMaxRetries(s3MultiplePartUploadMaxRetries))
+            .withMaxErrorRetry(s3MultiplePartUploadMaxRetries);
+    this.s3Client =
+        AmazonS3ClientBuilder.standard()
+            .withCredentials(new 
AWSStaticCredentialsProvider(basicAWSCredentials))
+            .withRegion(s3EndpointRegion)
+            .withClientConfiguration(clientConfig)
+            .build();
+    this.key = key;
+  }
+
+  @Override
+  public void startUpload() {
+    InitiateMultipartUploadRequest initRequest =
+        new InitiateMultipartUploadRequest(bucketName, key);
+    InitiateMultipartUploadResult initResponse = 
s3Client.initiateMultipartUpload(initRequest);
+    this.uploadId = initResponse.getUploadId();
+  }
+
+  @Override
+  public void putPart(InputStream inputStream, Integer partNumber, Boolean 
finalFlush) throws IOException {
+    try (InputStream inStream = inputStream) {
+      int partSize = inStream.available();
+      if (partSize == 0) {
+        logger.debug("key {} uploadId {} part size is 0 for part number {} 
finalFlush {}", key, uploadId, partNumber, finalFlush);
+        return;
+      }
+      UploadPartRequest uploadRequest =
+              new UploadPartRequest()
+                      .withBucketName(bucketName)
+                      .withKey(key)
+                      .withUploadId(uploadId)
+                      .withPartNumber(partNumber)
+                      .withInputStream(inStream)
+                      .withPartSize(partSize)
+                      .withLastPart(finalFlush);
+      s3Client.uploadPart(uploadRequest);
+      logger.debug("key {} uploadId {} part number {} uploaded with size {} 
finalFlush {}", key, uploadId, partNumber, partSize, finalFlush);
+    } catch (RuntimeException | IOException e) {
+      logger.error("Failed to upload part", e);
+      throw e;
+    }
+  }
+
+  @Override
+  public void complete() {
+    List<PartETag> partETags = new ArrayList<>();
+    ListPartsRequest listPartsRequest = new ListPartsRequest(bucketName, key, 
uploadId);
+    PartListing partListing;
+    do {
+      partListing = s3Client.listParts(listPartsRequest);
+      for (PartSummary part : partListing.getParts()) {
+        partETags.add(new PartETag(part.getPartNumber(), part.getETag()));
+      }
+      
listPartsRequest.setPartNumberMarker(partListing.getNextPartNumberMarker());
+    } while (partListing.isTruncated());
+    if (partETags.size() == 0){
+      logger.debug("bucket {} key {} uploadId {} has no parts uploaded, 
aborting upload", bucketName, key, uploadId);
+      abort();
+      logger.debug("bucket {} key {} upload completed with size {}", 
bucketName, key, 0);
+      return;
+    }
+    ProgressListener progressListener = progressEvent -> {
+      logger.debug("key {} uploadId {} progress event type {} transferred {} 
bytes", key, uploadId, progressEvent.getEventType(), 
progressEvent.getBytesTransferred());
+    };
+
+    CompleteMultipartUploadRequest compRequest =
+              new CompleteMultipartUploadRequest(
+                      bucketName, key, uploadId, partETags)
+                      .withGeneralProgressListener(progressListener);
+    CompleteMultipartUploadResult compResult = 
s3Client.completeMultipartUpload(compRequest);
+    logger.debug("bucket {} key {} uploadId {} upload completed location is in 
{} ", bucketName, key, uploadId, compResult.getLocation());
+  }
+
+  @Override
+  public void abort() {
+    AbortMultipartUploadRequest abortMultipartUploadRequest =
+        new AbortMultipartUploadRequest(bucketName, key, uploadId);
+    s3Client.abortMultipartUpload(abortMultipartUploadRequest);
+  }
+
+  @Override
+  public void close() {
+    if (s3Client != null) {
+      s3Client.shutdown();
+    }
+  }
+}
diff --git a/pom.xml b/pom.xml
index 1da158f70..a8549b5fc 100644
--- a/pom.xml
+++ b/pom.xml
@@ -38,6 +38,7 @@
     <module>service</module>
     <module>master</module>
     <module>worker</module>
+    <module>web</module>
     <module>cli</module>
   </modules>
 
@@ -71,6 +72,7 @@
 
     <!-- use hadoop-3 as default  -->
     <hadoop.version>3.3.6</hadoop.version>
+    <aws.version>1.12.532</aws.version>
     <!--
     If you change codahale.metrics.version, you also need to change
     the link to metrics.dropwizard.io in docs/monitoring.md.
@@ -1342,23 +1344,13 @@
       </dependencies>
     </profile>
     <profile>
-      <id>hadoop-aws</id>
+      <id>aws</id>
+      <modules>
+        <module>multipart-uploader</module>
+      </modules>
       <properties>
-        <hadoop-aws-deps>true</hadoop-aws-deps>
-        <aws.version>1.12.367</aws.version>
+        <aws-deps>true</aws-deps>
       </properties>
-      <dependencies>
-        <dependency>
-          <groupId>org.apache.hadoop</groupId>
-          <artifactId>hadoop-aws</artifactId>
-          <version>${hadoop.version}</version>
-        </dependency>
-        <dependency>
-          <groupId>com.amazonaws</groupId>
-          <artifactId>aws-java-sdk-bundle</artifactId>
-          <version>${aws.version}</version>
-        </dependency>
-      </dependencies>
     </profile>
     <profile>
       <id>spark-2.4</id>
diff --git a/project/CelebornBuild.scala b/project/CelebornBuild.scala
index 561afc9ac..dc2965344 100644
--- a/project/CelebornBuild.scala
+++ b/project/CelebornBuild.scala
@@ -248,7 +248,7 @@ object CelebornCommonSettings {
       "Build-Revision" -> gitHeadCommit.value.getOrElse("N/A"),
       "Build-Branch" -> gitCurrentBranch.value,
       "Build-Time" -> 
java.time.ZonedDateTime.now().format(java.time.format.DateTimeFormatter.ISO_DATE_TIME)),
-  
+
     // -target cannot be passed as a parameter to javadoc. See 
https://github.com/sbt/sbt/issues/355
     Compile / compile / javacOptions ++= Seq("-target", "1.8"),
 
@@ -370,7 +370,8 @@ object CelebornBuild extends sbt.internal.BuildDef {
       CelebornService.service,
       CelebornWorker.worker,
       CelebornMaster.master,
-      CelebornCli.cli) ++ maybeSparkClientModules ++ maybeFlinkClientModules 
++ maybeMRClientModules ++ maybeWebModules
+      CelebornCli.cli,
+      CeleborMPU.celeborMPU) ++ maybeSparkClientModules ++ 
maybeFlinkClientModules ++ maybeMRClientModules ++ maybeWebModules
   }
 
   // ThisBuild / parallelExecution := false
@@ -494,13 +495,23 @@ object CelebornSpi {
     )
 }
 
+object CeleborMPU {
+
+  lazy val hadoopAwsDependencies = Seq(Dependencies.hadoopAws, 
Dependencies.awsClient)
+
+  lazy val celeborMPU = Project("celeborn-multipart-uploader", 
file("multipart-uploader"))
+    .dependsOn(CelebornService.service % "test->test;compile->compile")
+    .settings (
+      commonSettings,
+      libraryDependencies ++= Seq(
+        Dependencies.log4j12Api,
+        Dependencies.log4jSlf4jImpl,
+      ) ++ hadoopAwsDependencies
+    )
+}
+
 object CelebornCommon {
 
-  lazy val hadoopAwsDependencies = 
if(profiles.exists(_.startsWith("hadoop-aws"))){
-    Seq(Dependencies.hadoopAws, Dependencies.awsClient)
-  } else {
-    Seq.empty
-  }
 
   lazy val common = Project("celeborn-common", file("common"))
     .dependsOn(CelebornSpi.spi)
@@ -538,7 +549,7 @@ object CelebornCommon {
         // SSL support
         Dependencies.bouncycastleBcprovJdk18on,
         Dependencies.bouncycastleBcpkixJdk18on
-      ) ++ commonUnitTestDependencies ++ hadoopAwsDependencies,
+      ) ++ commonUnitTestDependencies,
 
       Compile / sourceGenerators += Def.task {
         val file = (Compile / sourceManaged).value / "org" / "apache" / 
"celeborn" / "package.scala"
@@ -645,13 +656,18 @@ object CelebornMaster {
 }
 
 object CelebornWorker {
-  lazy val worker = Project("celeborn-worker", file("worker"))
+   var worker = Project("celeborn-worker", file("worker"))
     .dependsOn(CelebornService.service)
     .dependsOn(CelebornCommon.common % "test->test;compile->compile")
     .dependsOn(CelebornService.service % "test->test;compile->compile")
     .dependsOn(CelebornClient.client % "test->compile")
     .dependsOn(CelebornMaster.master % "test->compile")
-    .settings (
+
+  if (profiles.exists(_.startsWith("aws"))) {
+    worker = worker.dependsOn(CeleborMPU.celeborMPU)
+  }
+
+  worker = worker.settings(
       commonSettings,
       libraryDependencies ++= Seq(
         Dependencies.apLoader,
diff --git 
a/service/src/main/java/org/apache/celeborn/server/common/service/mpu/MultipartUploadHandler.java
 
b/service/src/main/java/org/apache/celeborn/server/common/service/mpu/MultipartUploadHandler.java
new file mode 100644
index 000000000..0df0e1a2e
--- /dev/null
+++ 
b/service/src/main/java/org/apache/celeborn/server/common/service/mpu/MultipartUploadHandler.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.server.common.service.mpu;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+public interface MultipartUploadHandler {
+
+  void startUpload();
+
+  void putPart(InputStream inputStream, Integer partNumber, Boolean 
finalFlush) throws IOException;
+
+  void complete();
+
+  void abort();
+
+  void close();
+}
diff --git a/worker/pom.xml b/worker/pom.xml
index e770bc8cd..12aed185b 100644
--- a/worker/pom.xml
+++ b/worker/pom.xml
@@ -141,4 +141,17 @@
       </plugin>
     </plugins>
   </build>
+
+  <profiles>
+    <profile>
+      <id>aws</id>
+      <dependencies>
+        <dependency>
+          <groupId>org.apache.celeborn</groupId>
+          
<artifactId>celeborn-multipart-uploader_${scala.binary.version}</artifactId>
+          <version>${project.version}</version>
+        </dependency>
+      </dependencies>
+    </profile>
+  </profiles>
 </project>
diff --git 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java
 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java
index af93b132a..9032bfe91 100644
--- 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java
+++ 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java
@@ -19,6 +19,7 @@ package org.apache.celeborn.service.deploy.worker.storage;
 
 import java.io.File;
 import java.io.IOException;
+import java.net.URI;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
 import java.util.concurrent.TimeUnit;
@@ -32,6 +33,7 @@ import com.google.common.base.Preconditions;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.CompositeByteBuf;
 import io.netty.buffer.PooledByteBufAllocator;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.roaringbitmap.RoaringBitmap;
 import org.slf4j.Logger;
@@ -49,6 +51,8 @@ import org.apache.celeborn.common.protocol.PartitionSplitMode;
 import org.apache.celeborn.common.protocol.StorageInfo;
 import org.apache.celeborn.common.unsafe.Platform;
 import org.apache.celeborn.common.util.FileChannelUtils;
+import org.apache.celeborn.reflect.DynConstructors;
+import org.apache.celeborn.server.common.service.mpu.MultipartUploadHandler;
 import org.apache.celeborn.service.deploy.worker.WorkerSource;
 import 
org.apache.celeborn.service.deploy.worker.congestcontrol.CongestionController;
 import org.apache.celeborn.service.deploy.worker.congestcontrol.UserBufferInfo;
@@ -113,6 +117,12 @@ public abstract class PartitionDataWriter implements 
DeviceObserver {
 
   private UserCongestionControlContext userCongestionControlContext = null;
 
+  protected MultipartUploadHandler s3MultipartUploadHandler;
+
+  protected int partNumber = 1;
+
+  private final int s3MultiplePartUploadMaxRetries;
+
   public PartitionDataWriter(
       StorageManager storageManager,
       AbstractSource workerSource,
@@ -137,6 +147,7 @@ public abstract class PartitionDataWriter implements 
DeviceObserver {
     this.s3FlusherBufferSize = conf.workerS3FlusherBufferSize();
     this.metricsCollectCriticalEnabled = conf.metricsCollectCriticalEnabled();
     this.chunkSize = conf.shuffleChunkSize();
+    this.s3MultiplePartUploadMaxRetries = 
conf.s3MultiplePartUploadMaxRetries();
 
     Tuple4<MemoryFileInfo, Flusher, DiskFileInfo, File> createFileResult =
         storageManager.createFile(writerContext, supportInMemory);
@@ -187,6 +198,38 @@ public abstract class PartitionDataWriter implements 
DeviceObserver {
       // If we reuse DFS output stream, we will exhaust the memory soon.
       try {
         hadoopFs.create(this.diskFileInfo.getDfsPath(), true).close();
+        if (diskFileInfo.isS3()) {
+          Configuration configuration = hadoopFs.getConf();
+          String s3AccessKey = configuration.get("fs.s3a.access.key");
+          String s3SecretKey = configuration.get("fs.s3a.secret.key");
+          String s3EndpointRegion = 
configuration.get("fs.s3a.endpoint.region");
+
+          URI uri = hadoopFs.getUri();
+          String bucketName = uri.getHost();
+          int index = diskFileInfo.getFilePath().indexOf(bucketName);
+          String key = diskFileInfo.getFilePath().substring(index + 
bucketName.length() + 1);
+
+          this.s3MultipartUploadHandler =
+              (MultipartUploadHandler)
+                  DynConstructors.builder()
+                      .impl(
+                          "org.apache.celeborn.S3MultipartUploadHandler",
+                          String.class,
+                          String.class,
+                          String.class,
+                          String.class,
+                          String.class,
+                          Integer.class)
+                      .build()
+                      .newInstance(
+                          bucketName,
+                          s3AccessKey,
+                          s3SecretKey,
+                          s3EndpointRegion,
+                          key,
+                          s3MultiplePartUploadMaxRetries);
+          s3MultipartUploadHandler.startUpload();
+        }
       } catch (IOException e) {
         try {
           // If create file failed, wait 10 ms and retry
@@ -236,7 +279,14 @@ public abstract class PartitionDataWriter implements 
DeviceObserver {
           } else if (diskFileInfo.isHdfs()) {
             task = new HdfsFlushTask(flushBuffer, diskFileInfo.getDfsPath(), 
notifier, false);
           } else if (diskFileInfo.isS3()) {
-            task = new S3FlushTask(flushBuffer, diskFileInfo.getDfsPath(), 
notifier, false);
+            task =
+                new S3FlushTask(
+                    flushBuffer,
+                    notifier,
+                    false,
+                    s3MultipartUploadHandler,
+                    partNumber++,
+                    finalFlush);
           }
           MemoryManager.instance().releaseMemoryFileStorage(numBytes);
           MemoryManager.instance().incrementDiskBuffer(numBytes);
@@ -264,7 +314,14 @@ public abstract class PartitionDataWriter implements 
DeviceObserver {
             } else if (diskFileInfo.isHdfs()) {
               task = new HdfsFlushTask(flushBuffer, diskFileInfo.getDfsPath(), 
notifier, true);
             } else if (diskFileInfo.isS3()) {
-              task = new S3FlushTask(flushBuffer, diskFileInfo.getDfsPath(), 
notifier, true);
+              task =
+                  new S3FlushTask(
+                      flushBuffer,
+                      notifier,
+                      true,
+                      s3MultipartUploadHandler,
+                      partNumber++,
+                      finalFlush);
             }
           }
         }
@@ -303,6 +360,11 @@ public abstract class PartitionDataWriter implements 
DeviceObserver {
     }
 
     if (notifier.hasException()) {
+      if (s3MultipartUploadHandler != null) {
+        logger.warn("Abort s3 multipart upload for {}", 
diskFileInfo.getFilePath());
+        s3MultipartUploadHandler.complete();
+        s3MultipartUploadHandler.close();
+      }
       return;
     }
 
@@ -461,7 +523,10 @@ public abstract class PartitionDataWriter implements 
DeviceObserver {
       }
 
       finalClose.run();
-
+      if (s3MultipartUploadHandler != null) {
+        s3MultipartUploadHandler.complete();
+        s3MultipartUploadHandler.close();
+      }
       // unregister from DeviceMonitor
       if (diskFileInfo != null && !this.diskFileInfo.isDFS()) {
         logger.debug("file info {} unregister from device monitor", 
diskFileInfo);
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/FlushTask.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/FlushTask.scala
index 0c4fa4105..897e28733 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/FlushTask.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/FlushTask.scala
@@ -17,13 +17,14 @@
 
 package org.apache.celeborn.service.deploy.worker.storage
 
+import java.io.ByteArrayInputStream
 import java.nio.channels.FileChannel
 
 import io.netty.buffer.{ByteBufUtil, CompositeByteBuf}
 import org.apache.hadoop.fs.Path
-import org.apache.hadoop.io.IOUtils
 
 import org.apache.celeborn.common.protocol.StorageInfo.Type
+import org.apache.celeborn.server.common.service.mpu.MultipartUploadHandler
 
 abstract private[worker] class FlushTask(
     val buffer: CompositeByteBuf,
@@ -64,29 +65,16 @@ private[worker] class HdfsFlushTask(
 
 private[worker] class S3FlushTask(
     buffer: CompositeByteBuf,
-    val path: Path,
     notifier: FlushNotifier,
-    keepBuffer: Boolean) extends FlushTask(buffer, notifier, keepBuffer) {
+    keepBuffer: Boolean,
+    s3MultipartUploader: MultipartUploadHandler,
+    partNumber: Int,
+    finalFlush: Boolean = false)
+  extends FlushTask(buffer, notifier, keepBuffer) {
+
   override def flush(): Unit = {
-    val hadoopFs = StorageManager.hadoopFs.get(Type.S3)
-    if (hadoopFs.exists(path)) {
-      val conf = hadoopFs.getConf
-      val tempPath = new Path(path.getParent, path.getName + ".tmp")
-      val outputStream = hadoopFs.create(tempPath, true, 256 * 1024)
-      val inputStream = hadoopFs.open(path)
-      try {
-        IOUtils.copyBytes(inputStream, outputStream, conf, false)
-      } finally {
-        inputStream.close()
-      }
-      outputStream.write(ByteBufUtil.getBytes(buffer))
-      outputStream.close()
-      hadoopFs.delete(path, false)
-      hadoopFs.rename(tempPath, path)
-    } else {
-      val s3Stream = hadoopFs.create(path, true, 256 * 1024)
-      s3Stream.write(ByteBufUtil.getBytes(buffer))
-      s3Stream.close()
-    }
+    val bytes = ByteBufUtil.getBytes(buffer)
+    val inputStream = new ByteArrayInputStream(bytes)
+    s3MultipartUploader.putPart(inputStream, partNumber, finalFlush)
   }
 }


Reply via email to