This is an automated email from the ASF dual-hosted git repository.

ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 65cb36c0 [CELEBORN-83][FOLLOWUP] Fix various bugs when using HDFS as 
storage. (#1065)
65cb36c0 is described below

commit 65cb36c0023d3100d15533aa51c1aa142b8ede2c
Author: Ethan Feng <[email protected]>
AuthorDate: Thu Dec 15 15:20:29 2022 +0800

    [CELEBORN-83][FOLLOWUP] Fix various bugs when using HDFS as storage. (#1065)
---
 README.md                                          |  1 +
 .../org/apache/celeborn/client/ShuffleClient.java  |  8 +++
 .../celeborn/client/read/DfsPartitionReader.java   | 75 +++++++++++++++++-----
 codecov.yml                                        |  8 ++-
 .../org/apache/celeborn/common/meta/FileInfo.java  | 25 ++++++--
 conf/log4j2.xml.template                           |  4 ++
 .../service/deploy/worker/storage/FileWriter.java  | 43 +++++++------
 .../worker/storage/MapPartitionFileWriter.java     |  2 +-
 .../worker/storage/PartitionFilesSorter.java       | 21 +++---
 .../worker/storage/ReducePartitionFileWriter.java  |  8 +--
 .../service/deploy/worker/PushDataHandler.scala    | 16 +++--
 .../service/deploy/worker/storage/FlushTask.scala  |  8 ++-
 .../service/deploy/worker/storage/Flusher.scala    |  2 +-
 .../deploy/worker/storage/StorageManager.scala     | 34 ++++++----
 14 files changed, 177 insertions(+), 78 deletions(-)

diff --git a/README.md b/README.md
index 5f4d4f28..aefdc583 100644
--- a/README.md
+++ b/README.md
@@ -121,6 +121,7 @@ celeborn.ha.master.node.3.ratis.port 9874
 celeborn.ha.master.ratis.raft.server.storage.dir /mnt/disk1/rss_ratis/
 
 celeborn.metrics.enabled true
+# If you want to use HDFS as shuffle storage, make sure that flush buffer size 
is at least 4MB or larger.
 celeborn.worker.flush.buffer.size 256k
 celeborn.worker.storage.dirs /mnt/disk1/,/mnt/disk2
 # If your hosts have disk raid or use lvm, set 
celeborn.worker.monitor.disk.enabled to false
diff --git a/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java 
b/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java
index f5ea4800..a3d3fa82 100644
--- a/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java
+++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java
@@ -24,6 +24,8 @@ import java.util.function.BooleanSupplier;
 import io.netty.buffer.ByteBuf;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import org.apache.celeborn.client.read.RssInputStream;
 import org.apache.celeborn.common.CelebornConf;
@@ -39,6 +41,7 @@ public abstract class ShuffleClient {
   private static volatile ShuffleClient _instance;
   private static volatile boolean initialized = false;
   private static volatile FileSystem hdfsFs;
+  private static Logger logger = LoggerFactory.getLogger(ShuffleClient.class);
 
   // for testing
   public static void reset() {
@@ -102,6 +105,11 @@ public abstract class ShuffleClient {
       synchronized (ShuffleClient.class) {
         if (null == hdfsFs) {
           Configuration hdfsConfiguration = new Configuration();
+          // enable fs cache to avoid too many fs instances
+          hdfsConfiguration.set("fs.hdfs.impl.disable.cache", "false");
+          logger.info(
+              "Celeborn client will ignore cluster"
+                  + " settings about fs.hdfs.impl.disable.cache and set it to 
false");
           try {
             hdfsFs = FileSystem.get(hdfsConfiguration);
           } catch (IOException e) {
diff --git 
a/client/src/main/java/org/apache/celeborn/client/read/DfsPartitionReader.java 
b/client/src/main/java/org/apache/celeborn/client/read/DfsPartitionReader.java
index 4259e1e0..e9326c32 100644
--- 
a/client/src/main/java/org/apache/celeborn/client/read/DfsPartitionReader.java
+++ 
b/client/src/main/java/org/apache/celeborn/client/read/DfsPartitionReader.java
@@ -23,7 +23,6 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 
 import io.netty.buffer.ByteBuf;
@@ -53,9 +52,10 @@ public class DfsPartitionReader implements PartitionReader {
   private final AtomicReference<IOException> exception = new 
AtomicReference<>();
   private volatile boolean closed = false;
   private Thread fetchThread;
-  private final FSDataInputStream hdfsInputStream;
+  private FSDataInputStream hdfsInputStream;
   private int numChunks = 0;
-  private final AtomicInteger currentChunkIndex = new AtomicInteger(0);
+  private int returnedChunks = 0;
+  private int currentChunkIndex = 0;
 
   public DfsPartitionReader(
       CelebornConf conf,
@@ -98,29 +98,67 @@ public class DfsPartitionReader implements PartitionReader {
           ShuffleClient.getHdfsFs(conf).open(new 
Path(location.getStorageInfo().getFilePath()));
       chunkOffsets.addAll(getChunkOffsetsFromUnsortedIndex(conf, location));
     }
+    logger.debug(
+        "DFS {} index count:{} offsets:{}",
+        location.getStorageInfo().getFilePath(),
+        chunkOffsets.size(),
+        chunkOffsets);
     if (chunkOffsets.size() > 1) {
       numChunks = chunkOffsets.size() - 1;
       fetchThread =
           new Thread(
               () -> {
                 try {
-                  while (!closed && currentChunkIndex.get() < numChunks) {
+                  while (!closed && currentChunkIndex < numChunks) {
                     while (results.size() >= fetchMaxReqsInFlight) {
                       Thread.sleep(50);
                     }
-                    long offset = chunkOffsets.get(currentChunkIndex.get());
-                    long length = chunkOffsets.get(currentChunkIndex.get() + 
1) - offset;
+                    long offset = chunkOffsets.get(currentChunkIndex);
+                    long length = chunkOffsets.get(currentChunkIndex + 1) - 
offset;
+                    logger.debug("read {} offset {} length {}", 
currentChunkIndex, offset, length);
                     byte[] buffer = new byte[(int) length];
-                    hdfsInputStream.readFully(offset, buffer);
-                    results.add(Unpooled.wrappedBuffer(buffer));
-                    currentChunkIndex.incrementAndGet();
+                    try {
+                      hdfsInputStream.readFully(offset, buffer);
+                    } catch (IOException e) {
+                      logger.warn(
+                          "read hdfs {} failed will retry, error detail {}",
+                          location.getStorageInfo().getFilePath(),
+                          e);
+                      try {
+                        hdfsInputStream.close();
+                        hdfsInputStream =
+                            ShuffleClient.getHdfsFs(conf)
+                                .open(
+                                    new Path(
+                                        Utils.getSortedFilePath(
+                                            
location.getStorageInfo().getFilePath())));
+                        hdfsInputStream.readFully(offset, buffer);
+                      } catch (IOException ex) {
+                        logger.warn(
+                            "retry read hdfs {} failed, error detail {} ",
+                            location.getStorageInfo().getFilePath(),
+                            e);
+                        exception.set(ex);
+                        break;
+                      }
+                    }
+                    results.put(Unpooled.wrappedBuffer(buffer));
+                    logger.debug("add index {} to results", 
currentChunkIndex++);
                   }
-                } catch (IOException e) {
-                  exception.set(e);
-                } catch (InterruptedException e) {
+                } catch (Exception e) {
+                  logger.warn("Fetch thread is cancelled.", e);
                   // cancel a task for speculative, ignore this exception
                 }
-              });
+                logger.debug("fetch {} is done.", 
location.getStorageInfo().getFilePath());
+              },
+              "Dfs-fetch-thread" + location.getStorageInfo().getFilePath());
+      fetchThread.setUncaughtExceptionHandler(
+          new Thread.UncaughtExceptionHandler() {
+            @Override
+            public void uncaughtException(Thread t, Throwable e) {
+              logger.error("thread {} failed with exception {}", t, e);
+            }
+          });
       fetchThread.start();
       logger.debug("Start dfs read on location {}", location);
     }
@@ -145,6 +183,7 @@ public class DfsPartitionReader implements PartitionReader {
       throws IOException {
     String indexPath = 
Utils.getIndexFilePath(location.getStorageInfo().getFilePath());
     FSDataInputStream indexInputStream = 
ShuffleClient.getHdfsFs(conf).open(new Path(indexPath));
+    logger.debug("read sorted index {}", indexPath);
     long indexSize = ShuffleClient.getHdfsFs(conf).getFileStatus(new 
Path(indexPath)).getLen();
     // Index size won't be large, so it's safe to do the conversion.
     byte[] indexBuffer = new byte[(int) indexSize];
@@ -162,17 +201,18 @@ public class DfsPartitionReader implements 
PartitionReader {
 
   @Override
   public boolean hasNext() {
-    return currentChunkIndex.get() < numChunks;
+    logger.debug("check has next current index: {} chunks {}", returnedChunks, 
numChunks);
+    return returnedChunks < numChunks;
   }
 
   @Override
   public ByteBuf next() throws IOException {
-    checkException();
     ByteBuf chunk = null;
     try {
       while (chunk == null) {
         checkException();
         chunk = results.poll(500, TimeUnit.MILLISECONDS);
+        logger.debug("poll result with result size: {}", results.size());
       }
     } catch (InterruptedException e) {
       Thread.currentThread().interrupt();
@@ -180,6 +220,7 @@ public class DfsPartitionReader implements PartitionReader {
       exception.set(ioe);
       throw ioe;
     }
+    returnedChunks++;
     return chunk;
   }
 
@@ -193,7 +234,9 @@ public class DfsPartitionReader implements PartitionReader {
   @Override
   public void close() {
     closed = true;
-    fetchThread.interrupt();
+    if (fetchThread != null) {
+      fetchThread.interrupt();
+    }
     try {
       hdfsInputStream.close();
     } catch (IOException e) {
diff --git a/codecov.yml b/codecov.yml
index 483a35d3..d3fb4dec 100644
--- a/codecov.yml
+++ b/codecov.yml
@@ -21,6 +21,8 @@ coverage:
   status:
     project:
       default:
-        target: 0%
-        threshold: 0%
-        base: auto
\ No newline at end of file
+        target: 10%
+    patch:
+      default:
+        enabled: no
+        if_not_found: success
\ No newline at end of file
diff --git a/common/src/main/java/org/apache/celeborn/common/meta/FileInfo.java 
b/common/src/main/java/org/apache/celeborn/common/meta/FileInfo.java
index 35ddebfa..50d3d8bb 100644
--- a/common/src/main/java/org/apache/celeborn/common/meta/FileInfo.java
+++ b/common/src/main/java/org/apache/celeborn/common/meta/FileInfo.java
@@ -18,7 +18,6 @@
 package org.apache.celeborn.common.meta;
 
 import java.io.File;
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
@@ -27,12 +26,15 @@ import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import org.apache.celeborn.common.identity.UserIdentifier;
 import org.apache.celeborn.common.protocol.PartitionType;
 import org.apache.celeborn.common.util.Utils;
 
 public class FileInfo {
+  private static Logger logger = LoggerFactory.getLogger(FileInfo.class);
   private final String filePath;
   private final List<Long> chunkOffsets;
   private final UserIdentifier userIdentifier;
@@ -127,12 +129,23 @@ public class FileInfo {
     return userIdentifier;
   }
 
-  public void deleteAllFiles(FileSystem hdfsFs) throws IOException {
+  public void deleteAllFiles(FileSystem hdfsFs) {
     if (isHdfs()) {
-      hdfsFs.delete(getHdfsPath(), false);
-      hdfsFs.delete(getHdfsWriterSuccessPath(), false);
-      hdfsFs.delete(getHdfsIndexPath(), false);
-      hdfsFs.delete(getHdfsSortedPath(), false);
+      try {
+        hdfsFs.delete(getHdfsPath(), false);
+        hdfsFs.delete(getHdfsWriterSuccessPath(), false);
+        hdfsFs.delete(getHdfsIndexPath(), false);
+        hdfsFs.delete(getHdfsSortedPath(), false);
+      } catch (Exception e) {
+        // ignore delete exceptions because some other workers might be 
deleting the directory
+        logger.debug(
+            "delete hdfs file {},{},{},{} failed {}",
+            getHdfsPath(),
+            getHdfsWriterSuccessPath(),
+            getHdfsIndexPath(),
+            getHdfsSortedPath(),
+            e);
+      }
     } else {
       getFile().delete();
       new File(getIndexPath()).delete();
diff --git a/conf/log4j2.xml.template b/conf/log4j2.xml.template
index 5a93485d..24173a2b 100644
--- a/conf/log4j2.xml.template
+++ b/conf/log4j2.xml.template
@@ -33,9 +33,13 @@
             <PatternLayout pattern="%d{yy/MM/dd HH:mm:ss,SSS} %p [%t] %c{1}: 
%m%n%ex"/>
         </Console>
     </Appenders>
+
     <Loggers>
         <Root level="INFO">
             <AppenderRef ref="stdout"/>
         </Root>
+        <Logger name="org.apache.hadoop.hdfs" level="WARN" additivity="false">
+            <Appender-ref ref="stdout" level="WARN" />
+        </Logger>
     </Loggers>
 </Configuration>
diff --git 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/FileWriter.java
 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/FileWriter.java
index 5914765c..a10eef9a 100644
--- 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/FileWriter.java
+++ 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/FileWriter.java
@@ -27,7 +27,6 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.CompositeByteBuf;
-import org.apache.hadoop.fs.FSDataOutputStream;
 import org.roaringbitmap.RoaringBitmap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -53,10 +52,9 @@ public abstract class FileWriter implements DeviceObserver {
   private static final long WAIT_INTERVAL_MS = 20;
 
   protected final FileInfo fileInfo;
-  protected FileChannel channel;
-  protected FSDataOutputStream stream;
-  protected volatile boolean closed;
-  protected volatile boolean destroyed;
+  private FileChannel channel;
+  private volatile boolean closed;
+  private volatile boolean destroyed;
 
   protected final AtomicInteger numPendingWrites = new AtomicInteger();
   protected long bytesFlushed;
@@ -105,7 +103,20 @@ public abstract class FileWriter implements DeviceObserver 
{
     if (!fileInfo.isHdfs()) {
       channel = new FileOutputStream(fileInfo.getFilePath()).getChannel();
     } else {
-      stream = StorageManager.hdfsFs().create(fileInfo.getHdfsPath(), true);
+      // We open the stream and close immediately because HDFS output stream 
will
+      // create a DataStreamer that is a threaed.
+      // If we reuse HDFS output stream, we will exhaust the memory soon.
+      try {
+        StorageManager.hadoopFs().create(fileInfo.getHdfsPath(), true).close();
+      } catch (IOException e) {
+        try {
+          // If create file failed, wait 10 ms and retry
+          Thread.sleep(10);
+        } catch (InterruptedException ex) {
+          throw new RuntimeException(ex);
+        }
+        StorageManager.hadoopFs().create(fileInfo.getHdfsPath(), true).close();
+      }
     }
     source = workerSource;
     logger.debug("FileWriter {} split threshold {} mode {}", this, 
splitThreshold, splitMode);
@@ -138,8 +149,8 @@ public abstract class FileWriter implements DeviceObserver {
     FlushTask task = null;
     if (channel != null) {
       task = new LocalFlushTask(flushBuffer, channel, notifier);
-    } else if (stream != null) {
-      task = new HdfsFlushTask(flushBuffer, stream, notifier);
+    } else if (fileInfo.isHdfs()) {
+      task = new HdfsFlushTask(flushBuffer, fileInfo.getHdfsPath(), notifier);
     }
     addTask(task);
     flushBuffer = null;
@@ -248,8 +259,7 @@ public abstract class FileWriter implements DeviceObserver {
         if (channel != null) {
           channel.close();
         }
-        if (stream != null) {
-          stream.close();
+        if (fileInfo.isHdfs()) {
           streamClose.run();
         }
       } catch (IOException e) {
@@ -278,9 +288,6 @@ public abstract class FileWriter implements DeviceObserver {
         if (channel != null) {
           channel.close();
         }
-        if (stream != null) {
-          stream.close();
-        }
       } catch (IOException e) {
         logger.warn(
             "Close channel failed for file {} caused by {}.",
@@ -291,14 +298,12 @@ public abstract class FileWriter implements 
DeviceObserver {
 
     if (!destroyed) {
       destroyed = true;
-      try {
-        fileInfo.deleteAllFiles(StorageManager.hdfsFs());
-      } catch (Exception e) {
-        logger.warn("Exception when cleaning hdfs file {}", 
fileInfo.getFilePath());
-      }
+      fileInfo.deleteAllFiles(StorageManager.hadoopFs());
 
       // unregister from DeviceMonitor
-      deviceMonitor.unregisterFileWriter(this);
+      if (!fileInfo.isHdfs()) {
+        deviceMonitor.unregisterFileWriter(this);
+      }
       destroyHook.run();
     }
   }
diff --git 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionFileWriter.java
 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionFileWriter.java
index 7d70f4be..84bf8503 100644
--- 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionFileWriter.java
+++ 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionFileWriter.java
@@ -75,7 +75,7 @@ public final class MapPartitionFileWriter extends FileWriter {
     if (!fileInfo.isHdfs()) {
       channelIndex = new 
FileOutputStream(fileInfo.getIndexPath()).getChannel();
     } else {
-      streamIndex = 
StorageManager.hdfsFs().create(fileInfo.getHdfsIndexPath(), true);
+      streamIndex = 
StorageManager.hadoopFs().create(fileInfo.getHdfsIndexPath(), true);
     }
   }
 
diff --git 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java
 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java
index 7af454e2..6d3501d2 100644
--- 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java
+++ 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java
@@ -342,7 +342,7 @@ public class PartitionFilesSorter extends 
ShuffleRecoverHelper {
     if (isHdfs) {
       // If the index file exists, it will be overwritten.
       // So there is no need to check its existence.
-      hdfsIndexOutput = StorageManager.hdfsFs().create(new 
Path(indexFilePath));
+      hdfsIndexOutput = StorageManager.hadoopFs().create(new 
Path(indexFilePath));
     } else {
       indexFileChannel = new FileOutputStream(indexFilePath).getChannel();
     }
@@ -455,8 +455,9 @@ public class PartitionFilesSorter extends 
ShuffleRecoverHelper {
       int indexSize = 0;
       try {
         if (isHdfs) {
-          hdfsIndexStream = StorageManager.hdfsFs().open(new 
Path(indexFilePath));
-          indexSize = (int) StorageManager.hdfsFs().getFileStatus(new 
Path(indexFilePath)).getLen();
+          hdfsIndexStream = StorageManager.hadoopFs().open(new 
Path(indexFilePath));
+          indexSize =
+              (int) StorageManager.hadoopFs().getFileStatus(new 
Path(indexFilePath)).getLen();
         } else {
           indexStream = new FileInputStream(indexFilePath);
           File indexFile = new File(indexFilePath);
@@ -520,11 +521,11 @@ public class PartitionFilesSorter extends 
ShuffleRecoverHelper {
           indexFile.delete();
         }
       } else {
-        if (StorageManager.hdfsFs().exists(fileInfo.getHdfsSortedPath())) {
-          StorageManager.hdfsFs().delete(fileInfo.getHdfsSortedPath(), false);
+        if (StorageManager.hadoopFs().exists(fileInfo.getHdfsSortedPath())) {
+          StorageManager.hadoopFs().delete(fileInfo.getHdfsSortedPath(), 
false);
         }
-        if (StorageManager.hdfsFs().exists(fileInfo.getHdfsIndexPath())) {
-          StorageManager.hdfsFs().delete(fileInfo.getHdfsIndexPath(), false);
+        if (StorageManager.hadoopFs().exists(fileInfo.getHdfsIndexPath())) {
+          StorageManager.hadoopFs().delete(fileInfo.getHdfsIndexPath(), false);
         }
       }
     }
@@ -612,8 +613,8 @@ public class PartitionFilesSorter extends 
ShuffleRecoverHelper {
 
     private void initializeFiles() throws IOException {
       if (isHdfs) {
-        hdfsOriginInput = StorageManager.hdfsFs().open(new 
Path(originFilePath));
-        hdfsSortedOutput = StorageManager.hdfsFs().create(new 
Path(sortedFilePath));
+        hdfsOriginInput = StorageManager.hadoopFs().open(new 
Path(originFilePath));
+        hdfsSortedOutput = StorageManager.hadoopFs().create(new 
Path(sortedFilePath));
       } else {
         originFileChannel = new FileInputStream(originFilePath).getChannel();
         sortedFileChannel = new FileOutputStream(sortedFilePath).getChannel();
@@ -646,7 +647,7 @@ public class PartitionFilesSorter extends 
ShuffleRecoverHelper {
     private void deleteOriginFiles() throws IOException {
       boolean deleteSuccess = false;
       if (isHdfs) {
-        deleteSuccess = StorageManager.hdfsFs().delete(new 
Path(originFilePath), false);
+        deleteSuccess = StorageManager.hadoopFs().delete(new 
Path(originFilePath), false);
       } else {
         deleteSuccess = new File(originFilePath).delete();
       }
diff --git 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/ReducePartitionFileWriter.java
 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/ReducePartitionFileWriter.java
index 843841e5..e214df18 100644
--- 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/ReducePartitionFileWriter.java
+++ 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/ReducePartitionFileWriter.java
@@ -94,13 +94,13 @@ public final class ReducePartitionFileWriter extends 
FileWriter {
           }
         },
         () -> {
-          if 
(StorageManager.hdfsFs().exists(fileInfo.getHdfsPeerWriterSuccessPath())) {
-            StorageManager.hdfsFs().delete(fileInfo.getHdfsPath(), false);
+          if 
(StorageManager.hadoopFs().exists(fileInfo.getHdfsPeerWriterSuccessPath())) {
+            StorageManager.hadoopFs().delete(fileInfo.getHdfsPath(), false);
             deleted = true;
           } else {
-            
StorageManager.hdfsFs().create(fileInfo.getHdfsWriterSuccessPath()).close();
+            
StorageManager.hadoopFs().create(fileInfo.getHdfsWriterSuccessPath()).close();
             FSDataOutputStream indexOutputStream =
-                StorageManager.hdfsFs().create(fileInfo.getHdfsIndexPath());
+                StorageManager.hadoopFs().create(fileInfo.getHdfsIndexPath());
             indexOutputStream.writeInt(fileInfo.getChunkOffsets().size());
             for (Long offset : fileInfo.getChunkOffsets()) {
               indexOutputStream.writeLong(offset);
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
index b59eafd2..933f3b6b 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
@@ -37,7 +37,7 @@ import 
org.apache.celeborn.common.protocol.{PartitionLocation, PartitionSplitMod
 import org.apache.celeborn.common.protocol.message.StatusCode
 import org.apache.celeborn.common.unsafe.Platform
 import org.apache.celeborn.common.util.PackedPartitionId
-import org.apache.celeborn.service.deploy.worker.storage.{FileWriter, 
LocalFlusher, MapPartitionFileWriter, StorageManager}
+import org.apache.celeborn.service.deploy.worker.storage.{FileWriter, 
HdfsFlusher, LocalFlusher, MapPartitionFileWriter, StorageManager}
 
 class PushDataHandler extends BaseMessageHandler with Logging {
 
@@ -224,9 +224,14 @@ class PushDataHandler extends BaseMessageHandler with 
Logging {
       callback.onFailure(new Exception(message, exception))
       return
     }
-    val diskFull = workerInfo.diskInfos
-      .get(fileWriter.flusher.asInstanceOf[LocalFlusher].mountPoint)
-      .actualUsableSpace < diskReserveSize
+    val diskFull =
+      if (fileWriter.flusher.isInstanceOf[LocalFlusher]) {
+        workerInfo.diskInfos
+          .get(fileWriter.flusher.asInstanceOf[LocalFlusher].mountPoint)
+          .actualUsableSpace < diskReserveSize
+      } else {
+        false
+      }
     if ((diskFull && fileWriter.getFileInfo.getFileLength > 
partitionSplitMinimumSize) ||
       (isMaster && fileWriter.getFileInfo.getFileLength > 
fileWriter.getSplitThreshold())) {
       if (fileWriter.getSplitMode == PartitionSplitMode.SOFT) {
@@ -859,6 +864,9 @@ class PushDataHandler extends BaseMessageHandler with 
Logging {
   }
 
   private def checkDiskFull(fileWriter: FileWriter): Boolean = {
+    if (fileWriter.flusher.isInstanceOf[HdfsFlusher]) {
+      return false
+    }
     val diskFull = workerInfo.diskInfos
       .get(fileWriter.flusher.asInstanceOf[LocalFlusher].mountPoint)
       .actualUsableSpace < diskReserveSize
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/FlushTask.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/FlushTask.scala
index 1d9b69e8..340e9e9d 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/FlushTask.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/FlushTask.scala
@@ -20,7 +20,7 @@ package org.apache.celeborn.service.deploy.worker.storage
 import java.nio.channels.FileChannel
 
 import io.netty.buffer.{ByteBufUtil, CompositeByteBuf}
-import org.apache.hadoop.fs.FSDataOutputStream
+import org.apache.hadoop.fs.{FSDataOutputStream, Path}
 
 abstract private[worker] class FlushTask(
     val buffer: CompositeByteBuf,
@@ -44,9 +44,11 @@ private[worker] class LocalFlushTask(
 
 private[worker] class HdfsFlushTask(
     buffer: CompositeByteBuf,
-    fsStream: FSDataOutputStream,
+    val path: Path,
     notifier: FlushNotifier) extends FlushTask(buffer, notifier) {
   override def flush(): Unit = {
-    fsStream.write(ByteBufUtil.getBytes(buffer))
+    val hdfsStream = StorageManager.hadoopFs.append(path)
+    hdfsStream.write(ByteBufUtil.getBytes(buffer))
+    hdfsStream.close()
   }
 }
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/Flusher.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/Flusher.scala
index 8a662154..b180674d 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/Flusher.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/Flusher.scala
@@ -223,11 +223,11 @@ final private[worker] class HdfsFlusher(
     hdfsFlusherThreads,
     flushAvgTimeWindowSize,
     avgFlushTimeSlidingWindowMinCount) with Logging {
-
   override def toString: String = s"HdfsFlusher@$flusherId"
 
   override def processIOException(e: IOException, deviceErrorType: 
DiskStatus): Unit = {
     stopAndCleanFlusher()
     logError(s"$this write failed, reason $deviceErrorType ,exception: $e")
   }
+
 }
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
index 205d4c3c..54d2138a 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
@@ -45,7 +45,7 @@ import 
org.apache.celeborn.common.protocol.{PartitionLocation, PartitionSplitMod
 import org.apache.celeborn.common.quota.ResourceConsumption
 import org.apache.celeborn.common.util.{PbSerDeUtils, ThreadUtils, Utils}
 import org.apache.celeborn.service.deploy.worker._
-import org.apache.celeborn.service.deploy.worker.storage.StorageManager.hdfsFs
+import 
org.apache.celeborn.service.deploy.worker.storage.StorageManager.hadoopFs
 
 final private[worker] class StorageManager(conf: CelebornConf, workerSource: 
AbstractSource)
   extends ShuffleRecoverHelper with DeviceObserver with Logging with 
MemoryPressureListener {
@@ -129,7 +129,10 @@ final private[worker] class StorageManager(conf: 
CelebornConf, workerSource: Abs
       val hdfsConfiguration = new Configuration
       hdfsConfiguration.set("fs.defaultFS", hdfsDir)
       hdfsConfiguration.set("dfs.replication", "2")
-      StorageManager.hdfsFs = FileSystem.get(hdfsConfiguration)
+      hdfsConfiguration.set("fs.hdfs.impl.disable.cache", "false")
+      logInfo("Celeborn will ignore cluster settings" +
+        " about fs.hdfs.impl.disable.cache and set it to false")
+      StorageManager.hadoopFs = FileSystem.get(hdfsConfiguration)
       Some(new HdfsFlusher(
         workerSource,
         conf.hdfsFlusherThreads,
@@ -270,7 +273,7 @@ final private[worker] class StorageManager(conf: 
CelebornConf, workerSource: Abs
         if (diskInfo != null && diskInfo.status.equals(DiskStatus.HEALTHY)) {
           diskInfo.dirs
         } else {
-          logWarning(s"Disk unavailable for $suggestedMountPoint, return all 
healthy" +
+          logDebug(s"Disk unavailable for $suggestedMountPoint, return all 
healthy" +
             s" working dirs. diskInfo $diskInfo")
           healthyWorkingDirs()
         }
@@ -281,7 +284,7 @@ final private[worker] class StorageManager(conf: 
CelebornConf, workerSource: Abs
       if (dirs.isEmpty) {
         val shuffleDir =
           new Path(new Path(hdfsDir, conf.workerWorkingDir), 
s"$appId/$shuffleId")
-        FileSystem.mkdirs(StorageManager.hdfsFs, shuffleDir, hdfsPermission)
+        FileSystem.mkdirs(StorageManager.hadoopFs, shuffleDir, hdfsPermission)
         val fileInfo =
           new FileInfo(new Path(shuffleDir, fileName).toString, 
userIdentifier, partitionType)
         val hdfsWriter = partitionType match {
@@ -420,7 +423,16 @@ final private[worker] class StorageManager(conf: 
CelebornConf, workerSource: Abs
           deleteDirectory(file, diskOperators.get(diskInfo.mountPoint))
         }
       }
-      hdfsInfos.foreach(item => item._2.deleteAllFiles(StorageManager.hdfsFs))
+      hdfsInfos.foreach(item => 
item._2.deleteAllFiles(StorageManager.hadoopFs))
+      if (!hdfsInfos.isEmpty) {
+        try {
+          StorageManager.hadoopFs.delete(
+            new Path(new Path(hdfsDir, conf.workerWorkingDir), 
s"$appId/$shuffleId"),
+            true)
+        } catch {
+          case e: Exception => logWarning("Clean expired hdfs shuffle 
failed.", e)
+        }
+      }
     }
   }
 
@@ -465,14 +477,14 @@ final private[worker] class StorageManager(conf: 
CelebornConf, workerSource: Abs
       }
     }
 
-    if (hdfsFs != null) {
+    if (hadoopFs != null) {
       val hdfsWorkPath = new Path(hdfsDir, conf.workerWorkingDir)
-      if (hdfsFs.exists(hdfsWorkPath)) {
-        val iter = hdfsFs.listFiles(hdfsWorkPath, false)
+      if (hadoopFs.exists(hdfsWorkPath)) {
+        val iter = hadoopFs.listFiles(hdfsWorkPath, false)
         while (iter.hasNext) {
           val fileStatus = iter.next()
           if (fileStatus.getModificationTime < expireTime) {
-            hdfsFs.delete(fileStatus.getPath, true)
+            hadoopFs.delete(fileStatus.getPath, true)
           }
         }
       }
@@ -528,7 +540,7 @@ final private[worker] class StorageManager(conf: 
CelebornConf, workerSource: Abs
           }
         }
 
-      val hdfsCleaned = hdfsFs match {
+      val hdfsCleaned = hadoopFs match {
         case hdfs: FileSystem =>
           val hdfsWorkPath = new Path(hdfsDir, conf.workerWorkingDir)
           // hdfs path not exist when first time initialize
@@ -670,5 +682,5 @@ final private[worker] class StorageManager(conf: 
CelebornConf, workerSource: Abs
 }
 
 object StorageManager {
-  var hdfsFs: FileSystem = _
+  var hadoopFs: FileSystem = _
 }

Reply via email to