This is an automated email from the ASF dual-hosted git repository.

zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new ea1c6301 [CELEBORN-80] FileWriter supports MapPartition (#1025)
ea1c6301 is described below

commit ea1c630173da41a6519e45d8e783bc6f1f68493d
Author: zhongqiangczq <[email protected]>
AuthorDate: Thu Dec 8 10:46:26 2022 +0800

    [CELEBORN-80] FileWriter supports MapPartition (#1025)
---
 .../service/deploy/worker/storage/FileWriter.java  | 126 ++++++---------------
 .../worker/storage/MapPartitionFileWriter.java     |  97 ++++++++++++++++
 .../worker/storage/ReducePartitionFileWriter.java  | 113 ++++++++++++++++++
 .../service/deploy/worker/PushDataHandler.scala    |   9 +-
 .../service/deploy/worker/WorkerSource.scala       |   1 +
 .../deploy/worker/storage/StorageManager.scala     |  62 ++++++----
 .../deploy/worker/storage/FileWriterSuiteJ.java    |   9 +-
 7 files changed, 296 insertions(+), 121 deletions(-)

diff --git 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/FileWriter.java
 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/FileWriter.java
index 175da298..5914765c 100644
--- 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/FileWriter.java
+++ 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/FileWriter.java
@@ -48,58 +48,38 @@ import 
org.apache.celeborn.service.deploy.worker.WorkerSource;
  * Note: Once FlushNotifier.exception is set, the whole file is not available.
  *       That's fine some of the internal state(e.g. bytesFlushed) may be 
inaccurate.
  */
-public final class FileWriter implements DeviceObserver {
+public abstract class FileWriter implements DeviceObserver {
   private static final Logger logger = 
LoggerFactory.getLogger(FileWriter.class);
   private static final long WAIT_INTERVAL_MS = 20;
 
-  private final FileInfo fileInfo;
-  private FileChannel channel;
-  private FSDataOutputStream stream;
-  private volatile boolean closed;
-  private volatile boolean destroyed;
+  protected final FileInfo fileInfo;
+  protected FileChannel channel;
+  protected FSDataOutputStream stream;
+  protected volatile boolean closed;
+  protected volatile boolean destroyed;
 
-  private final AtomicInteger numPendingWrites = new AtomicInteger();
-  private long nextBoundary;
-  private long bytesFlushed;
+  protected final AtomicInteger numPendingWrites = new AtomicInteger();
+  protected long bytesFlushed;
 
   public final Flusher flusher;
   private final int flushWorkerIndex;
-  private CompositeByteBuf flushBuffer;
+  protected CompositeByteBuf flushBuffer;
 
-  private final long shuffleChunkSize;
   private final long writerCloseTimeoutMs;
 
-  private final long flusherBufferSize;
+  protected final long flusherBufferSize;
 
-  private final DeviceMonitor deviceMonitor;
-  private final AbstractSource source; // metrics
+  protected final DeviceMonitor deviceMonitor;
+  protected final AbstractSource source; // metrics
 
   private long splitThreshold = 0;
   private final PartitionSplitMode splitMode;
   private final PartitionType partitionType;
   private final boolean rangeReadFilter;
-
   private Runnable destroyHook;
-  private boolean deleted = false;
+  protected boolean deleted = false;
   private RoaringBitmap mapIdBitMap = null;
-
-  private final FlushNotifier notifier = new FlushNotifier();
-
-  // //////////////////////////////////////////////////////
-  //            map partition                            //
-  // //////////////////////////////////////////////////////
-
-  /** Number of reducepartitions */
-  private int numReducePartitions;
-
-  /** Index number of the current data region being written. */
-  private int currentDataRegionIndex;
-
-  /**
-   * Whether current data region is a broadcast region or not. If true, 
buffers added to this region
-   * will be written to all reduce partitions.
-   */
-  private boolean isBroadcastRegion;
+  protected final FlushNotifier notifier = new FlushNotifier();
 
   public FileWriter(
       FileInfo fileInfo,
@@ -115,8 +95,6 @@ public final class FileWriter implements DeviceObserver {
     this.fileInfo = fileInfo;
     this.flusher = flusher;
     this.flushWorkerIndex = flusher.getWorkerIndex();
-    this.shuffleChunkSize = conf.shuffleChunkSize();
-    this.nextBoundary = this.shuffleChunkSize;
     this.writerCloseTimeoutMs = conf.writerCloseTimeoutMs();
     this.splitThreshold = splitThreshold;
     this.flusherBufferSize = conf.workerFlusherBufferSize();
@@ -153,7 +131,7 @@ public final class FileWriter implements DeviceObserver {
     numPendingWrites.decrementAndGet();
   }
 
-  private void flush(boolean finalFlush) throws IOException {
+  protected void flush(boolean finalFlush) throws IOException {
     int numBytes = flushBuffer.readableBytes();
     notifier.checkException();
     notifier.numPendingFlushes.incrementAndGet();
@@ -166,26 +144,6 @@ public final class FileWriter implements DeviceObserver {
     addTask(task);
     flushBuffer = null;
     bytesFlushed += numBytes;
-    maybeSetChunkOffsets(finalFlush);
-  }
-
-  private void maybeSetChunkOffsets(boolean forceSet) {
-    if (bytesFlushed >= nextBoundary || forceSet) {
-      fileInfo.addChunkOffset(bytesFlushed);
-      nextBoundary = bytesFlushed + shuffleChunkSize;
-    }
-  }
-
-  private boolean isChunkOffsetValid() {
-    // Consider a scenario where some bytes have been flushed
-    // but the chunk offset boundary has not yet been updated.
-    // we should check if the chunk offset boundary equals
-    // bytesFlush or not. For example:
-    // The last record is a giant record and it has been flushed
-    // but its size is smaller than the nextBoundary, then the
-    // chunk offset will not be set after flushing. we should
-    // set it during FileWriter close.
-    return fileInfo.getLastChunkOffset() == bytesFlushed;
   }
 
   /**
@@ -254,7 +212,18 @@ public final class FileWriter implements DeviceObserver {
     }
   }
 
-  public synchronized long close() throws IOException {
+  public abstract long close() throws IOException;
+
+  @FunctionalInterface
+  public interface RunnableWithException<R extends IOException> {
+    void run() throws R;
+  }
+
+  protected synchronized long close(
+      RunnableWithException tryClose,
+      RunnableWithException streamClose,
+      RunnableWithException finalClose)
+      throws IOException {
     if (closed) {
       String msg = "FileWriter has already closed! fileName " + 
fileInfo.getFilePath();
       logger.error(msg);
@@ -269,9 +238,7 @@ public final class FileWriter implements DeviceObserver {
         if (flushBuffer.readableBytes() > 0) {
           flush(true);
         }
-        if (!isChunkOffsetValid()) {
-          maybeSetChunkOffsets(true);
-        }
+        tryClose.run();
       }
 
       waitOnNoPending(notifier.numPendingFlushes);
@@ -283,24 +250,14 @@ public final class FileWriter implements DeviceObserver {
         }
         if (stream != null) {
           stream.close();
-          if 
(StorageManager.hdfsFs().exists(fileInfo.getHdfsPeerWriterSuccessPath())) {
-            StorageManager.hdfsFs().delete(fileInfo.getHdfsPath(), false);
-            deleted = true;
-          } else {
-            
StorageManager.hdfsFs().create(fileInfo.getHdfsWriterSuccessPath()).close();
-            FSDataOutputStream indexOutputStream =
-                StorageManager.hdfsFs().create(fileInfo.getHdfsIndexPath());
-            indexOutputStream.writeInt(fileInfo.getChunkOffsets().size());
-            for (Long offset : fileInfo.getChunkOffsets()) {
-              indexOutputStream.writeLong(offset);
-            }
-            indexOutputStream.close();
-          }
+          streamClose.run();
         }
       } catch (IOException e) {
         logger.warn("close file writer" + this + "failed", e);
       }
 
+      finalClose.run();
+
       // unregister from DeviceMonitor
       if (!fileInfo.isHdfs()) {
         logger.debug("file info {} register from device monitor");
@@ -364,7 +321,7 @@ public final class FileWriter implements DeviceObserver {
     }
   }
 
-  private void waitOnNoPending(AtomicInteger counter) throws IOException {
+  protected void waitOnNoPending(AtomicInteger counter) throws IOException {
     long waitTime = writerCloseTimeoutMs;
     while (counter.get() > 0 && waitTime > 0) {
       try {
@@ -385,7 +342,7 @@ public final class FileWriter implements DeviceObserver {
     notifier.checkException();
   }
 
-  private void takeBuffer() {
+  protected void takeBuffer() {
     // metrics start
     String metricsName = null;
     String fileAbsPath = null;
@@ -410,7 +367,7 @@ public final class FileWriter implements DeviceObserver {
     }
   }
 
-  private void addTask(FlushTask task) throws IOException {
+  protected void addTask(FlushTask task) throws IOException {
     if (!flusher.addTask(task, writerCloseTimeoutMs, flushWorkerIndex)) {
       IOException e = new IOException("Add flush task timeout.");
       notifier.setException(e);
@@ -418,7 +375,7 @@ public final class FileWriter implements DeviceObserver {
     }
   }
 
-  private synchronized void returnBuffer() {
+  protected synchronized void returnBuffer() {
     if (flushBuffer != null) {
       flusher.returnBuffer(flushBuffer);
       flushBuffer = null;
@@ -482,17 +439,4 @@ public final class FileWriter implements DeviceObserver {
   public PartitionType getPartitionType() {
     return partitionType;
   }
-
-  public void pushDataHandShake(int numReducePartitions) {
-    this.numReducePartitions = numReducePartitions;
-  }
-
-  public void regionStart(int currentDataRegionIndex, boolean 
isBroadcastRegion) {
-    this.currentDataRegionIndex = currentDataRegionIndex;
-    this.isBroadcastRegion = isBroadcastRegion;
-  }
-
-  public void regionFinish() {
-    // flush index
-  }
 }
diff --git 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionFileWriter.java
 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionFileWriter.java
new file mode 100644
index 00000000..22a2410e
--- /dev/null
+++ 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionFileWriter.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.service.deploy.worker.storage;
+
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+
+import io.netty.buffer.CompositeByteBuf;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.celeborn.common.CelebornConf;
+import org.apache.celeborn.common.meta.FileInfo;
+import org.apache.celeborn.common.metrics.source.AbstractSource;
+import org.apache.celeborn.common.protocol.PartitionSplitMode;
+import org.apache.celeborn.common.protocol.PartitionType;
+
+/*
+ * map partition file writer, it will create index for each partition
+ */
+public final class MapPartitionFileWriter extends FileWriter {
+  private static final Logger logger = 
LoggerFactory.getLogger(MapPartitionFileWriter.class);
+
+  private int numReducePartitions;
+  private int currentDataRegionIndex;
+  private boolean isBroadcastRegion;
+  private long[] numReducePartitionBytes;
+  private ByteBuffer indexBuffer;
+  private int currentReducePartition;
+  private long totalBytes;
+  private long regionStartingOffset;
+  private long numDataRegions;
+  private FileChannel channelIndex;
+  private FSDataOutputStream streamIndex;
+  private CompositeByteBuf flushBufferIndex;
+
+  public MapPartitionFileWriter(
+      FileInfo fileInfo,
+      Flusher flusher,
+      AbstractSource workerSource,
+      CelebornConf conf,
+      DeviceMonitor deviceMonitor,
+      long splitThreshold,
+      PartitionSplitMode splitMode,
+      boolean rangeReadFilter)
+      throws IOException {
+    super(
+        fileInfo,
+        flusher,
+        workerSource,
+        conf,
+        deviceMonitor,
+        splitThreshold,
+        splitMode,
+        PartitionType.MAP,
+        rangeReadFilter);
+    if (!fileInfo.isHdfs()) {
+      channelIndex = new 
FileOutputStream(fileInfo.getIndexPath()).getChannel();
+    } else {
+      streamIndex = 
StorageManager.hdfsFs().create(fileInfo.getHdfsIndexPath(), true);
+    }
+  }
+
+  @Override
+  public long close() throws IOException {
+    return 0;
+  }
+
+  public void pushDataHandShake(int numReducePartitions) {
+    this.numReducePartitions = numReducePartitions;
+  }
+
+  public void regionStart(int currentDataRegionIndex, boolean 
isBroadcastRegion) {
+    this.currentDataRegionIndex = currentDataRegionIndex;
+    this.isBroadcastRegion = isBroadcastRegion;
+  }
+
+  public void regionFinish() throws IOException {}
+}
diff --git 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/ReducePartitionFileWriter.java
 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/ReducePartitionFileWriter.java
new file mode 100644
index 00000000..843841e5
--- /dev/null
+++ 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/ReducePartitionFileWriter.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.service.deploy.worker.storage;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.celeborn.common.CelebornConf;
+import org.apache.celeborn.common.meta.FileInfo;
+import org.apache.celeborn.common.metrics.source.AbstractSource;
+import org.apache.celeborn.common.protocol.PartitionSplitMode;
+import org.apache.celeborn.common.protocol.PartitionType;
+
+/*
+ * reduce partition file writer, it will create chunkindex
+ */
+public final class ReducePartitionFileWriter extends FileWriter {
+  private static final Logger logger = 
LoggerFactory.getLogger(ReducePartitionFileWriter.class);
+
+  private long nextBoundary;
+  private final long shuffleChunkSize;
+
+  public ReducePartitionFileWriter(
+      FileInfo fileInfo,
+      Flusher flusher,
+      AbstractSource workerSource,
+      CelebornConf conf,
+      DeviceMonitor deviceMonitor,
+      long splitThreshold,
+      PartitionSplitMode splitMode,
+      boolean rangeReadFilter)
+      throws IOException {
+    super(
+        fileInfo,
+        flusher,
+        workerSource,
+        conf,
+        deviceMonitor,
+        splitThreshold,
+        splitMode,
+        PartitionType.REDUCE,
+        rangeReadFilter);
+    this.shuffleChunkSize = conf.shuffleChunkSize();
+    this.nextBoundary = this.shuffleChunkSize;
+  }
+
+  protected void flush(boolean finalFlush) throws IOException {
+    super.flush(finalFlush);
+    maybeSetChunkOffsets(finalFlush);
+  }
+
+  private void maybeSetChunkOffsets(boolean forceSet) {
+    if (bytesFlushed >= nextBoundary || forceSet) {
+      fileInfo.addChunkOffset(bytesFlushed);
+      nextBoundary = bytesFlushed + shuffleChunkSize;
+    }
+  }
+
+  private boolean isChunkOffsetValid() {
+    // Consider a scenario where some bytes have been flushed
+    // but the chunk offset boundary has not yet been updated.
+    // we should check if the chunk offset boundary equals
+    // bytesFlush or not. For example:
+    // The last record is a giant record and it has been flushed
+    // but its size is smaller than the nextBoundary, then the
+    // chunk offset will not be set after flushing. we should
+    // set it during FileWriter close.
+    return fileInfo.getLastChunkOffset() == bytesFlushed;
+  }
+
+  public synchronized long close() throws IOException {
+    return super.close(
+        () -> {
+          if (!isChunkOffsetValid()) {
+            maybeSetChunkOffsets(true);
+          }
+        },
+        () -> {
+          if 
(StorageManager.hdfsFs().exists(fileInfo.getHdfsPeerWriterSuccessPath())) {
+            StorageManager.hdfsFs().delete(fileInfo.getHdfsPath(), false);
+            deleted = true;
+          } else {
+            
StorageManager.hdfsFs().create(fileInfo.getHdfsWriterSuccessPath()).close();
+            FSDataOutputStream indexOutputStream =
+                StorageManager.hdfsFs().create(fileInfo.getHdfsIndexPath());
+            indexOutputStream.writeInt(fileInfo.getChunkOffsets().size());
+            for (Long offset : fileInfo.getChunkOffsets()) {
+              indexOutputStream.writeLong(offset);
+            }
+            indexOutputStream.close();
+          }
+        },
+        () -> {});
+  }
+}
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
index aba8809b..40cfcb13 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
@@ -37,7 +37,7 @@ import 
org.apache.celeborn.common.protocol.{PartitionLocation, PartitionSplitMod
 import org.apache.celeborn.common.protocol.message.StatusCode
 import org.apache.celeborn.common.unsafe.Platform
 import org.apache.celeborn.common.util.PackedPartitionId
-import org.apache.celeborn.service.deploy.worker.storage.{FileWriter, 
LocalFlusher, StorageManager}
+import org.apache.celeborn.service.deploy.worker.storage.{FileWriter, 
LocalFlusher, MapPartitionFileWriter, StorageManager}
 
 class PushDataHandler extends BaseMessageHandler with Logging {
 
@@ -617,15 +617,16 @@ class PushDataHandler extends BaseMessageHandler with 
Logging {
     try {
       messageType match {
         case Type.PUSH_DATA_HAND_SHAKE => {
-          
fileWriter.pushDataHandShake(message.asInstanceOf[PushDataHandShake].numPartitions)
+          fileWriter.asInstanceOf[MapPartitionFileWriter].pushDataHandShake(
+            message.asInstanceOf[PushDataHandShake].numPartitions)
         }
         case Type.REGION_START => {
-          fileWriter.regionStart(
+          fileWriter.asInstanceOf[MapPartitionFileWriter].regionStart(
             message.asInstanceOf[RegionStart].currentRegionIndex,
             message.asInstanceOf[RegionStart].isBroadcast)
         }
         case Type.REGION_FINISH => {
-          fileWriter.regionFinish()
+          fileWriter.asInstanceOf[MapPartitionFileWriter].regionFinish()
         }
       }
       // for master, send data to slave
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala
index 980bb781..745c486f 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala
@@ -74,6 +74,7 @@ object WorkerSource {
 
   // flush
   val TakeBufferTime = "TakeBufferTime"
+  val TakeBufferTimeIndex = "TakeBufferTimeIndex"
 
   val RegisteredShuffleCount = "RegisteredShuffleCount"
 
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
index e6adfc3f..205d4c3c 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
@@ -284,16 +284,27 @@ final private[worker] class StorageManager(conf: 
CelebornConf, workerSource: Abs
         FileSystem.mkdirs(StorageManager.hdfsFs, shuffleDir, hdfsPermission)
         val fileInfo =
           new FileInfo(new Path(shuffleDir, fileName).toString, 
userIdentifier, partitionType)
-        val hdfsWriter = new FileWriter(
-          fileInfo,
-          hdfsFlusher.get,
-          workerSource,
-          conf,
-          deviceMonitor,
-          splitThreshold,
-          splitMode,
-          partitionType,
-          rangeReadFilter)
+        val hdfsWriter = partitionType match {
+          case PartitionType.MAP => new MapPartitionFileWriter(
+              fileInfo,
+              hdfsFlusher.get,
+              workerSource,
+              conf,
+              deviceMonitor,
+              splitThreshold,
+              splitMode,
+              rangeReadFilter)
+          case PartitionType.REDUCE => new ReducePartitionFileWriter(
+              fileInfo,
+              hdfsFlusher.get,
+              workerSource,
+              conf,
+              deviceMonitor,
+              splitThreshold,
+              splitMode,
+              rangeReadFilter)
+        }
+
         fileInfos.computeIfAbsent(shuffleKey, newMapFunc).put(fileName, 
fileInfo)
         hdfsWriters.synchronized {
           hdfsWriters.add(hdfsWriter)
@@ -318,16 +329,27 @@ final private[worker] class StorageManager(conf: 
CelebornConf, workerSource: Abs
             }
           }
           val fileInfo = new FileInfo(file.getAbsolutePath, userIdentifier, 
partitionType)
-          val fileWriter = new FileWriter(
-            fileInfo,
-            localFlushers.get(mountPoint),
-            workerSource,
-            conf,
-            deviceMonitor,
-            splitThreshold,
-            splitMode,
-            partitionType,
-            rangeReadFilter)
+
+          val fileWriter = partitionType match {
+            case PartitionType.MAP => new MapPartitionFileWriter(
+                fileInfo,
+                localFlushers.get(mountPoint),
+                workerSource,
+                conf,
+                deviceMonitor,
+                splitThreshold,
+                splitMode,
+                rangeReadFilter)
+            case PartitionType.REDUCE => new ReducePartitionFileWriter(
+                fileInfo,
+                localFlushers.get(mountPoint),
+                workerSource,
+                conf,
+                deviceMonitor,
+                splitThreshold,
+                splitMode,
+                rangeReadFilter)
+          }
           deviceMonitor.registerFileWriter(fileWriter)
           val list = workingDirWriters.computeIfAbsent(dir, 
workingDirWriterListFunc)
           list.synchronized {
diff --git 
a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/FileWriterSuiteJ.java
 
b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/FileWriterSuiteJ.java
index f20ea47f..6ef563c2 100644
--- 
a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/FileWriterSuiteJ.java
+++ 
b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/FileWriterSuiteJ.java
@@ -238,7 +238,7 @@ public class FileWriterSuiteJ {
     final int threadsNum = 8;
     File file = getTemporaryFile();
     FileWriter fileWriter =
-        new FileWriter(
+        new ReducePartitionFileWriter(
             new FileInfo(file, userIdentifier),
             localFlusher,
             source,
@@ -246,7 +246,6 @@ public class FileWriterSuiteJ {
             DeviceMonitor$.MODULE$.EmptyMonitor(),
             SPLIT_THRESHOLD,
             splitMode,
-            partitionType,
             false);
 
     List<Future<?>> futures = new ArrayList<>();
@@ -283,7 +282,7 @@ public class FileWriterSuiteJ {
     final int threadsNum = Runtime.getRuntime().availableProcessors();
     File file = getTemporaryFile();
     FileWriter fileWriter =
-        new FileWriter(
+        new ReducePartitionFileWriter(
             new FileInfo(file, userIdentifier),
             localFlusher,
             source,
@@ -291,7 +290,6 @@ public class FileWriterSuiteJ {
             DeviceMonitor$.MODULE$.EmptyMonitor(),
             SPLIT_THRESHOLD,
             splitMode,
-            partitionType,
             false);
 
     List<Future<?>> futures = new ArrayList<>();
@@ -337,7 +335,7 @@ public class FileWriterSuiteJ {
     File file = getTemporaryFile();
     FileInfo fileInfo = new FileInfo(file, userIdentifier);
     FileWriter fileWriter =
-        new FileWriter(
+        new ReducePartitionFileWriter(
             fileInfo,
             localFlusher,
             source,
@@ -345,7 +343,6 @@ public class FileWriterSuiteJ {
             DeviceMonitor$.MODULE$.EmptyMonitor(),
             SPLIT_THRESHOLD,
             splitMode,
-            partitionType,
             false);
 
     List<Future<?>> futures = new ArrayList<>();

Reply via email to