This is an automated email from the ASF dual-hosted git repository. nicholasjiang pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/celeborn.git
commit b659439edc10f6b4f1768c4810222cd8e909151a Author: Enrico Olivelli <[email protected]> AuthorDate: Thu Feb 5 13:39:12 2026 +0800 [CELEBORN-2254] Fix support for S3 and add a simple integration test ### What changes were proposed in this pull request? * Fix creating files to S3 (and other DFS) * Add integration test for Spark and S3 (using Minio) * in CI some job will run with the AWS profile because this way we can activate the new integration test (that needs the S3 client dependencies) ### Why are the changes needed? See https://issues.apache.org/jira/browse/CELEBORN-2254. ### Does this PR resolve a correctness bug? No ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? * I have added an integration test * I have this patch on out internal fork, to make Celeborn run on k8s with S3 Closes #3592 from eolivelli/CELEBORN-2254-apache. Authored-by: Enrico Olivelli <[email protected]> Signed-off-by: SteNicholas <[email protected]> --- .github/workflows/maven.yml | 6 +- .../org/apache/celeborn/client/ShuffleClient.java | 2 + .../apache/celeborn/client/ShuffleClientImpl.java | 8 ++ .../celeborn/common/util/CelebornHadoopUtils.scala | 1 + .../apache/celeborn/S3MultipartUploadHandler.java | 33 +++-- pom.xml | 7 ++ project/CelebornBuild.scala | 8 +- tests/spark-it/pom.xml | 5 + .../spark/s3/BasicEndToEndTieredStorageTest.scala | 133 +++++++++++++++++++++ .../deploy/worker/storage/StorageManager.scala | 9 +- .../deploy/worker/storage/StoragePolicy.scala | 2 + 11 files changed, 198 insertions(+), 16 deletions(-) diff --git a/.github/workflows/maven.yml b/.github/workflows/maven.yml index 7f357d9f0..513e3a891 100644 --- a/.github/workflows/maven.yml +++ b/.github/workflows/maven.yml @@ -133,7 +133,8 @@ jobs: run: | SPARK_BINARY_VERSION=${{ matrix.spark }} SPARK_MAJOR_VERSION=${SPARK_BINARY_VERSION%%.*} - PROFILES="-Pgoogle-mirror,spark-${{ matrix.spark }}" + # enable AWS profile to run end-to-end tests with S3 storage + PROFILES="-Pgoogle-mirror,aws,spark-${{ matrix.spark }}" TEST_MODULES="client-spark/common,client-spark/spark-${SPARK_MAJOR_VERSION},client-spark/spark-${SPARK_MAJOR_VERSION}-columnar-common,client-spark/spark-${SPARK_MAJOR_VERSION}-shaded,tests/spark-it" build/mvn $PROFILES -pl $TEST_MODULES -am clean install -DskipTests build/mvn $PROFILES -pl $TEST_MODULES -Dspark.shuffle.sort.io.plugin.class=${{ matrix.shuffle-plugin-class }} test @@ -170,7 +171,8 @@ jobs: run: | SPARK_BINARY_VERSION=${{ matrix.spark }} SPARK_MAJOR_VERSION=${SPARK_BINARY_VERSION%%.*} - PROFILES="-Pgoogle-mirror,spark-${{ matrix.spark }}" + # enable AWS profile to run end-to-end tests with S3 storage + PROFILES="-Pgoogle-mirror,aws,spark-${{ matrix.spark }}" TEST_MODULES="client-spark/common,client-spark/spark-3,client-spark/spark-3-columnar-common,client-spark/spark-${SPARK_MAJOR_VERSION}-shaded,tests/spark-it" build/mvn $PROFILES -pl $TEST_MODULES -am clean install -DskipTests build/mvn $PROFILES -pl $TEST_MODULES -Dspark.shuffle.sort.io.plugin.class=${{ matrix.shuffle-plugin-class }} test diff --git a/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java b/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java index e4c473a9b..7a89b051d 100644 --- a/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java +++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java @@ -19,6 +19,7 @@ package org.apache.celeborn.client; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.Map; import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; @@ -122,6 +123,7 @@ public abstract class ShuffleClient { hadoopFs = CelebornHadoopUtils.getHadoopFS(conf); } catch (Exception e) { logger.error("Celeborn initialize DFS failed.", e); + hadoopFs = Collections.emptyMap(); } } } diff --git a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java index b55420386..be2bdf87d 100644 --- a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java +++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java @@ -36,6 +36,7 @@ import io.netty.buffer.CompositeByteBuf; import io.netty.buffer.Unpooled; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.Pair; +import org.apache.hadoop.fs.FileSystem; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -231,6 +232,13 @@ public class ShuffleClientImpl extends ShuffleClient { reviveManager = new ReviveManager(this, conf); + if (conf.hasS3Storage()) { + Map<StorageInfo.Type, FileSystem> hadoopFs = getHadoopFs(conf); + FileSystem s3client = hadoopFs.get(StorageInfo.Type.S3); + logger.info("S3 client: {}", s3client); + if (s3client == null) + throw new IllegalStateException("S3 type is requred but the S3 client was not created"); + } logger.info("Created ShuffleClientImpl, appUniqueId: {}", appUniqueId); } diff --git a/common/src/main/scala/org/apache/celeborn/common/util/CelebornHadoopUtils.scala b/common/src/main/scala/org/apache/celeborn/common/util/CelebornHadoopUtils.scala index 2940afcd2..4bb394a6c 100644 --- a/common/src/main/scala/org/apache/celeborn/common/util/CelebornHadoopUtils.scala +++ b/common/src/main/scala/org/apache/celeborn/common/util/CelebornHadoopUtils.scala @@ -99,6 +99,7 @@ object CelebornHadoopUtils extends Logging { dirs.foreach { case (storageType, dir) => { val path = new Path(dir) + logInfo(s"Creating HadoopFS for type $storageType at path $path"); hadoopFs.put(storageType, path.getFileSystem(hadoopConf)) } }) diff --git a/multipart-uploader/multipart-uploader-s3/src/main/java/org/apache/celeborn/S3MultipartUploadHandler.java b/multipart-uploader/multipart-uploader-s3/src/main/java/org/apache/celeborn/S3MultipartUploadHandler.java index 11951e36e..ab1d8c9a0 100644 --- a/multipart-uploader/multipart-uploader-s3/src/main/java/org/apache/celeborn/S3MultipartUploadHandler.java +++ b/multipart-uploader/multipart-uploader-s3/src/main/java/org/apache/celeborn/S3MultipartUploadHandler.java @@ -27,12 +27,14 @@ import java.util.List; import com.amazonaws.AmazonClientException; import com.amazonaws.ClientConfiguration; import com.amazonaws.auth.EnvironmentVariableCredentialsProvider; +import com.amazonaws.client.builder.AwsClientBuilder; import com.amazonaws.event.ProgressListener; import com.amazonaws.retry.PredefinedBackoffStrategies; import com.amazonaws.retry.PredefinedRetryPolicies; import com.amazonaws.retry.RetryPolicy; import com.amazonaws.services.s3.AmazonS3; import com.amazonaws.services.s3.AmazonS3ClientBuilder; +import com.amazonaws.services.s3.model.*; import com.amazonaws.services.s3.model.AbortMultipartUploadRequest; import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest; import com.amazonaws.services.s3.model.CompleteMultipartUploadResult; @@ -61,15 +63,15 @@ public class S3MultipartUploadHandler implements MultipartUploadHandler { private String uploadId; - private AmazonS3 s3Client; + private final AmazonS3 s3Client; - private String key; + private final String key; - private String bucketName; + private final String bucketName; - private Integer s3MultiplePartUploadMaxRetries; - private Integer baseDelay; - private Integer maxBackoff; + private final Integer s3MultiplePartUploadMaxRetries; + private final Integer baseDelay; + private final Integer maxBackoff; public S3MultipartUploadHandler( FileSystem hadoopFs, @@ -103,12 +105,23 @@ public class S3MultipartUploadHandler implements MultipartUploadHandler { new ClientConfiguration() .withRetryPolicy(retryPolicy) .withMaxErrorRetry(s3MultiplePartUploadMaxRetries); - this.s3Client = + AmazonS3ClientBuilder builder = AmazonS3ClientBuilder.standard() .withCredentials(providers) - .withRegion(conf.get(Constants.AWS_REGION)) - .withClientConfiguration(clientConfig) - .build(); + .withClientConfiguration(clientConfig); + // for MinIO + String endpoint = conf.get("fs.s3a.endpoint"); + if (endpoint != null && !endpoint.isEmpty()) { + builder = + builder + .withEndpointConfiguration( + new AwsClientBuilder.EndpointConfiguration( + endpoint, conf.get(Constants.AWS_REGION))) + .withPathStyleAccessEnabled(conf.getBoolean("fs.s3a.path.style.access", false)); + } else { + builder = builder.withRegion(conf.get(Constants.AWS_REGION)); + } + this.s3Client = builder.build(); this.key = key; } diff --git a/pom.xml b/pom.xml index 0bd41fbff..371f9b2b2 100644 --- a/pom.xml +++ b/pom.xml @@ -155,6 +155,7 @@ <maven.plugin.silencer.version>1.7.19</maven.plugin.silencer.version> <maven.plugin.resources.version>3.3.1</maven.plugin.resources.version> <openapi.generator.version>7.8.0</openapi.generator.version> + <testcontainers-minio.version>1.21.4</testcontainers-minio.version> <!-- Allow modules to enable / disable certain build plugins easily. --> <testJarPhase>prepare-package</testJarPhase> @@ -525,6 +526,12 @@ <version>${jakarta.ws.rs-api.version}</version> </dependency> + <dependency> + <groupId>org.testcontainers</groupId> + <artifactId>minio</artifactId> + <version>${testcontainers-minio.version}</version> + </dependency> + <dependency> <groupId>org.openapitools</groupId> <artifactId>jackson-databind-nullable</artifactId> diff --git a/project/CelebornBuild.scala b/project/CelebornBuild.scala index d303a3439..107edabac 100644 --- a/project/CelebornBuild.scala +++ b/project/CelebornBuild.scala @@ -90,6 +90,9 @@ object Dependencies { val picocliVersion = "4.7.6" val jmhVersion = "1.37" + // S3 integration tests with Minio + val testContainerMinioVersion = "1.21.4" + // For SSL support val bouncycastleVersion = "1.77" @@ -213,6 +216,8 @@ object Dependencies { val jakartaAnnotationApi = "jakarta.annotation" % "jakarta.annotation-api" % jakartaAnnotationApiVersion val jakartaWsRsApi = "jakarta.ws.rs" % "jakarta.ws.rs-api" % jakartaWsRsApiVersion + val testContainerMinio = "org.testcontainers" % "minio" % testContainerMinioVersion + // Test dependencies // https://www.scala-sbt.org/1.x/docs/Testing.html val junitInterface = "com.github.sbt" % "junit-interface" % junitInterfaceVersion @@ -425,7 +430,8 @@ object CelebornCommonSettings { Dependencies.scalatest % "test", Dependencies.junit % "test", // https://www.scala-sbt.org/1.x/docs/Testing.html - Dependencies.junitInterface % "test") + Dependencies.junitInterface % "test", + Dependencies.testContainerMinio % "test") } object CelebornBuild extends sbt.internal.BuildDef { diff --git a/tests/spark-it/pom.xml b/tests/spark-it/pom.xml index c0a37ea6c..a87594f5c 100644 --- a/tests/spark-it/pom.xml +++ b/tests/spark-it/pom.xml @@ -182,6 +182,11 @@ <artifactId>jakarta.servlet-api</artifactId> <scope>test</scope> </dependency> + <dependency> + <groupId>org.testcontainers</groupId> + <artifactId>minio</artifactId> + <scope>test</scope> + </dependency> </dependencies> <profiles> <profile> diff --git a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/s3/BasicEndToEndTieredStorageTest.scala b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/s3/BasicEndToEndTieredStorageTest.scala new file mode 100644 index 000000000..f65b00c4f --- /dev/null +++ b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/s3/BasicEndToEndTieredStorageTest.scala @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.celeborn.tests.spark + +import org.apache.spark.SparkConf +import org.apache.spark.sql.SparkSession +import org.scalatest.BeforeAndAfterEach +import org.scalatest.funsuite.AnyFunSuite +import org.testcontainers.containers.MinIOContainer + +import org.apache.celeborn.client.ShuffleClient +import org.apache.celeborn.common.CelebornConf +import org.apache.celeborn.common.protocol.ShuffleMode + +class BasicEndToEndTieredStorageTest extends AnyFunSuite + with SparkTestBase + with BeforeAndAfterEach { + + var container: MinIOContainer = null; + val skipAWSTest = !isClassPresent("org.apache.hadoop.fs.s3a.S3AFileSystem") + + def isClassPresent(className: String): Boolean = { + try { + Class.forName(className) + true + } catch { + case _: ClassNotFoundException => false + } + } + + override def beforeAll(): Unit = { + + if (skipAWSTest) + return + + container = new MinIOContainer("minio/minio:RELEASE.2023-09-04T19-57-37Z"); + container.start() + + // create bucket using Minio command line tool + container.execInContainer( + "mc", + "alias", + "set", + "dockerminio", + "http://minio:9000", + container.getUserName, + container.getPassword) + container.execInContainer("mc", "mb", "dockerminio/sample-bucket") + + System.setProperty("aws.accessKeyId", container.getUserName) + System.setProperty("aws.secretKey", container.getPassword) + + val s3url = container.getS3URL + val augmentedConfiguration = Map( + CelebornConf.ACTIVE_STORAGE_TYPES.key -> "MEMORY,S3", + CelebornConf.WORKER_STORAGE_CREATE_FILE_POLICY.key -> "MEMORY,S3", + CelebornConf.WORKER_STORAGE_EVICT_POLICY.key -> "MEMORY|S3", + "celeborn.hadoop.fs.s3a.endpoint" -> s"$s3url", + "celeborn.hadoop.fs.s3a.aws.credentials.provider" -> "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider", + "celeborn.hadoop.fs.s3a.access.key" -> container.getUserName, + "celeborn.hadoop.fs.s3a.secret.key" -> container.getPassword, + "celeborn.hadoop.fs.s3a.path.style.access" -> "true", + CelebornConf.S3_DIR.key -> "s3://sample-bucket/test/celeborn", + CelebornConf.S3_ENDPOINT_REGION.key -> "dummy-region") + + setupMiniClusterWithRandomPorts( + masterConf = augmentedConfiguration, + workerConf = augmentedConfiguration, + workerNum = 1) + } + + override def beforeEach(): Unit = { + ShuffleClient.reset() + } + + override def afterAll(): Unit = { + System.clearProperty("aws.accessKeyId") + System.clearProperty("aws.secretKey") + if (container != null) { + container.close() + super.afterAll() + } + } + + override def updateSparkConf(sparkConf: SparkConf, mode: ShuffleMode): SparkConf = { + val s3url = container.getS3URL + val newConf = sparkConf + .set("spark." + CelebornConf.ACTIVE_STORAGE_TYPES.key, "MEMORY,S3") + .set("spark." + CelebornConf.S3_DIR.key, "s3://sample-bucket/test/celeborn") + .set("spark." + CelebornConf.S3_ENDPOINT_REGION.key, "dummy-region") + .set("spark.celeborn.hadoop.fs.s3a.endpoint", s"$s3url") + .set( + "spark.celeborn.hadoop.fs.s3a.aws.credentials.provider", + "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider") + .set("spark.celeborn.hadoop.fs.s3a.access.key", container.getUserName) + .set("spark.celeborn.hadoop.fs.s3a.secret.key", container.getPassword) + .set("spark.celeborn.hadoop.fs.s3a.path.style.access", "true") + + super.updateSparkConf(newConf, mode) + } + + test("celeborn spark integration test - s3") { + assume( + !skipAWSTest, + "Skipping test because AWS Hadoop client is not in the classpath (enable with -Paws") + + val s3url = container.getS3URL + log.info(s"s3url $s3url"); + val sparkConf = new SparkConf().setAppName("celeborn-demo").setMaster("local[2]") + val celebornSparkSession = SparkSession.builder() + .config(updateSparkConf(sparkConf, ShuffleMode.HASH)) + .getOrCreate() + groupBy(celebornSparkSession) + + celebornSparkSession.stop() + } + +} diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala index a0b46dd6d..17b725b2d 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala @@ -43,6 +43,7 @@ import org.apache.celeborn.common.meta.{DeviceInfo, DiskFileInfo, DiskInfo, Disk import org.apache.celeborn.common.metrics.source.{AbstractSource, ThreadPoolSource} import org.apache.celeborn.common.network.util.{NettyUtils, TransportConf} import org.apache.celeborn.common.protocol.{PartitionLocation, PartitionSplitMode, PartitionType, StorageInfo} +import org.apache.celeborn.common.protocol.StorageInfo.Type import org.apache.celeborn.common.quota.ResourceConsumption import org.apache.celeborn.common.util.{CelebornExitKind, CelebornHadoopUtils, CollectionUtils, DiskUtils, JavaUtils, PbSerDeUtils, ThreadUtils, Utils} import org.apache.celeborn.service.deploy.worker._ @@ -1081,6 +1082,7 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs partitionType: PartitionType, partitionSplitEnabled: Boolean): (Flusher, DiskFileInfo, File) = { val suggestedMountPoint = location.getStorageInfo.getMountPoint + val storageType = location.getStorageInfo.getType var retryCount = 0 var exception: IOException = null val shuffleKey = Utils.makeShuffleKey(appId, shuffleId) @@ -1102,7 +1104,7 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs throw new IOException(s"No available disks! suggested mountPoint $suggestedMountPoint") } - if (dirs.isEmpty && location.getStorageInfo.HDFSAvailable()) { + if (storageType == Type.HDFS && location.getStorageInfo.HDFSAvailable()) { val shuffleDir = new Path(new Path(hdfsDir, conf.workerWorkingDir), s"$appId/$shuffleId") FileSystem.mkdirs( @@ -1120,9 +1122,10 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs fileName, hdfsFileInfo) return (hdfsFlusher.get, hdfsFileInfo, null) - } else if (dirs.isEmpty && location.getStorageInfo.S3Available()) { + } else if (storageType == Type.S3 && location.getStorageInfo.S3Available()) { val shuffleDir = new Path(new Path(s3Dir, conf.workerWorkingDir), s"$appId/$shuffleId") + logDebug(s"trying to create S3 file at $shuffleDir"); FileSystem.mkdirs( StorageManager.hadoopFs.get(StorageInfo.Type.S3), shuffleDir, @@ -1138,7 +1141,7 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs fileName, s3FileInfo) return (s3Flusher.get, s3FileInfo, null) - } else if (dirs.isEmpty && location.getStorageInfo.OSSAvailable()) { + } else if (storageType == Type.OSS && location.getStorageInfo.OSSAvailable()) { val shuffleDir = new Path(new Path(ossDir, conf.workerWorkingDir), s"$appId/$shuffleId") FileSystem.mkdirs( diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StoragePolicy.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StoragePolicy.scala index 08a1d0851..028e54bd7 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StoragePolicy.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StoragePolicy.scala @@ -160,6 +160,8 @@ class StoragePolicy(conf: CelebornConf, storageManager: StorageManager, source: storageManager) } } else { + logError( + s"CANNOT create non-memory file for ${partitionDataWriterContext.getShuffleKey} ${partitionDataWriterContext.getPartitionLocation.getFileName} because localOrDfsStorageAvailable is false") null } }
