This is an automated email from the ASF dual-hosted git repository.

ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new ae4022235 [CELEBORN-2047] Support MapPartitionData on DFS
ae4022235 is described below

commit ae40222351cbeb1a9bdd398d461255a0739f3cac
Author: SteNicholas <programg...@163.com>
AuthorDate: Sat Jul 26 22:11:32 2025 +0800

    [CELEBORN-2047] Support MapPartitionData on DFS
    
    ### What changes were proposed in this pull request?
    
    Support `MapPartitionData` on DFS.
    
    ### Why are the changes needed?
    
    `MapPartitionData` only supports on local, which does not support on DFS. 
It's recommended to support `MapPartitionData` on DFS for MapPartition mode to 
align the ability of ReducePartition mode.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    `WordCountTestWithHDFS`.
    
    Closes #3349 from SteNicholas/CELEBORN-2047.
    
    Authored-by: SteNicholas <programg...@163.com>
    Signed-off-by: mingji <fengmingxiao....@alibaba-inc.com>
---
 .../apache/celeborn/common/meta/DiskFileInfo.java  |   4 +
 .../org/apache/celeborn/common/meta/FileInfo.java  |   4 +
 .../org/apache/celeborn/common/util/Utils.scala    |   8 +-
 project/CelebornBuild.scala                        |   8 +-
 tests/flink-it/pom.xml                             |  31 ++++++
 .../celeborn/tests/flink/WordCountTest.scala       |  45 ++++++++-
 .../worker/storage/DfsPartitionDataReader.java     | 108 +++++++++++++++++++++
 .../worker/storage/LocalPartitionDataReader.java   | 101 +++++++++++++++++++
 .../deploy/worker/storage/MapPartitionData.java    |  15 +--
 .../worker/storage/MapPartitionDataReader.java     |  92 +++++-------------
 .../deploy/worker/storage/PartitionDataReader.java |  85 ++++++++++++++++
 .../worker/storage/PartitionMetaHandler.scala      |  29 +-----
 .../deploy/worker/storage/StorageManager.scala     |   4 +-
 .../service/deploy/worker/storage/TierWriter.scala |  52 +++++-----
 .../service/deploy/MiniClusterFeature.scala        |   6 +-
 15 files changed, 452 insertions(+), 140 deletions(-)

diff --git 
a/common/src/main/java/org/apache/celeborn/common/meta/DiskFileInfo.java 
b/common/src/main/java/org/apache/celeborn/common/meta/DiskFileInfo.java
index c7161483b..ab798eca3 100644
--- a/common/src/main/java/org/apache/celeborn/common/meta/DiskFileInfo.java
+++ b/common/src/main/java/org/apache/celeborn/common/meta/DiskFileInfo.java
@@ -164,4 +164,8 @@ public class DiskFileInfo extends FileInfo {
   public boolean isDFS() {
     return Utils.isS3Path(filePath) || Utils.isOssPath(filePath) || 
Utils.isHdfsPath(filePath);
   }
+
+  public StorageInfo.Type getStorageType() {
+    return storageType;
+  }
 }
diff --git a/common/src/main/java/org/apache/celeborn/common/meta/FileInfo.java 
b/common/src/main/java/org/apache/celeborn/common/meta/FileInfo.java
index e81b72936..e8511f1bf 100644
--- a/common/src/main/java/org/apache/celeborn/common/meta/FileInfo.java
+++ b/common/src/main/java/org/apache/celeborn/common/meta/FileInfo.java
@@ -108,4 +108,8 @@ public abstract class FileInfo {
   }
 
   public abstract String getFilePath();
+
+  public boolean isReduceFileMeta() {
+    return isReduceFileMeta;
+  }
 }
diff --git a/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala 
b/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
index 3fd090e77..aa719cff1 100644
--- a/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
@@ -40,6 +40,7 @@ import com.google.protobuf.{ByteString, GeneratedMessageV3}
 import io.netty.channel.unix.Errors.NativeIoException
 import org.apache.commons.lang3.SystemUtils
 import org.apache.commons.lang3.time.FastDateFormat
+import org.apache.hadoop.fs.FSDataInputStream
 import org.roaringbitmap.RoaringBitmap
 
 import org.apache.celeborn.common.CelebornConf
@@ -1175,12 +1176,11 @@ object Utils extends Logging {
   }
 
   @throws[IOException]
-  def checkFileIntegrity(fileChannel: FileChannel, length: Int): Unit = {
-    val remainingBytes = fileChannel.size - fileChannel.position
+  def checkFileIntegrity(remainingBytes: Long, length: Int, filePath: String): 
Unit = {
     if (remainingBytes < length) {
       logError(
-        s"File remaining bytes not not enough, remaining: ${remainingBytes}, 
wanted: ${length}.")
-      throw new RuntimeException(s"File is corrupted ${fileChannel}")
+        s"File remaining bytes not not enough, remaining: $remainingBytes, 
wanted: $length.")
+      throw new RuntimeException(s"File is corrupted $filePath")
     }
   }
 
diff --git a/project/CelebornBuild.scala b/project/CelebornBuild.scala
index cdce5973b..760aaf8fe 100644
--- a/project/CelebornBuild.scala
+++ b/project/CelebornBuild.scala
@@ -268,6 +268,8 @@ object Dependencies {
     ExclusionRule("org.apache.httpcomponents", "httpclient"),
     ExclusionRule("org.slf4j", "slf4j-log4j12")
   )
+  val hadoopAuth = "org.apache.hadoop" % "hadoop-auth" % hadoopVersion
+  val hadoopHdfs = "org.apache.hadoop" % "hadoop-hdfs" % hadoopVersion
 
   val picocli = "info.picocli" % "picocli" % picocliVersion
 
@@ -1284,7 +1286,11 @@ trait FlinkClientProjects {
           "org.apache.flink" % "flink-runtime" % flinkVersion % "test",
           flinkStreamingDependency,
           flinkClientsDependency,
-          flinkRuntimeWebDependency
+          flinkRuntimeWebDependency,
+          Dependencies.hadoopCommon % "test",
+          Dependencies.hadoopAuth % "test",
+          Dependencies.hadoopHdfs % "test->test;compile->compile",
+          Dependencies.jerseyServer % "test",
         ) ++ commonUnitTestDependencies,
         (Test / envVars) += ("FLINK_VERSION", flinkVersion)
       )
diff --git a/tests/flink-it/pom.xml b/tests/flink-it/pom.xml
index 8664fbe94..b4ef70ea5 100644
--- a/tests/flink-it/pom.xml
+++ b/tests/flink-it/pom.xml
@@ -91,5 +91,36 @@
       <version>${flink.version}</version>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+      <version>${hadoop.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+      <version>${hadoop.version}</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-hdfs</artifactId>
+      <version>${hadoop.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-hdfs</artifactId>
+      <version>${hadoop.version}</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-core</artifactId>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 </project>
diff --git 
a/tests/flink-it/src/test/scala/org/apache/celeborn/tests/flink/WordCountTest.scala
 
b/tests/flink-it/src/test/scala/org/apache/celeborn/tests/flink/WordCountTest.scala
index cc75a73f4..fa8860efc 100644
--- 
a/tests/flink-it/src/test/scala/org/apache/celeborn/tests/flink/WordCountTest.scala
+++ 
b/tests/flink-it/src/test/scala/org/apache/celeborn/tests/flink/WordCountTest.scala
@@ -18,6 +18,7 @@
 package org.apache.celeborn.tests.flink
 
 import java.io.File
+import java.nio.file.Files
 
 import scala.collection.JavaConverters._
 
@@ -26,13 +27,17 @@ import org.apache.flink.configuration.{Configuration, 
ExecutionOptions}
 import org.apache.flink.runtime.jobgraph.JobType
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
 import org.apache.flink.streaming.api.graph.GlobalStreamExchangeMode
+import org.apache.flink.util.OperatingSystem
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.hdfs.MiniDFSCluster
 import org.scalatest.BeforeAndAfterAll
 import org.scalatest.funsuite.AnyFunSuite
 
 import org.apache.celeborn.common.CelebornConf
-import org.apache.celeborn.common.CelebornConf.{AUTH_ENABLED, 
INTERNAL_PORT_ENABLED}
+import org.apache.celeborn.common.CelebornConf.{ACTIVE_STORAGE_TYPES, 
AUTH_ENABLED, HDFS_DIR, INTERNAL_PORT_ENABLED, 
WORKER_STORAGE_CREATE_FILE_POLICY}
 import org.apache.celeborn.common.internal.Logging
 import org.apache.celeborn.common.protocol.FallbackPolicy
+import org.apache.celeborn.rest.v1.model.PartitionLocationData.StorageEnum
 import org.apache.celeborn.service.deploy.MiniClusterFeature
 import org.apache.celeborn.service.deploy.worker.Worker
 
@@ -132,3 +137,41 @@ class WordCountTestWithAuthentication extends 
WordCountTestBase {
   override protected def getWorkerConf: Map[String, String] = authConfig
   override protected def getClientConf: Map[String, String] = 
Map(AUTH_ENABLED.key -> "true")
 }
+
+class WordCountTestWithHDFS extends WordCountTestBase {
+
+  private var basePath: Path = _
+  private var hdfsCluster: MiniDFSCluster = _
+
+  override protected def getMasterConf: Map[String, String] = Map()
+  override protected def getWorkerConf: Map[String, String] = hdfsConfig
+  override protected def getClientConf: Map[String, String] = hdfsConfig
+
+  override def createWorker(map: Map[String, String]): Worker = {
+    super.createWorker(map, null)
+  }
+
+  override def beforeAll(): Unit = {
+    assume(!OperatingSystem.isWindows)
+    val hdConf = new org.apache.hadoop.conf.Configuration()
+    val tmpDir = Files.createTempDirectory("celeborn-")
+    tmpDir.toFile.deleteOnExit()
+    hdConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, tmpDir.toString)
+    hdfsCluster = new MiniDFSCluster.Builder(hdConf).build
+    basePath = new Path(hdfsCluster.getFileSystem.getUri.toString + "/test")
+    super.beforeAll()
+  }
+
+  override def afterAll(): Unit = {
+    super.afterAll()
+    if (hdfsCluster != null) {
+      hdfsCluster.getFileSystem.delete(basePath, true)
+      hdfsCluster.shutdown()
+    }
+  }
+
+  private def hdfsConfig = Map(
+    ACTIVE_STORAGE_TYPES.key -> StorageEnum.HDFS.getValue,
+    WORKER_STORAGE_CREATE_FILE_POLICY.key -> StorageEnum.HDFS.getValue,
+    HDFS_DIR.key -> basePath.toString)
+}
diff --git 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/DfsPartitionDataReader.java
 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/DfsPartitionDataReader.java
new file mode 100644
index 000000000..c461e8366
--- /dev/null
+++ 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/DfsPartitionDataReader.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.service.deploy.worker.storage;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import io.netty.buffer.ByteBuf;
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+
+import org.apache.celeborn.common.meta.DiskFileInfo;
+import org.apache.celeborn.common.protocol.StorageInfo;
+import org.apache.celeborn.common.util.Utils;
+
+public class DfsPartitionDataReader extends PartitionDataReader {
+
+  private final FSDataInputStream dataInputStream;
+  private final FSDataInputStream indexInputStream;
+
+  public DfsPartitionDataReader(
+      DiskFileInfo fileInfo, ByteBuffer headerBuffer, ByteBuffer indexBuffer) 
throws IOException {
+    super(fileInfo, headerBuffer, indexBuffer);
+    FileSystem fileSystem =
+        StorageManager.hadoopFs()
+            .get(
+                fileInfo.isHdfs()
+                    ? StorageInfo.Type.HDFS
+                    : fileInfo.isS3() ? StorageInfo.Type.S3 : 
StorageInfo.Type.OSS);
+    this.dataInputStream = fileSystem.open(fileInfo.getDfsPath());
+    this.indexInputStream = fileSystem.open(fileInfo.getDfsIndexPath());
+    this.dataFileSize = 
fileSystem.getFileStatus(fileInfo.getDfsPath()).getLen();
+    this.indexFileSize = 
fileSystem.getFileStatus(fileInfo.getDfsIndexPath()).getLen();
+  }
+
+  @Override
+  public void readIndexBuffer(long targetPosition) throws IOException {
+    indexInputStream.seek(targetPosition);
+    readHeaderOrIndexBuffer(
+        indexInputStream,
+        indexBuffer,
+        indexFileSize,
+        indexBuffer.capacity(),
+        fileInfo.getIndexPath());
+  }
+
+  @Override
+  public void position(long targetPosition) throws IOException {
+    dataInputStream.seek(targetPosition);
+  }
+
+  @Override
+  public void readHeaderBuffer(int headerSize) throws IOException {
+    readHeaderOrIndexBuffer(
+        dataInputStream, headerBuffer, dataFileSize, headerSize, 
fileInfo.getFilePath());
+  }
+
+  @Override
+  public void readBufferIntoReadBuffer(ByteBuf buf, long fileSize, int length, 
String filePath)
+      throws IOException {
+    Utils.checkFileIntegrity(fileSize - dataInputStream.getPos(), length, 
filePath);
+    ByteBuffer tmpBuffer = ByteBuffer.allocate(length);
+    while (tmpBuffer.hasRemaining()) {
+      dataInputStream.read(tmpBuffer);
+    }
+    tmpBuffer.flip();
+    buf.writeBytes(tmpBuffer);
+  }
+
+  @Override
+  public long position() throws IOException {
+    return dataInputStream.getPos();
+  }
+
+  @Override
+  public void close() {
+    IOUtils.closeQuietly(dataInputStream);
+    IOUtils.closeQuietly(indexInputStream);
+  }
+
+  private void readHeaderOrIndexBuffer(
+      FSDataInputStream inputStream, ByteBuffer buffer, long fileSize, int 
length, String filePath)
+      throws IOException {
+    Utils.checkFileIntegrity(fileSize - inputStream.getPos(), length, 
filePath);
+    buffer.clear();
+    buffer.limit(length);
+    while (buffer.hasRemaining()) {
+      inputStream.read(buffer);
+    }
+    buffer.flip();
+  }
+}
diff --git 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/LocalPartitionDataReader.java
 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/LocalPartitionDataReader.java
new file mode 100644
index 000000000..c5c47aedf
--- /dev/null
+++ 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/LocalPartitionDataReader.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.service.deploy.worker.storage;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+
+import io.netty.buffer.ByteBuf;
+import org.apache.commons.io.IOUtils;
+
+import org.apache.celeborn.common.meta.DiskFileInfo;
+import org.apache.celeborn.common.util.FileChannelUtils;
+import org.apache.celeborn.common.util.Utils;
+
+public class LocalPartitionDataReader extends PartitionDataReader {
+
+  private final FileChannel dataFileChanel;
+  private final FileChannel indexFileChannel;
+
+  public LocalPartitionDataReader(
+      DiskFileInfo fileInfo, ByteBuffer headerBuffer, ByteBuffer indexBuffer) 
throws IOException {
+    super(fileInfo, headerBuffer, indexBuffer);
+    this.dataFileChanel = 
FileChannelUtils.openReadableFileChannel(fileInfo.getFilePath());
+    this.indexFileChannel = 
FileChannelUtils.openReadableFileChannel(fileInfo.getIndexPath());
+    this.dataFileSize = dataFileChanel.size();
+    this.indexFileSize = indexFileChannel.size();
+  }
+
+  @Override
+  public void readIndexBuffer(long targetPosition) throws IOException {
+    indexFileChannel.position(targetPosition);
+    readHeaderOrIndexBuffer(
+        indexFileChannel,
+        indexBuffer,
+        indexFileSize,
+        indexBuffer.capacity(),
+        fileInfo.getIndexPath());
+  }
+
+  @Override
+  public void position(long targetPosition) throws IOException {
+    dataFileChanel.position(targetPosition);
+  }
+
+  @Override
+  public void readHeaderBuffer(int headerSize) throws IOException {
+    readHeaderOrIndexBuffer(
+        dataFileChanel, headerBuffer, dataFileSize, headerSize, 
fileInfo.getFilePath());
+  }
+
+  @Override
+  public void readBufferIntoReadBuffer(ByteBuf buf, long fileSize, int length, 
String filePath)
+      throws IOException {
+    Utils.checkFileIntegrity(fileSize - dataFileChanel.position(), length, 
filePath);
+    ByteBuffer tmpBuffer = ByteBuffer.allocate(length);
+    while (tmpBuffer.hasRemaining()) {
+      dataFileChanel.read(tmpBuffer);
+    }
+    tmpBuffer.flip();
+    buf.writeBytes(tmpBuffer);
+  }
+
+  @Override
+  public long position() throws IOException {
+    return dataFileChanel.position();
+  }
+
+  @Override
+  public void close() {
+    IOUtils.closeQuietly(dataFileChanel);
+    IOUtils.closeQuietly(indexFileChannel);
+  }
+
+  private void readHeaderOrIndexBuffer(
+      FileChannel channel, ByteBuffer buffer, long fileSize, int length, 
String filePath)
+      throws IOException {
+    Utils.checkFileIntegrity(fileSize - channel.position(), length, filePath);
+    buffer.clear();
+    buffer.limit(length);
+    while (buffer.hasRemaining()) {
+      channel.read(buffer);
+    }
+    buffer.flip();
+  }
+}
diff --git 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionData.java
 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionData.java
index d88370edf..66d3be67b 100644
--- 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionData.java
+++ 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionData.java
@@ -18,7 +18,6 @@
 package org.apache.celeborn.service.deploy.worker.storage;
 
 import java.io.IOException;
-import java.nio.channels.FileChannel;
 import java.util.List;
 import java.util.PriorityQueue;
 import java.util.concurrent.ConcurrentHashMap;
@@ -28,13 +27,11 @@ import java.util.function.Consumer;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.Channel;
-import org.apache.commons.io.IOUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.celeborn.common.meta.DiskFileInfo;
 import org.apache.celeborn.common.meta.MapFileMeta;
-import org.apache.celeborn.common.util.FileChannelUtils;
 import org.apache.celeborn.common.util.JavaUtils;
 import org.apache.celeborn.common.util.ThreadUtils;
 import org.apache.celeborn.service.deploy.worker.memory.BufferQueue;
@@ -49,9 +46,6 @@ public class MapPartitionData implements 
MemoryManager.ReadBufferTargetChangeLis
   protected final ExecutorService readExecutor;
   protected final ConcurrentHashMap<Long, MapPartitionDataReader> readers =
       JavaUtils.newConcurrentHashMap();
-  private FileChannel dataFileChanel;
-  private FileChannel indexChannel;
-  private long indexSize;
   private volatile boolean isReleased = false;
   private final BufferQueue bufferQueue = new BufferQueue();
   private AtomicBoolean bufferQueueInitialized = new AtomicBoolean(false);
@@ -95,9 +89,6 @@ public class MapPartitionData implements 
MemoryManager.ReadBufferTargetChangeLis
                     threadsPerMountPoint,
                     String.format("worker-map-partition-%s-reader", 
mapFileMeta.getMountPoint()),
                     false));
-    this.dataFileChanel = 
FileChannelUtils.openReadableFileChannel(diskFileInfo.getFilePath());
-    this.indexChannel = 
FileChannelUtils.openReadableFileChannel(diskFileInfo.getIndexPath());
-    this.indexSize = indexChannel.size();
 
     MemoryManager.instance().addReadBufferTargetChangeListener(this);
   }
@@ -174,7 +165,7 @@ public class MapPartitionData implements 
MemoryManager.ReadBufferTargetChangeLis
   }
 
   protected void openReader(MapPartitionDataReader reader) throws IOException {
-    reader.open(dataFileChanel, indexChannel, indexSize);
+    reader.open();
   }
 
   public synchronized void readBuffers() {
@@ -252,8 +243,8 @@ public class MapPartitionData implements 
MemoryManager.ReadBufferTargetChangeLis
     bufferQueue.release();
     isReleased = true;
 
-    IOUtils.closeQuietly(dataFileChanel);
-    IOUtils.closeQuietly(indexChannel);
+    readers.values().forEach(MapPartitionDataReader::close);
+    readers.clear();
 
     MemoryManager.instance().removeReadBufferTargetChangeListener(this);
   }
diff --git 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionDataReader.java
 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionDataReader.java
index 8f8a2c353..a0a5ce506 100644
--- 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionDataReader.java
+++ 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionDataReader.java
@@ -20,7 +20,6 @@ package org.apache.celeborn.service.deploy.worker.storage;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.channels.ClosedChannelException;
-import java.nio.channels.FileChannel;
 import java.util.ArrayDeque;
 import java.util.Queue;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -95,13 +94,6 @@ public class MapPartitionDataReader implements 
Comparable<MapPartitionDataReader
   @GuardedBy("lock")
   protected boolean errorNotified;
 
-  private FileChannel dataFileChannel;
-
-  // The size of the data file, it is initialized in the open method and 
remains unchanged
-  // afterward.
-  private long dataFileChannelSize;
-  private FileChannel indexFileChannel;
-
   private Channel associatedChannel;
 
   private Runnable recycleStream;
@@ -109,6 +101,8 @@ public class MapPartitionDataReader implements 
Comparable<MapPartitionDataReader
   protected AtomicInteger numInUseBuffers = new AtomicInteger(0);
   private boolean isOpen = false;
 
+  private PartitionDataReader partitionDataReader;
+
   public MapPartitionDataReader(
       int startPartitionIndex,
       int endPartitionIndex,
@@ -132,15 +126,16 @@ public class MapPartitionDataReader implements 
Comparable<MapPartitionDataReader
     this.readFinished = false;
   }
 
-  public void open(FileChannel dataFileChannel, FileChannel indexFileChannel, 
long indexSize)
-      throws IOException {
+  public void open() throws IOException {
     if (!isOpen) {
-      this.dataFileChannel = dataFileChannel;
-      this.dataFileChannelSize = dataFileChannel.size();
-      this.indexFileChannel = indexFileChannel;
+      this.partitionDataReader =
+          fileInfo.isDFS()
+              ? new DfsPartitionDataReader(fileInfo, headerBuffer, indexBuffer)
+              : new LocalPartitionDataReader(fileInfo, headerBuffer, 
indexBuffer);
       // index is (offset,length)
       long indexRegionSize = mapFileMeta.getNumSubpartitions() * (long) 
INDEX_ENTRY_SIZE;
-      this.numRegions = Utils.checkedDownCast(indexSize / indexRegionSize);
+      this.numRegions =
+          Utils.checkedDownCast(partitionDataReader.getIndexFileSize() / 
indexRegionSize);
 
       updateConsumingOffset();
       isOpen = true;
@@ -285,44 +280,6 @@ public class MapPartitionDataReader implements 
Comparable<MapPartitionDataReader
     return mapFileMeta.getNumSubpartitions() * (long) INDEX_ENTRY_SIZE;
   }
 
-  protected void readHeaderOrIndexBuffer(FileChannel channel, ByteBuffer 
buffer, int length)
-      throws IOException {
-    Utils.checkFileIntegrity(channel, length);
-    buffer.clear();
-    buffer.limit(length);
-    while (buffer.hasRemaining()) {
-      channel.read(buffer);
-    }
-    buffer.flip();
-  }
-
-  protected void readBufferIntoReadBuffer(FileChannel channel, ByteBuf buf, 
int length)
-      throws IOException {
-    Utils.checkFileIntegrity(channel, length);
-    ByteBuffer tmpBuffer = ByteBuffer.allocate(length);
-    while (tmpBuffer.hasRemaining()) {
-      channel.read(tmpBuffer);
-    }
-    tmpBuffer.flip();
-    buf.writeBytes(tmpBuffer);
-  }
-
-  protected int readBuffer(
-      String filename, FileChannel channel, ByteBuffer header, ByteBuf buffer, 
int headerSize)
-      throws IOException {
-    readHeaderOrIndexBuffer(channel, header, headerSize);
-    // header is combined of mapId(4),attemptId(4),nextBatchId(4) and total 
Compressed Length(4)
-    // we need size here,so we read length directly
-    int bufferLength = header.getInt(12);
-    if (bufferLength <= 0 || bufferLength > buffer.capacity()) {
-      logger.error("Incorrect buffer header, buffer length: {}.", 
bufferLength);
-      throw new FileCorruptedException("File " + filename + " is corrupted");
-    }
-    buffer.writeBytes(header);
-    readBufferIntoReadBuffer(channel, buffer, bufferLength);
-    return bufferLength + headerSize;
-  }
-
   protected void updateConsumingOffset() throws IOException {
     while (currentPartitionRemainingBytes == 0
         && (currentDataRegion < numRegions - 1 || numRemainingPartitions > 0)) 
{
@@ -331,10 +288,10 @@ public class MapPartitionDataReader implements 
Comparable<MapPartitionDataReader
         numRemainingPartitions = endPartitionIndex - startPartitionIndex + 1;
 
         // read the target index entry to the target index buffer
-        indexFileChannel.position(
+        long targetPosition =
             currentDataRegion * getIndexRegionSize()
-                + (long) startPartitionIndex * INDEX_ENTRY_SIZE);
-        readHeaderOrIndexBuffer(indexFileChannel, indexBuffer, 
indexBuffer.capacity());
+                + (long) startPartitionIndex * INDEX_ENTRY_SIZE;
+        partitionDataReader.readIndexBuffer(targetPosition);
       }
 
       // get the data file offset and the data size
@@ -345,13 +302,14 @@ public class MapPartitionDataReader implements 
Comparable<MapPartitionDataReader
       logger.debug(
           "readBuffer updateConsumingOffset, {},  {}, {}, {}",
           streamId,
-          dataFileChannelSize,
+          partitionDataReader.getDataFileSize(),
           dataConsumingOffset,
           currentPartitionRemainingBytes);
 
       // if these checks fail, the partition file must be corrupted
       if (dataConsumingOffset < 0
-          || dataConsumingOffset + currentPartitionRemainingBytes > 
dataFileChannelSize
+          || dataConsumingOffset + currentPartitionRemainingBytes
+              > partitionDataReader.getDataFileSize()
           || currentPartitionRemainingBytes < 0) {
         throw new FileCorruptedException("File " + fileInfo.getFilePath() + " 
is corrupted");
       }
@@ -360,17 +318,9 @@ public class MapPartitionDataReader implements 
Comparable<MapPartitionDataReader
 
   private synchronized boolean readBuffer(ByteBuf buffer) throws IOException {
     try {
-      dataFileChannel.position(dataConsumingOffset);
-
-      int readSize =
-          readBuffer(
-              fileInfo.getFilePath(),
-              dataFileChannel,
-              headerBuffer,
-              buffer,
-              headerBuffer.capacity());
+      int readSize = partitionDataReader.readBuffer(buffer, 
dataConsumingOffset);
       currentPartitionRemainingBytes -= readSize;
-      dataConsumingOffset = dataFileChannel.position();
+      dataConsumingOffset = partitionDataReader.position();
 
       logger.debug(
           "readBuffer data: {}, {}, {}, {}, {}, {}",
@@ -388,7 +338,7 @@ public class MapPartitionDataReader implements 
Comparable<MapPartitionDataReader
         logger.debug(
             "readBuffer end, {},  {}, {}, {}",
             streamId,
-            dataFileChannelSize,
+            partitionDataReader.getDataFileSize(),
             dataConsumingOffset,
             currentPartitionRemainingBytes);
         int prevDataRegion = currentDataRegion;
@@ -399,7 +349,7 @@ public class MapPartitionDataReader implements 
Comparable<MapPartitionDataReader
       logger.debug(
           "readBuffer run: {}, {}, {}, {}",
           streamId,
-          dataFileChannelSize,
+          partitionDataReader.getDataFileSize(),
           dataConsumingOffset,
           currentPartitionRemainingBytes);
       return true;
@@ -557,4 +507,8 @@ public class MapPartitionDataReader implements 
Comparable<MapPartitionDataReader
       return !isReleased && !readFinished;
     }
   }
+
+  public void close() {
+    partitionDataReader.close();
+  }
 }
diff --git 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataReader.java
 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataReader.java
new file mode 100644
index 000000000..48b08d741
--- /dev/null
+++ 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataReader.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.service.deploy.worker.storage;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import io.netty.buffer.ByteBuf;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.celeborn.common.exception.FileCorruptedException;
+import org.apache.celeborn.common.meta.DiskFileInfo;
+
+public abstract class PartitionDataReader {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(PartitionDataReader.class);
+
+  protected final DiskFileInfo fileInfo;
+  protected final ByteBuffer headerBuffer;
+  protected final ByteBuffer indexBuffer;
+
+  protected long dataFileSize;
+  protected long indexFileSize;
+
+  public PartitionDataReader(
+      DiskFileInfo fileInfo, ByteBuffer headerBuffer, ByteBuffer indexBuffer) {
+    this.fileInfo = fileInfo;
+    this.headerBuffer = headerBuffer;
+    this.indexBuffer = indexBuffer;
+  }
+
+  public abstract void readIndexBuffer(long targetPosition) throws IOException;
+
+  public abstract void position(long targetPosition) throws IOException;
+
+  public abstract void readHeaderBuffer(int headSize) throws IOException;
+
+  public abstract void readBufferIntoReadBuffer(
+      ByteBuf buf, long fileSize, int length, String filePath) throws 
IOException;
+
+  public abstract long position() throws IOException;
+
+  public abstract void close();
+
+  public int readBuffer(ByteBuf buffer, long dataConsumingOffset) throws 
IOException {
+    position(dataConsumingOffset);
+    int headerSize = headerBuffer.capacity();
+    readHeaderBuffer(headerSize);
+    // header is combined of mapId(4),attemptId(4),nextBatchId(4) and total 
Compressed Length(4)
+    // we need size here,so we read length directly
+    int bufferLength = headerBuffer.getInt(12);
+    if (bufferLength <= 0 || bufferLength > buffer.capacity()) {
+      LOG.error("Incorrect buffer header, buffer length: {}.", bufferLength);
+      throw new FileCorruptedException(
+          String.format("File %s is corrupted", fileInfo.getFilePath()));
+    }
+    buffer.writeBytes(headerBuffer);
+    readBufferIntoReadBuffer(buffer, dataFileSize, bufferLength, 
fileInfo.getFilePath());
+    return bufferLength + headerSize;
+  }
+
+  public long getDataFileSize() {
+    return dataFileSize;
+  }
+
+  public long getIndexFileSize() {
+    return indexFileSize;
+  }
+}
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/PartitionMetaHandler.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/PartitionMetaHandler.scala
index 0eeaa5e70..902d720ee 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/PartitionMetaHandler.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/PartitionMetaHandler.scala
@@ -29,8 +29,8 @@ import org.apache.hadoop.fs.FileSystem
 import org.roaringbitmap.RoaringBitmap
 import org.slf4j.{Logger, LoggerFactory}
 
-import org.apache.celeborn.common.meta.{DiskFileInfo, FileInfo, MapFileMeta, 
MemoryFileInfo, ReduceFileMeta}
-import org.apache.celeborn.common.protocol.{PbPushDataHandShake, 
PbRegionFinish, PbRegionStart, PbSegmentStart}
+import org.apache.celeborn.common.meta.{DiskFileInfo, FileInfo, MapFileMeta, 
ReduceFileMeta}
+import org.apache.celeborn.common.protocol.{PbPushDataHandShake, 
PbRegionFinish, PbRegionStart, PbSegmentStart, StorageInfo}
 import org.apache.celeborn.common.unsafe.Platform
 import org.apache.celeborn.common.util.FileChannelUtils
 
@@ -94,7 +94,7 @@ trait PartitionMetaHandler {
 class MapPartitionMetaHandler(
     diskFileInfo: DiskFileInfo,
     notifier: FlushNotifier) extends PartitionMetaHandler {
-  lazy val hadoopFs: FileSystem = StorageManager.hadoopFs.get()
+  lazy val hadoopFs: FileSystem = 
StorageManager.hadoopFs.get(diskFileInfo.getStorageType)
   val logger: Logger = 
LoggerFactory.getLogger(classOf[MapPartitionMetaHandler])
   val fileMeta: MapFileMeta = 
diskFileInfo.getFileMeta.asInstanceOf[MapFileMeta]
   var numSubpartitions = 0
@@ -286,28 +286,7 @@ class MapPartitionMetaHandler(
   }
 
   override def afterClose(): Unit = {
-    // TODO: force flush the index file channel in scenarios which the 
upstream task writes and
-    // downstream task reads simultaneously, such as flink hybrid shuffle
-    if (indexBuffer != null) {
-      logger.debug(s"flushIndex start:${diskFileInfo.getIndexPath}")
-      val startTime = System.currentTimeMillis
-      indexBuffer.flip
-      notifier.checkException()
-      try {
-        if (indexBuffer.hasRemaining) {
-          // map partition synchronously writes file index
-          if (indexChannel != null) while (indexBuffer.hasRemaining) 
indexChannel.write(indexBuffer)
-          else if (diskFileInfo.isDFS) {
-            val dfsStream = hadoopFs.append(diskFileInfo.getDfsIndexPath)
-            dfsStream.write(indexBuffer.array)
-            dfsStream.close()
-          }
-        }
-        indexBuffer.clear
-      } finally logger.debug(
-        s"flushIndex end:${diskFileInfo.getIndexPath}, " +
-          s"cost:${System.currentTimeMillis - startTime}")
-    }
+    flushIndex()
   }
 
   override def beforeWrite(bytes: ByteBuf): Unit = {
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
index 3070e4b94..b71110947 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
@@ -43,7 +43,7 @@ import 
org.apache.celeborn.common.metrics.source.{AbstractSource, ThreadPoolSour
 import org.apache.celeborn.common.network.util.{NettyUtils, TransportConf}
 import org.apache.celeborn.common.protocol.{PartitionLocation, 
PartitionSplitMode, PartitionType, StorageInfo}
 import org.apache.celeborn.common.quota.ResourceConsumption
-import org.apache.celeborn.common.util.{CelebornExitKind, CelebornHadoopUtils, 
DiskUtils, JavaUtils, PbSerDeUtils, ThreadUtils, Utils}
+import org.apache.celeborn.common.util.{CelebornExitKind, CelebornHadoopUtils, 
CollectionUtils, DiskUtils, JavaUtils, PbSerDeUtils, ThreadUtils, Utils}
 import org.apache.celeborn.service.deploy.worker._
 import org.apache.celeborn.service.deploy.worker.memory.MemoryManager
 import 
org.apache.celeborn.service.deploy.worker.memory.MemoryManager.MemoryPressureListener
@@ -819,7 +819,7 @@ final private[worker] class StorageManager(conf: 
CelebornConf, workerSource: Abs
         }
       }
     }
-    if (null != diskOperators) {
+    if (CollectionUtils.isNotEmpty(diskOperators)) {
       if (exitKind != CelebornExitKind.WORKER_GRACEFUL_SHUTDOWN) {
         cleanupExpiredShuffleKey(shuffleKeySet(), false)
       }
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala
index 7c3f1b966..fa0934efb 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala
@@ -494,7 +494,7 @@ class DfsTierWriter(
     notifier: FlushNotifier,
     flusher: Flusher,
     source: AbstractSource,
-    hdfsFileInfo: DiskFileInfo,
+    dfsFileInfo: DiskFileInfo,
     storageType: StorageInfo.Type,
     partitionDataWriterContext: PartitionDataWriterContext,
     storageManager: StorageManager)
@@ -503,7 +503,7 @@ class DfsTierWriter(
     metaHandler,
     numPendingWrites,
     notifier,
-    hdfsFileInfo,
+    dfsFileInfo,
     source,
     storageType,
     partitionDataWriterContext.getPartitionLocation.getFileName,
@@ -520,21 +520,21 @@ class DfsTierWriter(
   var partNumber: Int = 1
 
   this.flusherBufferSize =
-    if (hdfsFileInfo.isS3()) {
+    if (dfsFileInfo.isS3()) {
       conf.workerS3FlusherBufferSize
-    } else if (hdfsFileInfo.isOSS()) {
+    } else if (dfsFileInfo.isOSS()) {
       conf.workerOssFlusherBufferSize
     } else {
       conf.workerHdfsFlusherBufferSize
     }
 
   try {
-    hadoopFs.create(hdfsFileInfo.getDfsPath, true).close()
-    if (hdfsFileInfo.isS3) {
+    hadoopFs.create(dfsFileInfo.getDfsPath, true).close()
+    if (dfsFileInfo.isS3) {
       val uri = hadoopFs.getUri
       val bucketName = uri.getHost
-      val index = hdfsFileInfo.getFilePath.indexOf(bucketName)
-      val key = hdfsFileInfo.getFilePath.substring(index + bucketName.length + 
1)
+      val index = dfsFileInfo.getFilePath.indexOf(bucketName)
+      val key = dfsFileInfo.getFilePath.substring(index + bucketName.length + 
1)
 
       this.s3MultipartUploadHandler = 
TierWriterHelper.getS3MultipartUploadHandler(
         hadoopFs,
@@ -544,7 +544,7 @@ class DfsTierWriter(
         conf.s3MultiplePartUploadBaseDelay,
         conf.s3MultiplePartUploadMaxBackoff)
       s3MultipartUploadHandler.startUpload()
-    } else if (hdfsFileInfo.isOSS) {
+    } else if (dfsFileInfo.isOSS) {
       val configuration = hadoopFs.getConf
       val ossEndpoint = configuration.get("fs.oss.endpoint")
       val ossAccessKey = configuration.get("fs.oss.accessKeyId")
@@ -552,8 +552,8 @@ class DfsTierWriter(
 
       val uri = hadoopFs.getUri
       val bucketName = uri.getHost
-      val index = hdfsFileInfo.getFilePath.indexOf(bucketName)
-      val key = hdfsFileInfo.getFilePath.substring(index + bucketName.length + 
1)
+      val index = dfsFileInfo.getFilePath.indexOf(bucketName)
+      val key = dfsFileInfo.getFilePath.substring(index + bucketName.length + 
1)
 
       this.ossMultipartUploadHandler = 
TierWriterHelper.getOssMultipartUploadHandler(
         ossEndpoint,
@@ -572,7 +572,7 @@ class DfsTierWriter(
         case ex: InterruptedException =>
           throw new RuntimeException(ex)
       }
-      hadoopFs.create(hdfsFileInfo.getDfsPath, true).close()
+      hadoopFs.create(dfsFileInfo.getDfsPath, true).close()
   }
 
   storageManager.registerDiskFilePartitionWriter(
@@ -586,9 +586,9 @@ class DfsTierWriter(
 
   override def genFlushTask(finalFlush: Boolean, keepBuffer: Boolean): 
FlushTask = {
     notifier.numPendingFlushes.incrementAndGet()
-    if (hdfsFileInfo.isHdfs) {
-      new HdfsFlushTask(flushBuffer, hdfsFileInfo.getDfsPath(), notifier, 
true, source)
-    } else if (hdfsFileInfo.isOSS) {
+    if (dfsFileInfo.isHdfs) {
+      new HdfsFlushTask(flushBuffer, dfsFileInfo.getDfsPath(), notifier, true, 
source)
+    } else if (dfsFileInfo.isOSS) {
       val flushTask = new OssFlushTask(
         flushBuffer,
         notifier,
@@ -641,17 +641,19 @@ class DfsTierWriter(
   }
 
   override def closeStreams(): Unit = {
-    if (hadoopFs.exists(hdfsFileInfo.getDfsPeerWriterSuccessPath)) {
-      hadoopFs.delete(hdfsFileInfo.getDfsPath, false)
+    if (hadoopFs.exists(dfsFileInfo.getDfsPeerWriterSuccessPath)) {
+      hadoopFs.delete(dfsFileInfo.getDfsPath, false)
       deleted = true
     } else {
-      hadoopFs.create(hdfsFileInfo.getDfsWriterSuccessPath).close()
-      val indexOutputStream = hadoopFs.create(hdfsFileInfo.getDfsIndexPath)
-      
indexOutputStream.writeInt(hdfsFileInfo.getReduceFileMeta.getChunkOffsets.size)
-      for (offset <- hdfsFileInfo.getReduceFileMeta.getChunkOffsets.asScala) {
-        indexOutputStream.writeLong(offset)
+      hadoopFs.create(dfsFileInfo.getDfsWriterSuccessPath).close()
+      if (dfsFileInfo.isReduceFileMeta) {
+        val indexOutputStream = hadoopFs.create(dfsFileInfo.getDfsIndexPath)
+        
indexOutputStream.writeInt(dfsFileInfo.getReduceFileMeta.getChunkOffsets.size)
+        for (offset <- dfsFileInfo.getReduceFileMeta.getChunkOffsets.asScala) {
+          indexOutputStream.writeLong(offset)
+        }
+        indexOutputStream.close()
       }
-      indexOutputStream.close()
     }
     if (s3MultipartUploadHandler != null) {
       s3MultipartUploadHandler.complete()
@@ -664,12 +666,12 @@ class DfsTierWriter(
   }
 
   override def notifyFileCommitted(): Unit =
-    storageManager.notifyFileInfoCommitted(shuffleKey, filename, hdfsFileInfo)
+    storageManager.notifyFileInfoCommitted(shuffleKey, filename, dfsFileInfo)
 
   override def closeResource(): Unit = {}
 
   override def cleanLocalOrDfsFiles(): Unit = {
-    hdfsFileInfo.deleteAllFiles(hadoopFs)
+    dfsFileInfo.deleteAllFiles(hadoopFs)
   }
 
   override def takeBufferInternal(): CompositeByteBuf = {
diff --git 
a/worker/src/test/scala/org/apache/celeborn/service/deploy/MiniClusterFeature.scala
 
b/worker/src/test/scala/org/apache/celeborn/service/deploy/MiniClusterFeature.scala
index 8c3a47a07..95d69fc12 100644
--- 
a/worker/src/test/scala/org/apache/celeborn/service/deploy/MiniClusterFeature.scala
+++ 
b/worker/src/test/scala/org/apache/celeborn/service/deploy/MiniClusterFeature.scala
@@ -25,6 +25,8 @@ import java.util.concurrent.locks.ReentrantLock
 
 import scala.collection.mutable
 
+import org.apache.commons.lang3.StringUtils
+
 import org.apache.celeborn.common.CelebornConf
 import org.apache.celeborn.common.internal.Logging
 import org.apache.celeborn.common.util.{CelebornExitKind, Utils}
@@ -151,7 +153,9 @@ trait MiniClusterFeature extends Logging {
   def createWorker(map: Map[String, String], storageDir: String): Worker = {
     logInfo("start create worker for mini cluster")
     val conf = new CelebornConf()
-    conf.set(CelebornConf.WORKER_STORAGE_DIRS.key, storageDir)
+    if (StringUtils.isNotEmpty(storageDir)) {
+      conf.set(CelebornConf.WORKER_STORAGE_DIRS.key, storageDir)
+    }
     conf.set(CelebornConf.WORKER_DISK_MONITOR_ENABLED.key, "false")
     conf.set(CelebornConf.CLIENT_PUSH_BUFFER_MAX_SIZE.key, "256K")
     conf.set(CelebornConf.WORKER_HTTP_PORT.key, s"${selectRandomPort()}")


Reply via email to