This is an automated email from the ASF dual-hosted git repository.
ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new a2d397231 [CELEBORN-1530] support MPU for S3
a2d397231 is described below
commit a2d39723180468fb9bdd7c36c0344c982292b302
Author: zhaohehuhu <[email protected]>
AuthorDate: Fri Nov 22 15:03:53 2024 +0800
[CELEBORN-1530] support MPU for S3
### What changes were proposed in this pull request?
as title
### Why are the changes needed?
AWS S3 doesn't support append, so Celeborn had to copy the historical data
from s3 to worker and write to s3 again, which heavily scales out the write.
This PR implements a better solution via MPU to avoid copy-and-write.
### Does this PR introduce _any_ user-facing change?
### How was this patch tested?

I conducted an experiment with a 1GB input dataset to compare the
performance of Celeborn using only S3 storage versus using SSD storage. The
results showed that Celeborn with SSD storage was approximately three times
faster than with only S3 storage.
<img width="1728" alt="Screenshot 2024-11-16 at 13 02 10"
src="https://github.com/user-attachments/assets/8f879c47-c01a-4004-9eae-1c266c1f3ef2">
The above screenshot is the second test with 5000 mapper and reducer that I
did.
Closes #2830 from zhaohehuhu/dev-1021.
Lead-authored-by: zhaohehuhu <[email protected]>
Co-authored-by: He Zhao <[email protected]>
Signed-off-by: mingji <[email protected]>
---
build/make-distribution.sh | 4 +-
common/pom.xml | 20 ---
.../org/apache/celeborn/common/CelebornConf.scala | 18 ++-
.../celeborn/common/util/CelebornHadoopUtils.scala | 6 +-
dev/deps/dependencies-server | 3 +
docs/configuration/client.md | 2 +-
docs/configuration/master.md | 2 +-
docs/configuration/worker.md | 5 +-
master/pom.xml | 25 ++--
multipart-uploader/pom.xml | 57 +++++++
.../apache/celeborn/S3MultipartUploadHandler.java | 164 +++++++++++++++++++++
pom.xml | 22 +--
project/CelebornBuild.scala | 36 +++--
.../common/service/mpu/MultipartUploadHandler.java | 34 +++++
worker/pom.xml | 13 ++
.../deploy/worker/storage/PartitionDataWriter.java | 71 ++++++++-
.../service/deploy/worker/storage/FlushTask.scala | 34 ++---
17 files changed, 417 insertions(+), 99 deletions(-)
diff --git a/build/make-distribution.sh b/build/make-distribution.sh
index 55b419d15..1c9bbe6e5 100755
--- a/build/make-distribution.sh
+++ b/build/make-distribution.sh
@@ -264,8 +264,8 @@ function sbt_build_service {
echo "Celeborn $VERSION$GITREVSTRING" > "$DIST_DIR/RELEASE"
echo "Build flags: $@" >> "$DIST_DIR/RELEASE"
- if [[ $@ == *"hadoop-aws"* ]]; then
- SBT_MAVEN_PROFILES="hadoop-aws"
+ if [[ $@ == *"aws"* ]]; then
+ export SBT_MAVEN_PROFILES="aws"
fi
BUILD_COMMAND=("$SBT" clean package)
diff --git a/common/pom.xml b/common/pom.xml
index 3c34be923..0f4da3e7a 100644
--- a/common/pom.xml
+++ b/common/pom.xml
@@ -209,25 +209,5 @@
</dependency>
</dependencies>
</profile>
- <profile>
- <id>hadoop-aws</id>
- <activation>
- <property>
- <name>hadoop-aws-deps</name>
- </property>
- </activation>
- <dependencies>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-aws</artifactId>
- <version>${hadoop.version}</version>
- </dependency>
- <dependency>
- <groupId>com.amazonaws</groupId>
- <artifactId>aws-java-sdk-bundle</artifactId>
- <version>${aws.version}</version>
- </dependency>
- </dependencies>
- </profile>
</profiles>
</project>
diff --git
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index 9bf3ea4d4..3e48d9388 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -1147,7 +1147,9 @@ class CelebornConf(loadDefaults: Boolean) extends
Cloneable with Logging with Se
def s3SecretKey: String = get(S3_SECRET_KEY).getOrElse("")
- def s3Endpoint: String = get(S3_ENDPOINT).getOrElse("")
+ def s3EndpointRegion: String = get(S3_ENDPOINT_REGION).getOrElse("")
+
+ def s3MultiplePartUploadMaxRetries: Int = get(S3_MPU_MAX_RETRIES)
def s3Dir: String = {
get(S3_DIR).map {
@@ -3062,14 +3064,22 @@ object CelebornConf extends Logging {
.stringConf
.createOptional
- val S3_ENDPOINT: OptionalConfigEntry[String] =
- buildConf("celeborn.storage.s3.endpoint")
+ val S3_ENDPOINT_REGION: OptionalConfigEntry[String] =
+ buildConf("celeborn.storage.s3.endpoint.region")
.categories("worker", "master", "client")
.version("0.6.0")
.doc("S3 endpoint for Celeborn to store shuffle data.")
.stringConf
.createOptional
+ val S3_MPU_MAX_RETRIES: ConfigEntry[Int] =
+ buildConf("celeborn.storage.s3.mpu.maxRetries")
+ .categories("worker")
+ .version("0.6.0")
+ .doc("S3 MPU upload max retries.")
+ .intConf
+ .createWithDefault(5)
+
val WORKER_DISK_RESERVE_SIZE: ConfigEntry[Long] =
buildConf("celeborn.worker.storage.disk.reserve.size")
.withAlternative("celeborn.worker.disk.reserve.size")
@@ -3552,7 +3562,7 @@ object CelebornConf extends Logging {
.version("0.6.0")
.doc("Size of buffer used by a S3 flusher.")
.bytesConf(ByteUnit.BYTE)
- .createWithDefaultString("4m")
+ .createWithDefaultString("6m")
val WORKER_WRITER_CLOSE_TIMEOUT: ConfigEntry[Long] =
buildConf("celeborn.worker.writer.close.timeout")
diff --git
a/common/src/main/scala/org/apache/celeborn/common/util/CelebornHadoopUtils.scala
b/common/src/main/scala/org/apache/celeborn/common/util/CelebornHadoopUtils.scala
index 356711d75..b703da07c 100644
---
a/common/src/main/scala/org/apache/celeborn/common/util/CelebornHadoopUtils.scala
+++
b/common/src/main/scala/org/apache/celeborn/common/util/CelebornHadoopUtils.scala
@@ -49,9 +49,9 @@ object CelebornHadoopUtils extends Logging {
}
if (conf.s3Dir.nonEmpty) {
- if (conf.s3AccessKey.isEmpty || conf.s3SecretKey.isEmpty ||
conf.s3Endpoint.isEmpty) {
+ if (conf.s3AccessKey.isEmpty || conf.s3SecretKey.isEmpty ||
conf.s3EndpointRegion.isEmpty) {
throw new CelebornException(
- "S3 storage is enabled but s3AccessKey, s3SecretKey, or s3Endpoint
is not set")
+ "S3 storage is enabled but s3AccessKey, s3SecretKey, or
s3EndpointRegion is not set")
}
hadoopConf.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
hadoopConf.set(
@@ -59,7 +59,7 @@ object CelebornHadoopUtils extends Logging {
"org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider")
hadoopConf.set("fs.s3a.access.key", conf.s3AccessKey)
hadoopConf.set("fs.s3a.secret.key", conf.s3SecretKey)
- hadoopConf.set("fs.s3a.endpoint", conf.s3Endpoint)
+ hadoopConf.set("fs.s3a.endpoint.region", conf.s3EndpointRegion)
}
appendSparkHadoopConfigs(conf, hadoopConf)
hadoopConf
diff --git a/dev/deps/dependencies-server b/dev/deps/dependencies-server
index 74e71214a..0edc78b70 100644
--- a/dev/deps/dependencies-server
+++ b/dev/deps/dependencies-server
@@ -19,6 +19,7 @@ HikariCP/4.0.3//HikariCP-4.0.3.jar
RoaringBitmap/1.0.6//RoaringBitmap-1.0.6.jar
aopalliance-repackaged/2.6.1//aopalliance-repackaged-2.6.1.jar
ap-loader-all/3.0-8//ap-loader-all-3.0-8.jar
+aws-java-sdk-bundle/1.12.367//aws-java-sdk-bundle-1.12.367.jar
classgraph/4.8.138//classgraph-4.8.138.jar
commons-cli/1.5.0//commons-cli-1.5.0.jar
commons-crypto/1.0.0//commons-crypto-1.0.0.jar
@@ -27,6 +28,7 @@ commons-lang3/3.17.0//commons-lang3-3.17.0.jar
commons-logging/1.1.3//commons-logging-1.1.3.jar
failureaccess/1.0.2//failureaccess-1.0.2.jar
guava/33.1.0-jre//guava-33.1.0-jre.jar
+hadoop-aws/3.3.6//hadoop-aws-3.3.6.jar
hadoop-client-api/3.3.6//hadoop-client-api-3.3.6.jar
hadoop-client-runtime/3.3.6//hadoop-client-runtime-3.3.6.jar
hk2-api/2.6.1//hk2-api-2.6.1.jar
@@ -145,4 +147,5 @@ swagger-integration/2.2.1//swagger-integration-2.2.1.jar
swagger-jaxrs2/2.2.1//swagger-jaxrs2-2.2.1.jar
swagger-models/2.2.1//swagger-models-2.2.1.jar
swagger-ui/4.9.1//swagger-ui-4.9.1.jar
+wildfly-openssl/1.1.3.Final//wildfly-openssl-1.1.3.Final.jar
zstd-jni/1.5.2-1//zstd-jni-1.5.2-1.jar
diff --git a/docs/configuration/client.md b/docs/configuration/client.md
index bec9c56ae..2bb7a08f3 100644
--- a/docs/configuration/client.md
+++ b/docs/configuration/client.md
@@ -132,6 +132,6 @@ license: |
| celeborn.storage.hdfs.dir | <undefined> | false | HDFS base directory
for Celeborn to store shuffle data. | 0.2.0 | |
| celeborn.storage.s3.access.key | <undefined> | false | S3 access key
for Celeborn to store shuffle data. | 0.6.0 | |
| celeborn.storage.s3.dir | <undefined> | false | S3 base directory for
Celeborn to store shuffle data. | 0.6.0 | |
-| celeborn.storage.s3.endpoint | <undefined> | false | S3 endpoint for
Celeborn to store shuffle data. | 0.6.0 | |
+| celeborn.storage.s3.endpoint.region | <undefined> | false | S3
endpoint for Celeborn to store shuffle data. | 0.6.0 | |
| celeborn.storage.s3.secret.key | <undefined> | false | S3 secret key
for Celeborn to store shuffle data. | 0.6.0 | |
<!--end-include-->
diff --git a/docs/configuration/master.md b/docs/configuration/master.md
index d3300369f..4359527c0 100644
--- a/docs/configuration/master.md
+++ b/docs/configuration/master.md
@@ -89,6 +89,6 @@ license: |
| celeborn.storage.hdfs.kerberos.principal | <undefined> | false |
Kerberos principal for HDFS storage connection. | 0.3.2 | |
| celeborn.storage.s3.access.key | <undefined> | false | S3 access key
for Celeborn to store shuffle data. | 0.6.0 | |
| celeborn.storage.s3.dir | <undefined> | false | S3 base directory for
Celeborn to store shuffle data. | 0.6.0 | |
-| celeborn.storage.s3.endpoint | <undefined> | false | S3 endpoint for
Celeborn to store shuffle data. | 0.6.0 | |
+| celeborn.storage.s3.endpoint.region | <undefined> | false | S3
endpoint for Celeborn to store shuffle data. | 0.6.0 | |
| celeborn.storage.s3.secret.key | <undefined> | false | S3 secret key
for Celeborn to store shuffle data. | 0.6.0 | |
<!--end-include-->
diff --git a/docs/configuration/worker.md b/docs/configuration/worker.md
index 36e2cca97..97e262718 100644
--- a/docs/configuration/worker.md
+++ b/docs/configuration/worker.md
@@ -48,7 +48,8 @@ license: |
| celeborn.storage.hdfs.kerberos.principal | <undefined> | false |
Kerberos principal for HDFS storage connection. | 0.3.2 | |
| celeborn.storage.s3.access.key | <undefined> | false | S3 access key
for Celeborn to store shuffle data. | 0.6.0 | |
| celeborn.storage.s3.dir | <undefined> | false | S3 base directory for
Celeborn to store shuffle data. | 0.6.0 | |
-| celeborn.storage.s3.endpoint | <undefined> | false | S3 endpoint for
Celeborn to store shuffle data. | 0.6.0 | |
+| celeborn.storage.s3.endpoint.region | <undefined> | false | S3
endpoint for Celeborn to store shuffle data. | 0.6.0 | |
+| celeborn.storage.s3.mpu.maxRetries | 5 | false | S3 MPU upload max retries.
| 0.6.0 | |
| celeborn.storage.s3.secret.key | <undefined> | false | S3 secret key
for Celeborn to store shuffle data. | 0.6.0 | |
| celeborn.worker.activeConnection.max | <undefined> | false | If the
number of active connections on a worker exceeds this configuration value, the
worker will be marked as high-load in the heartbeat report, and the master will
not include that node in the response of RequestSlots. | 0.3.1 | |
| celeborn.worker.applicationRegistry.cache.size | 10000 | false | Cache size
of the application registry on Workers. | 0.5.0 | |
@@ -84,7 +85,7 @@ license: |
| celeborn.worker.flusher.hdd.threads | 1 | false | Flusher's thread count per
disk used for write data to HDD disks. | 0.2.0 | |
| celeborn.worker.flusher.hdfs.buffer.size | 4m | false | Size of buffer used
by a HDFS flusher. | 0.3.0 | |
| celeborn.worker.flusher.hdfs.threads | 8 | false | Flusher's thread count
used for write data to HDFS. | 0.2.0 | |
-| celeborn.worker.flusher.s3.buffer.size | 4m | false | Size of buffer used by
a S3 flusher. | 0.6.0 | |
+| celeborn.worker.flusher.s3.buffer.size | 6m | false | Size of buffer used by
a S3 flusher. | 0.6.0 | |
| celeborn.worker.flusher.s3.threads | 8 | false | Flusher's thread count used
for write data to S3. | 0.6.0 | |
| celeborn.worker.flusher.shutdownTimeout | 3s | false | Timeout for a flusher
to shutdown. | 0.2.0 | |
| celeborn.worker.flusher.ssd.threads | 16 | false | Flusher's thread count
per disk used for write data to SSD disks. | 0.2.0 | |
diff --git a/master/pom.xml b/master/pom.xml
index 51286b9ef..88f2f9ece 100644
--- a/master/pom.xml
+++ b/master/pom.xml
@@ -164,38 +164,33 @@
</dependencies>
</profile>
<profile>
- <id>hadoop-aws</id>
+ <id>hadoop-2</id>
<activation>
<property>
- <name>hadoop-aws-deps</name>
+ <name>hadoop-2-deps</name>
</property>
</activation>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-aws</artifactId>
+ <artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
- <dependency>
- <groupId>com.amazonaws</groupId>
- <artifactId>aws-java-sdk-bundle</artifactId>
- <version>${aws.version}</version>
- </dependency>
</dependencies>
</profile>
<profile>
- <id>hadoop-2</id>
- <activation>
- <property>
- <name>hadoop-2-deps</name>
- </property>
- </activation>
+ <id>aws</id>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-client</artifactId>
+ <artifactId>hadoop-aws</artifactId>
<version>${hadoop.version}</version>
</dependency>
+ <dependency>
+ <groupId>com.amazonaws</groupId>
+ <artifactId>aws-java-sdk-bundle</artifactId>
+ <version>${aws.version}</version>
+ </dependency>
</dependencies>
</profile>
</profiles>
diff --git a/multipart-uploader/pom.xml b/multipart-uploader/pom.xml
new file mode 100644
index 000000000..cfdbbb4ee
--- /dev/null
+++ b/multipart-uploader/pom.xml
@@ -0,0 +1,57 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.celeborn</groupId>
+ <artifactId>celeborn-parent_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+
+ <artifactId>celeborn-multipart-uploader_${scala.binary.version}</artifactId>
+ <packaging>jar</packaging>
+ <name>Celeborn Multipart Uploader</name>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.celeborn</groupId>
+ <artifactId>celeborn-service_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-aws</artifactId>
+ <version>${hadoop.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.amazonaws</groupId>
+ <artifactId>aws-java-sdk-bundle</artifactId>
+ <version>${aws.version}</version>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-dependency-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git
a/multipart-uploader/src/main/java/org/apache/celeborn/S3MultipartUploadHandler.java
b/multipart-uploader/src/main/java/org/apache/celeborn/S3MultipartUploadHandler.java
new file mode 100644
index 000000000..f1699c884
--- /dev/null
+++
b/multipart-uploader/src/main/java/org/apache/celeborn/S3MultipartUploadHandler.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.auth.AWSStaticCredentialsProvider;
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.event.ProgressListener;
+import com.amazonaws.retry.PredefinedRetryPolicies;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.AmazonS3ClientBuilder;
+import com.amazonaws.services.s3.model.AbortMultipartUploadRequest;
+import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
+import com.amazonaws.services.s3.model.CompleteMultipartUploadResult;
+import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
+import com.amazonaws.services.s3.model.InitiateMultipartUploadResult;
+import com.amazonaws.services.s3.model.ListPartsRequest;
+import com.amazonaws.services.s3.model.PartETag;
+import com.amazonaws.services.s3.model.PartListing;
+import com.amazonaws.services.s3.model.PartSummary;
+import com.amazonaws.services.s3.model.UploadPartRequest;
+
+import org.apache.celeborn.server.common.service.mpu.MultipartUploadHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class S3MultipartUploadHandler implements MultipartUploadHandler {
+
+ private static final Logger logger =
LoggerFactory.getLogger(S3MultipartUploadHandler.class);
+
+ private String uploadId;
+
+ private AmazonS3 s3Client;
+
+ private String key;
+
+ private String bucketName;
+
+ private String s3AccessKey;
+
+ private String s3SecretKey;
+
+ private String s3EndpointRegion;
+
+ private Integer s3MultiplePartUploadMaxRetries;
+
+ public S3MultipartUploadHandler(String bucketName, String s3AccessKey,
String s3SecretKey, String s3EndpointRegion, String key, Integer
s3MultiplePartUploadMaxRetries) {
+ this.bucketName = bucketName;
+ this.s3AccessKey = s3AccessKey;
+ this.s3SecretKey = s3SecretKey;
+ this.s3EndpointRegion = s3EndpointRegion;
+ this.s3MultiplePartUploadMaxRetries = s3MultiplePartUploadMaxRetries;
+ BasicAWSCredentials basicAWSCredentials =
+ new BasicAWSCredentials(s3AccessKey, s3SecretKey);
+ ClientConfiguration clientConfig = new ClientConfiguration()
+
.withRetryPolicy(PredefinedRetryPolicies.getDefaultRetryPolicyWithCustomMaxRetries(s3MultiplePartUploadMaxRetries))
+ .withMaxErrorRetry(s3MultiplePartUploadMaxRetries);
+ this.s3Client =
+ AmazonS3ClientBuilder.standard()
+ .withCredentials(new
AWSStaticCredentialsProvider(basicAWSCredentials))
+ .withRegion(s3EndpointRegion)
+ .withClientConfiguration(clientConfig)
+ .build();
+ this.key = key;
+ }
+
+ @Override
+ public void startUpload() {
+ InitiateMultipartUploadRequest initRequest =
+ new InitiateMultipartUploadRequest(bucketName, key);
+ InitiateMultipartUploadResult initResponse =
s3Client.initiateMultipartUpload(initRequest);
+ this.uploadId = initResponse.getUploadId();
+ }
+
+ @Override
+ public void putPart(InputStream inputStream, Integer partNumber, Boolean
finalFlush) throws IOException {
+ try (InputStream inStream = inputStream) {
+ int partSize = inStream.available();
+ if (partSize == 0) {
+ logger.debug("key {} uploadId {} part size is 0 for part number {}
finalFlush {}", key, uploadId, partNumber, finalFlush);
+ return;
+ }
+ UploadPartRequest uploadRequest =
+ new UploadPartRequest()
+ .withBucketName(bucketName)
+ .withKey(key)
+ .withUploadId(uploadId)
+ .withPartNumber(partNumber)
+ .withInputStream(inStream)
+ .withPartSize(partSize)
+ .withLastPart(finalFlush);
+ s3Client.uploadPart(uploadRequest);
+ logger.debug("key {} uploadId {} part number {} uploaded with size {}
finalFlush {}", key, uploadId, partNumber, partSize, finalFlush);
+ } catch (RuntimeException | IOException e) {
+ logger.error("Failed to upload part", e);
+ throw e;
+ }
+ }
+
+ @Override
+ public void complete() {
+ List<PartETag> partETags = new ArrayList<>();
+ ListPartsRequest listPartsRequest = new ListPartsRequest(bucketName, key,
uploadId);
+ PartListing partListing;
+ do {
+ partListing = s3Client.listParts(listPartsRequest);
+ for (PartSummary part : partListing.getParts()) {
+ partETags.add(new PartETag(part.getPartNumber(), part.getETag()));
+ }
+
listPartsRequest.setPartNumberMarker(partListing.getNextPartNumberMarker());
+ } while (partListing.isTruncated());
+ if (partETags.size() == 0){
+ logger.debug("bucket {} key {} uploadId {} has no parts uploaded,
aborting upload", bucketName, key, uploadId);
+ abort();
+ logger.debug("bucket {} key {} upload completed with size {}",
bucketName, key, 0);
+ return;
+ }
+ ProgressListener progressListener = progressEvent -> {
+ logger.debug("key {} uploadId {} progress event type {} transferred {}
bytes", key, uploadId, progressEvent.getEventType(),
progressEvent.getBytesTransferred());
+ };
+
+ CompleteMultipartUploadRequest compRequest =
+ new CompleteMultipartUploadRequest(
+ bucketName, key, uploadId, partETags)
+ .withGeneralProgressListener(progressListener);
+ CompleteMultipartUploadResult compResult =
s3Client.completeMultipartUpload(compRequest);
+ logger.debug("bucket {} key {} uploadId {} upload completed location is in
{} ", bucketName, key, uploadId, compResult.getLocation());
+ }
+
+ @Override
+ public void abort() {
+ AbortMultipartUploadRequest abortMultipartUploadRequest =
+ new AbortMultipartUploadRequest(bucketName, key, uploadId);
+ s3Client.abortMultipartUpload(abortMultipartUploadRequest);
+ }
+
+ @Override
+ public void close() {
+ if (s3Client != null) {
+ s3Client.shutdown();
+ }
+ }
+}
diff --git a/pom.xml b/pom.xml
index 1da158f70..a8549b5fc 100644
--- a/pom.xml
+++ b/pom.xml
@@ -38,6 +38,7 @@
<module>service</module>
<module>master</module>
<module>worker</module>
+ <module>web</module>
<module>cli</module>
</modules>
@@ -71,6 +72,7 @@
<!-- use hadoop-3 as default -->
<hadoop.version>3.3.6</hadoop.version>
+ <aws.version>1.12.532</aws.version>
<!--
If you change codahale.metrics.version, you also need to change
the link to metrics.dropwizard.io in docs/monitoring.md.
@@ -1342,23 +1344,13 @@
</dependencies>
</profile>
<profile>
- <id>hadoop-aws</id>
+ <id>aws</id>
+ <modules>
+ <module>multipart-uploader</module>
+ </modules>
<properties>
- <hadoop-aws-deps>true</hadoop-aws-deps>
- <aws.version>1.12.367</aws.version>
+ <aws-deps>true</aws-deps>
</properties>
- <dependencies>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-aws</artifactId>
- <version>${hadoop.version}</version>
- </dependency>
- <dependency>
- <groupId>com.amazonaws</groupId>
- <artifactId>aws-java-sdk-bundle</artifactId>
- <version>${aws.version}</version>
- </dependency>
- </dependencies>
</profile>
<profile>
<id>spark-2.4</id>
diff --git a/project/CelebornBuild.scala b/project/CelebornBuild.scala
index 561afc9ac..dc2965344 100644
--- a/project/CelebornBuild.scala
+++ b/project/CelebornBuild.scala
@@ -248,7 +248,7 @@ object CelebornCommonSettings {
"Build-Revision" -> gitHeadCommit.value.getOrElse("N/A"),
"Build-Branch" -> gitCurrentBranch.value,
"Build-Time" ->
java.time.ZonedDateTime.now().format(java.time.format.DateTimeFormatter.ISO_DATE_TIME)),
-
+
// -target cannot be passed as a parameter to javadoc. See
https://github.com/sbt/sbt/issues/355
Compile / compile / javacOptions ++= Seq("-target", "1.8"),
@@ -370,7 +370,8 @@ object CelebornBuild extends sbt.internal.BuildDef {
CelebornService.service,
CelebornWorker.worker,
CelebornMaster.master,
- CelebornCli.cli) ++ maybeSparkClientModules ++ maybeFlinkClientModules
++ maybeMRClientModules ++ maybeWebModules
+ CelebornCli.cli,
+ CeleborMPU.celeborMPU) ++ maybeSparkClientModules ++
maybeFlinkClientModules ++ maybeMRClientModules ++ maybeWebModules
}
// ThisBuild / parallelExecution := false
@@ -494,13 +495,23 @@ object CelebornSpi {
)
}
+object CeleborMPU {
+
+ lazy val hadoopAwsDependencies = Seq(Dependencies.hadoopAws,
Dependencies.awsClient)
+
+ lazy val celeborMPU = Project("celeborn-multipart-uploader",
file("multipart-uploader"))
+ .dependsOn(CelebornService.service % "test->test;compile->compile")
+ .settings (
+ commonSettings,
+ libraryDependencies ++= Seq(
+ Dependencies.log4j12Api,
+ Dependencies.log4jSlf4jImpl,
+ ) ++ hadoopAwsDependencies
+ )
+}
+
object CelebornCommon {
- lazy val hadoopAwsDependencies =
if(profiles.exists(_.startsWith("hadoop-aws"))){
- Seq(Dependencies.hadoopAws, Dependencies.awsClient)
- } else {
- Seq.empty
- }
lazy val common = Project("celeborn-common", file("common"))
.dependsOn(CelebornSpi.spi)
@@ -538,7 +549,7 @@ object CelebornCommon {
// SSL support
Dependencies.bouncycastleBcprovJdk18on,
Dependencies.bouncycastleBcpkixJdk18on
- ) ++ commonUnitTestDependencies ++ hadoopAwsDependencies,
+ ) ++ commonUnitTestDependencies,
Compile / sourceGenerators += Def.task {
val file = (Compile / sourceManaged).value / "org" / "apache" /
"celeborn" / "package.scala"
@@ -645,13 +656,18 @@ object CelebornMaster {
}
object CelebornWorker {
- lazy val worker = Project("celeborn-worker", file("worker"))
+ var worker = Project("celeborn-worker", file("worker"))
.dependsOn(CelebornService.service)
.dependsOn(CelebornCommon.common % "test->test;compile->compile")
.dependsOn(CelebornService.service % "test->test;compile->compile")
.dependsOn(CelebornClient.client % "test->compile")
.dependsOn(CelebornMaster.master % "test->compile")
- .settings (
+
+ if (profiles.exists(_.startsWith("aws"))) {
+ worker = worker.dependsOn(CeleborMPU.celeborMPU)
+ }
+
+ worker = worker.settings(
commonSettings,
libraryDependencies ++= Seq(
Dependencies.apLoader,
diff --git
a/service/src/main/java/org/apache/celeborn/server/common/service/mpu/MultipartUploadHandler.java
b/service/src/main/java/org/apache/celeborn/server/common/service/mpu/MultipartUploadHandler.java
new file mode 100644
index 000000000..0df0e1a2e
--- /dev/null
+++
b/service/src/main/java/org/apache/celeborn/server/common/service/mpu/MultipartUploadHandler.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.server.common.service.mpu;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+public interface MultipartUploadHandler {
+
+ void startUpload();
+
+ void putPart(InputStream inputStream, Integer partNumber, Boolean
finalFlush) throws IOException;
+
+ void complete();
+
+ void abort();
+
+ void close();
+}
diff --git a/worker/pom.xml b/worker/pom.xml
index e770bc8cd..12aed185b 100644
--- a/worker/pom.xml
+++ b/worker/pom.xml
@@ -141,4 +141,17 @@
</plugin>
</plugins>
</build>
+
+ <profiles>
+ <profile>
+ <id>aws</id>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.celeborn</groupId>
+
<artifactId>celeborn-multipart-uploader_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ </dependencies>
+ </profile>
+ </profiles>
</project>
diff --git
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java
index af93b132a..9032bfe91 100644
---
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java
+++
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java
@@ -19,6 +19,7 @@ package org.apache.celeborn.service.deploy.worker.storage;
import java.io.File;
import java.io.IOException;
+import java.net.URI;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.concurrent.TimeUnit;
@@ -32,6 +33,7 @@ import com.google.common.base.Preconditions;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.roaringbitmap.RoaringBitmap;
import org.slf4j.Logger;
@@ -49,6 +51,8 @@ import org.apache.celeborn.common.protocol.PartitionSplitMode;
import org.apache.celeborn.common.protocol.StorageInfo;
import org.apache.celeborn.common.unsafe.Platform;
import org.apache.celeborn.common.util.FileChannelUtils;
+import org.apache.celeborn.reflect.DynConstructors;
+import org.apache.celeborn.server.common.service.mpu.MultipartUploadHandler;
import org.apache.celeborn.service.deploy.worker.WorkerSource;
import
org.apache.celeborn.service.deploy.worker.congestcontrol.CongestionController;
import org.apache.celeborn.service.deploy.worker.congestcontrol.UserBufferInfo;
@@ -113,6 +117,12 @@ public abstract class PartitionDataWriter implements
DeviceObserver {
private UserCongestionControlContext userCongestionControlContext = null;
+ protected MultipartUploadHandler s3MultipartUploadHandler;
+
+ protected int partNumber = 1;
+
+ private final int s3MultiplePartUploadMaxRetries;
+
public PartitionDataWriter(
StorageManager storageManager,
AbstractSource workerSource,
@@ -137,6 +147,7 @@ public abstract class PartitionDataWriter implements
DeviceObserver {
this.s3FlusherBufferSize = conf.workerS3FlusherBufferSize();
this.metricsCollectCriticalEnabled = conf.metricsCollectCriticalEnabled();
this.chunkSize = conf.shuffleChunkSize();
+ this.s3MultiplePartUploadMaxRetries =
conf.s3MultiplePartUploadMaxRetries();
Tuple4<MemoryFileInfo, Flusher, DiskFileInfo, File> createFileResult =
storageManager.createFile(writerContext, supportInMemory);
@@ -187,6 +198,38 @@ public abstract class PartitionDataWriter implements
DeviceObserver {
// If we reuse DFS output stream, we will exhaust the memory soon.
try {
hadoopFs.create(this.diskFileInfo.getDfsPath(), true).close();
+ if (diskFileInfo.isS3()) {
+ Configuration configuration = hadoopFs.getConf();
+ String s3AccessKey = configuration.get("fs.s3a.access.key");
+ String s3SecretKey = configuration.get("fs.s3a.secret.key");
+ String s3EndpointRegion =
configuration.get("fs.s3a.endpoint.region");
+
+ URI uri = hadoopFs.getUri();
+ String bucketName = uri.getHost();
+ int index = diskFileInfo.getFilePath().indexOf(bucketName);
+ String key = diskFileInfo.getFilePath().substring(index +
bucketName.length() + 1);
+
+ this.s3MultipartUploadHandler =
+ (MultipartUploadHandler)
+ DynConstructors.builder()
+ .impl(
+ "org.apache.celeborn.S3MultipartUploadHandler",
+ String.class,
+ String.class,
+ String.class,
+ String.class,
+ String.class,
+ Integer.class)
+ .build()
+ .newInstance(
+ bucketName,
+ s3AccessKey,
+ s3SecretKey,
+ s3EndpointRegion,
+ key,
+ s3MultiplePartUploadMaxRetries);
+ s3MultipartUploadHandler.startUpload();
+ }
} catch (IOException e) {
try {
// If create file failed, wait 10 ms and retry
@@ -236,7 +279,14 @@ public abstract class PartitionDataWriter implements
DeviceObserver {
} else if (diskFileInfo.isHdfs()) {
task = new HdfsFlushTask(flushBuffer, diskFileInfo.getDfsPath(),
notifier, false);
} else if (diskFileInfo.isS3()) {
- task = new S3FlushTask(flushBuffer, diskFileInfo.getDfsPath(),
notifier, false);
+ task =
+ new S3FlushTask(
+ flushBuffer,
+ notifier,
+ false,
+ s3MultipartUploadHandler,
+ partNumber++,
+ finalFlush);
}
MemoryManager.instance().releaseMemoryFileStorage(numBytes);
MemoryManager.instance().incrementDiskBuffer(numBytes);
@@ -264,7 +314,14 @@ public abstract class PartitionDataWriter implements
DeviceObserver {
} else if (diskFileInfo.isHdfs()) {
task = new HdfsFlushTask(flushBuffer, diskFileInfo.getDfsPath(),
notifier, true);
} else if (diskFileInfo.isS3()) {
- task = new S3FlushTask(flushBuffer, diskFileInfo.getDfsPath(),
notifier, true);
+ task =
+ new S3FlushTask(
+ flushBuffer,
+ notifier,
+ true,
+ s3MultipartUploadHandler,
+ partNumber++,
+ finalFlush);
}
}
}
@@ -303,6 +360,11 @@ public abstract class PartitionDataWriter implements
DeviceObserver {
}
if (notifier.hasException()) {
+ if (s3MultipartUploadHandler != null) {
+ logger.warn("Abort s3 multipart upload for {}",
diskFileInfo.getFilePath());
+ s3MultipartUploadHandler.complete();
+ s3MultipartUploadHandler.close();
+ }
return;
}
@@ -461,7 +523,10 @@ public abstract class PartitionDataWriter implements
DeviceObserver {
}
finalClose.run();
-
+ if (s3MultipartUploadHandler != null) {
+ s3MultipartUploadHandler.complete();
+ s3MultipartUploadHandler.close();
+ }
// unregister from DeviceMonitor
if (diskFileInfo != null && !this.diskFileInfo.isDFS()) {
logger.debug("file info {} unregister from device monitor",
diskFileInfo);
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/FlushTask.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/FlushTask.scala
index 0c4fa4105..897e28733 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/FlushTask.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/FlushTask.scala
@@ -17,13 +17,14 @@
package org.apache.celeborn.service.deploy.worker.storage
+import java.io.ByteArrayInputStream
import java.nio.channels.FileChannel
import io.netty.buffer.{ByteBufUtil, CompositeByteBuf}
import org.apache.hadoop.fs.Path
-import org.apache.hadoop.io.IOUtils
import org.apache.celeborn.common.protocol.StorageInfo.Type
+import org.apache.celeborn.server.common.service.mpu.MultipartUploadHandler
abstract private[worker] class FlushTask(
val buffer: CompositeByteBuf,
@@ -64,29 +65,16 @@ private[worker] class HdfsFlushTask(
private[worker] class S3FlushTask(
buffer: CompositeByteBuf,
- val path: Path,
notifier: FlushNotifier,
- keepBuffer: Boolean) extends FlushTask(buffer, notifier, keepBuffer) {
+ keepBuffer: Boolean,
+ s3MultipartUploader: MultipartUploadHandler,
+ partNumber: Int,
+ finalFlush: Boolean = false)
+ extends FlushTask(buffer, notifier, keepBuffer) {
+
override def flush(): Unit = {
- val hadoopFs = StorageManager.hadoopFs.get(Type.S3)
- if (hadoopFs.exists(path)) {
- val conf = hadoopFs.getConf
- val tempPath = new Path(path.getParent, path.getName + ".tmp")
- val outputStream = hadoopFs.create(tempPath, true, 256 * 1024)
- val inputStream = hadoopFs.open(path)
- try {
- IOUtils.copyBytes(inputStream, outputStream, conf, false)
- } finally {
- inputStream.close()
- }
- outputStream.write(ByteBufUtil.getBytes(buffer))
- outputStream.close()
- hadoopFs.delete(path, false)
- hadoopFs.rename(tempPath, path)
- } else {
- val s3Stream = hadoopFs.create(path, true, 256 * 1024)
- s3Stream.write(ByteBufUtil.getBytes(buffer))
- s3Stream.close()
- }
+ val bytes = ByteBufUtil.getBytes(buffer)
+ val inputStream = new ByteArrayInputStream(bytes)
+ s3MultipartUploader.putPart(inputStream, partNumber, finalFlush)
}
}