This is an automated email from the ASF dual-hosted git repository.

feiwang pushed a commit to branch branch-0.6
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/branch-0.6 by this push:
     new e89be8f6c [CELEBORN-2043] Fix IndexOutOfBoundsException exception in 
getEvictedFileWriter
e89be8f6c is described below

commit e89be8f6c03d28f8a2a4fe9fd318c94f94a0fab5
Author: mingji <[email protected]>
AuthorDate: Mon Jun 23 00:49:26 2025 -0700

    [CELEBORN-2043] Fix IndexOutOfBoundsException exception in 
getEvictedFileWriter
    
    ### What changes were proposed in this pull request?
    If the create file order list doesn't have the corresponding storage types, 
use 0 as the index.
    
    ### Why are the changes needed?
    This is needed in two scenarios:
    1. For existing clients, the change partition will select MEMORY as the 
default storage tier, which will cause the revived partition to utilize memory 
storage even if the application is not configured to do so. This is the cause 
of [CELEBORN-2043], and fixed by 
[CELEBORN-2021](https://github.com/apache/celeborn/pull/3302).
    2. Evicted files will need to exclude itself storage type in the create 
file order list. which means that the storage type does not exist in the create 
file order list.
    
    ### Does this PR introduce _any_ user-facing change?
    NO.
    
    ### How was this patch tested?
    GA and cluster.
    
    Closes #3344 from FMX/b2043.
    
    Authored-by: mingji <[email protected]>
    Signed-off-by: Wang, Fei <[email protected]>
    (cherry picked from commit 676beca616c8b5cb0e8ba72450d92fa9bc9cca42)
    Signed-off-by: Wang, Fei <[email protected]>
---
 docs/migration.md                                     |  6 ++++++
 .../service/deploy/worker/storage/StoragePolicy.scala | 16 ++++++++++++----
 .../memory/MemoryReducePartitionDataWriterSuiteJ.java |  7 ++++++-
 .../storage/storagePolicy/StoragePolicyCase3.scala    | 19 +++++++++++++++++++
 4 files changed, 43 insertions(+), 5 deletions(-)

diff --git a/docs/migration.md b/docs/migration.md
index a88904701..85121b581 100644
--- a/docs/migration.md
+++ b/docs/migration.md
@@ -97,6 +97,12 @@ license: |
 
 - Since 0.6.0, the out-of-dated Flink 1.14 and Flink 1.15 have been removed 
from the official support list.
 
+- Since 0.6.0, the client respects the spark.celeborn.storage.availableTypes 
configuration, 
+    ensuring revived partition locations no longer default to memory storage. 
In contrast, clients prior 
+    to 0.6.0 default to memory storage for revived partitions. This means that 
if memory storage is enabled in 
+    worker nodes, pre-0.6.0 clients may inadvertently utilize memory storage 
for an application even when memory 
+    storage is not enabled for that app.
+
 ## Upgrading from 0.5.0 to 0.5.1
 
 - Since 0.5.1, Celeborn master REST API `/exclude` request uses media type 
`application/x-www-form-urlencoded` instead of `text/plain`.
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StoragePolicy.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StoragePolicy.scala
index 9fc2e723a..b5166d009 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StoragePolicy.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StoragePolicy.scala
@@ -47,7 +47,8 @@ class StoragePolicy(conf: CelebornConf, storageManager: 
StorageManager, source:
           partitionType,
           numPendingWrites,
           notifier,
-          orderList)
+          orderList,
+          true)
       }
     }
     logError(s"Create evict file failed for 
${partitionDataWriterContext.getPartitionLocation}")
@@ -72,7 +73,8 @@ class StoragePolicy(conf: CelebornConf, storageManager: 
StorageManager, source:
       partitionType: PartitionType,
       numPendingWrites: AtomicInteger,
       notifier: FlushNotifier,
-      order: Option[List[String]] = createFileOrder): TierWriterBase = {
+      order: Option[List[String]] = createFileOrder,
+      evict: Boolean = false): TierWriterBase = {
     logDebug(
       s"create file for ${partitionDataWriterContext.getShuffleKey} 
${partitionDataWriterContext.getPartitionLocation.getFileName}")
     val location = partitionDataWriterContext.getPartitionLocation
@@ -179,8 +181,14 @@ class StoragePolicy(conf: CelebornConf, storageManager: 
StorageManager, source:
       throw new CelebornIOException("Create file order can not be empty, check 
your configs")
     }
 
-    val tryCreateFileTypeIndex = order.get.indexOf(
-      
partitionDataWriterContext.getPartitionLocation.getStorageInfo.getType.name())
+    val tryCreateFileTypeIndex =
+      if (evict) {
+        0
+      } else {
+        order.get.indexOf(
+          
partitionDataWriterContext.getPartitionLocation.getStorageInfo.getType.name())
+      }
+
     val maxSize = order.get.length
     for (i <- tryCreateFileTypeIndex until maxSize) {
       val storageStr = order.get(i)
diff --git 
a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/memory/MemoryReducePartitionDataWriterSuiteJ.java
 
b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/memory/MemoryReducePartitionDataWriterSuiteJ.java
index a9ae805b2..8da338bf3 100644
--- 
a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/memory/MemoryReducePartitionDataWriterSuiteJ.java
+++ 
b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/memory/MemoryReducePartitionDataWriterSuiteJ.java
@@ -123,7 +123,12 @@ public class MemoryReducePartitionDataWriterSuiteJ {
                     storageManager))
         .when(storagePolicy)
         .createFileWriter(
-            Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), 
Mockito.any());
+            Mockito.any(),
+            Mockito.any(),
+            Mockito.any(),
+            Mockito.any(),
+            Mockito.any(),
+            Mockito.anyBoolean());
 
     return storageManager;
   }
diff --git 
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase3.scala
 
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase3.scala
index 7a7718e87..c307adc37 100644
--- 
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase3.scala
+++ 
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase3.scala
@@ -116,4 +116,23 @@ class StoragePolicyCase3 extends CelebornFunSuite {
     assert(nFile.isInstanceOf[LocalTierWriter])
   }
 
+  test("test evict file case2") {
+    
when(mockedPartitionWriterContext.getPartitionLocation).thenAnswer(memoryHintPartitionLocation)
+    when(mockedStorageManager.localOrDfsStorageAvailable).thenAnswer(true)
+    val mockedMemoryFile = mock[LocalTierWriter]
+    val conf = new CelebornConf()
+    val flushLock = new AnyRef
+    val storagePolicy = new StoragePolicy(conf, mockedStorageManager, 
mockedSource)
+    val pendingWriters = new AtomicInteger()
+    val notifier = new FlushNotifier
+    when(mockedMemoryFile.storageType).thenAnswer(StorageInfo.Type.MEMORY)
+    val nFile = storagePolicy.getEvictedFileWriter(
+      mockedMemoryFile,
+      mockedPartitionWriterContext,
+      PartitionType.REDUCE,
+      pendingWriters,
+      notifier)
+    assert(nFile.isInstanceOf[LocalTierWriter])
+  }
+
 }

Reply via email to