This is an automated email from the ASF dual-hosted git repository.

nicholasjiang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new b537798e3 [CELEBORN-2108] Remove redundant PartitionType
b537798e3 is described below

commit b537798e37be1e5d7e905af6c7a2df905f1b0da5
Author: xxx <[email protected]>
AuthorDate: Tue Aug 19 16:38:19 2025 +0800

    [CELEBORN-2108] Remove redundant PartitionType
    
    ### What changes were proposed in this pull request?
    
    Remove redundant `PartitionType`.
    
    ### Why are the changes needed?
    
    `PartitionType` is included in `PartitionDataWriterContext`, therefore it 
is not necessary to use `PartitionType` as method parameter.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    CI.
    
    Closes #3422 from xy2953396112/remove_useless_partition_type.
    
    Authored-by: xxx <[email protected]>
    Signed-off-by: SteNicholas <[email protected]>
---
 .../deploy/worker/storage/PartitionDataWriter.java | 13 ++----
 .../deploy/worker/storage/StorageManager.scala     |  3 +-
 .../deploy/worker/storage/StoragePolicy.scala      |  7 +---
 .../storage/PartitionDataWriterSuiteUtils.java     |  9 ++--
 .../local/DiskMapPartitionDataWriterSuiteJ.java    |  3 +-
 .../local/DiskReducePartitionDataWriterSuiteJ.java | 39 ++++++-----------
 .../MemoryReducePartitionDataWriterSuiteJ.java     | 49 +++++++---------------
 .../storage/storagePolicy/StoragePolicyCase1.scala |  2 +-
 .../storage/storagePolicy/StoragePolicyCase2.scala |  2 +-
 .../storage/storagePolicy/StoragePolicyCase3.scala |  4 +-
 .../storage/storagePolicy/StoragePolicyCase4.scala |  2 +-
 11 files changed, 43 insertions(+), 90 deletions(-)

diff --git 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java
 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java
index e0f16c01a..35dfdc331 100644
--- 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java
+++ 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java
@@ -34,7 +34,6 @@ import org.apache.celeborn.common.CelebornConf;
 import org.apache.celeborn.common.meta.*;
 import org.apache.celeborn.common.metrics.source.AbstractSource;
 import org.apache.celeborn.common.protocol.PartitionSplitMode;
-import org.apache.celeborn.common.protocol.PartitionType;
 import org.apache.celeborn.common.protocol.StorageInfo;
 import org.apache.celeborn.service.deploy.worker.WorkerSource;
 import 
org.apache.celeborn.service.deploy.worker.congestcontrol.CongestionController;
@@ -54,7 +53,6 @@ public class PartitionDataWriter implements DeviceObserver {
   private final AtomicInteger numPendingWrites = new AtomicInteger(0);
   private final PartitionDataWriterContext writerContext;
   protected final AbstractSource source; // metrics
-  private final PartitionType partitionType;
   private final String writerString;
   private final StorageManager storageManager;
   private final FlushNotifier notifier = new FlushNotifier();
@@ -67,8 +65,7 @@ public class PartitionDataWriter implements DeviceObserver {
       AbstractSource workerSource,
       CelebornConf conf,
       DeviceMonitor deviceMonitor,
-      PartitionDataWriterContext writerContext,
-      PartitionType partitionType) {
+      PartitionDataWriterContext writerContext) {
     memoryFileStorageMaxFileSize = conf.workerMemoryFileStorageMaxFileSize();
     this.writerContext = writerContext;
     this.source = workerSource;
@@ -90,11 +87,8 @@ public class PartitionDataWriter implements DeviceObserver {
 
     writerContext.setPartitionDataWriter(this);
     writerContext.setDeviceMonitor(deviceMonitor);
-    this.partitionType = partitionType;
     currentTierWriter =
-        storageManager
-            .storagePolicy()
-            .createFileWriter(writerContext, partitionType, numPendingWrites, 
notifier);
+        storageManager.storagePolicy().createFileWriter(writerContext, 
numPendingWrites, notifier);
   }
 
   public DiskFileInfo getDiskFileInfo() {
@@ -198,8 +192,7 @@ public class PartitionDataWriter implements DeviceObserver {
     TierWriterBase newTierWriter =
         storageManager
             .storagePolicy()
-            .getEvictedFileWriter(
-                currentTierWriter, writerContext, partitionType, 
numPendingWrites, notifier);
+            .getEvictedFileWriter(currentTierWriter, writerContext, 
numPendingWrites, notifier);
     currentTierWriter.evict(newTierWriter);
     currentTierWriter = newTierWriter;
   }
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
index 9353ee23a..c9b24f389 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
@@ -470,8 +470,7 @@ final private[worker] class StorageManager(conf: 
CelebornConf, workerSource: Abs
           workerSource,
           conf,
           deviceMonitor,
-          partitionDataWriterContext,
-          partitionType)
+          partitionDataWriterContext)
       } catch {
         case e: Exception =>
           logError("Create partition data writer failed", e)
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StoragePolicy.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StoragePolicy.scala
index 798ad165e..a475d12f2 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StoragePolicy.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StoragePolicy.scala
@@ -36,7 +36,6 @@ class StoragePolicy(conf: CelebornConf, storageManager: 
StorageManager, source:
   def getEvictedFileWriter(
       celebornFile: TierWriterBase,
       partitionDataWriterContext: PartitionDataWriterContext,
-      partitionType: PartitionType,
       numPendingWrites: AtomicInteger,
       notifier: FlushNotifier): TierWriterBase = {
     evictFileOrder.foreach { order =>
@@ -44,7 +43,6 @@ class StoragePolicy(conf: CelebornConf, storageManager: 
StorageManager, source:
       if (orderList != null) {
         return createFileWriter(
           partitionDataWriterContext,
-          partitionType,
           numPendingWrites,
           notifier,
           orderList,
@@ -57,12 +55,10 @@ class StoragePolicy(conf: CelebornConf, storageManager: 
StorageManager, source:
 
   def createFileWriter(
       partitionDataWriterContext: PartitionDataWriterContext,
-      partitionType: PartitionType,
       numPendingWrites: AtomicInteger,
       notifier: FlushNotifier): TierWriterBase = {
     createFileWriter(
       partitionDataWriterContext: PartitionDataWriterContext,
-      partitionType: PartitionType,
       numPendingWrites: AtomicInteger,
       notifier: FlushNotifier,
       createFileOrder)
@@ -70,7 +66,6 @@ class StoragePolicy(conf: CelebornConf, storageManager: 
StorageManager, source:
 
   def createFileWriter(
       partitionDataWriterContext: PartitionDataWriterContext,
-      partitionType: PartitionType,
       numPendingWrites: AtomicInteger,
       notifier: FlushNotifier,
       order: Option[List[String]] = createFileOrder,
@@ -84,7 +79,7 @@ class StoragePolicy(conf: CelebornConf, storageManager: 
StorageManager, source:
     }
 
     def getPartitionMetaHandler(fileInfo: FileInfo) = {
-      partitionType match {
+      partitionDataWriterContext.getPartitionType match {
         case PartitionType.REDUCE =>
           new 
ReducePartitionMetaHandler(partitionDataWriterContext.isRangeReadFilter, 
fileInfo)
         case PartitionType.MAP =>
diff --git 
a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriterSuiteUtils.java
 
b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriterSuiteUtils.java
index 9d5c6c26a..471a44865 100644
--- 
a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriterSuiteUtils.java
+++ 
b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriterSuiteUtils.java
@@ -103,7 +103,7 @@ public class PartitionDataWriterSuiteUtils {
                     context,
                     storageManager))
         .when(storagePolicy)
-        .createFileWriter(Mockito.any(), Mockito.any(), Mockito.any(), 
Mockito.any());
+        .createFileWriter(Mockito.any(), Mockito.any(), Mockito.any());
 
     return storageManager;
   }
@@ -148,7 +148,7 @@ public class PartitionDataWriterSuiteUtils {
                     writerContext,
                     storageManager))
         .when(storagePolicy)
-        .createFileWriter(Mockito.any(), Mockito.any(), Mockito.any(), 
Mockito.any());
+        .createFileWriter(Mockito.any(), Mockito.any(), Mockito.any());
 
     return storageManager;
   }
@@ -223,7 +223,7 @@ public class PartitionDataWriterSuiteUtils {
                     writerContext,
                     storageManager))
         .when(storagePolicy)
-        .createFileWriter(Mockito.any(), Mockito.any(), Mockito.any(), 
Mockito.any());
+        .createFileWriter(Mockito.any(), Mockito.any(), Mockito.any());
 
     Mockito.doAnswer(
             invocation ->
@@ -240,8 +240,7 @@ public class PartitionDataWriterSuiteUtils {
                     writerContext,
                     storageManager))
         .when(storagePolicy)
-        .getEvictedFileWriter(
-            Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), 
Mockito.any());
+        .getEvictedFileWriter(Mockito.any(), Mockito.any(), Mockito.any(), 
Mockito.any());
 
     return storageManager;
   }
diff --git 
a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/local/DiskMapPartitionDataWriterSuiteJ.java
 
b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/local/DiskMapPartitionDataWriterSuiteJ.java
index 87ed418a3..329d7f90f 100644
--- 
a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/local/DiskMapPartitionDataWriterSuiteJ.java
+++ 
b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/local/DiskMapPartitionDataWriterSuiteJ.java
@@ -141,8 +141,7 @@ public class DiskMapPartitionDataWriterSuiteJ {
             source,
             CONF,
             DeviceMonitor$.MODULE$.EmptyMonitor(),
-            context,
-            PartitionType.MAP);
+            context);
     fileWriter.handleEvents(
         
PbPushDataHandShake.newBuilder().setNumPartitions(2).setBufferSize(32).build());
     fileWriter.handleEvents(
diff --git 
a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/local/DiskReducePartitionDataWriterSuiteJ.java
 
b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/local/DiskReducePartitionDataWriterSuiteJ.java
index 1faa04a55..536b4aab3 100644
--- 
a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/local/DiskReducePartitionDataWriterSuiteJ.java
+++ 
b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/local/DiskReducePartitionDataWriterSuiteJ.java
@@ -288,8 +288,7 @@ public class DiskReducePartitionDataWriterSuiteJ {
             source,
             CONF,
             DeviceMonitor$.MODULE$.EmptyMonitor(),
-            context,
-            PartitionType.REDUCE);
+            context);
 
     List<Future<?>> futures = new ArrayList<>();
     ExecutorService es = ThreadUtils.newDaemonFixedThreadPool(threadsNum, 
"FileWriter-UT-1");
@@ -343,8 +342,7 @@ public class DiskReducePartitionDataWriterSuiteJ {
             source,
             CONF,
             DeviceMonitor$.MODULE$.EmptyMonitor(),
-            context,
-            PartitionType.REDUCE);
+            context);
 
     List<Future<?>> futures = new ArrayList<>();
     ExecutorService es = ThreadUtils.newDaemonFixedThreadPool(threadsNum, 
"FileWriter-UT-1");
@@ -399,8 +397,7 @@ public class DiskReducePartitionDataWriterSuiteJ {
             source,
             CONF,
             DeviceMonitor$.MODULE$.EmptyMonitor(),
-            context,
-            PartitionType.REDUCE);
+            context);
 
     List<Future<?>> futures = new ArrayList<>();
     ExecutorService es = ThreadUtils.newDaemonFixedThreadPool(threadsNum, 
"FileWriter-UT-2");
@@ -470,8 +467,7 @@ public class DiskReducePartitionDataWriterSuiteJ {
             source,
             CONF,
             DeviceMonitor$.MODULE$.EmptyMonitor(),
-            context,
-            PartitionType.REDUCE);
+            context);
 
     List<Future<?>> futures = new ArrayList<>();
     ExecutorService es = ThreadUtils.newDaemonFixedThreadPool(threadsNum, 
"FileWriter-UT-2");
@@ -590,8 +586,7 @@ public class DiskReducePartitionDataWriterSuiteJ {
             source,
             conf,
             DeviceMonitor$.MODULE$.EmptyMonitor(),
-            context1,
-            PartitionType.REDUCE);
+            context1);
     partitionDataWriter.write(generateData(8 * 1024 * 1024));
     partitionDataWriter.close();
     ReduceFileMeta reduceFileMeta =
@@ -623,8 +618,7 @@ public class DiskReducePartitionDataWriterSuiteJ {
             source,
             conf,
             DeviceMonitor$.MODULE$.EmptyMonitor(),
-            context2,
-            PartitionType.REDUCE);
+            context2);
     for (int i = 0; i < 8; i++) {
       partitionDataWriter.write(generateData(128));
     }
@@ -656,8 +650,7 @@ public class DiskReducePartitionDataWriterSuiteJ {
             source,
             conf,
             DeviceMonitor$.MODULE$.EmptyMonitor(),
-            context3,
-            PartitionType.REDUCE);
+            context3);
     partitionDataWriter.write(generateData(1020));
     partitionDataWriter.write(generateData(3));
     partitionDataWriter.close();
@@ -688,8 +681,7 @@ public class DiskReducePartitionDataWriterSuiteJ {
             source,
             conf,
             DeviceMonitor$.MODULE$.EmptyMonitor(),
-            context4,
-            PartitionType.REDUCE);
+            context4);
     for (int i = 0; i < 8; i++) {
       partitionDataWriter.write(generateData(128));
     }
@@ -722,8 +714,7 @@ public class DiskReducePartitionDataWriterSuiteJ {
             source,
             conf,
             DeviceMonitor$.MODULE$.EmptyMonitor(),
-            context5,
-            PartitionType.REDUCE);
+            context5);
     for (int i = 0; i < 16; i++) {
       partitionDataWriter.write(generateData(128));
     }
@@ -755,8 +746,7 @@ public class DiskReducePartitionDataWriterSuiteJ {
             source,
             conf,
             DeviceMonitor$.MODULE$.EmptyMonitor(),
-            context6,
-            PartitionType.REDUCE);
+            context6);
     for (int i = 0; i < 16; i++) {
       partitionDataWriter.write(generateData(128));
     }
@@ -790,8 +780,7 @@ public class DiskReducePartitionDataWriterSuiteJ {
             source,
             conf,
             DeviceMonitor$.MODULE$.EmptyMonitor(),
-            context7,
-            PartitionType.REDUCE);
+            context7);
     for (int i = 0; i < 16; i++) {
       partitionDataWriter.write(generateData(128));
     }
@@ -824,8 +813,7 @@ public class DiskReducePartitionDataWriterSuiteJ {
             source,
             conf,
             DeviceMonitor$.MODULE$.EmptyMonitor(),
-            context8,
-            PartitionType.REDUCE);
+            context8);
     partitionDataWriter.write(generateData(1024));
     for (int i = 0; i < 9; i++) {
       partitionDataWriter.write(generateData(128));
@@ -859,8 +847,7 @@ public class DiskReducePartitionDataWriterSuiteJ {
             source,
             conf,
             DeviceMonitor$.MODULE$.EmptyMonitor(),
-            context9,
-            PartitionType.REDUCE);
+            context9);
     partitionDataWriter.write(generateData(1024));
     for (int i = 0; i < 9; i++) {
       partitionDataWriter.write(generateData(128));
diff --git 
a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/memory/MemoryReducePartitionDataWriterSuiteJ.java
 
b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/memory/MemoryReducePartitionDataWriterSuiteJ.java
index 006035d85..8aa0311be 100644
--- 
a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/memory/MemoryReducePartitionDataWriterSuiteJ.java
+++ 
b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/memory/MemoryReducePartitionDataWriterSuiteJ.java
@@ -123,12 +123,7 @@ public class MemoryReducePartitionDataWriterSuiteJ {
                     storageManager))
         .when(storagePolicy)
         .createFileWriter(
-            Mockito.any(),
-            Mockito.any(),
-            Mockito.any(),
-            Mockito.any(),
-            Mockito.any(),
-            Mockito.anyBoolean());
+            Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), 
Mockito.anyBoolean());
 
     return storageManager;
   }
@@ -305,8 +300,7 @@ public class MemoryReducePartitionDataWriterSuiteJ {
             source,
             CONF,
             DeviceMonitor$.MODULE$.EmptyMonitor(),
-            writerContext,
-            PartitionType.REDUCE);
+            writerContext);
 
     List<Future<?>> futures = new ArrayList<>();
     ExecutorService es = ThreadUtils.newDaemonFixedThreadPool(threadsNum, 
"FileWriter-UT-1");
@@ -360,8 +354,7 @@ public class MemoryReducePartitionDataWriterSuiteJ {
             source,
             CONF,
             DeviceMonitor$.MODULE$.EmptyMonitor(),
-            writerContext,
-            PartitionType.REDUCE);
+            writerContext);
 
     List<Future<?>> futures = new ArrayList<>();
     ExecutorService es = ThreadUtils.newDaemonFixedThreadPool(threadsNum, 
"FileWriter-UT-1");
@@ -424,8 +417,7 @@ public class MemoryReducePartitionDataWriterSuiteJ {
             source,
             CONF,
             DeviceMonitor$.MODULE$.EmptyMonitor(),
-            context1,
-            PartitionType.REDUCE);
+            context1);
 
     List<Future<?>> futures = new ArrayList<>();
     ExecutorService es = ThreadUtils.newDaemonFixedThreadPool(threadsNum, 
"FileWriter-UT-2");
@@ -486,8 +478,7 @@ public class MemoryReducePartitionDataWriterSuiteJ {
             source,
             CONF,
             DeviceMonitor$.MODULE$.EmptyMonitor(),
-            context,
-            PartitionType.REDUCE);
+            context);
 
     List<Future<?>> futures = new ArrayList<>();
     ExecutorService es = ThreadUtils.newDaemonFixedThreadPool(threadsNum, 
"FileWriter-UT-2");
@@ -575,8 +566,7 @@ public class MemoryReducePartitionDataWriterSuiteJ {
             source,
             CONF,
             DeviceMonitor$.MODULE$.EmptyMonitor(),
-            context,
-            PartitionType.REDUCE);
+            context);
 
     List<Future<?>> futures = new ArrayList<>();
     ExecutorService es = ThreadUtils.newDaemonFixedThreadPool(threadsNum, 
"FileWriter-UT-2");
@@ -698,8 +688,7 @@ public class MemoryReducePartitionDataWriterSuiteJ {
             source,
             conf,
             DeviceMonitor$.MODULE$.EmptyMonitor(),
-            writerContext1,
-            PartitionType.REDUCE);
+            writerContext1);
     partitionDataWriter.write(generateDataWithHeader(8 * 1024 * 1024));
     partitionDataWriter.close();
     ReduceFileMeta reduceFileMeta =
@@ -731,8 +720,7 @@ public class MemoryReducePartitionDataWriterSuiteJ {
             source,
             conf,
             DeviceMonitor$.MODULE$.EmptyMonitor(),
-            writerContext2,
-            PartitionType.REDUCE);
+            writerContext2);
     for (int i = 0; i < 8; i++) {
       partitionDataWriter.write(generateDataWithHeader(128));
     }
@@ -764,8 +752,7 @@ public class MemoryReducePartitionDataWriterSuiteJ {
             source,
             conf,
             DeviceMonitor$.MODULE$.EmptyMonitor(),
-            writerContext3,
-            PartitionType.REDUCE);
+            writerContext3);
     partitionDataWriter.write(generateDataWithHeader(1020));
     partitionDataWriter.write(generateDataWithHeader(3));
     partitionDataWriter.close();
@@ -796,8 +783,7 @@ public class MemoryReducePartitionDataWriterSuiteJ {
             source,
             conf,
             DeviceMonitor$.MODULE$.EmptyMonitor(),
-            writerContext4,
-            PartitionType.REDUCE);
+            writerContext4);
     for (int i = 0; i < 8; i++) {
       partitionDataWriter.write(generateDataWithHeader(128));
     }
@@ -830,8 +816,7 @@ public class MemoryReducePartitionDataWriterSuiteJ {
             source,
             conf,
             DeviceMonitor$.MODULE$.EmptyMonitor(),
-            writerContext5,
-            PartitionType.REDUCE);
+            writerContext5);
     for (int i = 0; i < 16; i++) {
       partitionDataWriter.write(generateDataWithHeader(128));
     }
@@ -863,8 +848,7 @@ public class MemoryReducePartitionDataWriterSuiteJ {
             source,
             conf,
             DeviceMonitor$.MODULE$.EmptyMonitor(),
-            writerContext6,
-            PartitionType.REDUCE);
+            writerContext6);
     for (int i = 0; i < 16; i++) {
       partitionDataWriter.write(generateDataWithHeader(128));
     }
@@ -898,8 +882,7 @@ public class MemoryReducePartitionDataWriterSuiteJ {
             source,
             conf,
             DeviceMonitor$.MODULE$.EmptyMonitor(),
-            writerContext7,
-            PartitionType.REDUCE);
+            writerContext7);
     for (int i = 0; i < 16; i++) {
       partitionDataWriter.write(generateDataWithHeader(128));
     }
@@ -932,8 +915,7 @@ public class MemoryReducePartitionDataWriterSuiteJ {
             source,
             conf,
             DeviceMonitor$.MODULE$.EmptyMonitor(),
-            writerContext8,
-            PartitionType.REDUCE);
+            writerContext8);
     partitionDataWriter.write(generateDataWithHeader(1024));
     for (int i = 0; i < 9; i++) {
       partitionDataWriter.write(generateDataWithHeader(128));
@@ -967,8 +949,7 @@ public class MemoryReducePartitionDataWriterSuiteJ {
             source,
             conf,
             DeviceMonitor$.MODULE$.EmptyMonitor(),
-            writerContext9,
-            PartitionType.REDUCE);
+            writerContext9);
     partitionDataWriter.write(generateDataWithHeader(1024));
     for (int i = 0; i < 9; i++) {
       partitionDataWriter.write(generateDataWithHeader(128));
diff --git 
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase1.scala
 
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase1.scala
index 09f34cb40..640a67113 100644
--- 
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase1.scala
+++ 
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase1.scala
@@ -99,6 +99,7 @@ class StoragePolicyCase1 extends CelebornFunSuite {
 
   test("test create file order case1") {
     
when(mockedPartitionWriterContext.getPartitionLocation).thenAnswer(memoryHintPartitionLocation)
+    
when(mockedPartitionWriterContext.getPartitionType).thenAnswer(PartitionType.REDUCE)
     val conf = new CelebornConf()
     val flushLock = new AnyRef
     conf.set("celeborn.worker.storage.storagePolicy.createFilePolicy", 
"MEMORY,SSD,HDD,HDFS,OSS,S3")
@@ -107,7 +108,6 @@ class StoragePolicyCase1 extends CelebornFunSuite {
     val notifier = new FlushNotifier
     val file = storagePolicy.createFileWriter(
       mockedPartitionWriterContext,
-      PartitionType.REDUCE,
       pendingWriters,
       notifier)
     assert(file.isInstanceOf[MemoryTierWriter])
diff --git 
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase2.scala
 
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase2.scala
index 0255262f9..28f6e87e0 100644
--- 
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase2.scala
+++ 
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase2.scala
@@ -99,6 +99,7 @@ class StoragePolicyCase2 extends CelebornFunSuite {
 
   test("test create file order case2") {
     
when(mockedPartitionWriterContext.getPartitionLocation).thenAnswer(localHintPartitionLocatioin)
+    
when(mockedPartitionWriterContext.getPartitionType).thenAnswer(PartitionType.REDUCE)
     when(mockedStorageManager.localOrDfsStorageAvailable).thenAnswer(true)
     val conf = new CelebornConf()
     conf.set("celeborn.worker.storage.storagePolicy.createFilePolicy", 
"SSD,HDD,HDFS,OSS,S3")
@@ -107,7 +108,6 @@ class StoragePolicyCase2 extends CelebornFunSuite {
     val notifier = new FlushNotifier
     val file = storagePolicy.createFileWriter(
       mockedPartitionWriterContext,
-      PartitionType.REDUCE,
       pendingWriters,
       notifier)
     assert(file.isInstanceOf[LocalTierWriter])
diff --git 
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase3.scala
 
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase3.scala
index c307adc37..cb21b7128 100644
--- 
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase3.scala
+++ 
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase3.scala
@@ -99,6 +99,7 @@ class StoragePolicyCase3 extends CelebornFunSuite {
 
   test("test getEvicted file case1") {
     
when(mockedPartitionWriterContext.getPartitionLocation).thenAnswer(localHintPartitionLocatioin)
+    
when(mockedPartitionWriterContext.getPartitionType).thenAnswer(PartitionType.REDUCE)
     when(mockedStorageManager.localOrDfsStorageAvailable).thenAnswer(true)
     val mockedMemoryFile = mock[LocalTierWriter]
     val conf = new CelebornConf()
@@ -110,7 +111,6 @@ class StoragePolicyCase3 extends CelebornFunSuite {
     val nFile = storagePolicy.getEvictedFileWriter(
       mockedMemoryFile,
       mockedPartitionWriterContext,
-      PartitionType.REDUCE,
       pendingWriters,
       notifier)
     assert(nFile.isInstanceOf[LocalTierWriter])
@@ -118,6 +118,7 @@ class StoragePolicyCase3 extends CelebornFunSuite {
 
   test("test evict file case2") {
     
when(mockedPartitionWriterContext.getPartitionLocation).thenAnswer(memoryHintPartitionLocation)
+    
when(mockedPartitionWriterContext.getPartitionType).thenAnswer(PartitionType.REDUCE)
     when(mockedStorageManager.localOrDfsStorageAvailable).thenAnswer(true)
     val mockedMemoryFile = mock[LocalTierWriter]
     val conf = new CelebornConf()
@@ -129,7 +130,6 @@ class StoragePolicyCase3 extends CelebornFunSuite {
     val nFile = storagePolicy.getEvictedFileWriter(
       mockedMemoryFile,
       mockedPartitionWriterContext,
-      PartitionType.REDUCE,
       pendingWriters,
       notifier)
     assert(nFile.isInstanceOf[LocalTierWriter])
diff --git 
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase4.scala
 
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase4.scala
index eae4d0727..e27def478 100644
--- 
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase4.scala
+++ 
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase4.scala
@@ -100,6 +100,7 @@ class StoragePolicyCase4 extends CelebornFunSuite {
   test("test create file fallback case1") {
     when(mockedPartitionWriterContext.getPartitionLocation).thenAnswer(
       memoryDisabledHintPartitionLocation)
+    
when(mockedPartitionWriterContext.getPartitionType).thenAnswer(PartitionType.REDUCE)
     when(mockedStorageManager.localOrDfsStorageAvailable).thenAnswer(true)
     val conf = new CelebornConf()
     conf.set("celeborn.worker.storage.storagePolicy.createFilePolicy", 
"MEMORY,SSD,HDD,HDFS,OSS,S3")
@@ -108,7 +109,6 @@ class StoragePolicyCase4 extends CelebornFunSuite {
     val notifier = new FlushNotifier
     val file = storagePolicy.createFileWriter(
       mockedPartitionWriterContext,
-      PartitionType.REDUCE,
       pendingWriters,
       notifier)
     assert(file.isInstanceOf[LocalTierWriter])

Reply via email to