This is an automated email from the ASF dual-hosted git repository.
ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 7d0ba7f9b [CELEBORN-1916] Support Aliyun OSS Based on MPU Extension
Interface
7d0ba7f9b is described below
commit 7d0ba7f9b8f10f3e71b9356267ef00d7d7fc9771
Author: veli.yang <[email protected]>
AuthorDate: Tue Apr 8 15:10:33 2025 +0800
[CELEBORN-1916] Support Aliyun OSS Based on MPU Extension Interface
### What changes were proposed in this pull request?
- close [CELEBORN-1916](https://issues.apache.org/jira/browse/CELEBORN-1916)
- This PR extends the Multipart Uploader (MPU) interface to support Aliyun
OSS.
### Why are the changes needed?
- Implemented multipart-uploader-oss module based on the existing MPU
extension interface.
- Added necessary configurations and dependencies for Aliyun OSS
integration.
- Ensured compatibility with the existing multipart-uploader framework.
- This enhancement allows seamless multipart upload functionality for
Aliyun OSS, similar to the existing AWS S3 support.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Deployment integration testing has been completed in the local environment.
Closes #3157 from shouwangyw/optimize/mpu-oss.
Lead-authored-by: veli.yang <[email protected]>
Co-authored-by: yangwei <[email protected]>
Co-authored-by: mingji <[email protected]>
Signed-off-by: mingji <[email protected]>
---
README.md | 25 ++++
build/make-distribution.sh | 3 +
.../celeborn/client/read/CelebornInputStream.java | 1 +
.../celeborn/client/read/DfsPartitionReader.java | 9 +-
.../apache/celeborn/common/meta/DiskFileInfo.java | 6 +-
.../org/apache/celeborn/common/CelebornConf.scala | 80 +++++++++++-
.../apache/celeborn/common/meta/WorkerInfo.scala | 6 +-
.../celeborn/common/util/CelebornHadoopUtils.scala | 17 +++
.../org/apache/celeborn/common/util/Utils.scala | 7 +-
.../apache/celeborn/common/util/UtilsSuite.scala | 18 ++-
dev/reformat | 1 +
docs/configuration/client.md | 7 +-
docs/configuration/master.md | 7 +-
docs/configuration/worker.md | 9 +-
master/pom.xml | 15 +++
.../service/deploy/master/SlotsAllocator.java | 16 ++-
.../master/clustermeta/AbstractMetaManager.java | 10 +-
.../celeborn/service/deploy/master/Master.scala | 6 +-
.../deploy/master/SlotsAllocatorSuiteJ.java | 53 +++++++-
multipart-uploader/multipart-uploader-oss/pom.xml | 62 +++++++++
.../apache/celeborn/OssMultipartUploadHandler.java | 142 +++++++++++++++++++++
pom.xml | 10 ++
project/CelebornBuild.scala | 33 ++++-
worker/pom.xml | 10 ++
.../deploy/worker/storage/PartitionDataWriter.java | 2 +
.../worker/storage/PartitionFilesSorter.java | 34 +++--
.../deploy/worker/storage/TierWriterHelper.java | 15 +++
.../service/deploy/worker/Controller.scala | 2 +-
.../service/deploy/worker/FetchHandler.scala | 6 +
.../service/deploy/worker/PushDataHandler.scala | 2 +-
.../service/deploy/worker/storage/FlushTask.scala | 16 +++
.../service/deploy/worker/storage/Flusher.scala | 19 +++
.../deploy/worker/storage/StorageManager.scala | 86 +++++++++++--
.../service/deploy/worker/storage/TierWriter.scala | 65 +++++++++-
34 files changed, 753 insertions(+), 47 deletions(-)
diff --git a/README.md b/README.md
index 9ff2fb982..1eb87b279 100644
--- a/README.md
+++ b/README.md
@@ -90,6 +90,11 @@ To compile for Spark 3.5 with Java21, please use the
following command
> **_NOTE:_** Celeborn supports automatic builds on linux aarch64 platform via
> `aarch64` profile. `aarch64` profile requires glibc version 3.4.21. There is
> potential problematic frame `C [libc.so.6+0x8412a]` for other glibc version
> like 2.x etc.
+To build Celeborn with Aliyun OSS support MPU, please use the following command
+```shell
+./build/make-distribution.sh --sbt-enabled -Pspark-3.4 -Pjdk-8 -Paliyun
+```
+
### Package Details
Build procedure will create a compressed package.
@@ -256,6 +261,16 @@ Disks: {/mnt/disk1=DiskInfo(maxSlots: 6679, committed
shuffles 0, running applic
WorkerRef: null
```
+Shuffle OSS storage related configurations:
+```properties
+# If you are using Celeborn for shuffle OSS storage, these settings will be
needed.
+celeborn.storage.availableTypes OSS
+celeborn.storage.oss.dir oss://<bucket_name>/
+celeborn.storage.oss.access.key <oss_access_key>
+celeborn.storage.oss.secret.key <oss_secret_key>
+celeborn.storage.oss.endpoint oss-cn-<region>[-internal].aliyuncs.com
+```
+
#### Deploy Celeborn on K8S
Please refer to our
[website](https://celeborn.apache.org/docs/latest/deploy_on_k8s/)
@@ -310,6 +325,16 @@ spark.dynamicAllocation.shuffleTracking.enabled false
spark.executor.userClassPathFirst false
```
+Shuffle OSS storage related configurations:
+```properties
+# If you are using Celeborn for shuffle OSS storage, these settings will be
needed.
+spark.celeborn.storage.availableTypes OSS
+spark.celeborn.storage.oss.dir oss://<bucket_name>/
+spark.celeborn.storage.oss.access.key <oss_access_key>
+spark.celeborn.storage.oss.secret.key <oss_secret_key>
+spark.celeborn.storage.oss.endpoint oss-cn-<region>[-internal].aliyuncs.com
+```
+
### Deploy Flink client
**Important: Only Flink batch jobs are supported for now. Due to the Shuffle
Service in Flink is cluster-granularity, if you want to use Celeborn in a
session cluster, it will not be able to submit both streaming and batch job to
the same cluster. We plan to get rid of this restriction for Hybrid Shuffle
mode in a future release.**
diff --git a/build/make-distribution.sh b/build/make-distribution.sh
index c19426898..96b0b4aa6 100755
--- a/build/make-distribution.sh
+++ b/build/make-distribution.sh
@@ -285,6 +285,9 @@ function sbt_build_service {
if [[ $@ == *"aws"* ]]; then
export SBT_MAVEN_PROFILES="aws"
fi
+ if [[ $@ == *"aliyun"* ]]; then
+ export SBT_MAVEN_PROFILES="aliyun"
+ fi
BUILD_COMMAND=("$SBT" clean package)
# Actually build the jar
diff --git
a/client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java
b/client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java
index cbcfae938..ffcc50dde 100644
---
a/client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java
+++
b/client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java
@@ -609,6 +609,7 @@ public abstract class CelebornInputStream extends
InputStream {
checkpointMetadata);
}
case S3:
+ case OSS:
case HDFS:
return new DfsPartitionReader(
conf,
diff --git
a/client/src/main/java/org/apache/celeborn/client/read/DfsPartitionReader.java
b/client/src/main/java/org/apache/celeborn/client/read/DfsPartitionReader.java
index 003c7cd12..58fb1aac0 100644
---
a/client/src/main/java/org/apache/celeborn/client/read/DfsPartitionReader.java
+++
b/client/src/main/java/org/apache/celeborn/client/read/DfsPartitionReader.java
@@ -101,9 +101,12 @@ public class DfsPartitionReader implements PartitionReader
{
this.metricsCallback = metricsCallback;
this.location = location;
- if (location.getStorageInfo() != null
- && location.getStorageInfo().getType() == StorageInfo.Type.S3) {
- this.hadoopFs = ShuffleClient.getHadoopFs(conf).get(StorageInfo.Type.S3);
+ if (location.getStorageInfo() != null) {
+ if (location.getStorageInfo().getType() == StorageInfo.Type.S3) {
+ this.hadoopFs =
ShuffleClient.getHadoopFs(conf).get(StorageInfo.Type.S3);
+ } else if (location.getStorageInfo().getType() == StorageInfo.Type.OSS) {
+ this.hadoopFs =
ShuffleClient.getHadoopFs(conf).get(StorageInfo.Type.OSS);
+ }
} else {
this.hadoopFs =
ShuffleClient.getHadoopFs(conf).get(StorageInfo.Type.HDFS);
}
diff --git
a/common/src/main/java/org/apache/celeborn/common/meta/DiskFileInfo.java
b/common/src/main/java/org/apache/celeborn/common/meta/DiskFileInfo.java
index bcef84a6e..c7161483b 100644
--- a/common/src/main/java/org/apache/celeborn/common/meta/DiskFileInfo.java
+++ b/common/src/main/java/org/apache/celeborn/common/meta/DiskFileInfo.java
@@ -157,7 +157,11 @@ public class DiskFileInfo extends FileInfo {
return Utils.isS3Path(filePath);
}
+ public boolean isOSS() {
+ return Utils.isOssPath(filePath);
+ }
+
public boolean isDFS() {
- return Utils.isS3Path(filePath) || Utils.isHdfsPath(filePath);
+ return Utils.isS3Path(filePath) || Utils.isOssPath(filePath) ||
Utils.isHdfsPath(filePath);
}
}
diff --git
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index c1503b3ca..59a085af0 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -659,6 +659,8 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable
with Logging with Se
get(ACTIVE_STORAGE_TYPES).contains(StorageInfo.Type.HDFS.name()) &&
get(HDFS_DIR).isDefined
def hasS3Storage: Boolean =
get(ACTIVE_STORAGE_TYPES).contains(StorageInfo.Type.S3.name()) &&
get(S3_DIR).isDefined
+ def hasOssStorage: Boolean =
+ get(ACTIVE_STORAGE_TYPES).contains(StorageInfo.Type.OSS.name()) &&
get(OSS_DIR).isDefined
def masterSlotAssignLoadAwareDiskGroupNum: Int =
get(MASTER_SLOT_ASSIGN_LOADAWARE_DISKGROUP_NUM)
def masterSlotAssignLoadAwareDiskGroupGradient: Double =
get(MASTER_SLOT_ASSIGN_LOADAWARE_DISKGROUP_GRADIENT)
@@ -1155,7 +1157,7 @@ class CelebornConf(loadDefaults: Boolean) extends
Cloneable with Logging with Se
(dir, maxCapacity, flushThread, diskType)
}
}.getOrElse {
- if (!hasHDFSStorage && !hasS3Storage) {
+ if (!hasHDFSStorage && !hasS3Storage && !hasOssStorage) {
val prefix = workerStorageBaseDirPrefix
val number = workerStorageBaseDirNumber
val diskType = Type.valueOf(workerStorageBaseDirDiskType)
@@ -1198,6 +1200,22 @@ class CelebornConf(loadDefaults: Boolean) extends
Cloneable with Logging with Se
}.getOrElse("")
}
+ def ossDir: String = {
+ get(OSS_DIR).map {
+ ossDir =>
+ if (!Utils.isOssPath(ossDir)) {
+ log.error(s"${OSS_DIR.key} configuration is wrong $ossDir. Disable
OSS support.")
+ ""
+ } else {
+ ossDir
+ }
+ }.getOrElse("")
+ }
+ def ossEndpoint: String = get(OSS_ENDPOINT).getOrElse("")
+ def ossAccessKey: String = get(OSS_ACCESS_KEY).getOrElse("")
+ def ossSecretKey: String = get(OSS_SECRET_KEY).getOrElse("")
+ def ossIgnoreCredentials: Boolean = get(OSS_IGNORE_CREDENTIALS)
+
def hdfsDir: String = {
get(HDFS_DIR).map {
hdfsDir =>
@@ -1270,11 +1288,13 @@ class CelebornConf(loadDefaults: Boolean) extends
Cloneable with Logging with Se
def workerFlusherBufferSize: Long = get(WORKER_FLUSHER_BUFFER_SIZE)
def workerHdfsFlusherBufferSize: Long = get(WORKER_HDFS_FLUSHER_BUFFER_SIZE)
def workerS3FlusherBufferSize: Long = get(WORKER_S3_FLUSHER_BUFFER_SIZE)
+ def workerOssFlusherBufferSize: Long = get(WORKER_OSS_FLUSHER_BUFFER_SIZE)
def workerWriterCloseTimeoutMs: Long = get(WORKER_WRITER_CLOSE_TIMEOUT)
def workerHddFlusherThreads: Int = get(WORKER_FLUSHER_HDD_THREADS)
def workerSsdFlusherThreads: Int = get(WORKER_FLUSHER_SSD_THREADS)
def workerHdfsFlusherThreads: Int = get(WORKER_FLUSHER_HDFS_THREADS)
def workerS3FlusherThreads: Int = get(WORKER_FLUSHER_S3_THREADS)
+ def workerOssFlusherThreads: Int = get(WORKER_FLUSHER_OSS_THREADS)
def workerCreateWriterMaxAttempts: Int =
get(WORKER_WRITER_CREATE_MAX_ATTEMPTS)
// //////////////////////////////////////////////////////
@@ -3180,6 +3200,46 @@ object CelebornConf extends Logging {
.intConf
.createWithDefault(5)
+ val OSS_ENDPOINT: OptionalConfigEntry[String] =
+ buildConf("celeborn.storage.oss.endpoint")
+ .categories("worker", "master", "client")
+ .version("0.6.0")
+ .doc("OSS endpoint for Celeborn to store shuffle data.")
+ .stringConf
+ .createOptional
+
+ val OSS_DIR: OptionalConfigEntry[String] =
+ buildConf("celeborn.storage.oss.dir")
+ .categories("worker", "master", "client")
+ .version("0.6.0")
+ .doc("OSS base directory for Celeborn to store shuffle data.")
+ .stringConf
+ .createOptional
+
+ val OSS_SECRET_KEY: OptionalConfigEntry[String] =
+ buildConf("celeborn.storage.oss.secret.key")
+ .categories("worker", "master", "client")
+ .version("0.6.0")
+ .doc("OSS secret key for Celeborn to store shuffle data.")
+ .stringConf
+ .createOptional
+
+ val OSS_IGNORE_CREDENTIALS: ConfigEntry[Boolean] =
+ buildConf("celeborn.storage.oss.ignore.credentials")
+ .categories("worker", "master", "client")
+ .version("0.6.0")
+ .doc("Whether to skip oss credentials, disable this config to support
jindo sdk .")
+ .booleanConf
+ .createWithDefault(true)
+
+ val OSS_ACCESS_KEY: OptionalConfigEntry[String] =
+ buildConf("celeborn.storage.oss.access.key")
+ .categories("worker", "master", "client")
+ .version("0.6.0")
+ .doc("OSS access key for Celeborn to store shuffle data.")
+ .stringConf
+ .createOptional
+
val WORKER_DISK_RESERVE_SIZE: ConfigEntry[Long] =
buildConf("celeborn.worker.storage.disk.reserve.size")
.withAlternative("celeborn.worker.disk.reserve.size")
@@ -3664,6 +3724,14 @@ object CelebornConf extends Logging {
.bytesConf(ByteUnit.BYTE)
.createWithDefaultString("6m")
+ val WORKER_OSS_FLUSHER_BUFFER_SIZE: ConfigEntry[Long] =
+ buildConf("celeborn.worker.flusher.oss.buffer.size")
+ .categories("worker")
+ .version("0.6.0")
+ .doc("Size of buffer used by a OSS flusher.")
+ .bytesConf(ByteUnit.BYTE)
+ .createWithDefaultString("6m")
+
val WORKER_WRITER_CLOSE_TIMEOUT: ConfigEntry[Long] =
buildConf("celeborn.worker.writer.close.timeout")
.categories("worker")
@@ -3712,6 +3780,14 @@ object CelebornConf extends Logging {
.intConf
.createWithDefault(8)
+ val WORKER_FLUSHER_OSS_THREADS: ConfigEntry[Int] =
+ buildConf("celeborn.worker.flusher.oss.threads")
+ .categories("worker")
+ .doc("Flusher's thread count used for write data to OSS.")
+ .version("0.6.0")
+ .intConf
+ .createWithDefault(8)
+
val WORKER_FLUSHER_SHUTDOWN_TIMEOUT: ConfigEntry[Long] =
buildConf("celeborn.worker.flusher.shutdownTimeout")
.categories("worker")
@@ -5744,7 +5820,7 @@ object CelebornConf extends Logging {
.categories("master", "worker", "client")
.version("0.3.0")
.doc(
- "Enabled storages. Available options: MEMORY,HDD,SSD,HDFS,S3. Note:
HDD and SSD would be treated as identical.")
+ "Enabled storages. Available options: MEMORY,HDD,SSD,HDFS,S3,OSS.
Note: HDD and SSD would be treated as identical.")
.stringConf
.transform(_.toUpperCase(Locale.ROOT))
.checkValue(p => p.split(",").map(StorageInfo.validate).reduce(_ && _),
"")
diff --git
a/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala
b/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala
index 71401735f..2a8f8d6a4 100644
--- a/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala
@@ -212,13 +212,15 @@ class WorkerInfo(
curDisk.activeSlots = newDisk.activeSlots
curDisk.avgFlushTime = newDisk.avgFlushTime
curDisk.avgFetchTime = newDisk.avgFetchTime
- if (estimatedPartitionSize.nonEmpty && curDisk.storageType !=
StorageInfo.Type.HDFS && curDisk.storageType != StorageInfo.Type.S3) {
+ if (estimatedPartitionSize.nonEmpty && curDisk.storageType !=
StorageInfo.Type.HDFS
+ && curDisk.storageType != StorageInfo.Type.S3 &&
curDisk.storageType != StorageInfo.Type.OSS) {
curDisk.maxSlots = curDisk.totalSpace / estimatedPartitionSize.get
curDisk.availableSlots = curDisk.actualUsableSpace /
estimatedPartitionSize.get
}
curDisk.setStatus(newDisk.status)
} else {
- if (estimatedPartitionSize.nonEmpty && newDisk.storageType !=
StorageInfo.Type.HDFS && newDisk.storageType != StorageInfo.Type.S3) {
+ if (estimatedPartitionSize.nonEmpty && newDisk.storageType !=
StorageInfo.Type.HDFS
+ && newDisk.storageType != StorageInfo.Type.S3 &&
newDisk.storageType != StorageInfo.Type.OSS) {
newDisk.maxSlots = newDisk.totalSpace / estimatedPartitionSize.get
newDisk.availableSlots = newDisk.actualUsableSpace /
estimatedPartitionSize.get
}
diff --git
a/common/src/main/scala/org/apache/celeborn/common/util/CelebornHadoopUtils.scala
b/common/src/main/scala/org/apache/celeborn/common/util/CelebornHadoopUtils.scala
index abd80a081..58fde690b 100644
---
a/common/src/main/scala/org/apache/celeborn/common/util/CelebornHadoopUtils.scala
+++
b/common/src/main/scala/org/apache/celeborn/common/util/CelebornHadoopUtils.scala
@@ -25,6 +25,7 @@ import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.security.UserGroupInformation
import org.apache.celeborn.common.CelebornConf
+import org.apache.celeborn.common.CelebornConf.{OSS_ACCESS_KEY, OSS_SECRET_KEY}
import org.apache.celeborn.common.exception.CelebornException
import org.apache.celeborn.common.internal.Logging
import org.apache.celeborn.common.protocol.StorageInfo
@@ -61,6 +62,18 @@ object CelebornHadoopUtils extends Logging {
hadoopConf.set("fs.s3a.access.key", conf.s3AccessKey)
hadoopConf.set("fs.s3a.secret.key", conf.s3SecretKey)
hadoopConf.set("fs.s3a.endpoint.region", conf.s3EndpointRegion)
+ } else if (conf.ossDir.nonEmpty) {
+ if (conf.ossAccessKey.isEmpty || conf.ossSecretKey.isEmpty ||
conf.ossEndpoint.isEmpty) {
+ throw new CelebornException(
+ "OSS storage is enabled but ossAccessKey, ossSecretKey, or
ossEndpoint is not set")
+ }
+ if (conf.ossIgnoreCredentials) {
+ hadoopConf.set("fs.oss.credentials.provider", "")
+ }
+ hadoopConf.set("fs.oss.impl",
"org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem")
+ hadoopConf.set("fs.oss.accessKeyId", conf.ossAccessKey)
+ hadoopConf.set("fs.oss.accessKeySecret", conf.ossSecretKey)
+ hadoopConf.set("fs.oss.endpoint", conf.ossEndpoint)
}
appendSparkHadoopConfigs(conf, hadoopConf)
hadoopConf
@@ -85,6 +98,10 @@ object CelebornHadoopUtils extends Logging {
val s3Dir = new Path(conf.s3Dir)
hadoopFs.put(StorageInfo.Type.S3, s3Dir.getFileSystem(hadoopConf))
}
+ if (conf.hasOssStorage) {
+ val ossDir = new Path(conf.ossDir)
+ hadoopFs.put(StorageInfo.Type.OSS, ossDir.getFileSystem(hadoopConf))
+ }
hadoopFs
}
diff --git a/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
b/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
index 2de6a13ec..2978d824f 100644
--- a/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
@@ -1216,8 +1216,9 @@ object Utils extends Logging {
val SORTED_SUFFIX = ".sorted"
val INDEX_SUFFIX = ".index"
val SUFFIX_HDFS_WRITE_SUCCESS = ".success"
- val COMPATIBLE_HDFS_REGEX = "^(?!s3://)(?!s3a://)[a-zA-Z0-9]+://.*"
+ val COMPATIBLE_HDFS_REGEX = "^(?!s3://)(?!s3a://)(?!oss://)[a-zA-Z0-9]+://.*"
val S3_REGEX = "^s3[a]?://([a-z0-9][a-z0-9-]{1,61}[a-z0-9])(/.*)?$"
+ val OSS_REGEX = "^oss?://([a-z0-9][a-z0-9-]{1,61}[a-z0-9])(/.*)?$"
val UNKNOWN_APP_SHUFFLE_ID = -1
@@ -1229,6 +1230,10 @@ object Utils extends Logging {
path.matches(S3_REGEX)
}
+ def isOssPath(path: String): Boolean = {
+ path.matches(OSS_REGEX)
+ }
+
def getSortedFilePath(path: String): String = {
path + SORTED_SUFFIX
}
diff --git
a/common/src/test/scala/org/apache/celeborn/common/util/UtilsSuite.scala
b/common/src/test/scala/org/apache/celeborn/common/util/UtilsSuite.scala
index 03c6176ed..d75006cf7 100644
--- a/common/src/test/scala/org/apache/celeborn/common/util/UtilsSuite.scala
+++ b/common/src/test/scala/org/apache/celeborn/common/util/UtilsSuite.scala
@@ -171,9 +171,9 @@ class UtilsSuite extends CelebornFunSuite {
val ossPath = "oss://xxxx/xx-xx/x-x-x"
val sortedOssPath = "oss://xxxx/xx-xx/x-x-x.sorted"
val indexOssPath = "oss://xxxx/xx-xx/x-x-x.index"
- assert(true == Utils.isHdfsPath(ossPath))
- assert(true == Utils.isHdfsPath(sortedOssPath))
- assert(true == Utils.isHdfsPath(indexOssPath))
+ assert(false == Utils.isHdfsPath(ossPath))
+ assert(false == Utils.isHdfsPath(sortedOssPath))
+ assert(false == Utils.isHdfsPath(indexOssPath))
val localPath = "/xxx/xxx/xx-xx/x-x-x"
assert(false == Utils.isHdfsPath(localPath))
@@ -191,6 +191,18 @@ class UtilsSuite extends CelebornFunSuite {
assert(true == Utils.isS3Path(indexS3Path))
}
+ test("validate oss compatible fs path") {
+ val hdfsPath = "hdfs://xxx:9000/xxxx/xx-xx/x-x-x"
+ val simpleOssPath = "oss://xxxx/xx-xx/x-x-x"
+ val sortedOssPath = "oss://xxx/xxxx/xx-xx/x-x-x.sorted"
+ val indexOssPath = "oss://xxx/xxxx/xx-xx/x-x-x.index"
+ assert(false == Utils.isOssPath(hdfsPath))
+ assert(false == Utils.isHdfsPath(simpleOssPath))
+ assert(true == Utils.isOssPath(simpleOssPath))
+ assert(true == Utils.isOssPath(sortedOssPath))
+ assert(true == Utils.isOssPath(indexOssPath))
+ }
+
test("GetReducerFileGroupResponse class convert with pb") {
val fileGroup = new util.HashMap[Integer, util.Set[PartitionLocation]]
fileGroup.put(0, partitionLocation(0))
diff --git a/dev/reformat b/dev/reformat
index cf3e6f12e..cac0d4dc9 100755
--- a/dev/reformat
+++ b/dev/reformat
@@ -34,6 +34,7 @@ else
${PROJECT_DIR}/build/mvn spotless:apply -Pspark-3.5
${PROJECT_DIR}/build/mvn spotless:apply -Pspark-4.0
${PROJECT_DIR}/build/mvn spotless:apply -Paws
+ ${PROJECT_DIR}/build/mvn spotless:apply -Paliyun
${PROJECT_DIR}/build/mvn spotless:apply -Pmr
${PROJECT_DIR}/build/mvn spotless:apply -Ptez
fi
diff --git a/docs/configuration/client.md b/docs/configuration/client.md
index 5b3a25308..ccb794319 100644
--- a/docs/configuration/client.md
+++ b/docs/configuration/client.md
@@ -133,8 +133,13 @@ license: |
| celeborn.master.endpoints.resolver |
org.apache.celeborn.common.client.StaticMasterEndpointResolver | false |
Resolver class that can be used for discovering and updating the master
endpoints. This allows users to provide a custom master endpoint resolver
implementation. This is useful in environments where the master nodes might
change due to scaling operations or infrastructure updates. Clients need to
ensure that provided resolver class should be present in the classpath. | 0.5.2
| |
| celeborn.quota.enabled | true | false | When Master side sets to true, the
master will enable to check the quota via QuotaManager. When Client side sets
to true, LifecycleManager will request Master side to check whether the current
user has enough quota before registration of shuffle. Fallback to the default
shuffle service when Master side checks that there is no enough quota for
current user. | 0.2.0 | |
| celeborn.quota.interruptShuffle.enabled | false | false | Whether to enable
interrupt shuffle when quota exceeds. | 0.6.0 | |
-| celeborn.storage.availableTypes | HDD | false | Enabled storages. Available
options: MEMORY,HDD,SSD,HDFS,S3. Note: HDD and SSD would be treated as
identical. | 0.3.0 | celeborn.storage.activeTypes |
+| celeborn.storage.availableTypes | HDD | false | Enabled storages. Available
options: MEMORY,HDD,SSD,HDFS,S3,OSS. Note: HDD and SSD would be treated as
identical. | 0.3.0 | celeborn.storage.activeTypes |
| celeborn.storage.hdfs.dir | <undefined> | false | HDFS base directory
for Celeborn to store shuffle data. | 0.2.0 | |
+| celeborn.storage.oss.access.key | <undefined> | false | OSS access key
for Celeborn to store shuffle data. | 0.6.0 | |
+| celeborn.storage.oss.dir | <undefined> | false | OSS base directory
for Celeborn to store shuffle data. | 0.6.0 | |
+| celeborn.storage.oss.endpoint | <undefined> | false | OSS endpoint for
Celeborn to store shuffle data. | 0.6.0 | |
+| celeborn.storage.oss.ignore.credentials | true | false | Whether to skip oss
credentials, disable this config to support jindo sdk . | 0.6.0 | |
+| celeborn.storage.oss.secret.key | <undefined> | false | OSS secret key
for Celeborn to store shuffle data. | 0.6.0 | |
| celeborn.storage.s3.access.key | <undefined> | false | S3 access key
for Celeborn to store shuffle data. | 0.6.0 | |
| celeborn.storage.s3.dir | <undefined> | false | S3 base directory for
Celeborn to store shuffle data. | 0.6.0 | |
| celeborn.storage.s3.endpoint.region | <undefined> | false | S3
endpoint for Celeborn to store shuffle data. | 0.6.0 | |
diff --git a/docs/configuration/master.md b/docs/configuration/master.md
index 6441fd912..b7e363f15 100644
--- a/docs/configuration/master.md
+++ b/docs/configuration/master.md
@@ -87,10 +87,15 @@ license: |
| celeborn.quota.tenant.enabled | true | false | Whether to enable
tenant-level quota. | 0.6.0 | |
| celeborn.quota.user.enabled | true | false | Whether to enable user-level
quota. | 0.6.0 | |
| celeborn.redaction.regex | (?i)secret|password|token|access[.]key | false |
Regex to decide which Celeborn configuration properties and environment
variables in master and worker environments contain sensitive information. When
this regex matches a property key or value, the value is redacted from the
logging. | 0.5.0 | |
-| celeborn.storage.availableTypes | HDD | false | Enabled storages. Available
options: MEMORY,HDD,SSD,HDFS,S3. Note: HDD and SSD would be treated as
identical. | 0.3.0 | celeborn.storage.activeTypes |
+| celeborn.storage.availableTypes | HDD | false | Enabled storages. Available
options: MEMORY,HDD,SSD,HDFS,S3,OSS. Note: HDD and SSD would be treated as
identical. | 0.3.0 | celeborn.storage.activeTypes |
| celeborn.storage.hdfs.dir | <undefined> | false | HDFS base directory
for Celeborn to store shuffle data. | 0.2.0 | |
| celeborn.storage.hdfs.kerberos.keytab | <undefined> | false | Kerberos
keytab file path for HDFS storage connection. | 0.3.2 | |
| celeborn.storage.hdfs.kerberos.principal | <undefined> | false |
Kerberos principal for HDFS storage connection. | 0.3.2 | |
+| celeborn.storage.oss.access.key | <undefined> | false | OSS access key
for Celeborn to store shuffle data. | 0.6.0 | |
+| celeborn.storage.oss.dir | <undefined> | false | OSS base directory
for Celeborn to store shuffle data. | 0.6.0 | |
+| celeborn.storage.oss.endpoint | <undefined> | false | OSS endpoint for
Celeborn to store shuffle data. | 0.6.0 | |
+| celeborn.storage.oss.ignore.credentials | true | false | Whether to skip oss
credentials, disable this config to support jindo sdk . | 0.6.0 | |
+| celeborn.storage.oss.secret.key | <undefined> | false | OSS secret key
for Celeborn to store shuffle data. | 0.6.0 | |
| celeborn.storage.s3.access.key | <undefined> | false | S3 access key
for Celeborn to store shuffle data. | 0.6.0 | |
| celeborn.storage.s3.dir | <undefined> | false | S3 base directory for
Celeborn to store shuffle data. | 0.6.0 | |
| celeborn.storage.s3.endpoint.region | <undefined> | false | S3
endpoint for Celeborn to store shuffle data. | 0.6.0 | |
diff --git a/docs/configuration/worker.md b/docs/configuration/worker.md
index a69c435da..854b1ae3f 100644
--- a/docs/configuration/worker.md
+++ b/docs/configuration/worker.md
@@ -42,10 +42,15 @@ license: |
| celeborn.redaction.regex | (?i)secret|password|token|access[.]key | false |
Regex to decide which Celeborn configuration properties and environment
variables in master and worker environments contain sensitive information. When
this regex matches a property key or value, the value is redacted from the
logging. | 0.5.0 | |
| celeborn.shuffle.chunk.size | 8m | false | Max chunk size of reducer's
merged shuffle data. For example, if a reducer's shuffle data is 128M and the
data will need 16 fetch chunk requests to fetch. | 0.2.0 | |
| celeborn.shuffle.sortPartition.block.compactionFactor | 0.25 | false |
Combine sorted shuffle blocks such that size of compacted shuffle block does
not exceed compactionFactor * celeborn.shuffle.chunk.size | 0.4.2 | |
-| celeborn.storage.availableTypes | HDD | false | Enabled storages. Available
options: MEMORY,HDD,SSD,HDFS,S3. Note: HDD and SSD would be treated as
identical. | 0.3.0 | celeborn.storage.activeTypes |
+| celeborn.storage.availableTypes | HDD | false | Enabled storages. Available
options: MEMORY,HDD,SSD,HDFS,S3,OSS. Note: HDD and SSD would be treated as
identical. | 0.3.0 | celeborn.storage.activeTypes |
| celeborn.storage.hdfs.dir | <undefined> | false | HDFS base directory
for Celeborn to store shuffle data. | 0.2.0 | |
| celeborn.storage.hdfs.kerberos.keytab | <undefined> | false | Kerberos
keytab file path for HDFS storage connection. | 0.3.2 | |
| celeborn.storage.hdfs.kerberos.principal | <undefined> | false |
Kerberos principal for HDFS storage connection. | 0.3.2 | |
+| celeborn.storage.oss.access.key | <undefined> | false | OSS access key
for Celeborn to store shuffle data. | 0.6.0 | |
+| celeborn.storage.oss.dir | <undefined> | false | OSS base directory
for Celeborn to store shuffle data. | 0.6.0 | |
+| celeborn.storage.oss.endpoint | <undefined> | false | OSS endpoint for
Celeborn to store shuffle data. | 0.6.0 | |
+| celeborn.storage.oss.ignore.credentials | true | false | Whether to skip oss
credentials, disable this config to support jindo sdk . | 0.6.0 | |
+| celeborn.storage.oss.secret.key | <undefined> | false | OSS secret key
for Celeborn to store shuffle data. | 0.6.0 | |
| celeborn.storage.s3.access.key | <undefined> | false | S3 access key
for Celeborn to store shuffle data. | 0.6.0 | |
| celeborn.storage.s3.dir | <undefined> | false | S3 base directory for
Celeborn to store shuffle data. | 0.6.0 | |
| celeborn.storage.s3.endpoint.region | <undefined> | false | S3
endpoint for Celeborn to store shuffle data. | 0.6.0 | |
@@ -85,6 +90,8 @@ license: |
| celeborn.worker.flusher.hdd.threads | 1 | false | Flusher's thread count per
disk used for write data to HDD disks. | 0.2.0 | |
| celeborn.worker.flusher.hdfs.buffer.size | 4m | false | Size of buffer used
by a HDFS flusher. | 0.3.0 | |
| celeborn.worker.flusher.hdfs.threads | 8 | false | Flusher's thread count
used for write data to HDFS. | 0.2.0 | |
+| celeborn.worker.flusher.oss.buffer.size | 6m | false | Size of buffer used
by a OSS flusher. | 0.6.0 | |
+| celeborn.worker.flusher.oss.threads | 8 | false | Flusher's thread count
used for write data to OSS. | 0.6.0 | |
| celeborn.worker.flusher.s3.buffer.size | 6m | false | Size of buffer used by
a S3 flusher. | 0.6.0 | |
| celeborn.worker.flusher.s3.threads | 8 | false | Flusher's thread count used
for write data to S3. | 0.6.0 | |
| celeborn.worker.flusher.shutdownTimeout | 3s | false | Timeout for a flusher
to shutdown. | 0.2.0 | |
diff --git a/master/pom.xml b/master/pom.xml
index 6ac7742e8..25f349f4e 100644
--- a/master/pom.xml
+++ b/master/pom.xml
@@ -199,5 +199,20 @@
</dependency>
</dependencies>
</profile>
+ <profile>
+ <id>aliyun</id>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-aliyun</artifactId>
+ <version>${hadoop.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.aliyun.oss</groupId>
+ <artifactId>aliyun-sdk-oss</artifactId>
+ <version>${aliyun-sdk-oss.version}</version>
+ </dependency>
+ </dependencies>
+ </profile>
</profiles>
</project>
diff --git
a/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java
b/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java
index aab79cc2f..75e680a3b 100644
---
a/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java
+++
b/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java
@@ -71,7 +71,8 @@ public class SlotsAllocator {
if (diskInfoEntry.getValue().status().equals(DiskStatus.HEALTHY)) {
if (StorageInfo.localDiskAvailable(availableStorageTypes)
&& diskInfoEntry.getValue().storageType() !=
StorageInfo.Type.HDFS
- && diskInfoEntry.getValue().storageType() !=
StorageInfo.Type.S3) {
+ && diskInfoEntry.getValue().storageType() != StorageInfo.Type.S3
+ && diskInfoEntry.getValue().storageType() !=
StorageInfo.Type.OSS) {
usableDisks.add(
new UsableDiskInfo(
diskInfoEntry.getValue(),
diskInfoEntry.getValue().getAvailableSlots()));
@@ -85,6 +86,11 @@ public class SlotsAllocator {
usableDisks.add(
new UsableDiskInfo(
diskInfoEntry.getValue(),
diskInfoEntry.getValue().getAvailableSlots()));
+ } else if (StorageInfo.OSSAvailable(availableStorageTypes)
+ && diskInfoEntry.getValue().storageType() ==
StorageInfo.Type.OSS) {
+ usableDisks.add(
+ new UsableDiskInfo(
+ diskInfoEntry.getValue(),
diskInfoEntry.getValue().availableSlots()));
}
}
}
@@ -141,7 +147,8 @@ public class SlotsAllocator {
if (diskInfo.actualUsableSpace() > 0
&& diskInfo.status().equals(DiskStatus.HEALTHY)
&& diskInfo.storageType() != StorageInfo.Type.HDFS
- && diskInfo.storageType() != StorageInfo.Type.S3) {
+ && diskInfo.storageType() != StorageInfo.Type.S3
+ && diskInfo.storageType() != StorageInfo.Type.OSS) {
usableDisks.add(diskInfo);
}
}));
@@ -201,6 +208,8 @@ public class SlotsAllocator {
storageInfo = new StorageInfo("", StorageInfo.Type.HDFS,
availableStorageTypes);
} else if (selectedDiskInfo.storageType() == StorageInfo.Type.S3) {
storageInfo = new StorageInfo("", StorageInfo.Type.S3,
availableStorageTypes);
+ } else if (selectedDiskInfo.storageType() == StorageInfo.Type.OSS) {
+ storageInfo = new StorageInfo("", StorageInfo.Type.OSS,
availableStorageTypes);
} else {
storageInfo =
new StorageInfo(
@@ -215,6 +224,7 @@ public class SlotsAllocator {
selectedWorker.diskInfos().values().stream()
.filter(p -> p.storageType() != StorageInfo.Type.HDFS)
.filter(p -> p.storageType() != StorageInfo.Type.S3)
+ .filter(p -> p.storageType() != StorageInfo.Type.OSS)
.collect(Collectors.toList())
.toArray(new DiskInfo[0]);
storageInfo =
@@ -225,6 +235,8 @@ public class SlotsAllocator {
workerDiskIndex.put(selectedWorker, (diskIndex + 1) %
diskInfos.length);
} else if (StorageInfo.S3Available(availableStorageTypes)) {
storageInfo = new StorageInfo("", StorageInfo.Type.S3,
availableStorageTypes);
+ } else if (StorageInfo.OSSAvailable(availableStorageTypes)) {
+ storageInfo = new StorageInfo("", StorageInfo.Type.OSS,
availableStorageTypes);
} else {
storageInfo = new StorageInfo("", StorageInfo.Type.HDFS,
availableStorageTypes);
}
diff --git
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
index ab3449e74..2acaea57b 100644
---
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
+++
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
@@ -282,12 +282,18 @@ public abstract class AbstractMetaManager implements
IMetadataHandler {
disks.values().stream().filter(s ->
!s.status().equals(DiskStatus.HEALTHY)).count();
boolean exceed = unhealthyDiskNum * 1.0 / disks.size() >=
unhealthyDiskRatioThreshold;
if (!excludedWorkers.contains(worker)
- && (((disks.isEmpty() || exceed) && !conf.hasHDFSStorage() &&
!conf.hasS3Storage())
+ && (((disks.isEmpty() || exceed)
+ && !conf.hasHDFSStorage()
+ && !conf.hasS3Storage()
+ && !conf.hasOssStorage())
|| highWorkload)) {
LOG.warn(
"Worker {} (unhealthy disks num: {}) adds to excluded workers",
worker, unhealthyDiskNum);
excludedWorkers.add(worker);
- } else if ((availableSlots.get() > 0 || conf.hasHDFSStorage() ||
conf.hasS3Storage())
+ } else if ((availableSlots.get() > 0
+ || conf.hasHDFSStorage()
+ || conf.hasS3Storage()
+ || conf.hasOssStorage())
&& !highWorkload) {
// only unblack if numSlots larger than 0
excludedWorkers.remove(worker);
diff --git
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
index 55d3ad579..1400a832e 100644
---
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
+++
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
@@ -189,6 +189,7 @@ private[celeborn] class Master(
private val dfsExpireDirsTimeoutMS = conf.dfsExpireDirsTimeoutMS
private val hasHDFSStorage = conf.hasHDFSStorage
private val hasS3Storage = conf.hasS3Storage
+ private val hasOssStorage = conf.hasOssStorage
private val quotaManager = new QuotaManager(
statusSystem,
@@ -334,7 +335,7 @@ private[celeborn] class Master(
CheckForWorkerUnavailableInfoTimeout)
}
- if (hasHDFSStorage || hasS3Storage) {
+ if (hasHDFSStorage || hasS3Storage || hasOssStorage) {
checkForDFSRemnantDirsTimeOutTask =
scheduleCheckTask(dfsExpireDirsTimeoutMS,
CheckForDFSExpiredDirsTimeout)
}
@@ -1064,7 +1065,7 @@ private[celeborn] class Master(
statusSystem.handleAppLost(appId, requestId)
quotaManager.handleAppLost(appId)
logInfo(s"Removed application $appId")
- if (hasHDFSStorage || hasS3Storage) {
+ if (hasHDFSStorage || hasS3Storage || hasOssStorage) {
checkAndCleanExpiredAppDirsOnDFS(appId)
}
if (context != null) {
@@ -1086,6 +1087,7 @@ private[celeborn] class Master(
}
if (hasHDFSStorage) processDir(conf.hdfsDir, expiredDir)
if (hasS3Storage) processDir(conf.s3Dir, expiredDir)
+ if (hasOssStorage) processDir(conf.ossDir, expiredDir)
}
private def processDir(dfsDir: String, expiredDir: String): Unit = {
diff --git
a/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorSuiteJ.java
b/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorSuiteJ.java
index 78f7727bf..95bd3a284 100644
---
a/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorSuiteJ.java
+++
b/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorSuiteJ.java
@@ -331,20 +331,36 @@ public class SlotsAllocatorSuiteJ {
boolean expectSuccess,
boolean roundRobin,
boolean enableS3) {
+ checkSlotsOnDFS(
+ workers, partitionIds, shouldReplicate, expectSuccess, roundRobin,
enableS3, false);
+ }
+
+ private void checkSlotsOnDFS(
+ List<WorkerInfo> workers,
+ List<Integer> partitionIds,
+ boolean shouldReplicate,
+ boolean expectSuccess,
+ boolean roundRobin,
+ boolean enableS3,
+ boolean enableOss) {
CelebornConf conf = new CelebornConf();
+ int availableStorageTypes;
if (enableS3) {
conf.set("celeborn.active.storage.levels", "S3");
+ availableStorageTypes = StorageInfo.S3_MASK;
+ } else if (enableOss) {
+ conf.set("celeborn.active.storage.levels", "OSS");
+ availableStorageTypes = StorageInfo.OSS_MASK;
} else {
conf.set("celeborn.active.storage.levels", "HDFS");
+ availableStorageTypes = StorageInfo.HDFS_MASK;
}
Map<WorkerInfo, Tuple2<List<PartitionLocation>, List<PartitionLocation>>>
slots;
if (roundRobin) {
- int availableStorageTypes = enableS3 ? StorageInfo.S3_MASK :
StorageInfo.HDFS_MASK;
slots =
SlotsAllocator.offerSlotsRoundRobin(
workers, partitionIds, shouldReplicate, false,
availableStorageTypes);
} else {
- int availableStorageTypes = enableS3 ? StorageInfo.S3_MASK :
StorageInfo.HDFS_MASK;
slots =
SlotsAllocator.offerSlotsLoadAware(
workers,
@@ -502,4 +518,37 @@ public class SlotsAllocatorSuiteJ {
checkSlotsOnDFS(workers, partitionIds, shouldReplicate, true, true, true);
checkSlotsOnDFS(workers, partitionIds, shouldReplicate, true, false, true);
}
+
+ @Test
+ public void testOssOnly() {
+ final List<WorkerInfo> workers = prepareWorkers(false);
+ final List<Integer> partitionIds = new ArrayList<>();
+ for (int i = 0; i < 3000; i++) {
+ partitionIds.add(i);
+ }
+ final boolean shouldReplicate = true;
+ checkSlotsOnDFS(workers, partitionIds, shouldReplicate, true, true, false,
true);
+ }
+
+ @Test
+ public void testLocalDisksAndOss() {
+ final List<WorkerInfo> workers = prepareWorkers(true);
+ DiskInfo ossDiskInfo1 =
+ new DiskInfo(
+ "OSS", Long.MAX_VALUE, 999999, 999999, Integer.MAX_VALUE,
StorageInfo.Type.OSS);
+ DiskInfo ossDiskInfo2 =
+ new DiskInfo(
+ "OSS", Long.MAX_VALUE, 999999, 999999, Integer.MAX_VALUE,
StorageInfo.Type.OSS);
+ ossDiskInfo1.maxSlots_$eq(Long.MAX_VALUE);
+ ossDiskInfo2.maxSlots_$eq(Long.MAX_VALUE);
+ workers.get(0).diskInfos().put("OSS", ossDiskInfo1);
+ workers.get(1).diskInfos().put("OSS", ossDiskInfo2);
+ final List<Integer> partitionIds = new ArrayList<>();
+ for (int i = 0; i < 3000; i++) {
+ partitionIds.add(i);
+ }
+ final boolean shouldReplicate = true;
+ checkSlotsOnDFS(workers, partitionIds, shouldReplicate, true, true, false,
true);
+ checkSlotsOnDFS(workers, partitionIds, shouldReplicate, true, false,
false, true);
+ }
}
diff --git a/multipart-uploader/multipart-uploader-oss/pom.xml
b/multipart-uploader/multipart-uploader-oss/pom.xml
new file mode 100644
index 000000000..28a78c138
--- /dev/null
+++ b/multipart-uploader/multipart-uploader-oss/pom.xml
@@ -0,0 +1,62 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.celeborn</groupId>
+ <artifactId>celeborn-parent_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <relativePath>../../pom.xml</relativePath>
+ </parent>
+
+
<artifactId>celeborn-multipart-uploader-oss_${scala.binary.version}</artifactId>
+ <packaging>jar</packaging>
+ <name>Celeborn Multipart Uploader OSS</name>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.celeborn</groupId>
+ <artifactId>celeborn-service_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-aliyun</artifactId>
+ <version>${hadoop.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.aliyun.oss</groupId>
+ <artifactId>aliyun-sdk-oss</artifactId>
+ <version>${aliyun-sdk-oss.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>commons-collections</groupId>
+ <artifactId>commons-collections</artifactId>
+ <version>3.2.2</version>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-dependency-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git
a/multipart-uploader/multipart-uploader-oss/src/main/java/org/apache/celeborn/OssMultipartUploadHandler.java
b/multipart-uploader/multipart-uploader-oss/src/main/java/org/apache/celeborn/OssMultipartUploadHandler.java
new file mode 100644
index 000000000..01749a3d2
--- /dev/null
+++
b/multipart-uploader/multipart-uploader-oss/src/main/java/org/apache/celeborn/OssMultipartUploadHandler.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+
+import com.aliyun.oss.OSS;
+import com.aliyun.oss.OSSClientBuilder;
+import com.aliyun.oss.event.ProgressListener;
+import com.aliyun.oss.model.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.celeborn.server.common.service.mpu.MultipartUploadHandler;
+
+public class OssMultipartUploadHandler implements MultipartUploadHandler {
+
+ private static final Logger log =
LoggerFactory.getLogger(OssMultipartUploadHandler.class);
+
+ private final String bucketName;
+ private final String key;
+ private final OSS ossClient;
+
+ private String uploadId;
+
+ public OssMultipartUploadHandler(
+ String endpoint, String bucketName, String accessKey, String secretKey,
String key) {
+ this.bucketName = bucketName;
+ this.ossClient = new OSSClientBuilder().build(endpoint, accessKey,
secretKey);
+ this.key = key;
+ }
+
+ @Override
+ public void startUpload() {
+ uploadId =
+ ossClient
+ .initiateMultipartUpload(new
InitiateMultipartUploadRequest(bucketName, key))
+ .getUploadId();
+ }
+
+ @Override
+ public void putPart(InputStream inputStream, Integer partNumber, Boolean
finalFlush)
+ throws IOException {
+ try (InputStream inStream = inputStream) {
+ int partSize = inStream.available();
+ if (partSize == 0) {
+ log.debug(
+ "key {} uploadId {} part size is 0 for part number {} finalFlush
{}",
+ key,
+ uploadId,
+ partNumber,
+ finalFlush);
+ return;
+ }
+ ossClient.uploadPart(
+ new UploadPartRequest(bucketName, key, uploadId, partNumber,
inStream, partSize));
+ log.debug(
+ "key {} uploadId {} part number {} uploaded with size {} finalFlush
{}",
+ key,
+ uploadId,
+ partNumber,
+ partSize,
+ finalFlush);
+ } catch (RuntimeException | IOException e) {
+ log.error("Failed to upload part", e);
+ throw e;
+ }
+ }
+
+ @Override
+ public void complete() {
+ List<PartETag> partETags = new ArrayList<>();
+ ListPartsRequest listPartsRequest = new ListPartsRequest(bucketName, key,
uploadId);
+ PartListing partListing;
+ do {
+ partListing = ossClient.listParts(listPartsRequest);
+ for (PartSummary part : partListing.getParts()) {
+ partETags.add(new PartETag(part.getPartNumber(), part.getETag()));
+ }
+
listPartsRequest.setPartNumberMarker(partListing.getNextPartNumberMarker());
+ } while (partListing.isTruncated());
+ if (partETags.size() == 0) {
+ log.debug(
+ "bucket {} key {} uploadId {} has no parts uploaded, aborting
upload",
+ bucketName,
+ key,
+ uploadId);
+ abort();
+ log.debug("bucket {} key {} upload completed with size {}", bucketName,
key, 0);
+ return;
+ }
+ ProgressListener progressListener =
+ event ->
+ log.debug(
+ "key {} uploadId {} progress event type {} transferred {}
bytes",
+ key,
+ uploadId,
+ event.getEventType(),
+ event.getBytes());
+
+ CompleteMultipartUploadResult compResult =
+ ossClient.completeMultipartUpload(
+ new CompleteMultipartUploadRequest(bucketName, key, uploadId,
partETags)
+ .withProgressListener(progressListener));
+ log.debug(
+ "bucket {} key {} uploadId {} upload completed location is in {} ",
+ bucketName,
+ key,
+ uploadId,
+ compResult.getLocation());
+ }
+
+ @Override
+ public void abort() {
+ ossClient.abortMultipartUpload(new AbortMultipartUploadRequest(bucketName,
key, uploadId));
+ }
+
+ @Override
+ public void close() {
+ if (ossClient != null) {
+ ossClient.shutdown();
+ }
+ }
+}
diff --git a/pom.xml b/pom.xml
index 18fb2b6ad..959aeb245 100644
--- a/pom.xml
+++ b/pom.xml
@@ -72,6 +72,7 @@
<!-- use hadoop-3 as default -->
<hadoop.version>3.3.6</hadoop.version>
<aws.version>1.12.532</aws.version>
+ <aliyun-sdk-oss.version>3.13.0</aliyun-sdk-oss.version>
<!--
If you change codahale.metrics.version, you also need to change
the link to metrics.dropwizard.io in docs/monitoring.md.
@@ -1352,6 +1353,15 @@
<aws-deps>true</aws-deps>
</properties>
</profile>
+ <profile>
+ <id>aliyun</id>
+ <modules>
+ <module>multipart-uploader/multipart-uploader-oss</module>
+ </modules>
+ <properties>
+ <aliyun-deps>true</aliyun-deps>
+ </properties>
+ </profile>
<profile>
<id>spark-2.4</id>
<modules>
diff --git a/project/CelebornBuild.scala b/project/CelebornBuild.scala
index 79580a774..dfeff9c30 100644
--- a/project/CelebornBuild.scala
+++ b/project/CelebornBuild.scala
@@ -47,10 +47,12 @@ object Dependencies {
val commonsIoVersion = "2.17.0"
val commonsLoggingVersion = "1.1.3"
val commonsLang3Version = "3.17.0"
+ val commonsCollectionsVersion = "3.2.2"
val findbugsVersion = "1.3.9"
val guavaVersion = "33.1.0-jre"
val hadoopVersion = "3.3.6"
val awsS3Version = "1.12.532"
+ val aliyunOssVersion = "3.13.0"
val junitInterfaceVersion = "0.13.3"
// don't forget update `junitInterfaceVersion` when we upgrade junit
val junitVersion = "4.13.2"
@@ -123,6 +125,9 @@ object Dependencies {
val hadoopAws = "org.apache.hadoop" % "hadoop-aws" % hadoopVersion
excludeAll (
ExclusionRule("com.amazonaws", "aws-java-sdk-bundle"))
val awsS3 = "com.amazonaws" % "aws-java-sdk-s3" % awsS3Version
+ val commonsCollections = "commons-collections" % "commons-collections" %
commonsCollectionsVersion
+ val hadoopAliyun = "org.apache.hadoop" % "hadoop-aliyun" % hadoopVersion
+ val aliyunOss = "com.aliyun.oss" % "aliyun-sdk-oss" % aliyunOssVersion
val ioDropwizardMetricsCore = "io.dropwizard.metrics" % "metrics-core" %
metricsVersion
val ioDropwizardMetricsGraphite = "io.dropwizard.metrics" %
"metrics-graphite" % metricsVersion excludeAll (
ExclusionRule("com.rabbitmq", "amqp-client"))
@@ -454,8 +459,9 @@ object Utils {
profiles
}
- val celeborMPUProject = profiles.filter(_.startsWith("aws")).headOption
match {
+ val celeborMPUProject = profiles.find(p => p.startsWith("aws") ||
p.startsWith("aliyun")) match {
case Some("aws") => Some(CeleborMPU.celeborMPU)
+ case Some("aliyun") => Some(CeleborMPU.celeborMPUOss)
case _ => None
}
@@ -569,6 +575,7 @@ object CelebornSpi {
object CeleborMPU {
lazy val hadoopAwsDependencies = Seq(Dependencies.hadoopAws,
Dependencies.awsS3)
+ lazy val hadoopAliyunDependencies = Seq(Dependencies.commonsCollections,
Dependencies.hadoopAliyun, Dependencies.aliyunOss)
lazy val celeborMPU = Project("celeborn-multipart-uploader-s3",
file("multipart-uploader/multipart-uploader-s3"))
.dependsOn(CelebornService.service % "test->test;compile->compile")
@@ -579,6 +586,16 @@ object CeleborMPU {
Dependencies.log4jSlf4jImpl,
) ++ hadoopAwsDependencies
)
+
+ lazy val celeborMPUOss = Project("celeborn-multipart-uploader-oss",
file("multipart-uploader/multipart-uploader-oss"))
+ .dependsOn(CelebornService.service % "test->test;compile->compile")
+ .settings (
+ commonSettings,
+ libraryDependencies ++= Seq(
+ Dependencies.log4j12Api,
+ Dependencies.log4jSlf4jImpl,
+ ) ++ hadoopAliyunDependencies
+ )
}
object CelebornCommon {
@@ -702,6 +719,15 @@ object CelebornService {
}
object CelebornMaster {
+ val mpuDependencies =
+ if (profiles.exists(_.startsWith("aws"))) {
+ CeleborMPU.hadoopAwsDependencies
+ } else if (profiles.exists(_.startsWith("aliyun"))) {
+ CeleborMPU.hadoopAliyunDependencies
+ } else {
+ Seq.empty
+ }
+
lazy val master = Project("celeborn-master", file("master"))
.dependsOn(CelebornCommon.common)
.dependsOn(CelebornCommon.common % "test->test;compile->compile")
@@ -724,7 +750,7 @@ object CelebornMaster {
Dependencies.ratisServer,
Dependencies.ratisShell,
Dependencies.scalatestMockito % "test",
- ) ++ commonUnitTestDependencies
+ ) ++ commonUnitTestDependencies ++ mpuDependencies
)
}
@@ -738,6 +764,8 @@ object CelebornWorker {
if (profiles.exists(_.startsWith("aws"))) {
worker = worker.dependsOn(CeleborMPU.celeborMPU)
+ } else if (profiles.exists(_.startsWith("aliyun"))) {
+ worker = worker.dependsOn(CeleborMPU.celeborMPUOss)
}
worker = worker.settings(
@@ -1032,6 +1060,7 @@ trait SparkClientProjects {
name.startsWith("failureaccess-") ||
name.startsWith("netty-") ||
name.startsWith("commons-lang3-") ||
+ name.startsWith("commons-io-") ||
name.startsWith("RoaringBitmap-"))
}
},
diff --git a/worker/pom.xml b/worker/pom.xml
index 12dc7c941..6d9c858a1 100644
--- a/worker/pom.xml
+++ b/worker/pom.xml
@@ -153,5 +153,15 @@
</dependency>
</dependencies>
</profile>
+ <profile>
+ <id>aliyun</id>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.celeborn</groupId>
+
<artifactId>celeborn-multipart-uploader-oss_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ </dependencies>
+ </profile>
</profiles>
</project>
diff --git
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java
index acaa777d4..3cacd4866 100644
---
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java
+++
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java
@@ -154,6 +154,8 @@ public class PartitionDataWriter implements DeviceObserver {
return null;
} else if (diskFileInfo.isS3()) {
storageInfo = new StorageInfo(StorageInfo.Type.S3, true,
diskFileInfo.getFilePath());
+ } else if (diskFileInfo.isOSS()) {
+ return new StorageInfo(StorageInfo.Type.OSS, true,
diskFileInfo.getFilePath());
} else {
storageInfo = new StorageInfo(StorageInfo.Type.HDFS, true,
diskFileInfo.getFilePath());
}
diff --git
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java
index 3385874c7..a45fa2e23 100644
---
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java
+++
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java
@@ -485,8 +485,12 @@ public class PartitionFilesSorter extends
ShuffleRecoverHelper {
FSDataOutputStream dfsIndexOutput = null;
FileChannel indexFileChannel = null;
if (isDfs) {
- boolean isS3 = Utils.isS3Path(indexFilePath);
- StorageInfo.Type storageType = isS3 ? StorageInfo.Type.S3 :
StorageInfo.Type.HDFS;
+ StorageInfo.Type storageType = StorageInfo.Type.HDFS;
+ if (Utils.isS3Path(indexFilePath)) {
+ storageType = StorageInfo.Type.S3;
+ } else if (Utils.isOssPath(indexFilePath)) {
+ storageType = StorageInfo.Type.OSS;
+ }
FileSystem hadoopFs = StorageManager.hadoopFs().get(storageType);
// If the index file exists, it will be overwritten.
// So there is no need to check its existence.
@@ -599,13 +603,19 @@ public class PartitionFilesSorter extends
ShuffleRecoverHelper {
() -> {
FileChannel indexChannel = null;
FSDataInputStream dfsIndexStream = null;
- boolean isDfs = Utils.isHdfsPath(indexFilePath) ||
Utils.isS3Path(indexFilePath);
- boolean isS3 = Utils.isS3Path(indexFilePath);
+ boolean isDfs =
+ Utils.isHdfsPath(indexFilePath)
+ || Utils.isS3Path(indexFilePath)
+ || Utils.isOssPath(indexFilePath);
int indexSize;
try {
if (isDfs) {
- StorageInfo.Type storageType =
- isS3 ? StorageInfo.Type.S3 : StorageInfo.Type.HDFS;
+ StorageInfo.Type storageType = StorageInfo.Type.HDFS;
+ if (Utils.isS3Path(indexFilePath)) {
+ storageType = StorageInfo.Type.S3;
+ } else if (Utils.isOssPath(indexFilePath)) {
+ storageType = StorageInfo.Type.OSS;
+ }
FileSystem hadoopFs =
StorageManager.hadoopFs().get(storageType);
dfsIndexStream = hadoopFs.open(new Path(indexFilePath));
indexSize = (int) hadoopFs.getFileStatus(new
Path(indexFilePath)).getLen();
@@ -656,6 +666,7 @@ public class PartitionFilesSorter extends
ShuffleRecoverHelper {
private final String shuffleKey;
private final boolean isHdfs;
private final boolean isS3;
+ private final boolean isOss;
private final boolean isDfs;
private final boolean isPrefetch;
private final FileInfo originFileInfo;
@@ -672,7 +683,8 @@ public class PartitionFilesSorter extends
ShuffleRecoverHelper {
this.sortedFilePath = Utils.getSortedFilePath(originFilePath);
this.isHdfs = fileInfo.isHdfs();
this.isS3 = fileInfo.isS3();
- this.isDfs = isHdfs || isS3;
+ this.isOss = fileInfo.isOSS();
+ this.isDfs = isHdfs || isS3 || isOss;
this.isPrefetch = !isDfs && prefetchEnabled;
this.originFileLen = fileInfo.getFileLength();
this.fileId = fileId;
@@ -688,8 +700,12 @@ public class PartitionFilesSorter extends
ShuffleRecoverHelper {
indexFile.delete();
}
} else {
- boolean isS3 = Utils.isS3Path(indexFilePath);
- StorageInfo.Type storageType = isS3 ? StorageInfo.Type.S3 :
StorageInfo.Type.HDFS;
+ StorageInfo.Type storageType = StorageInfo.Type.HDFS;
+ if (Utils.isS3Path(indexFilePath)) {
+ storageType = StorageInfo.Type.S3;
+ } else if (Utils.isOssPath(indexFilePath)) {
+ storageType = StorageInfo.Type.OSS;
+ }
this.hadoopFs = StorageManager.hadoopFs().get(storageType);
if (hadoopFs.exists(fileInfo.getDfsSortedPath())) {
hadoopFs.delete(fileInfo.getDfsSortedPath(), false);
diff --git
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/TierWriterHelper.java
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/TierWriterHelper.java
index a24fc0a21..3524beb78 100644
---
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/TierWriterHelper.java
+++
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/TierWriterHelper.java
@@ -41,4 +41,19 @@ public class TierWriterHelper {
.build()
.newInstance(bucketName, s3AccessKey, s3SecretKey,
s3EndpointRegion, key, maxRetryies);
}
+
+ public static MultipartUploadHandler getOssMultipartUploadHandler(
+ String ossEndpoint, String bucketName, String ossAccessKey, String
ossSecretKey, String key) {
+ return (MultipartUploadHandler)
+ DynConstructors.builder()
+ .impl(
+ "org.apache.celeborn.OssMultipartUploadHandler",
+ String.class,
+ String.class,
+ String.class,
+ String.class,
+ String.class)
+ .build()
+ .newInstance(ossEndpoint, bucketName, ossAccessKey, ossSecretKey,
key);
+ }
}
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala
index f79dffee0..52650cb6c 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala
@@ -186,7 +186,7 @@ private[deploy] class Controller(
return
}
- if (storageManager.healthyWorkingDirs().size <= 0 && !conf.hasHDFSStorage
&& !conf.hasS3Storage) {
+ if (storageManager.healthyWorkingDirs().size <= 0 && !conf.hasHDFSStorage
&& !conf.hasS3Storage && !conf.hasOssStorage) {
val msg = "Local storage has no available dirs!"
logError(s"[handleReserveSlots] $msg")
context.reply(ReserveSlotsResponse(StatusCode.NO_AVAILABLE_WORKING_DIR,
msg))
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
index 2e9895beb..568e0d193 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
@@ -288,6 +288,12 @@ class FetchHandler(
shuffleKey,
fileName)
makeStreamHandler(streamId, numChunks = 0)
+ case info: DiskFileInfo if info.isOSS =>
+ chunkStreamManager.registerStream(
+ streamId,
+ shuffleKey,
+ fileName)
+ makeStreamHandler(streamId, numChunks = 0)
case _ =>
val managedBuffer = fileInfo match {
case df: DiskFileInfo =>
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
index dccbf47e6..d74e7a380 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
@@ -45,7 +45,7 @@ import org.apache.celeborn.common.protocol.message.StatusCode
import org.apache.celeborn.common.unsafe.Platform
import org.apache.celeborn.common.util.{ExceptionUtils, Utils}
import
org.apache.celeborn.service.deploy.worker.congestcontrol.CongestionController
-import org.apache.celeborn.service.deploy.worker.storage.{HdfsFlusher,
LocalFlusher, PartitionDataWriter, S3Flusher, StorageManager}
+import org.apache.celeborn.service.deploy.worker.storage.{LocalFlusher,
PartitionDataWriter, StorageManager}
class PushDataHandler(val workerSource: WorkerSource) extends
BaseMessageHandler with Logging {
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/FlushTask.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/FlushTask.scala
index 897e28733..736e1ff4a 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/FlushTask.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/FlushTask.scala
@@ -78,3 +78,19 @@ private[worker] class S3FlushTask(
s3MultipartUploader.putPart(inputStream, partNumber, finalFlush)
}
}
+
+private[worker] class OssFlushTask(
+ buffer: CompositeByteBuf,
+ notifier: FlushNotifier,
+ keepBuffer: Boolean,
+ ossMultipartUploader: MultipartUploadHandler,
+ partNumber: Int,
+ finalFlush: Boolean = false)
+ extends FlushTask(buffer, notifier, keepBuffer) {
+
+ override def flush(): Unit = {
+ val bytes = ByteBufUtil.getBytes(buffer)
+ val inputStream = new ByteArrayInputStream(bytes)
+ ossMultipartUploader.putPart(inputStream, partNumber, finalFlush)
+ }
+}
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/Flusher.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/Flusher.scala
index a10059436..1aa7bde6f 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/Flusher.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/Flusher.scala
@@ -210,3 +210,22 @@ final private[worker] class S3Flusher(
override def toString: String = s"s3Flusher@$flusherId"
}
+
+final private[worker] class OssFlusher(
+ workerSource: AbstractSource,
+ ossFlusherThreads: Int,
+ allocator: ByteBufAllocator,
+ maxComponents: Int) extends Flusher(
+ workerSource,
+ ossFlusherThreads,
+ allocator,
+ maxComponents,
+ null,
+ "OSS") with Logging {
+
+ override def processIOException(e: IOException, deviceErrorType:
DiskStatus): Unit = {
+ logError(s"$this write failed, reason $deviceErrorType ,exception: $e")
+ }
+
+ override def toString: String = s"ossFlusher@$flusherId"
+}
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
index 0bc94bc8a..1d369cf27 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
@@ -58,6 +58,7 @@ final private[worker] class StorageManager(conf:
CelebornConf, workerSource: Abs
JavaUtils.newConcurrentHashMap[File, ConcurrentHashMap[String,
PartitionDataWriter]]()
val hdfsWriters = JavaUtils.newConcurrentHashMap[String,
PartitionDataWriter]()
val s3Writers = JavaUtils.newConcurrentHashMap[String, PartitionDataWriter]()
+ val ossWriters = JavaUtils.newConcurrentHashMap[String,
PartitionDataWriter]()
val memoryWriters = JavaUtils.newConcurrentHashMap[MemoryFileInfo,
PartitionDataWriter]()
// (shuffleKey->(filename->DiskFileInfo))
private val diskFileInfos =
@@ -70,6 +71,8 @@ final private[worker] class StorageManager(conf:
CelebornConf, workerSource: Abs
val hasS3Storage = conf.hasS3Storage
+ val hasOssStorage = conf.hasOssStorage
+
val storageExpireDirTimeout = conf.workerStorageExpireDirTimeout
val storagePolicy = new StoragePolicy(conf, this, workerSource)
@@ -83,7 +86,7 @@ final private[worker] class StorageManager(conf:
CelebornConf, workerSource: Abs
(new File(workdir, conf.workerWorkingDir), maxSpace, flusherThread,
storageType)
}
- if (workingDirInfos.size <= 0 && !hasHDFSStorage && !hasS3Storage) {
+ if (workingDirInfos.size <= 0 && !hasHDFSStorage && !hasS3Storage &&
!hasOssStorage) {
throw new IOException("Empty working directory configuration!")
}
@@ -100,6 +103,11 @@ final private[worker] class StorageManager(conf:
CelebornConf, workerSource: Abs
Option(new DiskInfo("S3", Long.MaxValue, 999999, 999999, 0,
StorageInfo.Type.S3))
else None
+ val ossDiskInfo =
+ if (conf.hasOssStorage)
+ Option(new DiskInfo("OSS", Long.MaxValue, 999999, 999999, 0,
StorageInfo.Type.OSS))
+ else None
+
def disksSnapshot(): List[DiskInfo] = {
diskInfos.synchronized {
val disks = new util.ArrayList[DiskInfo](diskInfos.values())
@@ -159,6 +167,7 @@ final private[worker] class StorageManager(conf:
CelebornConf, workerSource: Abs
val hdfsDir = conf.hdfsDir
val s3Dir = conf.s3Dir
+ val ossDir = conf.ossDir
val hdfsPermission = new FsPermission("755")
val (hdfsFlusher, _totalHdfsFlusherThread) =
if (hasHDFSStorage) {
@@ -202,15 +211,36 @@ final private[worker] class StorageManager(conf:
CelebornConf, workerSource: Abs
(None, 0)
}
+ val (ossFlusher, _totalOssFlusherThread) =
+ if (hasOssStorage) {
+ logInfo(s"Initialize OSS support with path $ossDir")
+ try {
+ StorageManager.hadoopFs = CelebornHadoopUtils.getHadoopFS(conf)
+ } catch {
+ case e: Exception =>
+ logError("Celeborn initialize OSS failed.", e)
+ throw e
+ }
+ (
+ Some(new OssFlusher(
+ workerSource,
+ conf.workerOssFlusherThreads,
+ storageBufferAllocator,
+ conf.workerPushMaxComponents)),
+ conf.workerOssFlusherThreads)
+ } else {
+ (None, 0)
+ }
+
def totalFlusherThread: Int =
- _totalLocalFlusherThread + _totalHdfsFlusherThread + _totalS3FlusherThread
+ _totalLocalFlusherThread + _totalHdfsFlusherThread + _totalS3FlusherThread
+ _totalOssFlusherThread
val activeTypes = conf.availableStorageTypes
lazy val localOrDfsStorageAvailable: Boolean = {
StorageInfo.OSSAvailable(activeTypes) || StorageInfo.HDFSAvailable(
activeTypes) || StorageInfo.localDiskAvailable(
- activeTypes) || hdfsDir.nonEmpty || !diskInfos.isEmpty || s3Dir.nonEmpty
+ activeTypes) || hdfsDir.nonEmpty || !diskInfos.isEmpty || s3Dir.nonEmpty
|| ossDir.nonEmpty
}
override def notifyError(mountPoint: String, diskStatus: DiskStatus): Unit =
this.synchronized {
@@ -419,7 +449,7 @@ final private[worker] class StorageManager(conf:
CelebornConf, workerSource: Abs
userIdentifier: UserIdentifier,
partitionSplitEnabled: Boolean,
isSegmentGranularityVisible: Boolean): PartitionDataWriter = {
- if (healthyWorkingDirs().size <= 0 && !hasHDFSStorage && !hasS3Storage) {
+ if (healthyWorkingDirs().size <= 0 && !hasHDFSStorage && !hasS3Storage &&
!hasOssStorage) {
throw new IOException("No available working dirs!")
}
val partitionDataWriterContext = new PartitionDataWriterContext(
@@ -477,6 +507,10 @@ final private[worker] class StorageManager(conf:
CelebornConf, workerSource: Abs
s3Writers.put(fileInfo.getFilePath, writer)
return
}
+ if (fileInfo.isOSS) {
+ ossWriters.put(fileInfo.getFilePath, writer)
+ return
+ }
deviceMonitor.registerFileWriter(writer, fileInfo.getFilePath)
workingDirWriters.computeIfAbsent(workingDir,
workingDirWriterListFunc).put(
fileInfo.getFilePath,
@@ -565,6 +599,14 @@ final private[worker] class StorageManager(conf:
CelebornConf, workerSource: Abs
s"Destroy FileWriter $s3FileWriter caused by shuffle $shuffleKey
expired."))
s3Writers.remove(info.getFilePath)
}
+ } else if (info.isOSS) {
+ isDfsExpired = true
+ val ossFileWriter = ossWriters.get(info.getFilePath)
+ if (ossFileWriter != null) {
+ ossFileWriter.destroy(new IOException(
+ s"Destroy FileWriter $ossFileWriter caused by shuffle
$shuffleKey expired."))
+ ossWriters.remove(info.getFilePath)
+ }
} else {
val workingDir =
info.getFile.getParentFile.getParentFile.getParentFile
@@ -597,12 +639,14 @@ final private[worker] class StorageManager(conf:
CelebornConf, workerSource: Abs
val removedFileInfos = diskFileInfos.remove(shuffleKey)
var isDfsExpired = false
var isHdfs = false
+ var isOss = false
if (removedFileInfos != null) {
removedFileInfos.asScala.foreach {
case (_, fileInfo) =>
if (cleanFileInternal(shuffleKey, fileInfo)) {
isDfsExpired = true
isHdfs = fileInfo.isHdfs
+ isOss = fileInfo.isOSS
}
}
}
@@ -617,9 +661,14 @@ final private[worker] class StorageManager(conf:
CelebornConf, workerSource: Abs
}
if (isDfsExpired) {
try {
- val dir = if (hasHDFSStorage && isHdfs) hdfsDir else s3Dir
+ val dir =
+ if (hasHDFSStorage && isHdfs) hdfsDir
+ else if (hasOssStorage && isOss) ossDir
+ else s3Dir
val storageInfo =
- if (hasHDFSStorage && isHdfs) StorageInfo.Type.HDFS else
StorageInfo.Type.S3
+ if (hasHDFSStorage && isHdfs) StorageInfo.Type.HDFS
+ else if (hasOssStorage && isOss) StorageInfo.Type.OSS
+ else StorageInfo.Type.S3
StorageManager.hadoopFs.get(storageInfo).delete(
new Path(new Path(dir, conf.workerWorkingDir),
s"$appId/$shuffleId"),
true)
@@ -726,7 +775,7 @@ final private[worker] class StorageManager(conf:
CelebornConf, workerSource: Abs
val dfsCleaned = hadoopFs match {
case dfs: FileSystem =>
- val dfsDir = if (hasHDFSStorage) hdfsDir else s3Dir
+ val dfsDir = if (hasHDFSStorage) hdfsDir else if (hasOssStorage)
ossDir else s3Dir
val dfsWorkPath = new Path(dfsDir, conf.workerWorkingDir)
// DFS path not exist when first time initialize
if (dfs.exists(dfsWorkPath)) {
@@ -965,7 +1014,8 @@ final private[worker] class StorageManager(conf:
CelebornConf, workerSource: Abs
null,
null,
null)
- } else if (location.getStorageInfo.localDiskAvailable() ||
location.getStorageInfo.HDFSAvailable() ||
location.getStorageInfo.S3Available()) {
+ } else if (location.getStorageInfo.localDiskAvailable() ||
location.getStorageInfo.HDFSAvailable()
+ || location.getStorageInfo.S3Available() ||
location.getStorageInfo.OSSAvailable()) {
logDebug(s"create non-memory file for
${partitionDataWriterContext.getShuffleKey}
${partitionDataWriterContext.getPartitionLocation.getFileName}")
val createDiskFileResult = createDiskFile(
location,
@@ -1038,7 +1088,7 @@ final private[worker] class StorageManager(conf:
CelebornConf, workerSource: Abs
}
healthyWorkingDirs()
}
- if (dirs.isEmpty && hdfsFlusher.isEmpty && s3Flusher.isEmpty) {
+ if (dirs.isEmpty && hdfsFlusher.isEmpty && s3Flusher.isEmpty &&
ossFlusher.isEmpty) {
throw new IOException(s"No available disks! suggested mountPoint
$suggestedMountPoint")
}
@@ -1078,6 +1128,24 @@ final private[worker] class StorageManager(conf:
CelebornConf, workerSource: Abs
fileName,
s3FileInfo)
return (s3Flusher.get, s3FileInfo, null)
+ } else if (hasOssStorage && location.getStorageInfo.OSSAvailable()) {
+ val shuffleDir =
+ new Path(new Path(ossDir, conf.workerWorkingDir),
s"$appId/$shuffleId")
+ FileSystem.mkdirs(
+ StorageManager.hadoopFs.get(StorageInfo.Type.OSS),
+ shuffleDir,
+ hdfsPermission)
+ val ossFilePath = new Path(shuffleDir, fileName).toString
+ val ossFileInfo = new DiskFileInfo(
+ userIdentifier,
+ partitionSplitEnabled,
+ new ReduceFileMeta(conf.shuffleChunkSize),
+ ossFilePath,
+ StorageInfo.Type.OSS)
+ diskFileInfos.computeIfAbsent(shuffleKey, diskFileInfoMapFunc).put(
+ fileName,
+ ossFileInfo)
+ return (ossFlusher.get, ossFileInfo, null)
} else if (dirs.nonEmpty &&
location.getStorageInfo.localDiskAvailable()) {
val dir = dirs(getNextIndex % dirs.size)
val mountPoint = DeviceInfo.getMountPoint(dir.getAbsolutePath,
mountPoints)
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala
index f1996441f..8ac98aae8 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala
@@ -68,7 +68,10 @@ abstract class TierWriterBase(
def write(buf: ByteBuf): Unit = {
ensureNotClosed()
- if (notifier.hasException) return
+ if (notifier.hasException) {
+ handleException()
+ return
+ }
flushLock.synchronized {
metaHandler.beforeWrite(buf)
@@ -80,6 +83,8 @@ abstract class TierWriterBase(
numPendingWrites.decrementAndGet()
}
+ def handleException(): Unit
+
protected def writeInternal(buf: ByteBuf): Unit
def needEvict(): Boolean
@@ -348,6 +353,8 @@ class MemoryTierWriter(
override def getFlusher(): Flusher = {
null
}
+
+ override def handleException(): Unit = {}
}
class LocalTierWriter(
@@ -471,6 +478,8 @@ class LocalTierWriter(
def getFlusher(): Flusher = {
flusher
}
+
+ override def handleException(): Unit = {}
}
class DfsTierWriter(
@@ -500,11 +509,14 @@ class DfsTierWriter(
val hadoopFs: FileSystem = StorageManager.hadoopFs.get(storageType)
var deleted = false
private var s3MultipartUploadHandler: MultipartUploadHandler = _
+ private var ossMultipartUploadHandler: MultipartUploadHandler = _
var partNumber: Int = 1
this.flusherBufferSize =
if (hdfsFileInfo.isS3()) {
conf.workerS3FlusherBufferSize
+ } else if (hdfsFileInfo.isOSS()) {
+ conf.workerOssFlusherBufferSize
} else {
conf.workerHdfsFlusherBufferSize
}
@@ -530,6 +542,24 @@ class DfsTierWriter(
key,
conf.s3MultiplePartUploadMaxRetries)
s3MultipartUploadHandler.startUpload()
+ } else if (hdfsFileInfo.isOSS) {
+ val configuration = hadoopFs.getConf
+ val ossEndpoint = configuration.get("fs.oss.endpoint")
+ val ossAccessKey = configuration.get("fs.oss.accessKeyId")
+ val ossSecretKey = configuration.get("fs.oss.accessKeySecret")
+
+ val uri = hadoopFs.getUri
+ val bucketName = uri.getHost
+ val index = hdfsFileInfo.getFilePath.indexOf(bucketName)
+ val key = hdfsFileInfo.getFilePath.substring(index + bucketName.length +
1)
+
+ this.ossMultipartUploadHandler =
TierWriterHelper.getOssMultipartUploadHandler(
+ ossEndpoint,
+ bucketName,
+ ossAccessKey,
+ ossSecretKey,
+ key)
+ ossMultipartUploadHandler.startUpload()
}
} catch {
case _: IOException =>
@@ -556,11 +586,21 @@ class DfsTierWriter(
notifier.numPendingFlushes.incrementAndGet()
if (hdfsFileInfo.isHdfs) {
new HdfsFlushTask(flushBuffer, hdfsFileInfo.getDfsPath(), notifier, true)
+ } else if (hdfsFileInfo.isOSS) {
+ val flushTask = new OssFlushTask(
+ flushBuffer,
+ notifier,
+ true,
+ ossMultipartUploadHandler,
+ partNumber,
+ finalFlush)
+ partNumber = partNumber + 1
+ flushTask
} else {
val flushTask = new S3FlushTask(
flushBuffer,
notifier,
- false,
+ true,
s3MultipartUploadHandler,
partNumber,
finalFlush)
@@ -610,6 +650,14 @@ class DfsTierWriter(
}
indexOutputStream.close()
}
+ if (s3MultipartUploadHandler != null) {
+ s3MultipartUploadHandler.complete()
+ s3MultipartUploadHandler.close()
+ }
+ if (ossMultipartUploadHandler != null) {
+ ossMultipartUploadHandler.complete()
+ ossMultipartUploadHandler.close()
+ }
}
override def notifyFileCommitted(): Unit =
@@ -643,4 +691,17 @@ class DfsTierWriter(
def getFlusher(): Flusher = {
flusher
}
+
+ override def handleException(): Unit = {
+ if (s3MultipartUploadHandler != null) {
+ logWarning(s"Abort s3 multipart upload for ${fileInfo.getFilePath}")
+ s3MultipartUploadHandler.complete()
+ s3MultipartUploadHandler.close()
+ }
+ if (ossMultipartUploadHandler != null) {
+ logWarning(s"Abort Oss multipart upload for ${fileInfo.getFilePath}")
+ ossMultipartUploadHandler.complete()
+ ossMultipartUploadHandler.close()
+ }
+ }
}