This is an automated email from the ASF dual-hosted git repository.
ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 46d9d63e1 [CELEBORN-1916][FOLLOWUP] Improve Aliyun OSS support
46d9d63e1 is described below
commit 46d9d63e1f42b5280ee5aea814bdd18b2e83c780
Author: SteNicholas <[email protected]>
AuthorDate: Wed May 21 11:44:50 2025 +0800
[CELEBORN-1916][FOLLOWUP] Improve Aliyun OSS support
### What changes were proposed in this pull request?
Improve Aliyun OSS support including `SlotsAllocator#offerSlotsLoadAware`,
`Worker#heartbeatToMaster` and `PartitionDataWriter#getStorageInfo`.
### Why are the changes needed?
There are many methods where OSS support is lacking in
`SlotsAllocator#offerSlotsLoadAware`, `Worker#heartbeatToMaster` and
`PartitionDataWriter#getStorageInfo`.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
CI.
Closes #3268 from SteNicholas/CELEBORN-1916.
Authored-by: SteNicholas <[email protected]>
Signed-off-by: mingji <[email protected]>
---
.../celeborn/common/protocol/StorageInfo.java | 8 ++--
.../org/apache/celeborn/common/CelebornConf.scala | 11 +----
.../apache/celeborn/common/CelebornConfSuite.scala | 3 ++
docs/configuration/master.md | 3 +-
docs/migration.md | 2 +
.../service/deploy/master/SlotsAllocator.java | 8 ++--
.../deploy/worker/storage/PartitionDataWriter.java | 2 +-
.../celeborn/service/deploy/worker/Worker.scala | 2 +-
.../deploy/worker/storage/StorageManager.scala | 54 +++++++++++-----------
9 files changed, 42 insertions(+), 51 deletions(-)
diff --git
a/common/src/main/java/org/apache/celeborn/common/protocol/StorageInfo.java
b/common/src/main/java/org/apache/celeborn/common/protocol/StorageInfo.java
index 8509d5717..b8d9428c3 100644
--- a/common/src/main/java/org/apache/celeborn/common/protocol/StorageInfo.java
+++ b/common/src/main/java/org/apache/celeborn/common/protocol/StorageInfo.java
@@ -202,14 +202,14 @@ public class StorageInfo implements Serializable {
return availableStorageTypes == HDFS_MASK;
}
- public boolean HDFSOnly() {
- return StorageInfo.HDFSOnly(availableStorageTypes);
- }
-
public static boolean S3Only(int availableStorageTypes) {
return availableStorageTypes == S3_MASK;
}
+ public static boolean OSSOnly(int availableStorageTypes) {
+ return availableStorageTypes == OSS_MASK;
+ }
+
public static boolean OSSAvailable(int availableStorageTypes) {
return availableStorageTypes == ALL_TYPES_AVAILABLE_MASK
|| (availableStorageTypes & OSS_MASK) > 0;
diff --git
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index d3a9977ef..c994c2802 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -931,7 +931,6 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable
with Logging with Se
get(CLIENT_SHUFFLE_DYNAMIC_RESOURCE_ENABLED)
def clientShuffleDynamicResourceFactor: Double =
get(CLIENT_SHUFFLE_DYNAMIC_RESOURCE_FACTOR)
def appHeartbeatTimeoutMs: Long = get(APPLICATION_HEARTBEAT_TIMEOUT)
- def hdfsExpireDirsTimeoutMS: Long = get(HDFS_EXPIRE_DIRS_TIMEOUT)
def dfsExpireDirsTimeoutMS: Long = get(DFS_EXPIRE_DIRS_TIMEOUT)
def appHeartbeatIntervalMs: Long = get(APPLICATION_HEARTBEAT_INTERVAL)
def applicationUnregisterEnabled: Boolean =
get(APPLICATION_UNREGISTER_ENABLED)
@@ -2384,19 +2383,11 @@ object CelebornConf extends Logging {
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("300s")
- val HDFS_EXPIRE_DIRS_TIMEOUT: ConfigEntry[Long] =
- buildConf("celeborn.master.hdfs.expireDirs.timeout")
- .categories("master")
- .version("0.3.0")
- .doc("The timeout for a expire dirs to be deleted on HDFS.")
- .timeConf(TimeUnit.MILLISECONDS)
- .createWithDefaultString("1h")
-
val DFS_EXPIRE_DIRS_TIMEOUT: ConfigEntry[Long] =
buildConf("celeborn.master.dfs.expireDirs.timeout")
.categories("master")
.version("0.6.0")
- .doc("The timeout for a expire dirs to be deleted on S3 or HDFS.")
+ .doc("The timeout for an expired dirs to be deleted on dfs like HDFS,
S3, OSS.")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("1h")
diff --git
a/common/src/test/scala/org/apache/celeborn/common/CelebornConfSuite.scala
b/common/src/test/scala/org/apache/celeborn/common/CelebornConfSuite.scala
index a4e721ceb..2fedcdb32 100644
--- a/common/src/test/scala/org/apache/celeborn/common/CelebornConfSuite.scala
+++ b/common/src/test/scala/org/apache/celeborn/common/CelebornConfSuite.scala
@@ -248,6 +248,9 @@ class CelebornConfSuite extends CelebornFunSuite {
conf.set("celeborn.storage.availableTypes", "S3")
assert(conf.availableStorageTypes == StorageInfo.S3_MASK)
+
+ conf.set("celeborn.storage.availableTypes", "OSS")
+ assert(conf.availableStorageTypes == StorageInfo.OSS_MASK)
}
test("Test role rpcDispatcherNumThreads") {
diff --git a/docs/configuration/master.md b/docs/configuration/master.md
index f17cdcc00..add7c136e 100644
--- a/docs/configuration/master.md
+++ b/docs/configuration/master.md
@@ -37,14 +37,13 @@ license: |
| celeborn.logConf.enabled | false | false | When `true`, log the CelebornConf
for debugging purposes. | 0.5.0 | |
| celeborn.master.allowWorkerHostPattern | <undefined> | false | Pattern
of worker host that allowed to register with the master. If not set, all
workers are allowed to register. | 0.6.0 | |
| celeborn.master.denyWorkerHostPattern | <undefined> | false | Pattern
of worker host that denied to register with the master. If not set, no workers
are denied to register. | 0.6.0 | |
-| celeborn.master.dfs.expireDirs.timeout | 1h | false | The timeout for a
expire dirs to be deleted on S3 or HDFS. | 0.6.0 | |
+| celeborn.master.dfs.expireDirs.timeout | 1h | false | The timeout for an
expired dirs to be deleted on dfs like HDFS, S3, OSS. | 0.6.0 | |
| celeborn.master.estimatedPartitionSize.initialSize | 64mb | false | Initial
partition size for estimation, it will change according to runtime stats. |
0.3.0 | celeborn.shuffle.initialEstimatedPartitionSize |
| celeborn.master.estimatedPartitionSize.maxSize | <undefined> | false |
Max partition size for estimation. Default value should be
celeborn.worker.shuffle.partitionSplit.max * 2. | 0.4.1 | |
| celeborn.master.estimatedPartitionSize.minSize | 8mb | false | Ignore
partition size smaller than this configuration of partition size for
estimation. | 0.3.0 | celeborn.shuffle.minPartitionSizeToEstimate |
| celeborn.master.estimatedPartitionSize.update.initialDelay | 5min | false |
Initial delay time before start updating partition size for estimation. | 0.3.0
| celeborn.shuffle.estimatedPartitionSize.update.initialDelay |
| celeborn.master.estimatedPartitionSize.update.interval | 10min | false |
Interval of updating partition size for estimation. | 0.3.0 |
celeborn.shuffle.estimatedPartitionSize.update.interval |
| celeborn.master.excludeWorker.unhealthyDiskRatioThreshold | 1.0 | false |
Max ratio of unhealthy disks for excluding worker, when unhealthy disk is
larger than max unhealthy count, master will exclude worker. If this value is
set to 1, master will exclude worker of which disks are all unhealthy. | 0.6.0
| |
-| celeborn.master.hdfs.expireDirs.timeout | 1h | false | The timeout for a
expire dirs to be deleted on HDFS. | 0.3.0 | |
| celeborn.master.heartbeat.application.timeout | 300s | false | Application
heartbeat timeout. | 0.3.0 | celeborn.application.heartbeat.timeout |
| celeborn.master.heartbeat.worker.timeout | 120s | false | Worker heartbeat
timeout. | 0.3.0 | celeborn.worker.heartbeat.timeout |
| celeborn.master.host | <localhost> | false | Hostname for master to
bind. | 0.2.0 | |
diff --git a/docs/migration.md b/docs/migration.md
index 616974d69..9512c6edb 100644
--- a/docs/migration.md
+++ b/docs/migration.md
@@ -31,6 +31,8 @@ license: |
- Since 0.6.0, Celeborn modified `celeborn.quota.tenant.hdfsFileCount` to
`celeborn.quota.user.hdfsFileCount`. Please use
`celeborn.quota.user.hdfsFileCount` if you want to set user level quota.
+- Since 0.6.0, Celeborn modified `celeborn.master.hdfs.expireDirs.timeout` to
`celeborn.master.dfs.expireDirs.timeout`. Please use
`cceleborn.master.dfs.expireDirs.timeout` if you want to set timeout for an
expired dirs to be deleted.
+
- Since 0.6.0, Celeborn changed the default value of
`celeborn.master.slot.assign.extraSlots` from `2` to `100`, which means
Celeborn will involve more workers in offering slots.
- Since 0.6.0, Celeborn deprecate
`celeborn.worker.congestionControl.low.watermark`. Please use
`celeborn.worker.congestionControl.diskBuffer.low.watermark` instead.
diff --git
a/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java
b/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java
index 62582018f..608c3e7bd 100644
---
a/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java
+++
b/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java
@@ -126,11 +126,9 @@ public class SlotsAllocator {
if (workers.size() < 2 && shouldReplicate) {
return new HashMap<>();
}
- if (StorageInfo.HDFSOnly(availableStorageTypes)) {
- return offerSlotsRoundRobin(
- workers, partitionIds, shouldReplicate, shouldRackAware,
availableStorageTypes);
- }
- if (StorageInfo.S3Only(availableStorageTypes)) {
+ if (StorageInfo.HDFSOnly(availableStorageTypes)
+ || StorageInfo.S3Only(availableStorageTypes)
+ || StorageInfo.OSSOnly(availableStorageTypes)) {
return offerSlotsRoundRobin(
workers, partitionIds, shouldReplicate, shouldRackAware,
availableStorageTypes);
}
diff --git
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java
index 3cacd4866..932890237 100644
---
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java
+++
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java
@@ -155,7 +155,7 @@ public class PartitionDataWriter implements DeviceObserver {
} else if (diskFileInfo.isS3()) {
storageInfo = new StorageInfo(StorageInfo.Type.S3, true,
diskFileInfo.getFilePath());
} else if (diskFileInfo.isOSS()) {
- return new StorageInfo(StorageInfo.Type.OSS, true,
diskFileInfo.getFilePath());
+ storageInfo = new StorageInfo(StorageInfo.Type.OSS, true,
diskFileInfo.getFilePath());
} else {
storageInfo = new StorageInfo(StorageInfo.Type.HDFS, true,
diskFileInfo.getFilePath());
}
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
index 502740cd7..0a1e14ff4 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
@@ -490,7 +490,7 @@ private[celeborn] class Worker(
val diskInfos =
workerInfo.updateThenGetDiskInfos(storageManager.disksSnapshot().map {
disk =>
disk.mountPoint -> disk
- }.toMap.asJava).values().asScala.toSeq ++ storageManager.hdfsDiskInfo ++
storageManager.s3DiskInfo
+ }.toMap.asJava).values().asScala.toSeq ++ storageManager.hdfsDiskInfo ++
storageManager.s3DiskInfo ++ storageManager.ossDiskInfo
workerStatusManager.checkIfNeedTransitionStatus()
val response = masterClient.askSync[HeartbeatFromWorkerResponse](
HeartbeatFromWorker(
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
index 1d369cf27..345f109ce 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
@@ -238,9 +238,9 @@ final private[worker] class StorageManager(conf:
CelebornConf, workerSource: Abs
val activeTypes = conf.availableStorageTypes
lazy val localOrDfsStorageAvailable: Boolean = {
- StorageInfo.OSSAvailable(activeTypes) || StorageInfo.HDFSAvailable(
- activeTypes) || StorageInfo.localDiskAvailable(
- activeTypes) || hdfsDir.nonEmpty || !diskInfos.isEmpty || s3Dir.nonEmpty
|| ossDir.nonEmpty
+ StorageInfo.OSSAvailable(activeTypes) ||
StorageInfo.S3Available(activeTypes) ||
+ StorageInfo.HDFSAvailable(activeTypes) ||
StorageInfo.localDiskAvailable(activeTypes) ||
+ hdfsDir.nonEmpty || !diskInfos.isEmpty || s3Dir.nonEmpty || ossDir.nonEmpty
}
override def notifyError(mountPoint: String, diskStatus: DiskStatus): Unit =
this.synchronized {
@@ -842,33 +842,31 @@ final private[worker] class StorageManager(conf:
CelebornConf, workerSource: Abs
override def accept(
t: File,
writers: ConcurrentHashMap[String, PartitionDataWriter]): Unit = {
- writers.forEach(new BiConsumer[String, PartitionDataWriter] {
- override def accept(file: String, writer: PartitionDataWriter): Unit
= {
- if (writer.getException == null) {
- try {
- writer.flushOnMemoryPressure()
- } catch {
- case t: Throwable =>
- logError(
- s"FileWrite of $writer faces unexpected exception when
flush on memory pressure.",
- t)
- }
- } else {
- logWarning(s"Skip flushOnMemoryPressure because
${writer.getFlusher} " +
- s"has error: ${writer.getException.getMessage}")
- }
- }
- })
- }
- })
- hdfsWriters.forEach(new BiConsumer[String, PartitionDataWriter] {
- override def accept(t: String, u: PartitionDataWriter): Unit = {
- u.flushOnMemoryPressure()
+ flushOnMemoryPressure(writers)
}
})
- s3Writers.forEach(new BiConsumer[String, PartitionDataWriter] {
- override def accept(t: String, u: PartitionDataWriter): Unit = {
- u.flushOnMemoryPressure()
+ flushOnMemoryPressure(hdfsWriters)
+ flushOnMemoryPressure(s3Writers)
+ flushOnMemoryPressure(ossWriters)
+ }
+
+ private def flushOnMemoryPressure(writers: ConcurrentHashMap[String,
PartitionDataWriter])
+ : Unit = {
+ writers.forEach(new BiConsumer[String, PartitionDataWriter] {
+ override def accept(file: String, writer: PartitionDataWriter): Unit = {
+ if (writer.getException == null) {
+ try {
+ writer.flushOnMemoryPressure()
+ } catch {
+ case t: Throwable =>
+ logError(
+ s"FileWrite of $writer faces unexpected exception when flush
on memory pressure.",
+ t)
+ }
+ } else {
+ logWarning(s"Skip flushOnMemoryPressure because ${writer.getFlusher}
" +
+ s"has error: ${writer.getException.getMessage}")
+ }
}
})
}