This is an automated email from the ASF dual-hosted git repository.

ethanfeng pushed a commit to branch b2043
in repository https://gitbox.apache.org/repos/asf/celeborn.git

commit d4c7d8f8cb222e8a2f3982ae2d019f3301a2803d
Author: mingji <[email protected]>
AuthorDate: Mon Jun 23 13:59:51 2025 +0800

    [CELEBORN-2043] Fix IndexOutOfBoundsException exception in 
getEvictedFileWriter
---
 .../service/deploy/worker/storage/StoragePolicy.scala | 16 ++++++++++++----
 .../memory/MemoryReducePartitionDataWriterSuiteJ.java |  7 ++++++-
 .../storage/storagePolicy/StoragePolicyCase3.scala    | 19 +++++++++++++++++++
 3 files changed, 37 insertions(+), 5 deletions(-)

diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StoragePolicy.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StoragePolicy.scala
index 9fc2e723a..b5166d009 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StoragePolicy.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StoragePolicy.scala
@@ -47,7 +47,8 @@ class StoragePolicy(conf: CelebornConf, storageManager: 
StorageManager, source:
           partitionType,
           numPendingWrites,
           notifier,
-          orderList)
+          orderList,
+          true)
       }
     }
     logError(s"Create evict file failed for 
${partitionDataWriterContext.getPartitionLocation}")
@@ -72,7 +73,8 @@ class StoragePolicy(conf: CelebornConf, storageManager: 
StorageManager, source:
       partitionType: PartitionType,
       numPendingWrites: AtomicInteger,
       notifier: FlushNotifier,
-      order: Option[List[String]] = createFileOrder): TierWriterBase = {
+      order: Option[List[String]] = createFileOrder,
+      evict: Boolean = false): TierWriterBase = {
     logDebug(
       s"create file for ${partitionDataWriterContext.getShuffleKey} 
${partitionDataWriterContext.getPartitionLocation.getFileName}")
     val location = partitionDataWriterContext.getPartitionLocation
@@ -179,8 +181,14 @@ class StoragePolicy(conf: CelebornConf, storageManager: 
StorageManager, source:
       throw new CelebornIOException("Create file order can not be empty, check 
your configs")
     }
 
-    val tryCreateFileTypeIndex = order.get.indexOf(
-      
partitionDataWriterContext.getPartitionLocation.getStorageInfo.getType.name())
+    val tryCreateFileTypeIndex =
+      if (evict) {
+        0
+      } else {
+        order.get.indexOf(
+          
partitionDataWriterContext.getPartitionLocation.getStorageInfo.getType.name())
+      }
+
     val maxSize = order.get.length
     for (i <- tryCreateFileTypeIndex until maxSize) {
       val storageStr = order.get(i)
diff --git 
a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/memory/MemoryReducePartitionDataWriterSuiteJ.java
 
b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/memory/MemoryReducePartitionDataWriterSuiteJ.java
index a9ae805b2..8da338bf3 100644
--- 
a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/memory/MemoryReducePartitionDataWriterSuiteJ.java
+++ 
b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/memory/MemoryReducePartitionDataWriterSuiteJ.java
@@ -123,7 +123,12 @@ public class MemoryReducePartitionDataWriterSuiteJ {
                     storageManager))
         .when(storagePolicy)
         .createFileWriter(
-            Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), 
Mockito.any());
+            Mockito.any(),
+            Mockito.any(),
+            Mockito.any(),
+            Mockito.any(),
+            Mockito.any(),
+            Mockito.anyBoolean());
 
     return storageManager;
   }
diff --git 
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase3.scala
 
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase3.scala
index 7a7718e87..c307adc37 100644
--- 
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase3.scala
+++ 
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase3.scala
@@ -116,4 +116,23 @@ class StoragePolicyCase3 extends CelebornFunSuite {
     assert(nFile.isInstanceOf[LocalTierWriter])
   }
 
+  test("test evict file case2") {
+    
when(mockedPartitionWriterContext.getPartitionLocation).thenAnswer(memoryHintPartitionLocation)
+    when(mockedStorageManager.localOrDfsStorageAvailable).thenAnswer(true)
+    val mockedMemoryFile = mock[LocalTierWriter]
+    val conf = new CelebornConf()
+    val flushLock = new AnyRef
+    val storagePolicy = new StoragePolicy(conf, mockedStorageManager, 
mockedSource)
+    val pendingWriters = new AtomicInteger()
+    val notifier = new FlushNotifier
+    when(mockedMemoryFile.storageType).thenAnswer(StorageInfo.Type.MEMORY)
+    val nFile = storagePolicy.getEvictedFileWriter(
+      mockedMemoryFile,
+      mockedPartitionWriterContext,
+      PartitionType.REDUCE,
+      pendingWriters,
+      notifier)
+    assert(nFile.isInstanceOf[LocalTierWriter])
+  }
+
 }

Reply via email to