This is an automated email from the ASF dual-hosted git repository.
nicholasjiang pushed a commit to branch branch-0.6
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/branch-0.6 by this push:
new 57461e2f9 [CELEBORN-2142] DfsTierWriter should create for unavailable
disks
57461e2f9 is described below
commit 57461e2f92daa0a5155392d5a47cf7762dbe822b
Author: xxx <[email protected]>
AuthorDate: Mon Sep 29 16:10:29 2025 +0800
[CELEBORN-2142] DfsTierWriter should create for unavailable disks
### What changes were proposed in this pull request?
`DfsTierWriter` should create for unavailable disks.
### Why are the changes needed?
If there is no available disk, `StorageManager#createDiskFile` returns
`HdfsFlusher`, `S3Flusher`, or `OssFlusher`. Meanwhile, `LocalTierWriter`
should not be created.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
CI.
Closes #3468 from xy2953396112/CELEBORN-2142.
Authored-by: xxx <[email protected]>
Signed-off-by: SteNicholas <[email protected]>
(cherry picked from commit 944efa87ba5be42befadeb720649f8717f0c05e1)
Signed-off-by: SteNicholas <[email protected]>
---
.../celeborn/service/deploy/worker/storage/StoragePolicy.scala | 7 ++++---
.../deploy/worker/storage/storagePolicy/StoragePolicyCase2.scala | 3 ++-
.../deploy/worker/storage/storagePolicy/StoragePolicyCase3.scala | 4 +++-
.../deploy/worker/storage/storagePolicy/StoragePolicyCase4.scala | 3 ++-
4 files changed, 11 insertions(+), 6 deletions(-)
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StoragePolicy.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StoragePolicy.scala
index a475d12f2..7c8f8161e 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StoragePolicy.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StoragePolicy.scala
@@ -135,7 +135,8 @@ class StoragePolicy(conf: CelebornConf, storageManager:
StorageManager, source:
partitionDataWriterContext.isPartitionSplitEnabled)
partitionDataWriterContext.setWorkingDir(workingDir)
val metaHandler = getPartitionMetaHandler(diskFileInfo)
- if ((storageInfoType == StorageInfo.Type.HDD || storageInfoType
== StorageInfo.Type.SSD) && location.getStorageInfo.localDiskAvailable()) {
+ if (flusher.isInstanceOf[LocalFlusher]
+ && location.getStorageInfo.localDiskAvailable()) {
new LocalTierWriter(
conf,
metaHandler,
@@ -144,7 +145,7 @@ class StoragePolicy(conf: CelebornConf, storageManager:
StorageManager, source:
flusher,
source,
diskFileInfo,
- storageInfoType,
+ diskFileInfo.getStorageType,
partitionDataWriterContext,
storageManager)
} else {
@@ -156,7 +157,7 @@ class StoragePolicy(conf: CelebornConf, storageManager:
StorageManager, source:
flusher,
source,
diskFileInfo,
- storageInfoType,
+ diskFileInfo.getStorageType,
partitionDataWriterContext,
storageManager)
}
diff --git
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase2.scala
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase2.scala
index 28f6e87e0..7a7ab956d 100644
---
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase2.scala
+++
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase2.scala
@@ -56,7 +56,7 @@ class StoragePolicyCase2 extends CelebornFunSuite {
when(mockedStorageManager.storageBufferAllocator).thenAnswer(UnpooledByteBufAllocator.DEFAULT)
val mockedDiskFile = mock[DiskFileInfo]
- val mockedFlusher = mock[Flusher]
+ val mockedFlusher = mock[LocalFlusher]
val mockedFile = mock[File]
when(
mockedStorageManager.createDiskFile(
@@ -101,6 +101,7 @@ class StoragePolicyCase2 extends CelebornFunSuite {
when(mockedPartitionWriterContext.getPartitionLocation).thenAnswer(localHintPartitionLocatioin)
when(mockedPartitionWriterContext.getPartitionType).thenAnswer(PartitionType.REDUCE)
when(mockedStorageManager.localOrDfsStorageAvailable).thenAnswer(true)
+ when(mockedDiskFile.getStorageType).thenAnswer(StorageInfo.Type.HDD)
val conf = new CelebornConf()
conf.set("celeborn.worker.storage.storagePolicy.createFilePolicy",
"SSD,HDD,HDFS,OSS,S3")
val storagePolicy = new StoragePolicy(conf, mockedStorageManager,
mockedSource)
diff --git
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase3.scala
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase3.scala
index cb21b7128..909703700 100644
---
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase3.scala
+++
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase3.scala
@@ -56,7 +56,7 @@ class StoragePolicyCase3 extends CelebornFunSuite {
when(mockedStorageManager.storageBufferAllocator).thenAnswer(UnpooledByteBufAllocator.DEFAULT)
val mockedDiskFile = mock[DiskFileInfo]
- val mockedFlusher = mock[Flusher]
+ val mockedFlusher = mock[LocalFlusher]
val mockedFile = mock[File]
when(
mockedStorageManager.createDiskFile(
@@ -101,6 +101,7 @@ class StoragePolicyCase3 extends CelebornFunSuite {
when(mockedPartitionWriterContext.getPartitionLocation).thenAnswer(localHintPartitionLocatioin)
when(mockedPartitionWriterContext.getPartitionType).thenAnswer(PartitionType.REDUCE)
when(mockedStorageManager.localOrDfsStorageAvailable).thenAnswer(true)
+ when(mockedDiskFile.getStorageType).thenAnswer(StorageInfo.Type.SSD)
val mockedMemoryFile = mock[LocalTierWriter]
val conf = new CelebornConf()
val flushLock = new AnyRef
@@ -120,6 +121,7 @@ class StoragePolicyCase3 extends CelebornFunSuite {
when(mockedPartitionWriterContext.getPartitionLocation).thenAnswer(memoryHintPartitionLocation)
when(mockedPartitionWriterContext.getPartitionType).thenAnswer(PartitionType.REDUCE)
when(mockedStorageManager.localOrDfsStorageAvailable).thenAnswer(true)
+ when(mockedDiskFile.getStorageType).thenAnswer(StorageInfo.Type.HDD)
val mockedMemoryFile = mock[LocalTierWriter]
val conf = new CelebornConf()
val flushLock = new AnyRef
diff --git
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase4.scala
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase4.scala
index e27def478..e25837d2b 100644
---
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase4.scala
+++
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase4.scala
@@ -56,7 +56,7 @@ class StoragePolicyCase4 extends CelebornFunSuite {
when(mockedStorageManager.storageBufferAllocator).thenAnswer(UnpooledByteBufAllocator.DEFAULT)
val mockedDiskFile = mock[DiskFileInfo]
- val mockedFlusher = mock[Flusher]
+ val mockedFlusher = mock[LocalFlusher]
val mockedFile = mock[File]
when(
mockedStorageManager.createDiskFile(
@@ -102,6 +102,7 @@ class StoragePolicyCase4 extends CelebornFunSuite {
memoryDisabledHintPartitionLocation)
when(mockedPartitionWriterContext.getPartitionType).thenAnswer(PartitionType.REDUCE)
when(mockedStorageManager.localOrDfsStorageAvailable).thenAnswer(true)
+ when(mockedDiskFile.getStorageType).thenAnswer(StorageInfo.Type.SSD)
val conf = new CelebornConf()
conf.set("celeborn.worker.storage.storagePolicy.createFilePolicy",
"MEMORY,SSD,HDD,HDFS,OSS,S3")
val storagePolicy = new StoragePolicy(conf, mockedStorageManager,
mockedSource)