This is an automated email from the ASF dual-hosted git repository.
nicholasjiang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new b537798e3 [CELEBORN-2108] Remove redundant PartitionType
b537798e3 is described below
commit b537798e37be1e5d7e905af6c7a2df905f1b0da5
Author: xxx <[email protected]>
AuthorDate: Tue Aug 19 16:38:19 2025 +0800
[CELEBORN-2108] Remove redundant PartitionType
### What changes were proposed in this pull request?
Remove redundant `PartitionType`.
### Why are the changes needed?
`PartitionType` is included in `PartitionDataWriterContext`, therefore it
is not necessary to use `PartitionType` as method parameter.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
CI.
Closes #3422 from xy2953396112/remove_useless_partition_type.
Authored-by: xxx <[email protected]>
Signed-off-by: SteNicholas <[email protected]>
---
.../deploy/worker/storage/PartitionDataWriter.java | 13 ++----
.../deploy/worker/storage/StorageManager.scala | 3 +-
.../deploy/worker/storage/StoragePolicy.scala | 7 +---
.../storage/PartitionDataWriterSuiteUtils.java | 9 ++--
.../local/DiskMapPartitionDataWriterSuiteJ.java | 3 +-
.../local/DiskReducePartitionDataWriterSuiteJ.java | 39 ++++++-----------
.../MemoryReducePartitionDataWriterSuiteJ.java | 49 +++++++---------------
.../storage/storagePolicy/StoragePolicyCase1.scala | 2 +-
.../storage/storagePolicy/StoragePolicyCase2.scala | 2 +-
.../storage/storagePolicy/StoragePolicyCase3.scala | 4 +-
.../storage/storagePolicy/StoragePolicyCase4.scala | 2 +-
11 files changed, 43 insertions(+), 90 deletions(-)
diff --git
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java
index e0f16c01a..35dfdc331 100644
---
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java
+++
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java
@@ -34,7 +34,6 @@ import org.apache.celeborn.common.CelebornConf;
import org.apache.celeborn.common.meta.*;
import org.apache.celeborn.common.metrics.source.AbstractSource;
import org.apache.celeborn.common.protocol.PartitionSplitMode;
-import org.apache.celeborn.common.protocol.PartitionType;
import org.apache.celeborn.common.protocol.StorageInfo;
import org.apache.celeborn.service.deploy.worker.WorkerSource;
import
org.apache.celeborn.service.deploy.worker.congestcontrol.CongestionController;
@@ -54,7 +53,6 @@ public class PartitionDataWriter implements DeviceObserver {
private final AtomicInteger numPendingWrites = new AtomicInteger(0);
private final PartitionDataWriterContext writerContext;
protected final AbstractSource source; // metrics
- private final PartitionType partitionType;
private final String writerString;
private final StorageManager storageManager;
private final FlushNotifier notifier = new FlushNotifier();
@@ -67,8 +65,7 @@ public class PartitionDataWriter implements DeviceObserver {
AbstractSource workerSource,
CelebornConf conf,
DeviceMonitor deviceMonitor,
- PartitionDataWriterContext writerContext,
- PartitionType partitionType) {
+ PartitionDataWriterContext writerContext) {
memoryFileStorageMaxFileSize = conf.workerMemoryFileStorageMaxFileSize();
this.writerContext = writerContext;
this.source = workerSource;
@@ -90,11 +87,8 @@ public class PartitionDataWriter implements DeviceObserver {
writerContext.setPartitionDataWriter(this);
writerContext.setDeviceMonitor(deviceMonitor);
- this.partitionType = partitionType;
currentTierWriter =
- storageManager
- .storagePolicy()
- .createFileWriter(writerContext, partitionType, numPendingWrites,
notifier);
+ storageManager.storagePolicy().createFileWriter(writerContext,
numPendingWrites, notifier);
}
public DiskFileInfo getDiskFileInfo() {
@@ -198,8 +192,7 @@ public class PartitionDataWriter implements DeviceObserver {
TierWriterBase newTierWriter =
storageManager
.storagePolicy()
- .getEvictedFileWriter(
- currentTierWriter, writerContext, partitionType,
numPendingWrites, notifier);
+ .getEvictedFileWriter(currentTierWriter, writerContext,
numPendingWrites, notifier);
currentTierWriter.evict(newTierWriter);
currentTierWriter = newTierWriter;
}
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
index 9353ee23a..c9b24f389 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
@@ -470,8 +470,7 @@ final private[worker] class StorageManager(conf:
CelebornConf, workerSource: Abs
workerSource,
conf,
deviceMonitor,
- partitionDataWriterContext,
- partitionType)
+ partitionDataWriterContext)
} catch {
case e: Exception =>
logError("Create partition data writer failed", e)
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StoragePolicy.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StoragePolicy.scala
index 798ad165e..a475d12f2 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StoragePolicy.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StoragePolicy.scala
@@ -36,7 +36,6 @@ class StoragePolicy(conf: CelebornConf, storageManager:
StorageManager, source:
def getEvictedFileWriter(
celebornFile: TierWriterBase,
partitionDataWriterContext: PartitionDataWriterContext,
- partitionType: PartitionType,
numPendingWrites: AtomicInteger,
notifier: FlushNotifier): TierWriterBase = {
evictFileOrder.foreach { order =>
@@ -44,7 +43,6 @@ class StoragePolicy(conf: CelebornConf, storageManager:
StorageManager, source:
if (orderList != null) {
return createFileWriter(
partitionDataWriterContext,
- partitionType,
numPendingWrites,
notifier,
orderList,
@@ -57,12 +55,10 @@ class StoragePolicy(conf: CelebornConf, storageManager:
StorageManager, source:
def createFileWriter(
partitionDataWriterContext: PartitionDataWriterContext,
- partitionType: PartitionType,
numPendingWrites: AtomicInteger,
notifier: FlushNotifier): TierWriterBase = {
createFileWriter(
partitionDataWriterContext: PartitionDataWriterContext,
- partitionType: PartitionType,
numPendingWrites: AtomicInteger,
notifier: FlushNotifier,
createFileOrder)
@@ -70,7 +66,6 @@ class StoragePolicy(conf: CelebornConf, storageManager:
StorageManager, source:
def createFileWriter(
partitionDataWriterContext: PartitionDataWriterContext,
- partitionType: PartitionType,
numPendingWrites: AtomicInteger,
notifier: FlushNotifier,
order: Option[List[String]] = createFileOrder,
@@ -84,7 +79,7 @@ class StoragePolicy(conf: CelebornConf, storageManager:
StorageManager, source:
}
def getPartitionMetaHandler(fileInfo: FileInfo) = {
- partitionType match {
+ partitionDataWriterContext.getPartitionType match {
case PartitionType.REDUCE =>
new
ReducePartitionMetaHandler(partitionDataWriterContext.isRangeReadFilter,
fileInfo)
case PartitionType.MAP =>
diff --git
a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriterSuiteUtils.java
b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriterSuiteUtils.java
index 9d5c6c26a..471a44865 100644
---
a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriterSuiteUtils.java
+++
b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriterSuiteUtils.java
@@ -103,7 +103,7 @@ public class PartitionDataWriterSuiteUtils {
context,
storageManager))
.when(storagePolicy)
- .createFileWriter(Mockito.any(), Mockito.any(), Mockito.any(),
Mockito.any());
+ .createFileWriter(Mockito.any(), Mockito.any(), Mockito.any());
return storageManager;
}
@@ -148,7 +148,7 @@ public class PartitionDataWriterSuiteUtils {
writerContext,
storageManager))
.when(storagePolicy)
- .createFileWriter(Mockito.any(), Mockito.any(), Mockito.any(),
Mockito.any());
+ .createFileWriter(Mockito.any(), Mockito.any(), Mockito.any());
return storageManager;
}
@@ -223,7 +223,7 @@ public class PartitionDataWriterSuiteUtils {
writerContext,
storageManager))
.when(storagePolicy)
- .createFileWriter(Mockito.any(), Mockito.any(), Mockito.any(),
Mockito.any());
+ .createFileWriter(Mockito.any(), Mockito.any(), Mockito.any());
Mockito.doAnswer(
invocation ->
@@ -240,8 +240,7 @@ public class PartitionDataWriterSuiteUtils {
writerContext,
storageManager))
.when(storagePolicy)
- .getEvictedFileWriter(
- Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(),
Mockito.any());
+ .getEvictedFileWriter(Mockito.any(), Mockito.any(), Mockito.any(),
Mockito.any());
return storageManager;
}
diff --git
a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/local/DiskMapPartitionDataWriterSuiteJ.java
b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/local/DiskMapPartitionDataWriterSuiteJ.java
index 87ed418a3..329d7f90f 100644
---
a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/local/DiskMapPartitionDataWriterSuiteJ.java
+++
b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/local/DiskMapPartitionDataWriterSuiteJ.java
@@ -141,8 +141,7 @@ public class DiskMapPartitionDataWriterSuiteJ {
source,
CONF,
DeviceMonitor$.MODULE$.EmptyMonitor(),
- context,
- PartitionType.MAP);
+ context);
fileWriter.handleEvents(
PbPushDataHandShake.newBuilder().setNumPartitions(2).setBufferSize(32).build());
fileWriter.handleEvents(
diff --git
a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/local/DiskReducePartitionDataWriterSuiteJ.java
b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/local/DiskReducePartitionDataWriterSuiteJ.java
index 1faa04a55..536b4aab3 100644
---
a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/local/DiskReducePartitionDataWriterSuiteJ.java
+++
b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/local/DiskReducePartitionDataWriterSuiteJ.java
@@ -288,8 +288,7 @@ public class DiskReducePartitionDataWriterSuiteJ {
source,
CONF,
DeviceMonitor$.MODULE$.EmptyMonitor(),
- context,
- PartitionType.REDUCE);
+ context);
List<Future<?>> futures = new ArrayList<>();
ExecutorService es = ThreadUtils.newDaemonFixedThreadPool(threadsNum,
"FileWriter-UT-1");
@@ -343,8 +342,7 @@ public class DiskReducePartitionDataWriterSuiteJ {
source,
CONF,
DeviceMonitor$.MODULE$.EmptyMonitor(),
- context,
- PartitionType.REDUCE);
+ context);
List<Future<?>> futures = new ArrayList<>();
ExecutorService es = ThreadUtils.newDaemonFixedThreadPool(threadsNum,
"FileWriter-UT-1");
@@ -399,8 +397,7 @@ public class DiskReducePartitionDataWriterSuiteJ {
source,
CONF,
DeviceMonitor$.MODULE$.EmptyMonitor(),
- context,
- PartitionType.REDUCE);
+ context);
List<Future<?>> futures = new ArrayList<>();
ExecutorService es = ThreadUtils.newDaemonFixedThreadPool(threadsNum,
"FileWriter-UT-2");
@@ -470,8 +467,7 @@ public class DiskReducePartitionDataWriterSuiteJ {
source,
CONF,
DeviceMonitor$.MODULE$.EmptyMonitor(),
- context,
- PartitionType.REDUCE);
+ context);
List<Future<?>> futures = new ArrayList<>();
ExecutorService es = ThreadUtils.newDaemonFixedThreadPool(threadsNum,
"FileWriter-UT-2");
@@ -590,8 +586,7 @@ public class DiskReducePartitionDataWriterSuiteJ {
source,
conf,
DeviceMonitor$.MODULE$.EmptyMonitor(),
- context1,
- PartitionType.REDUCE);
+ context1);
partitionDataWriter.write(generateData(8 * 1024 * 1024));
partitionDataWriter.close();
ReduceFileMeta reduceFileMeta =
@@ -623,8 +618,7 @@ public class DiskReducePartitionDataWriterSuiteJ {
source,
conf,
DeviceMonitor$.MODULE$.EmptyMonitor(),
- context2,
- PartitionType.REDUCE);
+ context2);
for (int i = 0; i < 8; i++) {
partitionDataWriter.write(generateData(128));
}
@@ -656,8 +650,7 @@ public class DiskReducePartitionDataWriterSuiteJ {
source,
conf,
DeviceMonitor$.MODULE$.EmptyMonitor(),
- context3,
- PartitionType.REDUCE);
+ context3);
partitionDataWriter.write(generateData(1020));
partitionDataWriter.write(generateData(3));
partitionDataWriter.close();
@@ -688,8 +681,7 @@ public class DiskReducePartitionDataWriterSuiteJ {
source,
conf,
DeviceMonitor$.MODULE$.EmptyMonitor(),
- context4,
- PartitionType.REDUCE);
+ context4);
for (int i = 0; i < 8; i++) {
partitionDataWriter.write(generateData(128));
}
@@ -722,8 +714,7 @@ public class DiskReducePartitionDataWriterSuiteJ {
source,
conf,
DeviceMonitor$.MODULE$.EmptyMonitor(),
- context5,
- PartitionType.REDUCE);
+ context5);
for (int i = 0; i < 16; i++) {
partitionDataWriter.write(generateData(128));
}
@@ -755,8 +746,7 @@ public class DiskReducePartitionDataWriterSuiteJ {
source,
conf,
DeviceMonitor$.MODULE$.EmptyMonitor(),
- context6,
- PartitionType.REDUCE);
+ context6);
for (int i = 0; i < 16; i++) {
partitionDataWriter.write(generateData(128));
}
@@ -790,8 +780,7 @@ public class DiskReducePartitionDataWriterSuiteJ {
source,
conf,
DeviceMonitor$.MODULE$.EmptyMonitor(),
- context7,
- PartitionType.REDUCE);
+ context7);
for (int i = 0; i < 16; i++) {
partitionDataWriter.write(generateData(128));
}
@@ -824,8 +813,7 @@ public class DiskReducePartitionDataWriterSuiteJ {
source,
conf,
DeviceMonitor$.MODULE$.EmptyMonitor(),
- context8,
- PartitionType.REDUCE);
+ context8);
partitionDataWriter.write(generateData(1024));
for (int i = 0; i < 9; i++) {
partitionDataWriter.write(generateData(128));
@@ -859,8 +847,7 @@ public class DiskReducePartitionDataWriterSuiteJ {
source,
conf,
DeviceMonitor$.MODULE$.EmptyMonitor(),
- context9,
- PartitionType.REDUCE);
+ context9);
partitionDataWriter.write(generateData(1024));
for (int i = 0; i < 9; i++) {
partitionDataWriter.write(generateData(128));
diff --git
a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/memory/MemoryReducePartitionDataWriterSuiteJ.java
b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/memory/MemoryReducePartitionDataWriterSuiteJ.java
index 006035d85..8aa0311be 100644
---
a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/memory/MemoryReducePartitionDataWriterSuiteJ.java
+++
b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/memory/MemoryReducePartitionDataWriterSuiteJ.java
@@ -123,12 +123,7 @@ public class MemoryReducePartitionDataWriterSuiteJ {
storageManager))
.when(storagePolicy)
.createFileWriter(
- Mockito.any(),
- Mockito.any(),
- Mockito.any(),
- Mockito.any(),
- Mockito.any(),
- Mockito.anyBoolean());
+ Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(),
Mockito.anyBoolean());
return storageManager;
}
@@ -305,8 +300,7 @@ public class MemoryReducePartitionDataWriterSuiteJ {
source,
CONF,
DeviceMonitor$.MODULE$.EmptyMonitor(),
- writerContext,
- PartitionType.REDUCE);
+ writerContext);
List<Future<?>> futures = new ArrayList<>();
ExecutorService es = ThreadUtils.newDaemonFixedThreadPool(threadsNum,
"FileWriter-UT-1");
@@ -360,8 +354,7 @@ public class MemoryReducePartitionDataWriterSuiteJ {
source,
CONF,
DeviceMonitor$.MODULE$.EmptyMonitor(),
- writerContext,
- PartitionType.REDUCE);
+ writerContext);
List<Future<?>> futures = new ArrayList<>();
ExecutorService es = ThreadUtils.newDaemonFixedThreadPool(threadsNum,
"FileWriter-UT-1");
@@ -424,8 +417,7 @@ public class MemoryReducePartitionDataWriterSuiteJ {
source,
CONF,
DeviceMonitor$.MODULE$.EmptyMonitor(),
- context1,
- PartitionType.REDUCE);
+ context1);
List<Future<?>> futures = new ArrayList<>();
ExecutorService es = ThreadUtils.newDaemonFixedThreadPool(threadsNum,
"FileWriter-UT-2");
@@ -486,8 +478,7 @@ public class MemoryReducePartitionDataWriterSuiteJ {
source,
CONF,
DeviceMonitor$.MODULE$.EmptyMonitor(),
- context,
- PartitionType.REDUCE);
+ context);
List<Future<?>> futures = new ArrayList<>();
ExecutorService es = ThreadUtils.newDaemonFixedThreadPool(threadsNum,
"FileWriter-UT-2");
@@ -575,8 +566,7 @@ public class MemoryReducePartitionDataWriterSuiteJ {
source,
CONF,
DeviceMonitor$.MODULE$.EmptyMonitor(),
- context,
- PartitionType.REDUCE);
+ context);
List<Future<?>> futures = new ArrayList<>();
ExecutorService es = ThreadUtils.newDaemonFixedThreadPool(threadsNum,
"FileWriter-UT-2");
@@ -698,8 +688,7 @@ public class MemoryReducePartitionDataWriterSuiteJ {
source,
conf,
DeviceMonitor$.MODULE$.EmptyMonitor(),
- writerContext1,
- PartitionType.REDUCE);
+ writerContext1);
partitionDataWriter.write(generateDataWithHeader(8 * 1024 * 1024));
partitionDataWriter.close();
ReduceFileMeta reduceFileMeta =
@@ -731,8 +720,7 @@ public class MemoryReducePartitionDataWriterSuiteJ {
source,
conf,
DeviceMonitor$.MODULE$.EmptyMonitor(),
- writerContext2,
- PartitionType.REDUCE);
+ writerContext2);
for (int i = 0; i < 8; i++) {
partitionDataWriter.write(generateDataWithHeader(128));
}
@@ -764,8 +752,7 @@ public class MemoryReducePartitionDataWriterSuiteJ {
source,
conf,
DeviceMonitor$.MODULE$.EmptyMonitor(),
- writerContext3,
- PartitionType.REDUCE);
+ writerContext3);
partitionDataWriter.write(generateDataWithHeader(1020));
partitionDataWriter.write(generateDataWithHeader(3));
partitionDataWriter.close();
@@ -796,8 +783,7 @@ public class MemoryReducePartitionDataWriterSuiteJ {
source,
conf,
DeviceMonitor$.MODULE$.EmptyMonitor(),
- writerContext4,
- PartitionType.REDUCE);
+ writerContext4);
for (int i = 0; i < 8; i++) {
partitionDataWriter.write(generateDataWithHeader(128));
}
@@ -830,8 +816,7 @@ public class MemoryReducePartitionDataWriterSuiteJ {
source,
conf,
DeviceMonitor$.MODULE$.EmptyMonitor(),
- writerContext5,
- PartitionType.REDUCE);
+ writerContext5);
for (int i = 0; i < 16; i++) {
partitionDataWriter.write(generateDataWithHeader(128));
}
@@ -863,8 +848,7 @@ public class MemoryReducePartitionDataWriterSuiteJ {
source,
conf,
DeviceMonitor$.MODULE$.EmptyMonitor(),
- writerContext6,
- PartitionType.REDUCE);
+ writerContext6);
for (int i = 0; i < 16; i++) {
partitionDataWriter.write(generateDataWithHeader(128));
}
@@ -898,8 +882,7 @@ public class MemoryReducePartitionDataWriterSuiteJ {
source,
conf,
DeviceMonitor$.MODULE$.EmptyMonitor(),
- writerContext7,
- PartitionType.REDUCE);
+ writerContext7);
for (int i = 0; i < 16; i++) {
partitionDataWriter.write(generateDataWithHeader(128));
}
@@ -932,8 +915,7 @@ public class MemoryReducePartitionDataWriterSuiteJ {
source,
conf,
DeviceMonitor$.MODULE$.EmptyMonitor(),
- writerContext8,
- PartitionType.REDUCE);
+ writerContext8);
partitionDataWriter.write(generateDataWithHeader(1024));
for (int i = 0; i < 9; i++) {
partitionDataWriter.write(generateDataWithHeader(128));
@@ -967,8 +949,7 @@ public class MemoryReducePartitionDataWriterSuiteJ {
source,
conf,
DeviceMonitor$.MODULE$.EmptyMonitor(),
- writerContext9,
- PartitionType.REDUCE);
+ writerContext9);
partitionDataWriter.write(generateDataWithHeader(1024));
for (int i = 0; i < 9; i++) {
partitionDataWriter.write(generateDataWithHeader(128));
diff --git
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase1.scala
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase1.scala
index 09f34cb40..640a67113 100644
---
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase1.scala
+++
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase1.scala
@@ -99,6 +99,7 @@ class StoragePolicyCase1 extends CelebornFunSuite {
test("test create file order case1") {
when(mockedPartitionWriterContext.getPartitionLocation).thenAnswer(memoryHintPartitionLocation)
+
when(mockedPartitionWriterContext.getPartitionType).thenAnswer(PartitionType.REDUCE)
val conf = new CelebornConf()
val flushLock = new AnyRef
conf.set("celeborn.worker.storage.storagePolicy.createFilePolicy",
"MEMORY,SSD,HDD,HDFS,OSS,S3")
@@ -107,7 +108,6 @@ class StoragePolicyCase1 extends CelebornFunSuite {
val notifier = new FlushNotifier
val file = storagePolicy.createFileWriter(
mockedPartitionWriterContext,
- PartitionType.REDUCE,
pendingWriters,
notifier)
assert(file.isInstanceOf[MemoryTierWriter])
diff --git
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase2.scala
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase2.scala
index 0255262f9..28f6e87e0 100644
---
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase2.scala
+++
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase2.scala
@@ -99,6 +99,7 @@ class StoragePolicyCase2 extends CelebornFunSuite {
test("test create file order case2") {
when(mockedPartitionWriterContext.getPartitionLocation).thenAnswer(localHintPartitionLocatioin)
+
when(mockedPartitionWriterContext.getPartitionType).thenAnswer(PartitionType.REDUCE)
when(mockedStorageManager.localOrDfsStorageAvailable).thenAnswer(true)
val conf = new CelebornConf()
conf.set("celeborn.worker.storage.storagePolicy.createFilePolicy",
"SSD,HDD,HDFS,OSS,S3")
@@ -107,7 +108,6 @@ class StoragePolicyCase2 extends CelebornFunSuite {
val notifier = new FlushNotifier
val file = storagePolicy.createFileWriter(
mockedPartitionWriterContext,
- PartitionType.REDUCE,
pendingWriters,
notifier)
assert(file.isInstanceOf[LocalTierWriter])
diff --git
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase3.scala
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase3.scala
index c307adc37..cb21b7128 100644
---
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase3.scala
+++
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase3.scala
@@ -99,6 +99,7 @@ class StoragePolicyCase3 extends CelebornFunSuite {
test("test getEvicted file case1") {
when(mockedPartitionWriterContext.getPartitionLocation).thenAnswer(localHintPartitionLocatioin)
+
when(mockedPartitionWriterContext.getPartitionType).thenAnswer(PartitionType.REDUCE)
when(mockedStorageManager.localOrDfsStorageAvailable).thenAnswer(true)
val mockedMemoryFile = mock[LocalTierWriter]
val conf = new CelebornConf()
@@ -110,7 +111,6 @@ class StoragePolicyCase3 extends CelebornFunSuite {
val nFile = storagePolicy.getEvictedFileWriter(
mockedMemoryFile,
mockedPartitionWriterContext,
- PartitionType.REDUCE,
pendingWriters,
notifier)
assert(nFile.isInstanceOf[LocalTierWriter])
@@ -118,6 +118,7 @@ class StoragePolicyCase3 extends CelebornFunSuite {
test("test evict file case2") {
when(mockedPartitionWriterContext.getPartitionLocation).thenAnswer(memoryHintPartitionLocation)
+
when(mockedPartitionWriterContext.getPartitionType).thenAnswer(PartitionType.REDUCE)
when(mockedStorageManager.localOrDfsStorageAvailable).thenAnswer(true)
val mockedMemoryFile = mock[LocalTierWriter]
val conf = new CelebornConf()
@@ -129,7 +130,6 @@ class StoragePolicyCase3 extends CelebornFunSuite {
val nFile = storagePolicy.getEvictedFileWriter(
mockedMemoryFile,
mockedPartitionWriterContext,
- PartitionType.REDUCE,
pendingWriters,
notifier)
assert(nFile.isInstanceOf[LocalTierWriter])
diff --git
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase4.scala
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase4.scala
index eae4d0727..e27def478 100644
---
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase4.scala
+++
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase4.scala
@@ -100,6 +100,7 @@ class StoragePolicyCase4 extends CelebornFunSuite {
test("test create file fallback case1") {
when(mockedPartitionWriterContext.getPartitionLocation).thenAnswer(
memoryDisabledHintPartitionLocation)
+
when(mockedPartitionWriterContext.getPartitionType).thenAnswer(PartitionType.REDUCE)
when(mockedStorageManager.localOrDfsStorageAvailable).thenAnswer(true)
val conf = new CelebornConf()
conf.set("celeborn.worker.storage.storagePolicy.createFilePolicy",
"MEMORY,SSD,HDD,HDFS,OSS,S3")
@@ -108,7 +109,6 @@ class StoragePolicyCase4 extends CelebornFunSuite {
val notifier = new FlushNotifier
val file = storagePolicy.createFileWriter(
mockedPartitionWriterContext,
- PartitionType.REDUCE,
pendingWriters,
notifier)
assert(file.isInstanceOf[LocalTierWriter])