This is an automated email from the ASF dual-hosted git repository.

ethanfeng pushed a commit to branch b2043
in repository https://gitbox.apache.org/repos/asf/celeborn.git

commit c98c0c751062b7775fe2c3ed45364c32eb850a08
Author: mingji <[email protected]>
AuthorDate: Mon Jun 23 13:59:51 2025 +0800

    [CELEBORN-2043] Fix IndexOutOfBoundsException exception in 
getEvictedFileWriter
---
 .../service/deploy/worker/storage/StoragePolicy.scala |  9 ++++++++-
 .../storage/storagePolicy/StoragePolicyCase3.scala    | 19 +++++++++++++++++++
 2 files changed, 27 insertions(+), 1 deletion(-)

diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StoragePolicy.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StoragePolicy.scala
index 9fc2e723a..9cf638520 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StoragePolicy.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StoragePolicy.scala
@@ -179,8 +179,15 @@ class StoragePolicy(conf: CelebornConf, storageManager: 
StorageManager, source:
       throw new CelebornIOException("Create file order can not be empty, check 
your configs")
     }
 
-    val tryCreateFileTypeIndex = order.get.indexOf(
+    var tryCreateFileTypeIndex = order.get.indexOf(
       
partitionDataWriterContext.getPartitionLocation.getStorageInfo.getType.name())
+    if (tryCreateFileTypeIndex == -1) {
+      // if the storage type is not in the order, use the first one there are 
two scenarios
+      // 1. old client <= 0.5 will select MEMORY as default storage
+      // 2. create file order for evicted files will not include itself's 
storage type
+      tryCreateFileTypeIndex = 0
+    }
+
     val maxSize = order.get.length
     for (i <- tryCreateFileTypeIndex until maxSize) {
       val storageStr = order.get(i)
diff --git 
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase3.scala
 
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase3.scala
index 7a7718e87..c307adc37 100644
--- 
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase3.scala
+++ 
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase3.scala
@@ -116,4 +116,23 @@ class StoragePolicyCase3 extends CelebornFunSuite {
     assert(nFile.isInstanceOf[LocalTierWriter])
   }
 
+  test("test evict file case2") {
+    
when(mockedPartitionWriterContext.getPartitionLocation).thenAnswer(memoryHintPartitionLocation)
+    when(mockedStorageManager.localOrDfsStorageAvailable).thenAnswer(true)
+    val mockedMemoryFile = mock[LocalTierWriter]
+    val conf = new CelebornConf()
+    val flushLock = new AnyRef
+    val storagePolicy = new StoragePolicy(conf, mockedStorageManager, 
mockedSource)
+    val pendingWriters = new AtomicInteger()
+    val notifier = new FlushNotifier
+    when(mockedMemoryFile.storageType).thenAnswer(StorageInfo.Type.MEMORY)
+    val nFile = storagePolicy.getEvictedFileWriter(
+      mockedMemoryFile,
+      mockedPartitionWriterContext,
+      PartitionType.REDUCE,
+      pendingWriters,
+      notifier)
+    assert(nFile.isInstanceOf[LocalTierWriter])
+  }
+
 }

Reply via email to