This is an automated email from the ASF dual-hosted git repository. ethanfeng pushed a commit to branch b2043 in repository https://gitbox.apache.org/repos/asf/celeborn.git
commit c98c0c751062b7775fe2c3ed45364c32eb850a08 Author: mingji <[email protected]> AuthorDate: Mon Jun 23 13:59:51 2025 +0800 [CELEBORN-2043] Fix IndexOutOfBoundsException exception in getEvictedFileWriter --- .../service/deploy/worker/storage/StoragePolicy.scala | 9 ++++++++- .../storage/storagePolicy/StoragePolicyCase3.scala | 19 +++++++++++++++++++ 2 files changed, 27 insertions(+), 1 deletion(-) diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StoragePolicy.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StoragePolicy.scala index 9fc2e723a..9cf638520 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StoragePolicy.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StoragePolicy.scala @@ -179,8 +179,15 @@ class StoragePolicy(conf: CelebornConf, storageManager: StorageManager, source: throw new CelebornIOException("Create file order can not be empty, check your configs") } - val tryCreateFileTypeIndex = order.get.indexOf( + var tryCreateFileTypeIndex = order.get.indexOf( partitionDataWriterContext.getPartitionLocation.getStorageInfo.getType.name()) + if (tryCreateFileTypeIndex == -1) { + // if the storage type is not in the order, use the first one there are two scenarios + // 1. old client <= 0.5 will select MEMORY as default storage + // 2. create file order for evicted files will not include itself's storage type + tryCreateFileTypeIndex = 0 + } + val maxSize = order.get.length for (i <- tryCreateFileTypeIndex until maxSize) { val storageStr = order.get(i) diff --git a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase3.scala b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase3.scala index 7a7718e87..c307adc37 100644 --- a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase3.scala +++ b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase3.scala @@ -116,4 +116,23 @@ class StoragePolicyCase3 extends CelebornFunSuite { assert(nFile.isInstanceOf[LocalTierWriter]) } + test("test evict file case2") { + when(mockedPartitionWriterContext.getPartitionLocation).thenAnswer(memoryHintPartitionLocation) + when(mockedStorageManager.localOrDfsStorageAvailable).thenAnswer(true) + val mockedMemoryFile = mock[LocalTierWriter] + val conf = new CelebornConf() + val flushLock = new AnyRef + val storagePolicy = new StoragePolicy(conf, mockedStorageManager, mockedSource) + val pendingWriters = new AtomicInteger() + val notifier = new FlushNotifier + when(mockedMemoryFile.storageType).thenAnswer(StorageInfo.Type.MEMORY) + val nFile = storagePolicy.getEvictedFileWriter( + mockedMemoryFile, + mockedPartitionWriterContext, + PartitionType.REDUCE, + pendingWriters, + notifier) + assert(nFile.isInstanceOf[LocalTierWriter]) + } + }
