This is an automated email from the ASF dual-hosted git repository.
nicholasjiang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new ec38fcce3 [CELEBORN-2254] Fix support for S3 and add a simple
integration test
ec38fcce3 is described below
commit ec38fcce383cd14252fe0e7074deddafc91e5f3f
Author: Enrico Olivelli <[email protected]>
AuthorDate: Thu Feb 5 13:39:12 2026 +0800
[CELEBORN-2254] Fix support for S3 and add a simple integration test
### What changes were proposed in this pull request?
* Fix creating files to S3 (and other DFS)
* Add integration test for Spark and S3 (using Minio)
* in CI some job will run with the AWS profile because this way we can
activate the new integration test (that needs the S3 client dependencies)
### Why are the changes needed?
see https://issues.apache.org/jira/browse/CELEBORN-2254
### Does this PR resolve a correctness bug?
No
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
* I have added an integration test
* I have this patch on out internal fork, to make Celeborn run on k8s with
S3
Closes #3592 from eolivelli/CELEBORN-2254-apache.
Authored-by: Enrico Olivelli <[email protected]>
Signed-off-by: SteNicholas <[email protected]>
---
.github/workflows/maven.yml | 6 +-
.../org/apache/celeborn/client/ShuffleClient.java | 2 +
.../apache/celeborn/client/ShuffleClientImpl.java | 8 ++
.../celeborn/common/util/CelebornHadoopUtils.scala | 1 +
.../apache/celeborn/S3MultipartUploadHandler.java | 33 +++--
pom.xml | 7 ++
project/CelebornBuild.scala | 8 +-
tests/spark-it/pom.xml | 5 +
.../spark/s3/BasicEndToEndTieredStorageTest.scala | 133 +++++++++++++++++++++
.../deploy/worker/storage/StorageManager.scala | 9 +-
.../deploy/worker/storage/StoragePolicy.scala | 2 +
11 files changed, 198 insertions(+), 16 deletions(-)
diff --git a/.github/workflows/maven.yml b/.github/workflows/maven.yml
index 7f357d9f0..513e3a891 100644
--- a/.github/workflows/maven.yml
+++ b/.github/workflows/maven.yml
@@ -133,7 +133,8 @@ jobs:
run: |
SPARK_BINARY_VERSION=${{ matrix.spark }}
SPARK_MAJOR_VERSION=${SPARK_BINARY_VERSION%%.*}
- PROFILES="-Pgoogle-mirror,spark-${{ matrix.spark }}"
+ # enable AWS profile to run end-to-end tests with S3 storage
+ PROFILES="-Pgoogle-mirror,aws,spark-${{ matrix.spark }}"
TEST_MODULES="client-spark/common,client-spark/spark-${SPARK_MAJOR_VERSION},client-spark/spark-${SPARK_MAJOR_VERSION}-columnar-common,client-spark/spark-${SPARK_MAJOR_VERSION}-shaded,tests/spark-it"
build/mvn $PROFILES -pl $TEST_MODULES -am clean install -DskipTests
build/mvn $PROFILES -pl $TEST_MODULES
-Dspark.shuffle.sort.io.plugin.class=${{ matrix.shuffle-plugin-class }} test
@@ -170,7 +171,8 @@ jobs:
run: |
SPARK_BINARY_VERSION=${{ matrix.spark }}
SPARK_MAJOR_VERSION=${SPARK_BINARY_VERSION%%.*}
- PROFILES="-Pgoogle-mirror,spark-${{ matrix.spark }}"
+ # enable AWS profile to run end-to-end tests with S3 storage
+ PROFILES="-Pgoogle-mirror,aws,spark-${{ matrix.spark }}"
TEST_MODULES="client-spark/common,client-spark/spark-3,client-spark/spark-3-columnar-common,client-spark/spark-${SPARK_MAJOR_VERSION}-shaded,tests/spark-it"
build/mvn $PROFILES -pl $TEST_MODULES -am clean install -DskipTests
build/mvn $PROFILES -pl $TEST_MODULES
-Dspark.shuffle.sort.io.plugin.class=${{ matrix.shuffle-plugin-class }} test
diff --git a/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java
b/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java
index e4c473a9b..7a89b051d 100644
--- a/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java
+++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java
@@ -19,6 +19,7 @@ package org.apache.celeborn.client;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
@@ -122,6 +123,7 @@ public abstract class ShuffleClient {
hadoopFs = CelebornHadoopUtils.getHadoopFS(conf);
} catch (Exception e) {
logger.error("Celeborn initialize DFS failed.", e);
+ hadoopFs = Collections.emptyMap();
}
}
}
diff --git
a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
index b55420386..be2bdf87d 100644
--- a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
+++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
@@ -36,6 +36,7 @@ import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.fs.FileSystem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -231,6 +232,13 @@ public class ShuffleClientImpl extends ShuffleClient {
reviveManager = new ReviveManager(this, conf);
+ if (conf.hasS3Storage()) {
+ Map<StorageInfo.Type, FileSystem> hadoopFs = getHadoopFs(conf);
+ FileSystem s3client = hadoopFs.get(StorageInfo.Type.S3);
+ logger.info("S3 client: {}", s3client);
+ if (s3client == null)
+ throw new IllegalStateException("S3 type is requred but the S3 client
was not created");
+ }
logger.info("Created ShuffleClientImpl, appUniqueId: {}", appUniqueId);
}
diff --git
a/common/src/main/scala/org/apache/celeborn/common/util/CelebornHadoopUtils.scala
b/common/src/main/scala/org/apache/celeborn/common/util/CelebornHadoopUtils.scala
index 2940afcd2..4bb394a6c 100644
---
a/common/src/main/scala/org/apache/celeborn/common/util/CelebornHadoopUtils.scala
+++
b/common/src/main/scala/org/apache/celeborn/common/util/CelebornHadoopUtils.scala
@@ -99,6 +99,7 @@ object CelebornHadoopUtils extends Logging {
dirs.foreach {
case (storageType, dir) => {
val path = new Path(dir)
+ logInfo(s"Creating HadoopFS for type $storageType at path $path");
hadoopFs.put(storageType, path.getFileSystem(hadoopConf))
}
})
diff --git
a/multipart-uploader/multipart-uploader-s3/src/main/java/org/apache/celeborn/S3MultipartUploadHandler.java
b/multipart-uploader/multipart-uploader-s3/src/main/java/org/apache/celeborn/S3MultipartUploadHandler.java
index 11951e36e..ab1d8c9a0 100644
---
a/multipart-uploader/multipart-uploader-s3/src/main/java/org/apache/celeborn/S3MultipartUploadHandler.java
+++
b/multipart-uploader/multipart-uploader-s3/src/main/java/org/apache/celeborn/S3MultipartUploadHandler.java
@@ -27,12 +27,14 @@ import java.util.List;
import com.amazonaws.AmazonClientException;
import com.amazonaws.ClientConfiguration;
import com.amazonaws.auth.EnvironmentVariableCredentialsProvider;
+import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.event.ProgressListener;
import com.amazonaws.retry.PredefinedBackoffStrategies;
import com.amazonaws.retry.PredefinedRetryPolicies;
import com.amazonaws.retry.RetryPolicy;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
+import com.amazonaws.services.s3.model.*;
import com.amazonaws.services.s3.model.AbortMultipartUploadRequest;
import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
import com.amazonaws.services.s3.model.CompleteMultipartUploadResult;
@@ -61,15 +63,15 @@ public class S3MultipartUploadHandler implements
MultipartUploadHandler {
private String uploadId;
- private AmazonS3 s3Client;
+ private final AmazonS3 s3Client;
- private String key;
+ private final String key;
- private String bucketName;
+ private final String bucketName;
- private Integer s3MultiplePartUploadMaxRetries;
- private Integer baseDelay;
- private Integer maxBackoff;
+ private final Integer s3MultiplePartUploadMaxRetries;
+ private final Integer baseDelay;
+ private final Integer maxBackoff;
public S3MultipartUploadHandler(
FileSystem hadoopFs,
@@ -103,12 +105,23 @@ public class S3MultipartUploadHandler implements
MultipartUploadHandler {
new ClientConfiguration()
.withRetryPolicy(retryPolicy)
.withMaxErrorRetry(s3MultiplePartUploadMaxRetries);
- this.s3Client =
+ AmazonS3ClientBuilder builder =
AmazonS3ClientBuilder.standard()
.withCredentials(providers)
- .withRegion(conf.get(Constants.AWS_REGION))
- .withClientConfiguration(clientConfig)
- .build();
+ .withClientConfiguration(clientConfig);
+ // for MinIO
+ String endpoint = conf.get("fs.s3a.endpoint");
+ if (endpoint != null && !endpoint.isEmpty()) {
+ builder =
+ builder
+ .withEndpointConfiguration(
+ new AwsClientBuilder.EndpointConfiguration(
+ endpoint, conf.get(Constants.AWS_REGION)))
+
.withPathStyleAccessEnabled(conf.getBoolean("fs.s3a.path.style.access", false));
+ } else {
+ builder = builder.withRegion(conf.get(Constants.AWS_REGION));
+ }
+ this.s3Client = builder.build();
this.key = key;
}
diff --git a/pom.xml b/pom.xml
index 0bd41fbff..371f9b2b2 100644
--- a/pom.xml
+++ b/pom.xml
@@ -155,6 +155,7 @@
<maven.plugin.silencer.version>1.7.19</maven.plugin.silencer.version>
<maven.plugin.resources.version>3.3.1</maven.plugin.resources.version>
<openapi.generator.version>7.8.0</openapi.generator.version>
+ <testcontainers-minio.version>1.21.4</testcontainers-minio.version>
<!-- Allow modules to enable / disable certain build plugins easily. -->
<testJarPhase>prepare-package</testJarPhase>
@@ -525,6 +526,12 @@
<version>${jakarta.ws.rs-api.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.testcontainers</groupId>
+ <artifactId>minio</artifactId>
+ <version>${testcontainers-minio.version}</version>
+ </dependency>
+
<dependency>
<groupId>org.openapitools</groupId>
<artifactId>jackson-databind-nullable</artifactId>
diff --git a/project/CelebornBuild.scala b/project/CelebornBuild.scala
index d303a3439..107edabac 100644
--- a/project/CelebornBuild.scala
+++ b/project/CelebornBuild.scala
@@ -90,6 +90,9 @@ object Dependencies {
val picocliVersion = "4.7.6"
val jmhVersion = "1.37"
+ // S3 integration tests with Minio
+ val testContainerMinioVersion = "1.21.4"
+
// For SSL support
val bouncycastleVersion = "1.77"
@@ -213,6 +216,8 @@ object Dependencies {
val jakartaAnnotationApi = "jakarta.annotation" % "jakarta.annotation-api" %
jakartaAnnotationApiVersion
val jakartaWsRsApi = "jakarta.ws.rs" % "jakarta.ws.rs-api" %
jakartaWsRsApiVersion
+ val testContainerMinio = "org.testcontainers" % "minio" %
testContainerMinioVersion
+
// Test dependencies
// https://www.scala-sbt.org/1.x/docs/Testing.html
val junitInterface = "com.github.sbt" % "junit-interface" %
junitInterfaceVersion
@@ -425,7 +430,8 @@ object CelebornCommonSettings {
Dependencies.scalatest % "test",
Dependencies.junit % "test",
// https://www.scala-sbt.org/1.x/docs/Testing.html
- Dependencies.junitInterface % "test")
+ Dependencies.junitInterface % "test",
+ Dependencies.testContainerMinio % "test")
}
object CelebornBuild extends sbt.internal.BuildDef {
diff --git a/tests/spark-it/pom.xml b/tests/spark-it/pom.xml
index c0a37ea6c..a87594f5c 100644
--- a/tests/spark-it/pom.xml
+++ b/tests/spark-it/pom.xml
@@ -182,6 +182,11 @@
<artifactId>jakarta.servlet-api</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.testcontainers</groupId>
+ <artifactId>minio</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<profiles>
<profile>
diff --git
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/s3/BasicEndToEndTieredStorageTest.scala
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/s3/BasicEndToEndTieredStorageTest.scala
new file mode 100644
index 000000000..f65b00c4f
--- /dev/null
+++
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/s3/BasicEndToEndTieredStorageTest.scala
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.tests.spark
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.SparkSession
+import org.scalatest.BeforeAndAfterEach
+import org.scalatest.funsuite.AnyFunSuite
+import org.testcontainers.containers.MinIOContainer
+
+import org.apache.celeborn.client.ShuffleClient
+import org.apache.celeborn.common.CelebornConf
+import org.apache.celeborn.common.protocol.ShuffleMode
+
+class BasicEndToEndTieredStorageTest extends AnyFunSuite
+ with SparkTestBase
+ with BeforeAndAfterEach {
+
+ var container: MinIOContainer = null;
+ val skipAWSTest = !isClassPresent("org.apache.hadoop.fs.s3a.S3AFileSystem")
+
+ def isClassPresent(className: String): Boolean = {
+ try {
+ Class.forName(className)
+ true
+ } catch {
+ case _: ClassNotFoundException => false
+ }
+ }
+
+ override def beforeAll(): Unit = {
+
+ if (skipAWSTest)
+ return
+
+ container = new MinIOContainer("minio/minio:RELEASE.2023-09-04T19-57-37Z");
+ container.start()
+
+ // create bucket using Minio command line tool
+ container.execInContainer(
+ "mc",
+ "alias",
+ "set",
+ "dockerminio",
+ "http://minio:9000",
+ container.getUserName,
+ container.getPassword)
+ container.execInContainer("mc", "mb", "dockerminio/sample-bucket")
+
+ System.setProperty("aws.accessKeyId", container.getUserName)
+ System.setProperty("aws.secretKey", container.getPassword)
+
+ val s3url = container.getS3URL
+ val augmentedConfiguration = Map(
+ CelebornConf.ACTIVE_STORAGE_TYPES.key -> "MEMORY,S3",
+ CelebornConf.WORKER_STORAGE_CREATE_FILE_POLICY.key -> "MEMORY,S3",
+ CelebornConf.WORKER_STORAGE_EVICT_POLICY.key -> "MEMORY|S3",
+ "celeborn.hadoop.fs.s3a.endpoint" -> s"$s3url",
+ "celeborn.hadoop.fs.s3a.aws.credentials.provider" ->
"org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider",
+ "celeborn.hadoop.fs.s3a.access.key" -> container.getUserName,
+ "celeborn.hadoop.fs.s3a.secret.key" -> container.getPassword,
+ "celeborn.hadoop.fs.s3a.path.style.access" -> "true",
+ CelebornConf.S3_DIR.key -> "s3://sample-bucket/test/celeborn",
+ CelebornConf.S3_ENDPOINT_REGION.key -> "dummy-region")
+
+ setupMiniClusterWithRandomPorts(
+ masterConf = augmentedConfiguration,
+ workerConf = augmentedConfiguration,
+ workerNum = 1)
+ }
+
+ override def beforeEach(): Unit = {
+ ShuffleClient.reset()
+ }
+
+ override def afterAll(): Unit = {
+ System.clearProperty("aws.accessKeyId")
+ System.clearProperty("aws.secretKey")
+ if (container != null) {
+ container.close()
+ super.afterAll()
+ }
+ }
+
+ override def updateSparkConf(sparkConf: SparkConf, mode: ShuffleMode):
SparkConf = {
+ val s3url = container.getS3URL
+ val newConf = sparkConf
+ .set("spark." + CelebornConf.ACTIVE_STORAGE_TYPES.key, "MEMORY,S3")
+ .set("spark." + CelebornConf.S3_DIR.key,
"s3://sample-bucket/test/celeborn")
+ .set("spark." + CelebornConf.S3_ENDPOINT_REGION.key, "dummy-region")
+ .set("spark.celeborn.hadoop.fs.s3a.endpoint", s"$s3url")
+ .set(
+ "spark.celeborn.hadoop.fs.s3a.aws.credentials.provider",
+ "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider")
+ .set("spark.celeborn.hadoop.fs.s3a.access.key", container.getUserName)
+ .set("spark.celeborn.hadoop.fs.s3a.secret.key", container.getPassword)
+ .set("spark.celeborn.hadoop.fs.s3a.path.style.access", "true")
+
+ super.updateSparkConf(newConf, mode)
+ }
+
+ test("celeborn spark integration test - s3") {
+ assume(
+ !skipAWSTest,
+ "Skipping test because AWS Hadoop client is not in the classpath (enable
with -Paws")
+
+ val s3url = container.getS3URL
+ log.info(s"s3url $s3url");
+ val sparkConf = new
SparkConf().setAppName("celeborn-demo").setMaster("local[2]")
+ val celebornSparkSession = SparkSession.builder()
+ .config(updateSparkConf(sparkConf, ShuffleMode.HASH))
+ .getOrCreate()
+ groupBy(celebornSparkSession)
+
+ celebornSparkSession.stop()
+ }
+
+}
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
index a0b46dd6d..17b725b2d 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
@@ -43,6 +43,7 @@ import org.apache.celeborn.common.meta.{DeviceInfo,
DiskFileInfo, DiskInfo, Disk
import org.apache.celeborn.common.metrics.source.{AbstractSource,
ThreadPoolSource}
import org.apache.celeborn.common.network.util.{NettyUtils, TransportConf}
import org.apache.celeborn.common.protocol.{PartitionLocation,
PartitionSplitMode, PartitionType, StorageInfo}
+import org.apache.celeborn.common.protocol.StorageInfo.Type
import org.apache.celeborn.common.quota.ResourceConsumption
import org.apache.celeborn.common.util.{CelebornExitKind, CelebornHadoopUtils,
CollectionUtils, DiskUtils, JavaUtils, PbSerDeUtils, ThreadUtils, Utils}
import org.apache.celeborn.service.deploy.worker._
@@ -1081,6 +1082,7 @@ final private[worker] class StorageManager(conf:
CelebornConf, workerSource: Abs
partitionType: PartitionType,
partitionSplitEnabled: Boolean): (Flusher, DiskFileInfo, File) = {
val suggestedMountPoint = location.getStorageInfo.getMountPoint
+ val storageType = location.getStorageInfo.getType
var retryCount = 0
var exception: IOException = null
val shuffleKey = Utils.makeShuffleKey(appId, shuffleId)
@@ -1102,7 +1104,7 @@ final private[worker] class StorageManager(conf:
CelebornConf, workerSource: Abs
throw new IOException(s"No available disks! suggested mountPoint
$suggestedMountPoint")
}
- if (dirs.isEmpty && location.getStorageInfo.HDFSAvailable()) {
+ if (storageType == Type.HDFS && location.getStorageInfo.HDFSAvailable())
{
val shuffleDir =
new Path(new Path(hdfsDir, conf.workerWorkingDir),
s"$appId/$shuffleId")
FileSystem.mkdirs(
@@ -1120,9 +1122,10 @@ final private[worker] class StorageManager(conf:
CelebornConf, workerSource: Abs
fileName,
hdfsFileInfo)
return (hdfsFlusher.get, hdfsFileInfo, null)
- } else if (dirs.isEmpty && location.getStorageInfo.S3Available()) {
+ } else if (storageType == Type.S3 &&
location.getStorageInfo.S3Available()) {
val shuffleDir =
new Path(new Path(s3Dir, conf.workerWorkingDir),
s"$appId/$shuffleId")
+ logDebug(s"trying to create S3 file at $shuffleDir");
FileSystem.mkdirs(
StorageManager.hadoopFs.get(StorageInfo.Type.S3),
shuffleDir,
@@ -1138,7 +1141,7 @@ final private[worker] class StorageManager(conf:
CelebornConf, workerSource: Abs
fileName,
s3FileInfo)
return (s3Flusher.get, s3FileInfo, null)
- } else if (dirs.isEmpty && location.getStorageInfo.OSSAvailable()) {
+ } else if (storageType == Type.OSS &&
location.getStorageInfo.OSSAvailable()) {
val shuffleDir =
new Path(new Path(ossDir, conf.workerWorkingDir),
s"$appId/$shuffleId")
FileSystem.mkdirs(
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StoragePolicy.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StoragePolicy.scala
index 08a1d0851..028e54bd7 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StoragePolicy.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StoragePolicy.scala
@@ -160,6 +160,8 @@ class StoragePolicy(conf: CelebornConf, storageManager:
StorageManager, source:
storageManager)
}
} else {
+ logError(
+ s"CANNOT create non-memory file for
${partitionDataWriterContext.getShuffleKey}
${partitionDataWriterContext.getPartitionLocation.getFileName} because
localOrDfsStorageAvailable is false")
null
}
}