This is an automated email from the ASF dual-hosted git repository. ethanfeng pushed a commit to branch b2043 in repository https://gitbox.apache.org/repos/asf/celeborn.git
commit d4c7d8f8cb222e8a2f3982ae2d019f3301a2803d Author: mingji <[email protected]> AuthorDate: Mon Jun 23 13:59:51 2025 +0800 [CELEBORN-2043] Fix IndexOutOfBoundsException exception in getEvictedFileWriter --- .../service/deploy/worker/storage/StoragePolicy.scala | 16 ++++++++++++---- .../memory/MemoryReducePartitionDataWriterSuiteJ.java | 7 ++++++- .../storage/storagePolicy/StoragePolicyCase3.scala | 19 +++++++++++++++++++ 3 files changed, 37 insertions(+), 5 deletions(-) diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StoragePolicy.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StoragePolicy.scala index 9fc2e723a..b5166d009 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StoragePolicy.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StoragePolicy.scala @@ -47,7 +47,8 @@ class StoragePolicy(conf: CelebornConf, storageManager: StorageManager, source: partitionType, numPendingWrites, notifier, - orderList) + orderList, + true) } } logError(s"Create evict file failed for ${partitionDataWriterContext.getPartitionLocation}") @@ -72,7 +73,8 @@ class StoragePolicy(conf: CelebornConf, storageManager: StorageManager, source: partitionType: PartitionType, numPendingWrites: AtomicInteger, notifier: FlushNotifier, - order: Option[List[String]] = createFileOrder): TierWriterBase = { + order: Option[List[String]] = createFileOrder, + evict: Boolean = false): TierWriterBase = { logDebug( s"create file for ${partitionDataWriterContext.getShuffleKey} ${partitionDataWriterContext.getPartitionLocation.getFileName}") val location = partitionDataWriterContext.getPartitionLocation @@ -179,8 +181,14 @@ class StoragePolicy(conf: CelebornConf, storageManager: StorageManager, source: throw new CelebornIOException("Create file order can not be empty, check your configs") } - val tryCreateFileTypeIndex = order.get.indexOf( - partitionDataWriterContext.getPartitionLocation.getStorageInfo.getType.name()) + val tryCreateFileTypeIndex = + if (evict) { + 0 + } else { + order.get.indexOf( + partitionDataWriterContext.getPartitionLocation.getStorageInfo.getType.name()) + } + val maxSize = order.get.length for (i <- tryCreateFileTypeIndex until maxSize) { val storageStr = order.get(i) diff --git a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/memory/MemoryReducePartitionDataWriterSuiteJ.java b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/memory/MemoryReducePartitionDataWriterSuiteJ.java index a9ae805b2..8da338bf3 100644 --- a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/memory/MemoryReducePartitionDataWriterSuiteJ.java +++ b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/memory/MemoryReducePartitionDataWriterSuiteJ.java @@ -123,7 +123,12 @@ public class MemoryReducePartitionDataWriterSuiteJ { storageManager)) .when(storagePolicy) .createFileWriter( - Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any()); + Mockito.any(), + Mockito.any(), + Mockito.any(), + Mockito.any(), + Mockito.any(), + Mockito.anyBoolean()); return storageManager; } diff --git a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase3.scala b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase3.scala index 7a7718e87..c307adc37 100644 --- a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase3.scala +++ b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase3.scala @@ -116,4 +116,23 @@ class StoragePolicyCase3 extends CelebornFunSuite { assert(nFile.isInstanceOf[LocalTierWriter]) } + test("test evict file case2") { + when(mockedPartitionWriterContext.getPartitionLocation).thenAnswer(memoryHintPartitionLocation) + when(mockedStorageManager.localOrDfsStorageAvailable).thenAnswer(true) + val mockedMemoryFile = mock[LocalTierWriter] + val conf = new CelebornConf() + val flushLock = new AnyRef + val storagePolicy = new StoragePolicy(conf, mockedStorageManager, mockedSource) + val pendingWriters = new AtomicInteger() + val notifier = new FlushNotifier + when(mockedMemoryFile.storageType).thenAnswer(StorageInfo.Type.MEMORY) + val nFile = storagePolicy.getEvictedFileWriter( + mockedMemoryFile, + mockedPartitionWriterContext, + PartitionType.REDUCE, + pendingWriters, + notifier) + assert(nFile.isInstanceOf[LocalTierWriter]) + } + }
