This is an automated email from the ASF dual-hosted git repository.
ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 3bf91929b [CELEBORN-1746] Reduce the size of aws dependencies
3bf91929b is described below
commit 3bf91929b6bd02974b5d15b4d6804c9b2b01cfc0
Author: zhaohehuhu <[email protected]>
AuthorDate: Thu Nov 28 19:45:01 2024 +0800
[CELEBORN-1746] Reduce the size of aws dependencies
### What changes were proposed in this pull request?
Due to the large size of the AWS cloud vendor's client JARs, this PR aims
to keep AWS s3 module only to reduce the AWS dependency size from over 296MB to
around 2.3MB
### Why are the changes needed?
### Does this PR introduce _any_ user-facing change?
### How was this patch tested?
<img width="2560" alt="Screenshot 2024-11-25 at 16 17 52"
src="https://github.com/user-attachments/assets/efebbe7d-73cb-47fb-b7fa-9aae052f744b">
tested on lab shown as above picture
Closes #2944 from zhaohehuhu/dev-1125.
Authored-by: zhaohehuhu <[email protected]>
Signed-off-by: mingji <[email protected]>
---
dev/deps/dependencies-server | 3 -
dev/reformat | 1 +
master/pom.xml | 8 +-
multipart-uploader/pom.xml | 18 +++--
.../apache/celeborn/S3MultipartUploadHandler.java | 87 +++++++++++++++-------
project/CelebornBuild.scala | 21 ++++--
6 files changed, 95 insertions(+), 43 deletions(-)
diff --git a/dev/deps/dependencies-server b/dev/deps/dependencies-server
index 0edc78b70..74e71214a 100644
--- a/dev/deps/dependencies-server
+++ b/dev/deps/dependencies-server
@@ -19,7 +19,6 @@ HikariCP/4.0.3//HikariCP-4.0.3.jar
RoaringBitmap/1.0.6//RoaringBitmap-1.0.6.jar
aopalliance-repackaged/2.6.1//aopalliance-repackaged-2.6.1.jar
ap-loader-all/3.0-8//ap-loader-all-3.0-8.jar
-aws-java-sdk-bundle/1.12.367//aws-java-sdk-bundle-1.12.367.jar
classgraph/4.8.138//classgraph-4.8.138.jar
commons-cli/1.5.0//commons-cli-1.5.0.jar
commons-crypto/1.0.0//commons-crypto-1.0.0.jar
@@ -28,7 +27,6 @@ commons-lang3/3.17.0//commons-lang3-3.17.0.jar
commons-logging/1.1.3//commons-logging-1.1.3.jar
failureaccess/1.0.2//failureaccess-1.0.2.jar
guava/33.1.0-jre//guava-33.1.0-jre.jar
-hadoop-aws/3.3.6//hadoop-aws-3.3.6.jar
hadoop-client-api/3.3.6//hadoop-client-api-3.3.6.jar
hadoop-client-runtime/3.3.6//hadoop-client-runtime-3.3.6.jar
hk2-api/2.6.1//hk2-api-2.6.1.jar
@@ -147,5 +145,4 @@ swagger-integration/2.2.1//swagger-integration-2.2.1.jar
swagger-jaxrs2/2.2.1//swagger-jaxrs2-2.2.1.jar
swagger-models/2.2.1//swagger-models-2.2.1.jar
swagger-ui/4.9.1//swagger-ui-4.9.1.jar
-wildfly-openssl/1.1.3.Final//wildfly-openssl-1.1.3.Final.jar
zstd-jni/1.5.2-1//zstd-jni-1.5.2-1.jar
diff --git a/dev/reformat b/dev/reformat
index 0784c3c45..17f834ee2 100755
--- a/dev/reformat
+++ b/dev/reformat
@@ -32,6 +32,7 @@ else
${PROJECT_DIR}/build/mvn spotless:apply -Pflink-1.20
${PROJECT_DIR}/build/mvn spotless:apply -Pspark-2.4
${PROJECT_DIR}/build/mvn spotless:apply -Pspark-3.3
+ ${PROJECT_DIR}/build/mvn spotless:apply -Paws
${PROJECT_DIR}/build/mvn spotless:apply -Pmr
${PROJECT_DIR}/build/mvn spotless:apply -Ptez
fi
diff --git a/master/pom.xml b/master/pom.xml
index 88f2f9ece..6ac7742e8 100644
--- a/master/pom.xml
+++ b/master/pom.xml
@@ -185,10 +185,16 @@
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-aws</artifactId>
<version>${hadoop.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>com.amazonaws</groupId>
+ <artifactId>aws-java-sdk-bundle</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
- <artifactId>aws-java-sdk-bundle</artifactId>
+ <artifactId>aws-java-sdk-s3</artifactId>
<version>${aws.version}</version>
</dependency>
</dependencies>
diff --git a/multipart-uploader/pom.xml b/multipart-uploader/pom.xml
index cfdbbb4ee..15ffeb8dc 100644
--- a/multipart-uploader/pom.xml
+++ b/multipart-uploader/pom.xml
@@ -35,14 +35,20 @@
<version>${project.version}</version>
</dependency>
<dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-aws</artifactId>
- <version>${hadoop.version}</version>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-aws</artifactId>
+ <version>${hadoop.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>com.amazonaws</groupId>
+ <artifactId>aws-java-sdk-bundle</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
- <groupId>com.amazonaws</groupId>
- <artifactId>aws-java-sdk-bundle</artifactId>
- <version>${aws.version}</version>
+ <groupId>com.amazonaws</groupId>
+ <artifactId>aws-java-sdk-s3</artifactId>
+ <version>${aws.version}</version>
</dependency>
</dependencies>
diff --git
a/multipart-uploader/src/main/java/org/apache/celeborn/S3MultipartUploadHandler.java
b/multipart-uploader/src/main/java/org/apache/celeborn/S3MultipartUploadHandler.java
index f1699c884..993199fb4 100644
---
a/multipart-uploader/src/main/java/org/apache/celeborn/S3MultipartUploadHandler.java
+++
b/multipart-uploader/src/main/java/org/apache/celeborn/S3MultipartUploadHandler.java
@@ -39,11 +39,10 @@ import com.amazonaws.services.s3.model.PartETag;
import com.amazonaws.services.s3.model.PartListing;
import com.amazonaws.services.s3.model.PartSummary;
import com.amazonaws.services.s3.model.UploadPartRequest;
-
-import org.apache.celeborn.server.common.service.mpu.MultipartUploadHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.celeborn.server.common.service.mpu.MultipartUploadHandler;
public class S3MultipartUploadHandler implements MultipartUploadHandler {
@@ -65,16 +64,24 @@ public class S3MultipartUploadHandler implements
MultipartUploadHandler {
private Integer s3MultiplePartUploadMaxRetries;
- public S3MultipartUploadHandler(String bucketName, String s3AccessKey,
String s3SecretKey, String s3EndpointRegion, String key, Integer
s3MultiplePartUploadMaxRetries) {
+ public S3MultipartUploadHandler(
+ String bucketName,
+ String s3AccessKey,
+ String s3SecretKey,
+ String s3EndpointRegion,
+ String key,
+ Integer s3MultiplePartUploadMaxRetries) {
this.bucketName = bucketName;
this.s3AccessKey = s3AccessKey;
this.s3SecretKey = s3SecretKey;
this.s3EndpointRegion = s3EndpointRegion;
this.s3MultiplePartUploadMaxRetries = s3MultiplePartUploadMaxRetries;
- BasicAWSCredentials basicAWSCredentials =
- new BasicAWSCredentials(s3AccessKey, s3SecretKey);
- ClientConfiguration clientConfig = new ClientConfiguration()
-
.withRetryPolicy(PredefinedRetryPolicies.getDefaultRetryPolicyWithCustomMaxRetries(s3MultiplePartUploadMaxRetries))
+ BasicAWSCredentials basicAWSCredentials = new
BasicAWSCredentials(s3AccessKey, s3SecretKey);
+ ClientConfiguration clientConfig =
+ new ClientConfiguration()
+ .withRetryPolicy(
+
PredefinedRetryPolicies.getDefaultRetryPolicyWithCustomMaxRetries(
+ s3MultiplePartUploadMaxRetries))
.withMaxErrorRetry(s3MultiplePartUploadMaxRetries);
this.s3Client =
AmazonS3ClientBuilder.standard()
@@ -94,24 +101,36 @@ public class S3MultipartUploadHandler implements
MultipartUploadHandler {
}
@Override
- public void putPart(InputStream inputStream, Integer partNumber, Boolean
finalFlush) throws IOException {
+ public void putPart(InputStream inputStream, Integer partNumber, Boolean
finalFlush)
+ throws IOException {
try (InputStream inStream = inputStream) {
int partSize = inStream.available();
if (partSize == 0) {
- logger.debug("key {} uploadId {} part size is 0 for part number {}
finalFlush {}", key, uploadId, partNumber, finalFlush);
+ logger.debug(
+ "key {} uploadId {} part size is 0 for part number {} finalFlush
{}",
+ key,
+ uploadId,
+ partNumber,
+ finalFlush);
return;
}
UploadPartRequest uploadRequest =
- new UploadPartRequest()
- .withBucketName(bucketName)
- .withKey(key)
- .withUploadId(uploadId)
- .withPartNumber(partNumber)
- .withInputStream(inStream)
- .withPartSize(partSize)
- .withLastPart(finalFlush);
+ new UploadPartRequest()
+ .withBucketName(bucketName)
+ .withKey(key)
+ .withUploadId(uploadId)
+ .withPartNumber(partNumber)
+ .withInputStream(inStream)
+ .withPartSize(partSize)
+ .withLastPart(finalFlush);
s3Client.uploadPart(uploadRequest);
- logger.debug("key {} uploadId {} part number {} uploaded with size {}
finalFlush {}", key, uploadId, partNumber, partSize, finalFlush);
+ logger.debug(
+ "key {} uploadId {} part number {} uploaded with size {} finalFlush
{}",
+ key,
+ uploadId,
+ partNumber,
+ partSize,
+ finalFlush);
} catch (RuntimeException | IOException e) {
logger.error("Failed to upload part", e);
throw e;
@@ -130,22 +149,36 @@ public class S3MultipartUploadHandler implements
MultipartUploadHandler {
}
listPartsRequest.setPartNumberMarker(partListing.getNextPartNumberMarker());
} while (partListing.isTruncated());
- if (partETags.size() == 0){
- logger.debug("bucket {} key {} uploadId {} has no parts uploaded,
aborting upload", bucketName, key, uploadId);
+ if (partETags.size() == 0) {
+ logger.debug(
+ "bucket {} key {} uploadId {} has no parts uploaded, aborting
upload",
+ bucketName,
+ key,
+ uploadId);
abort();
logger.debug("bucket {} key {} upload completed with size {}",
bucketName, key, 0);
return;
}
- ProgressListener progressListener = progressEvent -> {
- logger.debug("key {} uploadId {} progress event type {} transferred {}
bytes", key, uploadId, progressEvent.getEventType(),
progressEvent.getBytesTransferred());
- };
+ ProgressListener progressListener =
+ progressEvent -> {
+ logger.debug(
+ "key {} uploadId {} progress event type {} transferred {} bytes",
+ key,
+ uploadId,
+ progressEvent.getEventType(),
+ progressEvent.getBytesTransferred());
+ };
CompleteMultipartUploadRequest compRequest =
- new CompleteMultipartUploadRequest(
- bucketName, key, uploadId, partETags)
- .withGeneralProgressListener(progressListener);
+ new CompleteMultipartUploadRequest(bucketName, key, uploadId,
partETags)
+ .withGeneralProgressListener(progressListener);
CompleteMultipartUploadResult compResult =
s3Client.completeMultipartUpload(compRequest);
- logger.debug("bucket {} key {} uploadId {} upload completed location is in
{} ", bucketName, key, uploadId, compResult.getLocation());
+ logger.debug(
+ "bucket {} key {} uploadId {} upload completed location is in {} ",
+ bucketName,
+ key,
+ uploadId,
+ compResult.getLocation());
}
@Override
diff --git a/project/CelebornBuild.scala b/project/CelebornBuild.scala
index dc2965344..501c6ad58 100644
--- a/project/CelebornBuild.scala
+++ b/project/CelebornBuild.scala
@@ -50,7 +50,7 @@ object Dependencies {
val findbugsVersion = "1.3.9"
val guavaVersion = "33.1.0-jre"
val hadoopVersion = "3.3.6"
- val awsVersion = "1.12.367"
+ val awsS3Version = "1.12.532"
val junitInterfaceVersion = "0.13.3"
// don't forget update `junitInterfaceVersion` when we upgrade junit
val junitVersion = "4.13.2"
@@ -117,8 +117,9 @@ object Dependencies {
ExclusionRule("jline", "jline"),
ExclusionRule("log4j", "log4j"),
ExclusionRule("org.slf4j", "slf4j-log4j12"))
- val hadoopAws = "org.apache.hadoop" % "hadoop-aws" % hadoopVersion
- val awsClient = "com.amazonaws" % "aws-java-sdk-bundle" % awsVersion
+ val hadoopAws = "org.apache.hadoop" % "hadoop-aws" % hadoopVersion
excludeAll (
+ ExclusionRule("com.amazonaws", "aws-java-sdk-bundle"))
+ val awsS3 = "com.amazonaws" % "aws-java-sdk-s3" % awsS3Version
val ioDropwizardMetricsCore = "io.dropwizard.metrics" % "metrics-core" %
metricsVersion
val ioDropwizardMetricsGraphite = "io.dropwizard.metrics" %
"metrics-graphite" % metricsVersion excludeAll (
ExclusionRule("com.rabbitmq", "amqp-client"))
@@ -370,8 +371,8 @@ object CelebornBuild extends sbt.internal.BuildDef {
CelebornService.service,
CelebornWorker.worker,
CelebornMaster.master,
- CelebornCli.cli,
- CeleborMPU.celeborMPU) ++ maybeSparkClientModules ++
maybeFlinkClientModules ++ maybeMRClientModules ++ maybeWebModules
+ CelebornCli.cli
+ ) ++ maybeSparkClientModules ++ maybeFlinkClientModules ++
maybeMRClientModules ++ maybeWebModules ++ maybeCelebornMPUModule
}
// ThisBuild / parallelExecution := false
@@ -399,6 +400,14 @@ object Utils {
}
profiles
}
+
+ val celeborMPUProject = profiles.filter(_.startsWith("aws")).headOption
match {
+ case Some("aws") => Some(CeleborMPU.celeborMPU)
+ case _ => None
+ }
+
+ lazy val maybeCelebornMPUModule: Seq[Project] =
celeborMPUProject.map(Seq(_)).getOrElse(Seq.empty)
+
val SPARK_VERSION = profiles.filter(_.startsWith("spark")).headOption
lazy val sparkClientProjects = SPARK_VERSION match {
@@ -497,7 +506,7 @@ object CelebornSpi {
object CeleborMPU {
- lazy val hadoopAwsDependencies = Seq(Dependencies.hadoopAws,
Dependencies.awsClient)
+ lazy val hadoopAwsDependencies = Seq(Dependencies.hadoopAws,
Dependencies.awsS3)
lazy val celeborMPU = Project("celeborn-multipart-uploader",
file("multipart-uploader"))
.dependsOn(CelebornService.service % "test->test;compile->compile")