This is an automated email from the ASF dual-hosted git repository.
feiwang pushed a commit to branch branch-0.6
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/branch-0.6 by this push:
new e89be8f6c [CELEBORN-2043] Fix IndexOutOfBoundsException exception in
getEvictedFileWriter
e89be8f6c is described below
commit e89be8f6c03d28f8a2a4fe9fd318c94f94a0fab5
Author: mingji <[email protected]>
AuthorDate: Mon Jun 23 00:49:26 2025 -0700
[CELEBORN-2043] Fix IndexOutOfBoundsException exception in
getEvictedFileWriter
### What changes were proposed in this pull request?
If the create file order list doesn't have the corresponding storage types,
use 0 as the index.
### Why are the changes needed?
This is needed in two scenarios:
1. For existing clients, the change partition will select MEMORY as the
default storage tier, which will cause the revived partition to utilize memory
storage even if the application is not configured to do so. This is the cause
of [CELEBORN-2043], and fixed by
[CELEBORN-2021](https://github.com/apache/celeborn/pull/3302).
2. Evicted files will need to exclude itself storage type in the create
file order list. which means that the storage type does not exist in the create
file order list.
### Does this PR introduce _any_ user-facing change?
NO.
### How was this patch tested?
GA and cluster.
Closes #3344 from FMX/b2043.
Authored-by: mingji <[email protected]>
Signed-off-by: Wang, Fei <[email protected]>
(cherry picked from commit 676beca616c8b5cb0e8ba72450d92fa9bc9cca42)
Signed-off-by: Wang, Fei <[email protected]>
---
docs/migration.md | 6 ++++++
.../service/deploy/worker/storage/StoragePolicy.scala | 16 ++++++++++++----
.../memory/MemoryReducePartitionDataWriterSuiteJ.java | 7 ++++++-
.../storage/storagePolicy/StoragePolicyCase3.scala | 19 +++++++++++++++++++
4 files changed, 43 insertions(+), 5 deletions(-)
diff --git a/docs/migration.md b/docs/migration.md
index a88904701..85121b581 100644
--- a/docs/migration.md
+++ b/docs/migration.md
@@ -97,6 +97,12 @@ license: |
- Since 0.6.0, the out-of-dated Flink 1.14 and Flink 1.15 have been removed
from the official support list.
+- Since 0.6.0, the client respects the spark.celeborn.storage.availableTypes
configuration,
+ ensuring revived partition locations no longer default to memory storage.
In contrast, clients prior
+ to 0.6.0 default to memory storage for revived partitions. This means that
if memory storage is enabled in
+ worker nodes, pre-0.6.0 clients may inadvertently utilize memory storage
for an application even when memory
+ storage is not enabled for that app.
+
## Upgrading from 0.5.0 to 0.5.1
- Since 0.5.1, Celeborn master REST API `/exclude` request uses media type
`application/x-www-form-urlencoded` instead of `text/plain`.
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StoragePolicy.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StoragePolicy.scala
index 9fc2e723a..b5166d009 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StoragePolicy.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StoragePolicy.scala
@@ -47,7 +47,8 @@ class StoragePolicy(conf: CelebornConf, storageManager:
StorageManager, source:
partitionType,
numPendingWrites,
notifier,
- orderList)
+ orderList,
+ true)
}
}
logError(s"Create evict file failed for
${partitionDataWriterContext.getPartitionLocation}")
@@ -72,7 +73,8 @@ class StoragePolicy(conf: CelebornConf, storageManager:
StorageManager, source:
partitionType: PartitionType,
numPendingWrites: AtomicInteger,
notifier: FlushNotifier,
- order: Option[List[String]] = createFileOrder): TierWriterBase = {
+ order: Option[List[String]] = createFileOrder,
+ evict: Boolean = false): TierWriterBase = {
logDebug(
s"create file for ${partitionDataWriterContext.getShuffleKey}
${partitionDataWriterContext.getPartitionLocation.getFileName}")
val location = partitionDataWriterContext.getPartitionLocation
@@ -179,8 +181,14 @@ class StoragePolicy(conf: CelebornConf, storageManager:
StorageManager, source:
throw new CelebornIOException("Create file order can not be empty, check
your configs")
}
- val tryCreateFileTypeIndex = order.get.indexOf(
-
partitionDataWriterContext.getPartitionLocation.getStorageInfo.getType.name())
+ val tryCreateFileTypeIndex =
+ if (evict) {
+ 0
+ } else {
+ order.get.indexOf(
+
partitionDataWriterContext.getPartitionLocation.getStorageInfo.getType.name())
+ }
+
val maxSize = order.get.length
for (i <- tryCreateFileTypeIndex until maxSize) {
val storageStr = order.get(i)
diff --git
a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/memory/MemoryReducePartitionDataWriterSuiteJ.java
b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/memory/MemoryReducePartitionDataWriterSuiteJ.java
index a9ae805b2..8da338bf3 100644
---
a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/memory/MemoryReducePartitionDataWriterSuiteJ.java
+++
b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/memory/MemoryReducePartitionDataWriterSuiteJ.java
@@ -123,7 +123,12 @@ public class MemoryReducePartitionDataWriterSuiteJ {
storageManager))
.when(storagePolicy)
.createFileWriter(
- Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(),
Mockito.any());
+ Mockito.any(),
+ Mockito.any(),
+ Mockito.any(),
+ Mockito.any(),
+ Mockito.any(),
+ Mockito.anyBoolean());
return storageManager;
}
diff --git
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase3.scala
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase3.scala
index 7a7718e87..c307adc37 100644
---
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase3.scala
+++
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase3.scala
@@ -116,4 +116,23 @@ class StoragePolicyCase3 extends CelebornFunSuite {
assert(nFile.isInstanceOf[LocalTierWriter])
}
+ test("test evict file case2") {
+
when(mockedPartitionWriterContext.getPartitionLocation).thenAnswer(memoryHintPartitionLocation)
+ when(mockedStorageManager.localOrDfsStorageAvailable).thenAnswer(true)
+ val mockedMemoryFile = mock[LocalTierWriter]
+ val conf = new CelebornConf()
+ val flushLock = new AnyRef
+ val storagePolicy = new StoragePolicy(conf, mockedStorageManager,
mockedSource)
+ val pendingWriters = new AtomicInteger()
+ val notifier = new FlushNotifier
+ when(mockedMemoryFile.storageType).thenAnswer(StorageInfo.Type.MEMORY)
+ val nFile = storagePolicy.getEvictedFileWriter(
+ mockedMemoryFile,
+ mockedPartitionWriterContext,
+ PartitionType.REDUCE,
+ pendingWriters,
+ notifier)
+ assert(nFile.isInstanceOf[LocalTierWriter])
+ }
+
}