This is an automated email from the ASF dual-hosted git repository.

ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 7d0ba7f9b [CELEBORN-1916] Support Aliyun OSS Based on MPU Extension 
Interface
7d0ba7f9b is described below

commit 7d0ba7f9b8f10f3e71b9356267ef00d7d7fc9771
Author: veli.yang <[email protected]>
AuthorDate: Tue Apr 8 15:10:33 2025 +0800

    [CELEBORN-1916] Support Aliyun OSS Based on MPU Extension Interface
    
    ### What changes were proposed in this pull request?
    
    - close [CELEBORN-1916](https://issues.apache.org/jira/browse/CELEBORN-1916)
    - This PR extends the Multipart Uploader (MPU) interface to support Aliyun 
OSS.
    
    ### Why are the changes needed?
    
    - Implemented multipart-uploader-oss module based on the existing MPU 
extension interface.
    - Added necessary configurations and dependencies for Aliyun OSS 
integration.
    - Ensured compatibility with the existing multipart-uploader framework.
    - This enhancement allows seamless multipart upload functionality for 
Aliyun OSS, similar to the existing AWS S3 support.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Deployment integration testing has been completed in the local environment.
    
    Closes #3157 from shouwangyw/optimize/mpu-oss.
    
    Lead-authored-by: veli.yang <[email protected]>
    Co-authored-by: yangwei <[email protected]>
    Co-authored-by: mingji <[email protected]>
    Signed-off-by: mingji <[email protected]>
---
 README.md                                          |  25 ++++
 build/make-distribution.sh                         |   3 +
 .../celeborn/client/read/CelebornInputStream.java  |   1 +
 .../celeborn/client/read/DfsPartitionReader.java   |   9 +-
 .../apache/celeborn/common/meta/DiskFileInfo.java  |   6 +-
 .../org/apache/celeborn/common/CelebornConf.scala  |  80 +++++++++++-
 .../apache/celeborn/common/meta/WorkerInfo.scala   |   6 +-
 .../celeborn/common/util/CelebornHadoopUtils.scala |  17 +++
 .../org/apache/celeborn/common/util/Utils.scala    |   7 +-
 .../apache/celeborn/common/util/UtilsSuite.scala   |  18 ++-
 dev/reformat                                       |   1 +
 docs/configuration/client.md                       |   7 +-
 docs/configuration/master.md                       |   7 +-
 docs/configuration/worker.md                       |   9 +-
 master/pom.xml                                     |  15 +++
 .../service/deploy/master/SlotsAllocator.java      |  16 ++-
 .../master/clustermeta/AbstractMetaManager.java    |  10 +-
 .../celeborn/service/deploy/master/Master.scala    |   6 +-
 .../deploy/master/SlotsAllocatorSuiteJ.java        |  53 +++++++-
 multipart-uploader/multipart-uploader-oss/pom.xml  |  62 +++++++++
 .../apache/celeborn/OssMultipartUploadHandler.java | 142 +++++++++++++++++++++
 pom.xml                                            |  10 ++
 project/CelebornBuild.scala                        |  33 ++++-
 worker/pom.xml                                     |  10 ++
 .../deploy/worker/storage/PartitionDataWriter.java |   2 +
 .../worker/storage/PartitionFilesSorter.java       |  34 +++--
 .../deploy/worker/storage/TierWriterHelper.java    |  15 +++
 .../service/deploy/worker/Controller.scala         |   2 +-
 .../service/deploy/worker/FetchHandler.scala       |   6 +
 .../service/deploy/worker/PushDataHandler.scala    |   2 +-
 .../service/deploy/worker/storage/FlushTask.scala  |  16 +++
 .../service/deploy/worker/storage/Flusher.scala    |  19 +++
 .../deploy/worker/storage/StorageManager.scala     |  86 +++++++++++--
 .../service/deploy/worker/storage/TierWriter.scala |  65 +++++++++-
 34 files changed, 753 insertions(+), 47 deletions(-)

diff --git a/README.md b/README.md
index 9ff2fb982..1eb87b279 100644
--- a/README.md
+++ b/README.md
@@ -90,6 +90,11 @@ To compile for Spark 3.5 with Java21,  please use the 
following command
 
 > **_NOTE:_** Celeborn supports automatic builds on linux aarch64 platform via 
 > `aarch64` profile. `aarch64` profile requires glibc version 3.4.21. There is 
 > potential problematic frame `C  [libc.so.6+0x8412a]` for other glibc version 
 > like 2.x etc.
 
+To build Celeborn with Aliyun OSS support MPU, please use the following command
+```shell
+./build/make-distribution.sh --sbt-enabled -Pspark-3.4 -Pjdk-8 -Paliyun
+```
+
 ### Package Details
 Build procedure will create a compressed package.
 
@@ -256,6 +261,16 @@ Disks: {/mnt/disk1=DiskInfo(maxSlots: 6679, committed 
shuffles 0, running applic
 WorkerRef: null
 ```
 
+Shuffle OSS storage related configurations:
+```properties
+# If you are using Celeborn for shuffle OSS storage, these settings will be 
needed.
+celeborn.storage.availableTypes OSS
+celeborn.storage.oss.dir oss://<bucket_name>/
+celeborn.storage.oss.access.key <oss_access_key>
+celeborn.storage.oss.secret.key <oss_secret_key>
+celeborn.storage.oss.endpoint oss-cn-<region>[-internal].aliyuncs.com
+```
+
 #### Deploy Celeborn on K8S
 Please refer to our 
[website](https://celeborn.apache.org/docs/latest/deploy_on_k8s/)
 
@@ -310,6 +325,16 @@ spark.dynamicAllocation.shuffleTracking.enabled false
 spark.executor.userClassPathFirst false
 ```
 
+Shuffle OSS storage related configurations:
+```properties
+# If you are using Celeborn for shuffle OSS storage, these settings will be 
needed.
+spark.celeborn.storage.availableTypes OSS
+spark.celeborn.storage.oss.dir oss://<bucket_name>/
+spark.celeborn.storage.oss.access.key <oss_access_key>
+spark.celeborn.storage.oss.secret.key <oss_secret_key>
+spark.celeborn.storage.oss.endpoint oss-cn-<region>[-internal].aliyuncs.com
+```
+
 ### Deploy Flink client
 
 **Important: Only Flink batch jobs are supported for now. Due to the Shuffle 
Service in Flink is cluster-granularity, if you want to use Celeborn in a 
session cluster, it will not be able to submit both streaming and batch job to 
the same cluster. We plan to get rid of this restriction for Hybrid Shuffle 
mode in a future release.**
diff --git a/build/make-distribution.sh b/build/make-distribution.sh
index c19426898..96b0b4aa6 100755
--- a/build/make-distribution.sh
+++ b/build/make-distribution.sh
@@ -285,6 +285,9 @@ function sbt_build_service {
   if [[ $@ == *"aws"* ]]; then
      export SBT_MAVEN_PROFILES="aws"
   fi
+  if [[ $@ == *"aliyun"* ]]; then
+     export SBT_MAVEN_PROFILES="aliyun"
+  fi
   BUILD_COMMAND=("$SBT" clean package)
 
   # Actually build the jar
diff --git 
a/client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java 
b/client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java
index cbcfae938..ffcc50dde 100644
--- 
a/client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java
+++ 
b/client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java
@@ -609,6 +609,7 @@ public abstract class CelebornInputStream extends 
InputStream {
                 checkpointMetadata);
           }
         case S3:
+        case OSS:
         case HDFS:
           return new DfsPartitionReader(
               conf,
diff --git 
a/client/src/main/java/org/apache/celeborn/client/read/DfsPartitionReader.java 
b/client/src/main/java/org/apache/celeborn/client/read/DfsPartitionReader.java
index 003c7cd12..58fb1aac0 100644
--- 
a/client/src/main/java/org/apache/celeborn/client/read/DfsPartitionReader.java
+++ 
b/client/src/main/java/org/apache/celeborn/client/read/DfsPartitionReader.java
@@ -101,9 +101,12 @@ public class DfsPartitionReader implements PartitionReader 
{
 
     this.metricsCallback = metricsCallback;
     this.location = location;
-    if (location.getStorageInfo() != null
-        && location.getStorageInfo().getType() == StorageInfo.Type.S3) {
-      this.hadoopFs = ShuffleClient.getHadoopFs(conf).get(StorageInfo.Type.S3);
+    if (location.getStorageInfo() != null) {
+      if (location.getStorageInfo().getType() == StorageInfo.Type.S3) {
+        this.hadoopFs = 
ShuffleClient.getHadoopFs(conf).get(StorageInfo.Type.S3);
+      } else if (location.getStorageInfo().getType() == StorageInfo.Type.OSS) {
+        this.hadoopFs = 
ShuffleClient.getHadoopFs(conf).get(StorageInfo.Type.OSS);
+      }
     } else {
       this.hadoopFs = 
ShuffleClient.getHadoopFs(conf).get(StorageInfo.Type.HDFS);
     }
diff --git 
a/common/src/main/java/org/apache/celeborn/common/meta/DiskFileInfo.java 
b/common/src/main/java/org/apache/celeborn/common/meta/DiskFileInfo.java
index bcef84a6e..c7161483b 100644
--- a/common/src/main/java/org/apache/celeborn/common/meta/DiskFileInfo.java
+++ b/common/src/main/java/org/apache/celeborn/common/meta/DiskFileInfo.java
@@ -157,7 +157,11 @@ public class DiskFileInfo extends FileInfo {
     return Utils.isS3Path(filePath);
   }
 
+  public boolean isOSS() {
+    return Utils.isOssPath(filePath);
+  }
+
   public boolean isDFS() {
-    return Utils.isS3Path(filePath) || Utils.isHdfsPath(filePath);
+    return Utils.isS3Path(filePath) || Utils.isOssPath(filePath) || 
Utils.isHdfsPath(filePath);
   }
 }
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala 
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index c1503b3ca..59a085af0 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -659,6 +659,8 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable 
with Logging with Se
     get(ACTIVE_STORAGE_TYPES).contains(StorageInfo.Type.HDFS.name()) && 
get(HDFS_DIR).isDefined
   def hasS3Storage: Boolean =
     get(ACTIVE_STORAGE_TYPES).contains(StorageInfo.Type.S3.name()) && 
get(S3_DIR).isDefined
+  def hasOssStorage: Boolean =
+    get(ACTIVE_STORAGE_TYPES).contains(StorageInfo.Type.OSS.name()) && 
get(OSS_DIR).isDefined
   def masterSlotAssignLoadAwareDiskGroupNum: Int = 
get(MASTER_SLOT_ASSIGN_LOADAWARE_DISKGROUP_NUM)
   def masterSlotAssignLoadAwareDiskGroupGradient: Double =
     get(MASTER_SLOT_ASSIGN_LOADAWARE_DISKGROUP_GRADIENT)
@@ -1155,7 +1157,7 @@ class CelebornConf(loadDefaults: Boolean) extends 
Cloneable with Logging with Se
         (dir, maxCapacity, flushThread, diskType)
       }
     }.getOrElse {
-      if (!hasHDFSStorage && !hasS3Storage) {
+      if (!hasHDFSStorage && !hasS3Storage && !hasOssStorage) {
         val prefix = workerStorageBaseDirPrefix
         val number = workerStorageBaseDirNumber
         val diskType = Type.valueOf(workerStorageBaseDirDiskType)
@@ -1198,6 +1200,22 @@ class CelebornConf(loadDefaults: Boolean) extends 
Cloneable with Logging with Se
     }.getOrElse("")
   }
 
+  def ossDir: String = {
+    get(OSS_DIR).map {
+      ossDir =>
+        if (!Utils.isOssPath(ossDir)) {
+          log.error(s"${OSS_DIR.key} configuration is wrong $ossDir. Disable 
OSS support.")
+          ""
+        } else {
+          ossDir
+        }
+    }.getOrElse("")
+  }
+  def ossEndpoint: String = get(OSS_ENDPOINT).getOrElse("")
+  def ossAccessKey: String = get(OSS_ACCESS_KEY).getOrElse("")
+  def ossSecretKey: String = get(OSS_SECRET_KEY).getOrElse("")
+  def ossIgnoreCredentials: Boolean = get(OSS_IGNORE_CREDENTIALS)
+
   def hdfsDir: String = {
     get(HDFS_DIR).map {
       hdfsDir =>
@@ -1270,11 +1288,13 @@ class CelebornConf(loadDefaults: Boolean) extends 
Cloneable with Logging with Se
   def workerFlusherBufferSize: Long = get(WORKER_FLUSHER_BUFFER_SIZE)
   def workerHdfsFlusherBufferSize: Long = get(WORKER_HDFS_FLUSHER_BUFFER_SIZE)
   def workerS3FlusherBufferSize: Long = get(WORKER_S3_FLUSHER_BUFFER_SIZE)
+  def workerOssFlusherBufferSize: Long = get(WORKER_OSS_FLUSHER_BUFFER_SIZE)
   def workerWriterCloseTimeoutMs: Long = get(WORKER_WRITER_CLOSE_TIMEOUT)
   def workerHddFlusherThreads: Int = get(WORKER_FLUSHER_HDD_THREADS)
   def workerSsdFlusherThreads: Int = get(WORKER_FLUSHER_SSD_THREADS)
   def workerHdfsFlusherThreads: Int = get(WORKER_FLUSHER_HDFS_THREADS)
   def workerS3FlusherThreads: Int = get(WORKER_FLUSHER_S3_THREADS)
+  def workerOssFlusherThreads: Int = get(WORKER_FLUSHER_OSS_THREADS)
   def workerCreateWriterMaxAttempts: Int = 
get(WORKER_WRITER_CREATE_MAX_ATTEMPTS)
 
   // //////////////////////////////////////////////////////
@@ -3180,6 +3200,46 @@ object CelebornConf extends Logging {
       .intConf
       .createWithDefault(5)
 
+  val OSS_ENDPOINT: OptionalConfigEntry[String] =
+    buildConf("celeborn.storage.oss.endpoint")
+      .categories("worker", "master", "client")
+      .version("0.6.0")
+      .doc("OSS endpoint for Celeborn to store shuffle data.")
+      .stringConf
+      .createOptional
+
+  val OSS_DIR: OptionalConfigEntry[String] =
+    buildConf("celeborn.storage.oss.dir")
+      .categories("worker", "master", "client")
+      .version("0.6.0")
+      .doc("OSS base directory for Celeborn to store shuffle data.")
+      .stringConf
+      .createOptional
+
+  val OSS_SECRET_KEY: OptionalConfigEntry[String] =
+    buildConf("celeborn.storage.oss.secret.key")
+      .categories("worker", "master", "client")
+      .version("0.6.0")
+      .doc("OSS secret key for Celeborn to store shuffle data.")
+      .stringConf
+      .createOptional
+
+  val OSS_IGNORE_CREDENTIALS: ConfigEntry[Boolean] =
+    buildConf("celeborn.storage.oss.ignore.credentials")
+      .categories("worker", "master", "client")
+      .version("0.6.0")
+      .doc("Whether to skip oss credentials, disable this config to support 
jindo sdk .")
+      .booleanConf
+      .createWithDefault(true)
+
+  val OSS_ACCESS_KEY: OptionalConfigEntry[String] =
+    buildConf("celeborn.storage.oss.access.key")
+      .categories("worker", "master", "client")
+      .version("0.6.0")
+      .doc("OSS access key for Celeborn to store shuffle data.")
+      .stringConf
+      .createOptional
+
   val WORKER_DISK_RESERVE_SIZE: ConfigEntry[Long] =
     buildConf("celeborn.worker.storage.disk.reserve.size")
       .withAlternative("celeborn.worker.disk.reserve.size")
@@ -3664,6 +3724,14 @@ object CelebornConf extends Logging {
       .bytesConf(ByteUnit.BYTE)
       .createWithDefaultString("6m")
 
+  val WORKER_OSS_FLUSHER_BUFFER_SIZE: ConfigEntry[Long] =
+    buildConf("celeborn.worker.flusher.oss.buffer.size")
+      .categories("worker")
+      .version("0.6.0")
+      .doc("Size of buffer used by a OSS flusher.")
+      .bytesConf(ByteUnit.BYTE)
+      .createWithDefaultString("6m")
+
   val WORKER_WRITER_CLOSE_TIMEOUT: ConfigEntry[Long] =
     buildConf("celeborn.worker.writer.close.timeout")
       .categories("worker")
@@ -3712,6 +3780,14 @@ object CelebornConf extends Logging {
       .intConf
       .createWithDefault(8)
 
+  val WORKER_FLUSHER_OSS_THREADS: ConfigEntry[Int] =
+    buildConf("celeborn.worker.flusher.oss.threads")
+      .categories("worker")
+      .doc("Flusher's thread count used for write data to OSS.")
+      .version("0.6.0")
+      .intConf
+      .createWithDefault(8)
+
   val WORKER_FLUSHER_SHUTDOWN_TIMEOUT: ConfigEntry[Long] =
     buildConf("celeborn.worker.flusher.shutdownTimeout")
       .categories("worker")
@@ -5744,7 +5820,7 @@ object CelebornConf extends Logging {
       .categories("master", "worker", "client")
       .version("0.3.0")
       .doc(
-        "Enabled storages. Available options: MEMORY,HDD,SSD,HDFS,S3. Note: 
HDD and SSD would be treated as identical.")
+        "Enabled storages. Available options: MEMORY,HDD,SSD,HDFS,S3,OSS. 
Note: HDD and SSD would be treated as identical.")
       .stringConf
       .transform(_.toUpperCase(Locale.ROOT))
       .checkValue(p => p.split(",").map(StorageInfo.validate).reduce(_ && _), 
"")
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala 
b/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala
index 71401735f..2a8f8d6a4 100644
--- a/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala
@@ -212,13 +212,15 @@ class WorkerInfo(
           curDisk.activeSlots = newDisk.activeSlots
           curDisk.avgFlushTime = newDisk.avgFlushTime
           curDisk.avgFetchTime = newDisk.avgFetchTime
-          if (estimatedPartitionSize.nonEmpty && curDisk.storageType != 
StorageInfo.Type.HDFS && curDisk.storageType != StorageInfo.Type.S3) {
+          if (estimatedPartitionSize.nonEmpty && curDisk.storageType != 
StorageInfo.Type.HDFS
+            && curDisk.storageType != StorageInfo.Type.S3 && 
curDisk.storageType != StorageInfo.Type.OSS) {
             curDisk.maxSlots = curDisk.totalSpace / estimatedPartitionSize.get
             curDisk.availableSlots = curDisk.actualUsableSpace / 
estimatedPartitionSize.get
           }
           curDisk.setStatus(newDisk.status)
         } else {
-          if (estimatedPartitionSize.nonEmpty && newDisk.storageType != 
StorageInfo.Type.HDFS && newDisk.storageType != StorageInfo.Type.S3) {
+          if (estimatedPartitionSize.nonEmpty && newDisk.storageType != 
StorageInfo.Type.HDFS
+            && newDisk.storageType != StorageInfo.Type.S3 && 
newDisk.storageType != StorageInfo.Type.OSS) {
             newDisk.maxSlots = newDisk.totalSpace / estimatedPartitionSize.get
             newDisk.availableSlots = newDisk.actualUsableSpace / 
estimatedPartitionSize.get
           }
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/util/CelebornHadoopUtils.scala
 
b/common/src/main/scala/org/apache/celeborn/common/util/CelebornHadoopUtils.scala
index abd80a081..58fde690b 100644
--- 
a/common/src/main/scala/org/apache/celeborn/common/util/CelebornHadoopUtils.scala
+++ 
b/common/src/main/scala/org/apache/celeborn/common/util/CelebornHadoopUtils.scala
@@ -25,6 +25,7 @@ import org.apache.hadoop.fs.{FileSystem, Path}
 import org.apache.hadoop.security.UserGroupInformation
 
 import org.apache.celeborn.common.CelebornConf
+import org.apache.celeborn.common.CelebornConf.{OSS_ACCESS_KEY, OSS_SECRET_KEY}
 import org.apache.celeborn.common.exception.CelebornException
 import org.apache.celeborn.common.internal.Logging
 import org.apache.celeborn.common.protocol.StorageInfo
@@ -61,6 +62,18 @@ object CelebornHadoopUtils extends Logging {
       hadoopConf.set("fs.s3a.access.key", conf.s3AccessKey)
       hadoopConf.set("fs.s3a.secret.key", conf.s3SecretKey)
       hadoopConf.set("fs.s3a.endpoint.region", conf.s3EndpointRegion)
+    } else if (conf.ossDir.nonEmpty) {
+      if (conf.ossAccessKey.isEmpty || conf.ossSecretKey.isEmpty || 
conf.ossEndpoint.isEmpty) {
+        throw new CelebornException(
+          "OSS storage is enabled but ossAccessKey, ossSecretKey, or 
ossEndpoint is not set")
+      }
+      if (conf.ossIgnoreCredentials) {
+        hadoopConf.set("fs.oss.credentials.provider", "")
+      }
+      hadoopConf.set("fs.oss.impl", 
"org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem")
+      hadoopConf.set("fs.oss.accessKeyId", conf.ossAccessKey)
+      hadoopConf.set("fs.oss.accessKeySecret", conf.ossSecretKey)
+      hadoopConf.set("fs.oss.endpoint", conf.ossEndpoint)
     }
     appendSparkHadoopConfigs(conf, hadoopConf)
     hadoopConf
@@ -85,6 +98,10 @@ object CelebornHadoopUtils extends Logging {
       val s3Dir = new Path(conf.s3Dir)
       hadoopFs.put(StorageInfo.Type.S3, s3Dir.getFileSystem(hadoopConf))
     }
+    if (conf.hasOssStorage) {
+      val ossDir = new Path(conf.ossDir)
+      hadoopFs.put(StorageInfo.Type.OSS, ossDir.getFileSystem(hadoopConf))
+    }
     hadoopFs
   }
 
diff --git a/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala 
b/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
index 2de6a13ec..2978d824f 100644
--- a/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
@@ -1216,8 +1216,9 @@ object Utils extends Logging {
   val SORTED_SUFFIX = ".sorted"
   val INDEX_SUFFIX = ".index"
   val SUFFIX_HDFS_WRITE_SUCCESS = ".success"
-  val COMPATIBLE_HDFS_REGEX = "^(?!s3://)(?!s3a://)[a-zA-Z0-9]+://.*"
+  val COMPATIBLE_HDFS_REGEX = "^(?!s3://)(?!s3a://)(?!oss://)[a-zA-Z0-9]+://.*"
   val S3_REGEX = "^s3[a]?://([a-z0-9][a-z0-9-]{1,61}[a-z0-9])(/.*)?$"
+  val OSS_REGEX = "^oss?://([a-z0-9][a-z0-9-]{1,61}[a-z0-9])(/.*)?$"
 
   val UNKNOWN_APP_SHUFFLE_ID = -1
 
@@ -1229,6 +1230,10 @@ object Utils extends Logging {
     path.matches(S3_REGEX)
   }
 
+  def isOssPath(path: String): Boolean = {
+    path.matches(OSS_REGEX)
+  }
+
   def getSortedFilePath(path: String): String = {
     path + SORTED_SUFFIX
   }
diff --git 
a/common/src/test/scala/org/apache/celeborn/common/util/UtilsSuite.scala 
b/common/src/test/scala/org/apache/celeborn/common/util/UtilsSuite.scala
index 03c6176ed..d75006cf7 100644
--- a/common/src/test/scala/org/apache/celeborn/common/util/UtilsSuite.scala
+++ b/common/src/test/scala/org/apache/celeborn/common/util/UtilsSuite.scala
@@ -171,9 +171,9 @@ class UtilsSuite extends CelebornFunSuite {
     val ossPath = "oss://xxxx/xx-xx/x-x-x"
     val sortedOssPath = "oss://xxxx/xx-xx/x-x-x.sorted"
     val indexOssPath = "oss://xxxx/xx-xx/x-x-x.index"
-    assert(true == Utils.isHdfsPath(ossPath))
-    assert(true == Utils.isHdfsPath(sortedOssPath))
-    assert(true == Utils.isHdfsPath(indexOssPath))
+    assert(false == Utils.isHdfsPath(ossPath))
+    assert(false == Utils.isHdfsPath(sortedOssPath))
+    assert(false == Utils.isHdfsPath(indexOssPath))
 
     val localPath = "/xxx/xxx/xx-xx/x-x-x"
     assert(false == Utils.isHdfsPath(localPath))
@@ -191,6 +191,18 @@ class UtilsSuite extends CelebornFunSuite {
     assert(true == Utils.isS3Path(indexS3Path))
   }
 
+  test("validate oss compatible fs path") {
+    val hdfsPath = "hdfs://xxx:9000/xxxx/xx-xx/x-x-x"
+    val simpleOssPath = "oss://xxxx/xx-xx/x-x-x"
+    val sortedOssPath = "oss://xxx/xxxx/xx-xx/x-x-x.sorted"
+    val indexOssPath = "oss://xxx/xxxx/xx-xx/x-x-x.index"
+    assert(false == Utils.isOssPath(hdfsPath))
+    assert(false == Utils.isHdfsPath(simpleOssPath))
+    assert(true == Utils.isOssPath(simpleOssPath))
+    assert(true == Utils.isOssPath(sortedOssPath))
+    assert(true == Utils.isOssPath(indexOssPath))
+  }
+
   test("GetReducerFileGroupResponse class convert with pb") {
     val fileGroup = new util.HashMap[Integer, util.Set[PartitionLocation]]
     fileGroup.put(0, partitionLocation(0))
diff --git a/dev/reformat b/dev/reformat
index cf3e6f12e..cac0d4dc9 100755
--- a/dev/reformat
+++ b/dev/reformat
@@ -34,6 +34,7 @@ else
   ${PROJECT_DIR}/build/mvn spotless:apply -Pspark-3.5
   ${PROJECT_DIR}/build/mvn spotless:apply -Pspark-4.0
   ${PROJECT_DIR}/build/mvn spotless:apply -Paws
+  ${PROJECT_DIR}/build/mvn spotless:apply -Paliyun
   ${PROJECT_DIR}/build/mvn spotless:apply -Pmr
   ${PROJECT_DIR}/build/mvn spotless:apply -Ptez
 fi
diff --git a/docs/configuration/client.md b/docs/configuration/client.md
index 5b3a25308..ccb794319 100644
--- a/docs/configuration/client.md
+++ b/docs/configuration/client.md
@@ -133,8 +133,13 @@ license: |
 | celeborn.master.endpoints.resolver | 
org.apache.celeborn.common.client.StaticMasterEndpointResolver | false | 
Resolver class that can be used for discovering and updating the master 
endpoints. This allows users to provide a custom master endpoint resolver 
implementation. This is useful in environments where the master nodes might 
change due to scaling operations or infrastructure updates. Clients need to 
ensure that provided resolver class should be present in the classpath. | 0.5.2 
|  | 
 | celeborn.quota.enabled | true | false | When Master side sets to true, the 
master will enable to check the quota via QuotaManager. When Client side sets 
to true, LifecycleManager will request Master side to check whether the current 
user has enough quota before registration of shuffle. Fallback to the default 
shuffle service when Master side checks that there is no enough quota for 
current user. | 0.2.0 |  | 
 | celeborn.quota.interruptShuffle.enabled | false | false | Whether to enable 
interrupt shuffle when quota exceeds. | 0.6.0 |  | 
-| celeborn.storage.availableTypes | HDD | false | Enabled storages. Available 
options: MEMORY,HDD,SSD,HDFS,S3. Note: HDD and SSD would be treated as 
identical. | 0.3.0 | celeborn.storage.activeTypes | 
+| celeborn.storage.availableTypes | HDD | false | Enabled storages. Available 
options: MEMORY,HDD,SSD,HDFS,S3,OSS. Note: HDD and SSD would be treated as 
identical. | 0.3.0 | celeborn.storage.activeTypes | 
 | celeborn.storage.hdfs.dir | &lt;undefined&gt; | false | HDFS base directory 
for Celeborn to store shuffle data. | 0.2.0 |  | 
+| celeborn.storage.oss.access.key | &lt;undefined&gt; | false | OSS access key 
for Celeborn to store shuffle data. | 0.6.0 |  | 
+| celeborn.storage.oss.dir | &lt;undefined&gt; | false | OSS base directory 
for Celeborn to store shuffle data. | 0.6.0 |  | 
+| celeborn.storage.oss.endpoint | &lt;undefined&gt; | false | OSS endpoint for 
Celeborn to store shuffle data. | 0.6.0 |  | 
+| celeborn.storage.oss.ignore.credentials | true | false | Whether to skip oss 
credentials, disable this config to support jindo sdk . | 0.6.0 |  | 
+| celeborn.storage.oss.secret.key | &lt;undefined&gt; | false | OSS secret key 
for Celeborn to store shuffle data. | 0.6.0 |  | 
 | celeborn.storage.s3.access.key | &lt;undefined&gt; | false | S3 access key 
for Celeborn to store shuffle data. | 0.6.0 |  | 
 | celeborn.storage.s3.dir | &lt;undefined&gt; | false | S3 base directory for 
Celeborn to store shuffle data. | 0.6.0 |  | 
 | celeborn.storage.s3.endpoint.region | &lt;undefined&gt; | false | S3 
endpoint for Celeborn to store shuffle data. | 0.6.0 |  | 
diff --git a/docs/configuration/master.md b/docs/configuration/master.md
index 6441fd912..b7e363f15 100644
--- a/docs/configuration/master.md
+++ b/docs/configuration/master.md
@@ -87,10 +87,15 @@ license: |
 | celeborn.quota.tenant.enabled | true | false | Whether to enable 
tenant-level quota. | 0.6.0 |  | 
 | celeborn.quota.user.enabled | true | false | Whether to enable user-level 
quota. | 0.6.0 |  | 
 | celeborn.redaction.regex | (?i)secret|password|token|access[.]key | false | 
Regex to decide which Celeborn configuration properties and environment 
variables in master and worker environments contain sensitive information. When 
this regex matches a property key or value, the value is redacted from the 
logging. | 0.5.0 |  | 
-| celeborn.storage.availableTypes | HDD | false | Enabled storages. Available 
options: MEMORY,HDD,SSD,HDFS,S3. Note: HDD and SSD would be treated as 
identical. | 0.3.0 | celeborn.storage.activeTypes | 
+| celeborn.storage.availableTypes | HDD | false | Enabled storages. Available 
options: MEMORY,HDD,SSD,HDFS,S3,OSS. Note: HDD and SSD would be treated as 
identical. | 0.3.0 | celeborn.storage.activeTypes | 
 | celeborn.storage.hdfs.dir | &lt;undefined&gt; | false | HDFS base directory 
for Celeborn to store shuffle data. | 0.2.0 |  | 
 | celeborn.storage.hdfs.kerberos.keytab | &lt;undefined&gt; | false | Kerberos 
keytab file path for HDFS storage connection. | 0.3.2 |  | 
 | celeborn.storage.hdfs.kerberos.principal | &lt;undefined&gt; | false | 
Kerberos principal for HDFS storage connection. | 0.3.2 |  | 
+| celeborn.storage.oss.access.key | &lt;undefined&gt; | false | OSS access key 
for Celeborn to store shuffle data. | 0.6.0 |  | 
+| celeborn.storage.oss.dir | &lt;undefined&gt; | false | OSS base directory 
for Celeborn to store shuffle data. | 0.6.0 |  | 
+| celeborn.storage.oss.endpoint | &lt;undefined&gt; | false | OSS endpoint for 
Celeborn to store shuffle data. | 0.6.0 |  | 
+| celeborn.storage.oss.ignore.credentials | true | false | Whether to skip oss 
credentials, disable this config to support jindo sdk . | 0.6.0 |  | 
+| celeborn.storage.oss.secret.key | &lt;undefined&gt; | false | OSS secret key 
for Celeborn to store shuffle data. | 0.6.0 |  | 
 | celeborn.storage.s3.access.key | &lt;undefined&gt; | false | S3 access key 
for Celeborn to store shuffle data. | 0.6.0 |  | 
 | celeborn.storage.s3.dir | &lt;undefined&gt; | false | S3 base directory for 
Celeborn to store shuffle data. | 0.6.0 |  | 
 | celeborn.storage.s3.endpoint.region | &lt;undefined&gt; | false | S3 
endpoint for Celeborn to store shuffle data. | 0.6.0 |  | 
diff --git a/docs/configuration/worker.md b/docs/configuration/worker.md
index a69c435da..854b1ae3f 100644
--- a/docs/configuration/worker.md
+++ b/docs/configuration/worker.md
@@ -42,10 +42,15 @@ license: |
 | celeborn.redaction.regex | (?i)secret|password|token|access[.]key | false | 
Regex to decide which Celeborn configuration properties and environment 
variables in master and worker environments contain sensitive information. When 
this regex matches a property key or value, the value is redacted from the 
logging. | 0.5.0 |  | 
 | celeborn.shuffle.chunk.size | 8m | false | Max chunk size of reducer's 
merged shuffle data. For example, if a reducer's shuffle data is 128M and the 
data will need 16 fetch chunk requests to fetch. | 0.2.0 |  | 
 | celeborn.shuffle.sortPartition.block.compactionFactor | 0.25 | false | 
Combine sorted shuffle blocks such that size of compacted shuffle block does 
not exceed compactionFactor * celeborn.shuffle.chunk.size | 0.4.2 |  | 
-| celeborn.storage.availableTypes | HDD | false | Enabled storages. Available 
options: MEMORY,HDD,SSD,HDFS,S3. Note: HDD and SSD would be treated as 
identical. | 0.3.0 | celeborn.storage.activeTypes | 
+| celeborn.storage.availableTypes | HDD | false | Enabled storages. Available 
options: MEMORY,HDD,SSD,HDFS,S3,OSS. Note: HDD and SSD would be treated as 
identical. | 0.3.0 | celeborn.storage.activeTypes | 
 | celeborn.storage.hdfs.dir | &lt;undefined&gt; | false | HDFS base directory 
for Celeborn to store shuffle data. | 0.2.0 |  | 
 | celeborn.storage.hdfs.kerberos.keytab | &lt;undefined&gt; | false | Kerberos 
keytab file path for HDFS storage connection. | 0.3.2 |  | 
 | celeborn.storage.hdfs.kerberos.principal | &lt;undefined&gt; | false | 
Kerberos principal for HDFS storage connection. | 0.3.2 |  | 
+| celeborn.storage.oss.access.key | &lt;undefined&gt; | false | OSS access key 
for Celeborn to store shuffle data. | 0.6.0 |  | 
+| celeborn.storage.oss.dir | &lt;undefined&gt; | false | OSS base directory 
for Celeborn to store shuffle data. | 0.6.0 |  | 
+| celeborn.storage.oss.endpoint | &lt;undefined&gt; | false | OSS endpoint for 
Celeborn to store shuffle data. | 0.6.0 |  | 
+| celeborn.storage.oss.ignore.credentials | true | false | Whether to skip oss 
credentials, disable this config to support jindo sdk . | 0.6.0 |  | 
+| celeborn.storage.oss.secret.key | &lt;undefined&gt; | false | OSS secret key 
for Celeborn to store shuffle data. | 0.6.0 |  | 
 | celeborn.storage.s3.access.key | &lt;undefined&gt; | false | S3 access key 
for Celeborn to store shuffle data. | 0.6.0 |  | 
 | celeborn.storage.s3.dir | &lt;undefined&gt; | false | S3 base directory for 
Celeborn to store shuffle data. | 0.6.0 |  | 
 | celeborn.storage.s3.endpoint.region | &lt;undefined&gt; | false | S3 
endpoint for Celeborn to store shuffle data. | 0.6.0 |  | 
@@ -85,6 +90,8 @@ license: |
 | celeborn.worker.flusher.hdd.threads | 1 | false | Flusher's thread count per 
disk used for write data to HDD disks. | 0.2.0 |  | 
 | celeborn.worker.flusher.hdfs.buffer.size | 4m | false | Size of buffer used 
by a HDFS flusher. | 0.3.0 |  | 
 | celeborn.worker.flusher.hdfs.threads | 8 | false | Flusher's thread count 
used for write data to HDFS. | 0.2.0 |  | 
+| celeborn.worker.flusher.oss.buffer.size | 6m | false | Size of buffer used 
by a OSS flusher. | 0.6.0 |  | 
+| celeborn.worker.flusher.oss.threads | 8 | false | Flusher's thread count 
used for write data to OSS. | 0.6.0 |  | 
 | celeborn.worker.flusher.s3.buffer.size | 6m | false | Size of buffer used by 
a S3 flusher. | 0.6.0 |  | 
 | celeborn.worker.flusher.s3.threads | 8 | false | Flusher's thread count used 
for write data to S3. | 0.6.0 |  | 
 | celeborn.worker.flusher.shutdownTimeout | 3s | false | Timeout for a flusher 
to shutdown. | 0.2.0 |  | 
diff --git a/master/pom.xml b/master/pom.xml
index 6ac7742e8..25f349f4e 100644
--- a/master/pom.xml
+++ b/master/pom.xml
@@ -199,5 +199,20 @@
         </dependency>
       </dependencies>
     </profile>
+    <profile>
+      <id>aliyun</id>
+      <dependencies>
+        <dependency>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-aliyun</artifactId>
+          <version>${hadoop.version}</version>
+        </dependency>
+        <dependency>
+          <groupId>com.aliyun.oss</groupId>
+          <artifactId>aliyun-sdk-oss</artifactId>
+          <version>${aliyun-sdk-oss.version}</version>
+        </dependency>
+      </dependencies>
+    </profile>
   </profiles>
 </project>
diff --git 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java
 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java
index aab79cc2f..75e680a3b 100644
--- 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java
+++ 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java
@@ -71,7 +71,8 @@ public class SlotsAllocator {
         if (diskInfoEntry.getValue().status().equals(DiskStatus.HEALTHY)) {
           if (StorageInfo.localDiskAvailable(availableStorageTypes)
               && diskInfoEntry.getValue().storageType() != 
StorageInfo.Type.HDFS
-              && diskInfoEntry.getValue().storageType() != 
StorageInfo.Type.S3) {
+              && diskInfoEntry.getValue().storageType() != StorageInfo.Type.S3
+              && diskInfoEntry.getValue().storageType() != 
StorageInfo.Type.OSS) {
             usableDisks.add(
                 new UsableDiskInfo(
                     diskInfoEntry.getValue(), 
diskInfoEntry.getValue().getAvailableSlots()));
@@ -85,6 +86,11 @@ public class SlotsAllocator {
             usableDisks.add(
                 new UsableDiskInfo(
                     diskInfoEntry.getValue(), 
diskInfoEntry.getValue().getAvailableSlots()));
+          } else if (StorageInfo.OSSAvailable(availableStorageTypes)
+              && diskInfoEntry.getValue().storageType() == 
StorageInfo.Type.OSS) {
+            usableDisks.add(
+                new UsableDiskInfo(
+                    diskInfoEntry.getValue(), 
diskInfoEntry.getValue().availableSlots()));
           }
         }
       }
@@ -141,7 +147,8 @@ public class SlotsAllocator {
                       if (diskInfo.actualUsableSpace() > 0
                           && diskInfo.status().equals(DiskStatus.HEALTHY)
                           && diskInfo.storageType() != StorageInfo.Type.HDFS
-                          && diskInfo.storageType() != StorageInfo.Type.S3) {
+                          && diskInfo.storageType() != StorageInfo.Type.S3
+                          && diskInfo.storageType() != StorageInfo.Type.OSS) {
                         usableDisks.add(diskInfo);
                       }
                     }));
@@ -201,6 +208,8 @@ public class SlotsAllocator {
         storageInfo = new StorageInfo("", StorageInfo.Type.HDFS, 
availableStorageTypes);
       } else if (selectedDiskInfo.storageType() == StorageInfo.Type.S3) {
         storageInfo = new StorageInfo("", StorageInfo.Type.S3, 
availableStorageTypes);
+      } else if (selectedDiskInfo.storageType() == StorageInfo.Type.OSS) {
+        storageInfo = new StorageInfo("", StorageInfo.Type.OSS, 
availableStorageTypes);
       } else {
         storageInfo =
             new StorageInfo(
@@ -215,6 +224,7 @@ public class SlotsAllocator {
             selectedWorker.diskInfos().values().stream()
                 .filter(p -> p.storageType() != StorageInfo.Type.HDFS)
                 .filter(p -> p.storageType() != StorageInfo.Type.S3)
+                .filter(p -> p.storageType() != StorageInfo.Type.OSS)
                 .collect(Collectors.toList())
                 .toArray(new DiskInfo[0]);
         storageInfo =
@@ -225,6 +235,8 @@ public class SlotsAllocator {
         workerDiskIndex.put(selectedWorker, (diskIndex + 1) % 
diskInfos.length);
       } else if (StorageInfo.S3Available(availableStorageTypes)) {
         storageInfo = new StorageInfo("", StorageInfo.Type.S3, 
availableStorageTypes);
+      } else if (StorageInfo.OSSAvailable(availableStorageTypes)) {
+        storageInfo = new StorageInfo("", StorageInfo.Type.OSS, 
availableStorageTypes);
       } else {
         storageInfo = new StorageInfo("", StorageInfo.Type.HDFS, 
availableStorageTypes);
       }
diff --git 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
index ab3449e74..2acaea57b 100644
--- 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
+++ 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
@@ -282,12 +282,18 @@ public abstract class AbstractMetaManager implements 
IMetadataHandler {
         disks.values().stream().filter(s -> 
!s.status().equals(DiskStatus.HEALTHY)).count();
     boolean exceed = unhealthyDiskNum * 1.0 / disks.size() >= 
unhealthyDiskRatioThreshold;
     if (!excludedWorkers.contains(worker)
-        && (((disks.isEmpty() || exceed) && !conf.hasHDFSStorage() && 
!conf.hasS3Storage())
+        && (((disks.isEmpty() || exceed)
+                && !conf.hasHDFSStorage()
+                && !conf.hasS3Storage()
+                && !conf.hasOssStorage())
             || highWorkload)) {
       LOG.warn(
           "Worker {} (unhealthy disks num: {}) adds to excluded workers", 
worker, unhealthyDiskNum);
       excludedWorkers.add(worker);
-    } else if ((availableSlots.get() > 0 || conf.hasHDFSStorage() || 
conf.hasS3Storage())
+    } else if ((availableSlots.get() > 0
+            || conf.hasHDFSStorage()
+            || conf.hasS3Storage()
+            || conf.hasOssStorage())
         && !highWorkload) {
       // only unblack if numSlots larger than 0
       excludedWorkers.remove(worker);
diff --git 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
index 55d3ad579..1400a832e 100644
--- 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
+++ 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
@@ -189,6 +189,7 @@ private[celeborn] class Master(
   private val dfsExpireDirsTimeoutMS = conf.dfsExpireDirsTimeoutMS
   private val hasHDFSStorage = conf.hasHDFSStorage
   private val hasS3Storage = conf.hasS3Storage
+  private val hasOssStorage = conf.hasOssStorage
 
   private val quotaManager = new QuotaManager(
     statusSystem,
@@ -334,7 +335,7 @@ private[celeborn] class Master(
         CheckForWorkerUnavailableInfoTimeout)
     }
 
-    if (hasHDFSStorage || hasS3Storage) {
+    if (hasHDFSStorage || hasS3Storage || hasOssStorage) {
       checkForDFSRemnantDirsTimeOutTask =
         scheduleCheckTask(dfsExpireDirsTimeoutMS, 
CheckForDFSExpiredDirsTimeout)
     }
@@ -1064,7 +1065,7 @@ private[celeborn] class Master(
         statusSystem.handleAppLost(appId, requestId)
         quotaManager.handleAppLost(appId)
         logInfo(s"Removed application $appId")
-        if (hasHDFSStorage || hasS3Storage) {
+        if (hasHDFSStorage || hasS3Storage || hasOssStorage) {
           checkAndCleanExpiredAppDirsOnDFS(appId)
         }
         if (context != null) {
@@ -1086,6 +1087,7 @@ private[celeborn] class Master(
     }
     if (hasHDFSStorage) processDir(conf.hdfsDir, expiredDir)
     if (hasS3Storage) processDir(conf.s3Dir, expiredDir)
+    if (hasOssStorage) processDir(conf.ossDir, expiredDir)
   }
 
   private def processDir(dfsDir: String, expiredDir: String): Unit = {
diff --git 
a/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorSuiteJ.java
 
b/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorSuiteJ.java
index 78f7727bf..95bd3a284 100644
--- 
a/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorSuiteJ.java
+++ 
b/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorSuiteJ.java
@@ -331,20 +331,36 @@ public class SlotsAllocatorSuiteJ {
       boolean expectSuccess,
       boolean roundRobin,
       boolean enableS3) {
+    checkSlotsOnDFS(
+        workers, partitionIds, shouldReplicate, expectSuccess, roundRobin, 
enableS3, false);
+  }
+
+  private void checkSlotsOnDFS(
+      List<WorkerInfo> workers,
+      List<Integer> partitionIds,
+      boolean shouldReplicate,
+      boolean expectSuccess,
+      boolean roundRobin,
+      boolean enableS3,
+      boolean enableOss) {
     CelebornConf conf = new CelebornConf();
+    int availableStorageTypes;
     if (enableS3) {
       conf.set("celeborn.active.storage.levels", "S3");
+      availableStorageTypes = StorageInfo.S3_MASK;
+    } else if (enableOss) {
+      conf.set("celeborn.active.storage.levels", "OSS");
+      availableStorageTypes = StorageInfo.OSS_MASK;
     } else {
       conf.set("celeborn.active.storage.levels", "HDFS");
+      availableStorageTypes = StorageInfo.HDFS_MASK;
     }
     Map<WorkerInfo, Tuple2<List<PartitionLocation>, List<PartitionLocation>>> 
slots;
     if (roundRobin) {
-      int availableStorageTypes = enableS3 ? StorageInfo.S3_MASK : 
StorageInfo.HDFS_MASK;
       slots =
           SlotsAllocator.offerSlotsRoundRobin(
               workers, partitionIds, shouldReplicate, false, 
availableStorageTypes);
     } else {
-      int availableStorageTypes = enableS3 ? StorageInfo.S3_MASK : 
StorageInfo.HDFS_MASK;
       slots =
           SlotsAllocator.offerSlotsLoadAware(
               workers,
@@ -502,4 +518,37 @@ public class SlotsAllocatorSuiteJ {
     checkSlotsOnDFS(workers, partitionIds, shouldReplicate, true, true, true);
     checkSlotsOnDFS(workers, partitionIds, shouldReplicate, true, false, true);
   }
+
+  @Test
+  public void testOssOnly() {
+    final List<WorkerInfo> workers = prepareWorkers(false);
+    final List<Integer> partitionIds = new ArrayList<>();
+    for (int i = 0; i < 3000; i++) {
+      partitionIds.add(i);
+    }
+    final boolean shouldReplicate = true;
+    checkSlotsOnDFS(workers, partitionIds, shouldReplicate, true, true, false, 
true);
+  }
+
+  @Test
+  public void testLocalDisksAndOss() {
+    final List<WorkerInfo> workers = prepareWorkers(true);
+    DiskInfo ossDiskInfo1 =
+        new DiskInfo(
+            "OSS", Long.MAX_VALUE, 999999, 999999, Integer.MAX_VALUE, 
StorageInfo.Type.OSS);
+    DiskInfo ossDiskInfo2 =
+        new DiskInfo(
+            "OSS", Long.MAX_VALUE, 999999, 999999, Integer.MAX_VALUE, 
StorageInfo.Type.OSS);
+    ossDiskInfo1.maxSlots_$eq(Long.MAX_VALUE);
+    ossDiskInfo2.maxSlots_$eq(Long.MAX_VALUE);
+    workers.get(0).diskInfos().put("OSS", ossDiskInfo1);
+    workers.get(1).diskInfos().put("OSS", ossDiskInfo2);
+    final List<Integer> partitionIds = new ArrayList<>();
+    for (int i = 0; i < 3000; i++) {
+      partitionIds.add(i);
+    }
+    final boolean shouldReplicate = true;
+    checkSlotsOnDFS(workers, partitionIds, shouldReplicate, true, true, false, 
true);
+    checkSlotsOnDFS(workers, partitionIds, shouldReplicate, true, false, 
false, true);
+  }
 }
diff --git a/multipart-uploader/multipart-uploader-oss/pom.xml 
b/multipart-uploader/multipart-uploader-oss/pom.xml
new file mode 100644
index 000000000..28a78c138
--- /dev/null
+++ b/multipart-uploader/multipart-uploader-oss/pom.xml
@@ -0,0 +1,62 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.celeborn</groupId>
+    <artifactId>celeborn-parent_${scala.binary.version}</artifactId>
+    <version>${project.version}</version>
+    <relativePath>../../pom.xml</relativePath>
+  </parent>
+
+  
<artifactId>celeborn-multipart-uploader-oss_${scala.binary.version}</artifactId>
+  <packaging>jar</packaging>
+  <name>Celeborn Multipart Uploader OSS</name>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.celeborn</groupId>
+      <artifactId>celeborn-service_${scala.binary.version}</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-aliyun</artifactId>
+      <version>${hadoop.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>com.aliyun.oss</groupId>
+      <artifactId>aliyun-sdk-oss</artifactId>
+      <version>${aliyun-sdk-oss.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>commons-collections</groupId>
+      <artifactId>commons-collections</artifactId>
+      <version>3.2.2</version>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-dependency-plugin</artifactId>
+      </plugin>
+    </plugins>
+  </build>
+</project>
diff --git 
a/multipart-uploader/multipart-uploader-oss/src/main/java/org/apache/celeborn/OssMultipartUploadHandler.java
 
b/multipart-uploader/multipart-uploader-oss/src/main/java/org/apache/celeborn/OssMultipartUploadHandler.java
new file mode 100644
index 000000000..01749a3d2
--- /dev/null
+++ 
b/multipart-uploader/multipart-uploader-oss/src/main/java/org/apache/celeborn/OssMultipartUploadHandler.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+
+import com.aliyun.oss.OSS;
+import com.aliyun.oss.OSSClientBuilder;
+import com.aliyun.oss.event.ProgressListener;
+import com.aliyun.oss.model.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.celeborn.server.common.service.mpu.MultipartUploadHandler;
+
+public class OssMultipartUploadHandler implements MultipartUploadHandler {
+
+  private static final Logger log = 
LoggerFactory.getLogger(OssMultipartUploadHandler.class);
+
+  private final String bucketName;
+  private final String key;
+  private final OSS ossClient;
+
+  private String uploadId;
+
+  public OssMultipartUploadHandler(
+      String endpoint, String bucketName, String accessKey, String secretKey, 
String key) {
+    this.bucketName = bucketName;
+    this.ossClient = new OSSClientBuilder().build(endpoint, accessKey, 
secretKey);
+    this.key = key;
+  }
+
+  @Override
+  public void startUpload() {
+    uploadId =
+        ossClient
+            .initiateMultipartUpload(new 
InitiateMultipartUploadRequest(bucketName, key))
+            .getUploadId();
+  }
+
+  @Override
+  public void putPart(InputStream inputStream, Integer partNumber, Boolean 
finalFlush)
+      throws IOException {
+    try (InputStream inStream = inputStream) {
+      int partSize = inStream.available();
+      if (partSize == 0) {
+        log.debug(
+            "key {} uploadId {} part size is 0 for part number {} finalFlush 
{}",
+            key,
+            uploadId,
+            partNumber,
+            finalFlush);
+        return;
+      }
+      ossClient.uploadPart(
+          new UploadPartRequest(bucketName, key, uploadId, partNumber, 
inStream, partSize));
+      log.debug(
+          "key {} uploadId {} part number {} uploaded with size {} finalFlush 
{}",
+          key,
+          uploadId,
+          partNumber,
+          partSize,
+          finalFlush);
+    } catch (RuntimeException | IOException e) {
+      log.error("Failed to upload part", e);
+      throw e;
+    }
+  }
+
+  @Override
+  public void complete() {
+    List<PartETag> partETags = new ArrayList<>();
+    ListPartsRequest listPartsRequest = new ListPartsRequest(bucketName, key, 
uploadId);
+    PartListing partListing;
+    do {
+      partListing = ossClient.listParts(listPartsRequest);
+      for (PartSummary part : partListing.getParts()) {
+        partETags.add(new PartETag(part.getPartNumber(), part.getETag()));
+      }
+      
listPartsRequest.setPartNumberMarker(partListing.getNextPartNumberMarker());
+    } while (partListing.isTruncated());
+    if (partETags.size() == 0) {
+      log.debug(
+          "bucket {} key {} uploadId {} has no parts uploaded, aborting 
upload",
+          bucketName,
+          key,
+          uploadId);
+      abort();
+      log.debug("bucket {} key {} upload completed with size {}", bucketName, 
key, 0);
+      return;
+    }
+    ProgressListener progressListener =
+        event ->
+            log.debug(
+                "key {} uploadId {} progress event type {} transferred {} 
bytes",
+                key,
+                uploadId,
+                event.getEventType(),
+                event.getBytes());
+
+    CompleteMultipartUploadResult compResult =
+        ossClient.completeMultipartUpload(
+            new CompleteMultipartUploadRequest(bucketName, key, uploadId, 
partETags)
+                .withProgressListener(progressListener));
+    log.debug(
+        "bucket {} key {} uploadId {} upload completed location is in {} ",
+        bucketName,
+        key,
+        uploadId,
+        compResult.getLocation());
+  }
+
+  @Override
+  public void abort() {
+    ossClient.abortMultipartUpload(new AbortMultipartUploadRequest(bucketName, 
key, uploadId));
+  }
+
+  @Override
+  public void close() {
+    if (ossClient != null) {
+      ossClient.shutdown();
+    }
+  }
+}
diff --git a/pom.xml b/pom.xml
index 18fb2b6ad..959aeb245 100644
--- a/pom.xml
+++ b/pom.xml
@@ -72,6 +72,7 @@
     <!-- use hadoop-3 as default  -->
     <hadoop.version>3.3.6</hadoop.version>
     <aws.version>1.12.532</aws.version>
+    <aliyun-sdk-oss.version>3.13.0</aliyun-sdk-oss.version>
     <!--
     If you change codahale.metrics.version, you also need to change
     the link to metrics.dropwizard.io in docs/monitoring.md.
@@ -1352,6 +1353,15 @@
         <aws-deps>true</aws-deps>
       </properties>
     </profile>
+    <profile>
+      <id>aliyun</id>
+      <modules>
+        <module>multipart-uploader/multipart-uploader-oss</module>
+      </modules>
+      <properties>
+        <aliyun-deps>true</aliyun-deps>
+      </properties>
+    </profile>
     <profile>
       <id>spark-2.4</id>
       <modules>
diff --git a/project/CelebornBuild.scala b/project/CelebornBuild.scala
index 79580a774..dfeff9c30 100644
--- a/project/CelebornBuild.scala
+++ b/project/CelebornBuild.scala
@@ -47,10 +47,12 @@ object Dependencies {
   val commonsIoVersion = "2.17.0"
   val commonsLoggingVersion = "1.1.3"
   val commonsLang3Version = "3.17.0"
+  val commonsCollectionsVersion = "3.2.2"
   val findbugsVersion = "1.3.9"
   val guavaVersion = "33.1.0-jre"
   val hadoopVersion = "3.3.6"
   val awsS3Version = "1.12.532"
+  val aliyunOssVersion = "3.13.0"
   val junitInterfaceVersion = "0.13.3"
   // don't forget update `junitInterfaceVersion` when we upgrade junit
   val junitVersion = "4.13.2"
@@ -123,6 +125,9 @@ object Dependencies {
   val hadoopAws = "org.apache.hadoop" % "hadoop-aws" % hadoopVersion 
excludeAll (
     ExclusionRule("com.amazonaws", "aws-java-sdk-bundle"))
   val awsS3 = "com.amazonaws" % "aws-java-sdk-s3" % awsS3Version
+  val commonsCollections = "commons-collections" % "commons-collections" % 
commonsCollectionsVersion
+  val hadoopAliyun = "org.apache.hadoop" % "hadoop-aliyun" % hadoopVersion
+  val aliyunOss = "com.aliyun.oss" % "aliyun-sdk-oss" % aliyunOssVersion
   val ioDropwizardMetricsCore = "io.dropwizard.metrics" % "metrics-core" % 
metricsVersion
   val ioDropwizardMetricsGraphite = "io.dropwizard.metrics" % 
"metrics-graphite" % metricsVersion excludeAll (
     ExclusionRule("com.rabbitmq", "amqp-client"))
@@ -454,8 +459,9 @@ object Utils {
       profiles
   }
 
-  val celeborMPUProject = profiles.filter(_.startsWith("aws")).headOption 
match {
+  val celeborMPUProject = profiles.find(p => p.startsWith("aws") || 
p.startsWith("aliyun")) match {
     case Some("aws") => Some(CeleborMPU.celeborMPU)
+    case Some("aliyun") => Some(CeleborMPU.celeborMPUOss)
     case _ => None
   }
 
@@ -569,6 +575,7 @@ object CelebornSpi {
 object CeleborMPU {
 
   lazy val hadoopAwsDependencies = Seq(Dependencies.hadoopAws, 
Dependencies.awsS3)
+  lazy val hadoopAliyunDependencies = Seq(Dependencies.commonsCollections, 
Dependencies.hadoopAliyun, Dependencies.aliyunOss)
 
   lazy val celeborMPU = Project("celeborn-multipart-uploader-s3", 
file("multipart-uploader/multipart-uploader-s3"))
     .dependsOn(CelebornService.service % "test->test;compile->compile")
@@ -579,6 +586,16 @@ object CeleborMPU {
         Dependencies.log4jSlf4jImpl,
       ) ++ hadoopAwsDependencies
     )
+
+  lazy val celeborMPUOss = Project("celeborn-multipart-uploader-oss", 
file("multipart-uploader/multipart-uploader-oss"))
+    .dependsOn(CelebornService.service % "test->test;compile->compile")
+    .settings (
+      commonSettings,
+      libraryDependencies ++= Seq(
+        Dependencies.log4j12Api,
+        Dependencies.log4jSlf4jImpl,
+      ) ++ hadoopAliyunDependencies
+    )
 }
 
 object CelebornCommon {
@@ -702,6 +719,15 @@ object CelebornService {
 }
 
 object CelebornMaster {
+  val mpuDependencies =
+    if (profiles.exists(_.startsWith("aws"))) {
+      CeleborMPU.hadoopAwsDependencies
+    } else if (profiles.exists(_.startsWith("aliyun"))) {
+      CeleborMPU.hadoopAliyunDependencies
+    } else {
+      Seq.empty
+    }
+
   lazy val master = Project("celeborn-master", file("master"))
     .dependsOn(CelebornCommon.common)
     .dependsOn(CelebornCommon.common % "test->test;compile->compile")
@@ -724,7 +750,7 @@ object CelebornMaster {
         Dependencies.ratisServer,
         Dependencies.ratisShell,
         Dependencies.scalatestMockito % "test",
-      ) ++ commonUnitTestDependencies
+      ) ++ commonUnitTestDependencies ++ mpuDependencies
     )
 }
 
@@ -738,6 +764,8 @@ object CelebornWorker {
 
   if (profiles.exists(_.startsWith("aws"))) {
     worker = worker.dependsOn(CeleborMPU.celeborMPU)
+  } else if (profiles.exists(_.startsWith("aliyun"))) {
+    worker = worker.dependsOn(CeleborMPU.celeborMPUOss)
   }
 
   worker = worker.settings(
@@ -1032,6 +1060,7 @@ trait SparkClientProjects {
               name.startsWith("failureaccess-") ||
               name.startsWith("netty-") ||
               name.startsWith("commons-lang3-") ||
+              name.startsWith("commons-io-") ||
               name.startsWith("RoaringBitmap-"))
           }
         },
diff --git a/worker/pom.xml b/worker/pom.xml
index 12dc7c941..6d9c858a1 100644
--- a/worker/pom.xml
+++ b/worker/pom.xml
@@ -153,5 +153,15 @@
         </dependency>
       </dependencies>
     </profile>
+    <profile>
+      <id>aliyun</id>
+      <dependencies>
+        <dependency>
+          <groupId>org.apache.celeborn</groupId>
+          
<artifactId>celeborn-multipart-uploader-oss_${scala.binary.version}</artifactId>
+          <version>${project.version}</version>
+        </dependency>
+      </dependencies>
+    </profile>
   </profiles>
 </project>
diff --git 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java
 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java
index acaa777d4..3cacd4866 100644
--- 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java
+++ 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java
@@ -154,6 +154,8 @@ public class PartitionDataWriter implements DeviceObserver {
           return null;
         } else if (diskFileInfo.isS3()) {
           storageInfo = new StorageInfo(StorageInfo.Type.S3, true, 
diskFileInfo.getFilePath());
+        } else if (diskFileInfo.isOSS()) {
+          return new StorageInfo(StorageInfo.Type.OSS, true, 
diskFileInfo.getFilePath());
         } else {
           storageInfo = new StorageInfo(StorageInfo.Type.HDFS, true, 
diskFileInfo.getFilePath());
         }
diff --git 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java
 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java
index 3385874c7..a45fa2e23 100644
--- 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java
+++ 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java
@@ -485,8 +485,12 @@ public class PartitionFilesSorter extends 
ShuffleRecoverHelper {
     FSDataOutputStream dfsIndexOutput = null;
     FileChannel indexFileChannel = null;
     if (isDfs) {
-      boolean isS3 = Utils.isS3Path(indexFilePath);
-      StorageInfo.Type storageType = isS3 ? StorageInfo.Type.S3 : 
StorageInfo.Type.HDFS;
+      StorageInfo.Type storageType = StorageInfo.Type.HDFS;
+      if (Utils.isS3Path(indexFilePath)) {
+        storageType = StorageInfo.Type.S3;
+      } else if (Utils.isOssPath(indexFilePath)) {
+        storageType = StorageInfo.Type.OSS;
+      }
       FileSystem hadoopFs = StorageManager.hadoopFs().get(storageType);
       // If the index file exists, it will be overwritten.
       // So there is no need to check its existence.
@@ -599,13 +603,19 @@ public class PartitionFilesSorter extends 
ShuffleRecoverHelper {
               () -> {
                 FileChannel indexChannel = null;
                 FSDataInputStream dfsIndexStream = null;
-                boolean isDfs = Utils.isHdfsPath(indexFilePath) || 
Utils.isS3Path(indexFilePath);
-                boolean isS3 = Utils.isS3Path(indexFilePath);
+                boolean isDfs =
+                    Utils.isHdfsPath(indexFilePath)
+                        || Utils.isS3Path(indexFilePath)
+                        || Utils.isOssPath(indexFilePath);
                 int indexSize;
                 try {
                   if (isDfs) {
-                    StorageInfo.Type storageType =
-                        isS3 ? StorageInfo.Type.S3 : StorageInfo.Type.HDFS;
+                    StorageInfo.Type storageType = StorageInfo.Type.HDFS;
+                    if (Utils.isS3Path(indexFilePath)) {
+                      storageType = StorageInfo.Type.S3;
+                    } else if (Utils.isOssPath(indexFilePath)) {
+                      storageType = StorageInfo.Type.OSS;
+                    }
                     FileSystem hadoopFs = 
StorageManager.hadoopFs().get(storageType);
                     dfsIndexStream = hadoopFs.open(new Path(indexFilePath));
                     indexSize = (int) hadoopFs.getFileStatus(new 
Path(indexFilePath)).getLen();
@@ -656,6 +666,7 @@ public class PartitionFilesSorter extends 
ShuffleRecoverHelper {
     private final String shuffleKey;
     private final boolean isHdfs;
     private final boolean isS3;
+    private final boolean isOss;
     private final boolean isDfs;
     private final boolean isPrefetch;
     private final FileInfo originFileInfo;
@@ -672,7 +683,8 @@ public class PartitionFilesSorter extends 
ShuffleRecoverHelper {
       this.sortedFilePath = Utils.getSortedFilePath(originFilePath);
       this.isHdfs = fileInfo.isHdfs();
       this.isS3 = fileInfo.isS3();
-      this.isDfs = isHdfs || isS3;
+      this.isOss = fileInfo.isOSS();
+      this.isDfs = isHdfs || isS3 || isOss;
       this.isPrefetch = !isDfs && prefetchEnabled;
       this.originFileLen = fileInfo.getFileLength();
       this.fileId = fileId;
@@ -688,8 +700,12 @@ public class PartitionFilesSorter extends 
ShuffleRecoverHelper {
           indexFile.delete();
         }
       } else {
-        boolean isS3 = Utils.isS3Path(indexFilePath);
-        StorageInfo.Type storageType = isS3 ? StorageInfo.Type.S3 : 
StorageInfo.Type.HDFS;
+        StorageInfo.Type storageType = StorageInfo.Type.HDFS;
+        if (Utils.isS3Path(indexFilePath)) {
+          storageType = StorageInfo.Type.S3;
+        } else if (Utils.isOssPath(indexFilePath)) {
+          storageType = StorageInfo.Type.OSS;
+        }
         this.hadoopFs = StorageManager.hadoopFs().get(storageType);
         if (hadoopFs.exists(fileInfo.getDfsSortedPath())) {
           hadoopFs.delete(fileInfo.getDfsSortedPath(), false);
diff --git 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/TierWriterHelper.java
 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/TierWriterHelper.java
index a24fc0a21..3524beb78 100644
--- 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/TierWriterHelper.java
+++ 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/TierWriterHelper.java
@@ -41,4 +41,19 @@ public class TierWriterHelper {
             .build()
             .newInstance(bucketName, s3AccessKey, s3SecretKey, 
s3EndpointRegion, key, maxRetryies);
   }
+
+  public static MultipartUploadHandler getOssMultipartUploadHandler(
+      String ossEndpoint, String bucketName, String ossAccessKey, String 
ossSecretKey, String key) {
+    return (MultipartUploadHandler)
+        DynConstructors.builder()
+            .impl(
+                "org.apache.celeborn.OssMultipartUploadHandler",
+                String.class,
+                String.class,
+                String.class,
+                String.class,
+                String.class)
+            .build()
+            .newInstance(ossEndpoint, bucketName, ossAccessKey, ossSecretKey, 
key);
+  }
 }
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala
index f79dffee0..52650cb6c 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala
@@ -186,7 +186,7 @@ private[deploy] class Controller(
       return
     }
 
-    if (storageManager.healthyWorkingDirs().size <= 0 && !conf.hasHDFSStorage 
&& !conf.hasS3Storage) {
+    if (storageManager.healthyWorkingDirs().size <= 0 && !conf.hasHDFSStorage 
&& !conf.hasS3Storage && !conf.hasOssStorage) {
       val msg = "Local storage has no available dirs!"
       logError(s"[handleReserveSlots] $msg")
       context.reply(ReserveSlotsResponse(StatusCode.NO_AVAILABLE_WORKING_DIR, 
msg))
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
index 2e9895beb..568e0d193 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
@@ -288,6 +288,12 @@ class FetchHandler(
               shuffleKey,
               fileName)
             makeStreamHandler(streamId, numChunks = 0)
+          case info: DiskFileInfo if info.isOSS =>
+            chunkStreamManager.registerStream(
+              streamId,
+              shuffleKey,
+              fileName)
+            makeStreamHandler(streamId, numChunks = 0)
           case _ =>
             val managedBuffer = fileInfo match {
               case df: DiskFileInfo =>
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
index dccbf47e6..d74e7a380 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
@@ -45,7 +45,7 @@ import org.apache.celeborn.common.protocol.message.StatusCode
 import org.apache.celeborn.common.unsafe.Platform
 import org.apache.celeborn.common.util.{ExceptionUtils, Utils}
 import 
org.apache.celeborn.service.deploy.worker.congestcontrol.CongestionController
-import org.apache.celeborn.service.deploy.worker.storage.{HdfsFlusher, 
LocalFlusher, PartitionDataWriter, S3Flusher, StorageManager}
+import org.apache.celeborn.service.deploy.worker.storage.{LocalFlusher, 
PartitionDataWriter, StorageManager}
 
 class PushDataHandler(val workerSource: WorkerSource) extends 
BaseMessageHandler with Logging {
 
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/FlushTask.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/FlushTask.scala
index 897e28733..736e1ff4a 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/FlushTask.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/FlushTask.scala
@@ -78,3 +78,19 @@ private[worker] class S3FlushTask(
     s3MultipartUploader.putPart(inputStream, partNumber, finalFlush)
   }
 }
+
+private[worker] class OssFlushTask(
+    buffer: CompositeByteBuf,
+    notifier: FlushNotifier,
+    keepBuffer: Boolean,
+    ossMultipartUploader: MultipartUploadHandler,
+    partNumber: Int,
+    finalFlush: Boolean = false)
+  extends FlushTask(buffer, notifier, keepBuffer) {
+
+  override def flush(): Unit = {
+    val bytes = ByteBufUtil.getBytes(buffer)
+    val inputStream = new ByteArrayInputStream(bytes)
+    ossMultipartUploader.putPart(inputStream, partNumber, finalFlush)
+  }
+}
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/Flusher.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/Flusher.scala
index a10059436..1aa7bde6f 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/Flusher.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/Flusher.scala
@@ -210,3 +210,22 @@ final private[worker] class S3Flusher(
 
   override def toString: String = s"s3Flusher@$flusherId"
 }
+
+final private[worker] class OssFlusher(
+    workerSource: AbstractSource,
+    ossFlusherThreads: Int,
+    allocator: ByteBufAllocator,
+    maxComponents: Int) extends Flusher(
+    workerSource,
+    ossFlusherThreads,
+    allocator,
+    maxComponents,
+    null,
+    "OSS") with Logging {
+
+  override def processIOException(e: IOException, deviceErrorType: 
DiskStatus): Unit = {
+    logError(s"$this write failed, reason $deviceErrorType ,exception: $e")
+  }
+
+  override def toString: String = s"ossFlusher@$flusherId"
+}
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
index 0bc94bc8a..1d369cf27 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
@@ -58,6 +58,7 @@ final private[worker] class StorageManager(conf: 
CelebornConf, workerSource: Abs
     JavaUtils.newConcurrentHashMap[File, ConcurrentHashMap[String, 
PartitionDataWriter]]()
   val hdfsWriters = JavaUtils.newConcurrentHashMap[String, 
PartitionDataWriter]()
   val s3Writers = JavaUtils.newConcurrentHashMap[String, PartitionDataWriter]()
+  val ossWriters = JavaUtils.newConcurrentHashMap[String, 
PartitionDataWriter]()
   val memoryWriters = JavaUtils.newConcurrentHashMap[MemoryFileInfo, 
PartitionDataWriter]()
   // (shuffleKey->(filename->DiskFileInfo))
   private val diskFileInfos =
@@ -70,6 +71,8 @@ final private[worker] class StorageManager(conf: 
CelebornConf, workerSource: Abs
 
   val hasS3Storage = conf.hasS3Storage
 
+  val hasOssStorage = conf.hasOssStorage
+
   val storageExpireDirTimeout = conf.workerStorageExpireDirTimeout
   val storagePolicy = new StoragePolicy(conf, this, workerSource)
 
@@ -83,7 +86,7 @@ final private[worker] class StorageManager(conf: 
CelebornConf, workerSource: Abs
         (new File(workdir, conf.workerWorkingDir), maxSpace, flusherThread, 
storageType)
       }
 
-    if (workingDirInfos.size <= 0 && !hasHDFSStorage && !hasS3Storage) {
+    if (workingDirInfos.size <= 0 && !hasHDFSStorage && !hasS3Storage && 
!hasOssStorage) {
       throw new IOException("Empty working directory configuration!")
     }
 
@@ -100,6 +103,11 @@ final private[worker] class StorageManager(conf: 
CelebornConf, workerSource: Abs
       Option(new DiskInfo("S3", Long.MaxValue, 999999, 999999, 0, 
StorageInfo.Type.S3))
     else None
 
+  val ossDiskInfo =
+    if (conf.hasOssStorage)
+      Option(new DiskInfo("OSS", Long.MaxValue, 999999, 999999, 0, 
StorageInfo.Type.OSS))
+    else None
+
   def disksSnapshot(): List[DiskInfo] = {
     diskInfos.synchronized {
       val disks = new util.ArrayList[DiskInfo](diskInfos.values())
@@ -159,6 +167,7 @@ final private[worker] class StorageManager(conf: 
CelebornConf, workerSource: Abs
 
   val hdfsDir = conf.hdfsDir
   val s3Dir = conf.s3Dir
+  val ossDir = conf.ossDir
   val hdfsPermission = new FsPermission("755")
   val (hdfsFlusher, _totalHdfsFlusherThread) =
     if (hasHDFSStorage) {
@@ -202,15 +211,36 @@ final private[worker] class StorageManager(conf: 
CelebornConf, workerSource: Abs
       (None, 0)
     }
 
+  val (ossFlusher, _totalOssFlusherThread) =
+    if (hasOssStorage) {
+      logInfo(s"Initialize OSS support with path $ossDir")
+      try {
+        StorageManager.hadoopFs = CelebornHadoopUtils.getHadoopFS(conf)
+      } catch {
+        case e: Exception =>
+          logError("Celeborn initialize OSS failed.", e)
+          throw e
+      }
+      (
+        Some(new OssFlusher(
+          workerSource,
+          conf.workerOssFlusherThreads,
+          storageBufferAllocator,
+          conf.workerPushMaxComponents)),
+        conf.workerOssFlusherThreads)
+    } else {
+      (None, 0)
+    }
+
   def totalFlusherThread: Int =
-    _totalLocalFlusherThread + _totalHdfsFlusherThread + _totalS3FlusherThread
+    _totalLocalFlusherThread + _totalHdfsFlusherThread + _totalS3FlusherThread 
+ _totalOssFlusherThread
 
   val activeTypes = conf.availableStorageTypes
 
   lazy val localOrDfsStorageAvailable: Boolean = {
     StorageInfo.OSSAvailable(activeTypes) || StorageInfo.HDFSAvailable(
       activeTypes) || StorageInfo.localDiskAvailable(
-      activeTypes) || hdfsDir.nonEmpty || !diskInfos.isEmpty || s3Dir.nonEmpty
+      activeTypes) || hdfsDir.nonEmpty || !diskInfos.isEmpty || s3Dir.nonEmpty 
|| ossDir.nonEmpty
   }
 
   override def notifyError(mountPoint: String, diskStatus: DiskStatus): Unit = 
this.synchronized {
@@ -419,7 +449,7 @@ final private[worker] class StorageManager(conf: 
CelebornConf, workerSource: Abs
       userIdentifier: UserIdentifier,
       partitionSplitEnabled: Boolean,
       isSegmentGranularityVisible: Boolean): PartitionDataWriter = {
-    if (healthyWorkingDirs().size <= 0 && !hasHDFSStorage && !hasS3Storage) {
+    if (healthyWorkingDirs().size <= 0 && !hasHDFSStorage && !hasS3Storage && 
!hasOssStorage) {
       throw new IOException("No available working dirs!")
     }
     val partitionDataWriterContext = new PartitionDataWriterContext(
@@ -477,6 +507,10 @@ final private[worker] class StorageManager(conf: 
CelebornConf, workerSource: Abs
       s3Writers.put(fileInfo.getFilePath, writer)
       return
     }
+    if (fileInfo.isOSS) {
+      ossWriters.put(fileInfo.getFilePath, writer)
+      return
+    }
     deviceMonitor.registerFileWriter(writer, fileInfo.getFilePath)
     workingDirWriters.computeIfAbsent(workingDir, 
workingDirWriterListFunc).put(
       fileInfo.getFilePath,
@@ -565,6 +599,14 @@ final private[worker] class StorageManager(conf: 
CelebornConf, workerSource: Abs
               s"Destroy FileWriter $s3FileWriter caused by shuffle $shuffleKey 
expired."))
             s3Writers.remove(info.getFilePath)
           }
+        } else if (info.isOSS) {
+          isDfsExpired = true
+          val ossFileWriter = ossWriters.get(info.getFilePath)
+          if (ossFileWriter != null) {
+            ossFileWriter.destroy(new IOException(
+              s"Destroy FileWriter $ossFileWriter caused by shuffle 
$shuffleKey expired."))
+            ossWriters.remove(info.getFilePath)
+          }
         } else {
           val workingDir =
             info.getFile.getParentFile.getParentFile.getParentFile
@@ -597,12 +639,14 @@ final private[worker] class StorageManager(conf: 
CelebornConf, workerSource: Abs
         val removedFileInfos = diskFileInfos.remove(shuffleKey)
         var isDfsExpired = false
         var isHdfs = false
+        var isOss = false
         if (removedFileInfos != null) {
           removedFileInfos.asScala.foreach {
             case (_, fileInfo) =>
               if (cleanFileInternal(shuffleKey, fileInfo)) {
                 isDfsExpired = true
                 isHdfs = fileInfo.isHdfs
+                isOss = fileInfo.isOSS
               }
           }
         }
@@ -617,9 +661,14 @@ final private[worker] class StorageManager(conf: 
CelebornConf, workerSource: Abs
         }
         if (isDfsExpired) {
           try {
-            val dir = if (hasHDFSStorage && isHdfs) hdfsDir else s3Dir
+            val dir =
+              if (hasHDFSStorage && isHdfs) hdfsDir
+              else if (hasOssStorage && isOss) ossDir
+              else s3Dir
             val storageInfo =
-              if (hasHDFSStorage && isHdfs) StorageInfo.Type.HDFS else 
StorageInfo.Type.S3
+              if (hasHDFSStorage && isHdfs) StorageInfo.Type.HDFS
+              else if (hasOssStorage && isOss) StorageInfo.Type.OSS
+              else StorageInfo.Type.S3
             StorageManager.hadoopFs.get(storageInfo).delete(
               new Path(new Path(dir, conf.workerWorkingDir), 
s"$appId/$shuffleId"),
               true)
@@ -726,7 +775,7 @@ final private[worker] class StorageManager(conf: 
CelebornConf, workerSource: Abs
 
       val dfsCleaned = hadoopFs match {
         case dfs: FileSystem =>
-          val dfsDir = if (hasHDFSStorage) hdfsDir else s3Dir
+          val dfsDir = if (hasHDFSStorage) hdfsDir else if (hasOssStorage) 
ossDir else s3Dir
           val dfsWorkPath = new Path(dfsDir, conf.workerWorkingDir)
           // DFS path not exist when first time initialize
           if (dfs.exists(dfsWorkPath)) {
@@ -965,7 +1014,8 @@ final private[worker] class StorageManager(conf: 
CelebornConf, workerSource: Abs
         null,
         null,
         null)
-    } else if (location.getStorageInfo.localDiskAvailable() || 
location.getStorageInfo.HDFSAvailable() || 
location.getStorageInfo.S3Available()) {
+    } else if (location.getStorageInfo.localDiskAvailable() || 
location.getStorageInfo.HDFSAvailable()
+      || location.getStorageInfo.S3Available() || 
location.getStorageInfo.OSSAvailable()) {
       logDebug(s"create non-memory file for 
${partitionDataWriterContext.getShuffleKey} 
${partitionDataWriterContext.getPartitionLocation.getFileName}")
       val createDiskFileResult = createDiskFile(
         location,
@@ -1038,7 +1088,7 @@ final private[worker] class StorageManager(conf: 
CelebornConf, workerSource: Abs
           }
           healthyWorkingDirs()
         }
-      if (dirs.isEmpty && hdfsFlusher.isEmpty && s3Flusher.isEmpty) {
+      if (dirs.isEmpty && hdfsFlusher.isEmpty && s3Flusher.isEmpty && 
ossFlusher.isEmpty) {
         throw new IOException(s"No available disks! suggested mountPoint 
$suggestedMountPoint")
       }
 
@@ -1078,6 +1128,24 @@ final private[worker] class StorageManager(conf: 
CelebornConf, workerSource: Abs
           fileName,
           s3FileInfo)
         return (s3Flusher.get, s3FileInfo, null)
+      } else if (hasOssStorage && location.getStorageInfo.OSSAvailable()) {
+        val shuffleDir =
+          new Path(new Path(ossDir, conf.workerWorkingDir), 
s"$appId/$shuffleId")
+        FileSystem.mkdirs(
+          StorageManager.hadoopFs.get(StorageInfo.Type.OSS),
+          shuffleDir,
+          hdfsPermission)
+        val ossFilePath = new Path(shuffleDir, fileName).toString
+        val ossFileInfo = new DiskFileInfo(
+          userIdentifier,
+          partitionSplitEnabled,
+          new ReduceFileMeta(conf.shuffleChunkSize),
+          ossFilePath,
+          StorageInfo.Type.OSS)
+        diskFileInfos.computeIfAbsent(shuffleKey, diskFileInfoMapFunc).put(
+          fileName,
+          ossFileInfo)
+        return (ossFlusher.get, ossFileInfo, null)
       } else if (dirs.nonEmpty && 
location.getStorageInfo.localDiskAvailable()) {
         val dir = dirs(getNextIndex % dirs.size)
         val mountPoint = DeviceInfo.getMountPoint(dir.getAbsolutePath, 
mountPoints)
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala
index f1996441f..8ac98aae8 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala
@@ -68,7 +68,10 @@ abstract class TierWriterBase(
 
   def write(buf: ByteBuf): Unit = {
     ensureNotClosed()
-    if (notifier.hasException) return
+    if (notifier.hasException) {
+      handleException()
+      return
+    }
 
     flushLock.synchronized {
       metaHandler.beforeWrite(buf)
@@ -80,6 +83,8 @@ abstract class TierWriterBase(
     numPendingWrites.decrementAndGet()
   }
 
+  def handleException(): Unit
+
   protected def writeInternal(buf: ByteBuf): Unit
 
   def needEvict(): Boolean
@@ -348,6 +353,8 @@ class MemoryTierWriter(
   override def getFlusher(): Flusher = {
     null
   }
+
+  override def handleException(): Unit = {}
 }
 
 class LocalTierWriter(
@@ -471,6 +478,8 @@ class LocalTierWriter(
   def getFlusher(): Flusher = {
     flusher
   }
+
+  override def handleException(): Unit = {}
 }
 
 class DfsTierWriter(
@@ -500,11 +509,14 @@ class DfsTierWriter(
   val hadoopFs: FileSystem = StorageManager.hadoopFs.get(storageType)
   var deleted = false
   private var s3MultipartUploadHandler: MultipartUploadHandler = _
+  private var ossMultipartUploadHandler: MultipartUploadHandler = _
   var partNumber: Int = 1
 
   this.flusherBufferSize =
     if (hdfsFileInfo.isS3()) {
       conf.workerS3FlusherBufferSize
+    } else if (hdfsFileInfo.isOSS()) {
+      conf.workerOssFlusherBufferSize
     } else {
       conf.workerHdfsFlusherBufferSize
     }
@@ -530,6 +542,24 @@ class DfsTierWriter(
         key,
         conf.s3MultiplePartUploadMaxRetries)
       s3MultipartUploadHandler.startUpload()
+    } else if (hdfsFileInfo.isOSS) {
+      val configuration = hadoopFs.getConf
+      val ossEndpoint = configuration.get("fs.oss.endpoint")
+      val ossAccessKey = configuration.get("fs.oss.accessKeyId")
+      val ossSecretKey = configuration.get("fs.oss.accessKeySecret")
+
+      val uri = hadoopFs.getUri
+      val bucketName = uri.getHost
+      val index = hdfsFileInfo.getFilePath.indexOf(bucketName)
+      val key = hdfsFileInfo.getFilePath.substring(index + bucketName.length + 
1)
+
+      this.ossMultipartUploadHandler = 
TierWriterHelper.getOssMultipartUploadHandler(
+        ossEndpoint,
+        bucketName,
+        ossAccessKey,
+        ossSecretKey,
+        key)
+      ossMultipartUploadHandler.startUpload()
     }
   } catch {
     case _: IOException =>
@@ -556,11 +586,21 @@ class DfsTierWriter(
     notifier.numPendingFlushes.incrementAndGet()
     if (hdfsFileInfo.isHdfs) {
       new HdfsFlushTask(flushBuffer, hdfsFileInfo.getDfsPath(), notifier, true)
+    } else if (hdfsFileInfo.isOSS) {
+      val flushTask = new OssFlushTask(
+        flushBuffer,
+        notifier,
+        true,
+        ossMultipartUploadHandler,
+        partNumber,
+        finalFlush)
+      partNumber = partNumber + 1
+      flushTask
     } else {
       val flushTask = new S3FlushTask(
         flushBuffer,
         notifier,
-        false,
+        true,
         s3MultipartUploadHandler,
         partNumber,
         finalFlush)
@@ -610,6 +650,14 @@ class DfsTierWriter(
       }
       indexOutputStream.close()
     }
+    if (s3MultipartUploadHandler != null) {
+      s3MultipartUploadHandler.complete()
+      s3MultipartUploadHandler.close()
+    }
+    if (ossMultipartUploadHandler != null) {
+      ossMultipartUploadHandler.complete()
+      ossMultipartUploadHandler.close()
+    }
   }
 
   override def notifyFileCommitted(): Unit =
@@ -643,4 +691,17 @@ class DfsTierWriter(
   def getFlusher(): Flusher = {
     flusher
   }
+
+  override def handleException(): Unit = {
+    if (s3MultipartUploadHandler != null) {
+      logWarning(s"Abort s3 multipart upload for ${fileInfo.getFilePath}")
+      s3MultipartUploadHandler.complete()
+      s3MultipartUploadHandler.close()
+    }
+    if (ossMultipartUploadHandler != null) {
+      logWarning(s"Abort Oss multipart upload for ${fileInfo.getFilePath}")
+      ossMultipartUploadHandler.complete()
+      ossMultipartUploadHandler.close()
+    }
+  }
 }

Reply via email to