This is an automated email from the ASF dual-hosted git repository.

nicholasjiang pushed a commit to branch branch-0.6
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/branch-0.6 by this push:
     new 57461e2f9 [CELEBORN-2142] DfsTierWriter should create for unavailable 
disks
57461e2f9 is described below

commit 57461e2f92daa0a5155392d5a47cf7762dbe822b
Author: xxx <[email protected]>
AuthorDate: Mon Sep 29 16:10:29 2025 +0800

    [CELEBORN-2142] DfsTierWriter should create for unavailable disks
    
    ### What changes were proposed in this pull request?
    
    `DfsTierWriter` should create for unavailable disks.
    
    ### Why are the changes needed?
    
    If there is no available disk, `StorageManager#createDiskFile` returns 
`HdfsFlusher`, `S3Flusher`, or `OssFlusher`. Meanwhile, `LocalTierWriter` 
should not be created.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    CI.
    
    Closes #3468 from xy2953396112/CELEBORN-2142.
    
    Authored-by: xxx <[email protected]>
    Signed-off-by: SteNicholas <[email protected]>
    (cherry picked from commit 944efa87ba5be42befadeb720649f8717f0c05e1)
    Signed-off-by: SteNicholas <[email protected]>
---
 .../celeborn/service/deploy/worker/storage/StoragePolicy.scala     | 7 ++++---
 .../deploy/worker/storage/storagePolicy/StoragePolicyCase2.scala   | 3 ++-
 .../deploy/worker/storage/storagePolicy/StoragePolicyCase3.scala   | 4 +++-
 .../deploy/worker/storage/storagePolicy/StoragePolicyCase4.scala   | 3 ++-
 4 files changed, 11 insertions(+), 6 deletions(-)

diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StoragePolicy.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StoragePolicy.scala
index a475d12f2..7c8f8161e 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StoragePolicy.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StoragePolicy.scala
@@ -135,7 +135,8 @@ class StoragePolicy(conf: CelebornConf, storageManager: 
StorageManager, source:
                 partitionDataWriterContext.isPartitionSplitEnabled)
               partitionDataWriterContext.setWorkingDir(workingDir)
               val metaHandler = getPartitionMetaHandler(diskFileInfo)
-              if ((storageInfoType == StorageInfo.Type.HDD || storageInfoType 
== StorageInfo.Type.SSD) && location.getStorageInfo.localDiskAvailable()) {
+              if (flusher.isInstanceOf[LocalFlusher]
+                && location.getStorageInfo.localDiskAvailable()) {
                 new LocalTierWriter(
                   conf,
                   metaHandler,
@@ -144,7 +145,7 @@ class StoragePolicy(conf: CelebornConf, storageManager: 
StorageManager, source:
                   flusher,
                   source,
                   diskFileInfo,
-                  storageInfoType,
+                  diskFileInfo.getStorageType,
                   partitionDataWriterContext,
                   storageManager)
               } else {
@@ -156,7 +157,7 @@ class StoragePolicy(conf: CelebornConf, storageManager: 
StorageManager, source:
                   flusher,
                   source,
                   diskFileInfo,
-                  storageInfoType,
+                  diskFileInfo.getStorageType,
                   partitionDataWriterContext,
                   storageManager)
               }
diff --git 
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase2.scala
 
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase2.scala
index 28f6e87e0..7a7ab956d 100644
--- 
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase2.scala
+++ 
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase2.scala
@@ -56,7 +56,7 @@ class StoragePolicyCase2 extends CelebornFunSuite {
   
when(mockedStorageManager.storageBufferAllocator).thenAnswer(UnpooledByteBufAllocator.DEFAULT)
 
   val mockedDiskFile = mock[DiskFileInfo]
-  val mockedFlusher = mock[Flusher]
+  val mockedFlusher = mock[LocalFlusher]
   val mockedFile = mock[File]
   when(
     mockedStorageManager.createDiskFile(
@@ -101,6 +101,7 @@ class StoragePolicyCase2 extends CelebornFunSuite {
     
when(mockedPartitionWriterContext.getPartitionLocation).thenAnswer(localHintPartitionLocatioin)
     
when(mockedPartitionWriterContext.getPartitionType).thenAnswer(PartitionType.REDUCE)
     when(mockedStorageManager.localOrDfsStorageAvailable).thenAnswer(true)
+    when(mockedDiskFile.getStorageType).thenAnswer(StorageInfo.Type.HDD)
     val conf = new CelebornConf()
     conf.set("celeborn.worker.storage.storagePolicy.createFilePolicy", 
"SSD,HDD,HDFS,OSS,S3")
     val storagePolicy = new StoragePolicy(conf, mockedStorageManager, 
mockedSource)
diff --git 
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase3.scala
 
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase3.scala
index cb21b7128..909703700 100644
--- 
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase3.scala
+++ 
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase3.scala
@@ -56,7 +56,7 @@ class StoragePolicyCase3 extends CelebornFunSuite {
   
when(mockedStorageManager.storageBufferAllocator).thenAnswer(UnpooledByteBufAllocator.DEFAULT)
 
   val mockedDiskFile = mock[DiskFileInfo]
-  val mockedFlusher = mock[Flusher]
+  val mockedFlusher = mock[LocalFlusher]
   val mockedFile = mock[File]
   when(
     mockedStorageManager.createDiskFile(
@@ -101,6 +101,7 @@ class StoragePolicyCase3 extends CelebornFunSuite {
     
when(mockedPartitionWriterContext.getPartitionLocation).thenAnswer(localHintPartitionLocatioin)
     
when(mockedPartitionWriterContext.getPartitionType).thenAnswer(PartitionType.REDUCE)
     when(mockedStorageManager.localOrDfsStorageAvailable).thenAnswer(true)
+    when(mockedDiskFile.getStorageType).thenAnswer(StorageInfo.Type.SSD)
     val mockedMemoryFile = mock[LocalTierWriter]
     val conf = new CelebornConf()
     val flushLock = new AnyRef
@@ -120,6 +121,7 @@ class StoragePolicyCase3 extends CelebornFunSuite {
     
when(mockedPartitionWriterContext.getPartitionLocation).thenAnswer(memoryHintPartitionLocation)
     
when(mockedPartitionWriterContext.getPartitionType).thenAnswer(PartitionType.REDUCE)
     when(mockedStorageManager.localOrDfsStorageAvailable).thenAnswer(true)
+    when(mockedDiskFile.getStorageType).thenAnswer(StorageInfo.Type.HDD)
     val mockedMemoryFile = mock[LocalTierWriter]
     val conf = new CelebornConf()
     val flushLock = new AnyRef
diff --git 
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase4.scala
 
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase4.scala
index e27def478..e25837d2b 100644
--- 
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase4.scala
+++ 
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase4.scala
@@ -56,7 +56,7 @@ class StoragePolicyCase4 extends CelebornFunSuite {
   
when(mockedStorageManager.storageBufferAllocator).thenAnswer(UnpooledByteBufAllocator.DEFAULT)
 
   val mockedDiskFile = mock[DiskFileInfo]
-  val mockedFlusher = mock[Flusher]
+  val mockedFlusher = mock[LocalFlusher]
   val mockedFile = mock[File]
   when(
     mockedStorageManager.createDiskFile(
@@ -102,6 +102,7 @@ class StoragePolicyCase4 extends CelebornFunSuite {
       memoryDisabledHintPartitionLocation)
     
when(mockedPartitionWriterContext.getPartitionType).thenAnswer(PartitionType.REDUCE)
     when(mockedStorageManager.localOrDfsStorageAvailable).thenAnswer(true)
+    when(mockedDiskFile.getStorageType).thenAnswer(StorageInfo.Type.SSD)
     val conf = new CelebornConf()
     conf.set("celeborn.worker.storage.storagePolicy.createFilePolicy", 
"MEMORY,SSD,HDD,HDFS,OSS,S3")
     val storagePolicy = new StoragePolicy(conf, mockedStorageManager, 
mockedSource)

Reply via email to