This is an automated email from the ASF dual-hosted git repository.

ethanfeng pushed a commit to branch CELEBORN-83-FOLLOWUP
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git

commit f2703dfa087e4f35ab7d02b04de592b2b886b5ed
Author: mingji <[email protected]>
AuthorDate: Fri Dec 9 16:44:44 2022 +0800

    [CELEBORN-83][FOLLOWUP] Fix various bugs when using HDFS as storage.
---
 README.md                                          |  1 +
 .../org/apache/celeborn/client/ShuffleClient.java  |  1 +
 .../celeborn/client/read/DfsPartitionReader.java   | 16 ++++++++++++---
 .../org/apache/celeborn/common/meta/FileInfo.java  | 15 ++++++++------
 .../org/apache/celeborn/common/CelebornConf.scala  | 10 ++++++++++
 conf/log4j2.xml.template                           |  4 ++++
 .../service/deploy/worker/storage/FileWriter.java  | 23 ++++++++--------------
 .../service/deploy/worker/PushDataHandler.scala    | 16 +++++++++++----
 .../service/deploy/worker/storage/FlushTask.scala  |  8 +++++---
 .../service/deploy/worker/storage/Flusher.scala    | 10 +++++++---
 .../deploy/worker/storage/StorageManager.scala     | 15 ++++++++++++--
 11 files changed, 83 insertions(+), 36 deletions(-)

diff --git a/README.md b/README.md
index 5f4d4f28..aefdc583 100644
--- a/README.md
+++ b/README.md
@@ -121,6 +121,7 @@ celeborn.ha.master.node.3.ratis.port 9874
 celeborn.ha.master.ratis.raft.server.storage.dir /mnt/disk1/rss_ratis/
 
 celeborn.metrics.enabled true
+# If you want to use HDFS as shuffle storage, make sure that flush buffer size 
is at least 4MB or larger.
 celeborn.worker.flush.buffer.size 256k
 celeborn.worker.storage.dirs /mnt/disk1/,/mnt/disk2
 # If your hosts have disk raid or use lvm, set 
celeborn.worker.monitor.disk.enabled to false
diff --git a/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java 
b/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java
index b9fcb9e1..876047a5 100644
--- a/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java
+++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java
@@ -100,6 +100,7 @@ public abstract class ShuffleClient implements Cloneable {
       synchronized (ShuffleClient.class) {
         if (null == hdfsFs) {
           Configuration hdfsConfiguration = new Configuration();
+          hdfsConfiguration.set("fs.hdfs.impl.disable.cache", "false");
           try {
             hdfsFs = FileSystem.get(hdfsConfiguration);
           } catch (IOException e) {
diff --git 
a/client/src/main/java/org/apache/celeborn/client/read/DfsPartitionReader.java 
b/client/src/main/java/org/apache/celeborn/client/read/DfsPartitionReader.java
index 4259e1e0..3ac4a181 100644
--- 
a/client/src/main/java/org/apache/celeborn/client/read/DfsPartitionReader.java
+++ 
b/client/src/main/java/org/apache/celeborn/client/read/DfsPartitionReader.java
@@ -98,6 +98,12 @@ public class DfsPartitionReader implements PartitionReader {
           ShuffleClient.getHdfsFs(conf).open(new 
Path(location.getStorageInfo().getFilePath()));
       chunkOffsets.addAll(getChunkOffsetsFromUnsortedIndex(conf, location));
     }
+    StringBuilder builder = new StringBuilder();
+    for (Long offset : chunkOffsets) {
+      builder.append(offset + " ");
+    }
+    logger.info(
+            "DFS " + location.getStorageInfo().getFilePath() + "index count:" 
+ chunkOffsets.size() + " offsets:" + builder.toString());
     if (chunkOffsets.size() > 1) {
       numChunks = chunkOffsets.size() - 1;
       fetchThread =
@@ -110,17 +116,21 @@ public class DfsPartitionReader implements 
PartitionReader {
                     }
                     long offset = chunkOffsets.get(currentChunkIndex.get());
                     long length = chunkOffsets.get(currentChunkIndex.get() + 
1) - offset;
+                    logger.info("read " + currentChunkIndex.get() + " offset " 
+ offset + " length" + length);
                     byte[] buffer = new byte[(int) length];
                     hdfsInputStream.readFully(offset, buffer);
                     results.add(Unpooled.wrappedBuffer(buffer));
-                    currentChunkIndex.incrementAndGet();
+                    int fetchedIndex = currentChunkIndex.incrementAndGet();
+                    logger.info("fetch chunkIndex" + fetchedIndex);
                   }
                 } catch (IOException e) {
                   exception.set(e);
                 } catch (InterruptedException e) {
+                  logger.warn("Fetch thread is cancelled.");
                   // cancel a task for speculative, ignore this exception
                 }
-              });
+              },
+              "Dfs-fetch-thread" + location.getFileName());
       fetchThread.start();
       logger.debug("Start dfs read on location {}", location);
     }
@@ -162,7 +172,7 @@ public class DfsPartitionReader implements PartitionReader {
 
   @Override
   public boolean hasNext() {
-    return currentChunkIndex.get() < numChunks;
+    return currentChunkIndex.get() < numChunks || !results.isEmpty();
   }
 
   @Override
diff --git a/common/src/main/java/org/apache/celeborn/common/meta/FileInfo.java 
b/common/src/main/java/org/apache/celeborn/common/meta/FileInfo.java
index 6a815e3e..9ec2df3f 100644
--- a/common/src/main/java/org/apache/celeborn/common/meta/FileInfo.java
+++ b/common/src/main/java/org/apache/celeborn/common/meta/FileInfo.java
@@ -18,7 +18,6 @@
 package org.apache.celeborn.common.meta;
 
 import java.io.File;
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
@@ -126,12 +125,16 @@ public class FileInfo {
     return userIdentifier;
   }
 
-  public void deleteAllFiles(FileSystem hdfsFs) throws IOException {
+  public void deleteAllFiles(FileSystem hdfsFs) {
     if (isHdfs()) {
-      hdfsFs.delete(getHdfsPath(), false);
-      hdfsFs.delete(getHdfsWriterSuccessPath(), false);
-      hdfsFs.delete(getHdfsIndexPath(), false);
-      hdfsFs.delete(getHdfsSortedPath(), false);
+      try {
+        hdfsFs.delete(getHdfsPath(), false);
+        hdfsFs.delete(getHdfsWriterSuccessPath(), false);
+        hdfsFs.delete(getHdfsIndexPath(), false);
+        hdfsFs.delete(getHdfsSortedPath(), false);
+      } catch (Exception e) {
+        // ignore delete exceptions because some other workers might be 
deleting the directory
+      }
     } else {
       getFile().delete();
       new File(getIndexPath()).delete();
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala 
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index 64ccfd7c..73b44019 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -719,6 +719,7 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable 
with Logging with Se
   def avgFlushTimeSlidingWindowSize: Int = 
get(WORKER_FLUSHER_AVGFLUSHTIME_SLIDINGWINDOW_SIZE)
   def avgFlushTimeSlidingWindowMinCount: Int =
     get(WORKER_FLUSHER_AVGFLUSHTIME_SLIDINGWINDOW_MINCOUNT)
+  def hdfsStreamCacheSize: Int = get(WORKER_FLUSHER_HDFS_STREAM_CACHE_SIZE)
   def diskReserveSize: Long = get(WORKER_DISK_RESERVE_SIZE)
   def diskMonitorEnabled: Boolean = get(WORKER_DISK_MONITOR_ENABLED)
   def diskMonitorCheckList: Seq[String] = get(WORKER_DISK_MONITOR_CHECKLIST)
@@ -1956,6 +1957,15 @@ object CelebornConf extends Logging {
       .intConf
       .createWithDefault(1000)
 
+  val WORKER_FLUSHER_HDFS_STREAM_CACHE_SIZE: ConfigEntry[Int] =
+    buildConf("celeborn.worker.flusher.hdfsStream.cacheSize")
+      .categories("worker")
+      .doc("Max number of cached HDFS output stream size.")
+      .version("0.2.0")
+      .internal
+      .intConf
+      .createWithDefault(100)
+
   val SLOTS_ASSIGN_LOADAWARE_DISKGROUP_NUM: ConfigEntry[Int] =
     buildConf("celeborn.slots.assign.loadAware.numDiskGroups")
       .withAlternative("rss.disk.groups")
diff --git a/conf/log4j2.xml.template b/conf/log4j2.xml.template
index 5a93485d..4ed4e8a7 100644
--- a/conf/log4j2.xml.template
+++ b/conf/log4j2.xml.template
@@ -33,9 +33,13 @@
             <PatternLayout pattern="%d{yy/MM/dd HH:mm:ss,SSS} %p [%t] %c{1}: 
%m%n%ex"/>
         </Console>
     </Appenders>
+
     <Loggers>
         <Root level="INFO">
             <AppenderRef ref="stdout"/>
         </Root>
+        <Logger name="org.apache.hadoop.hdfs" level="WARN" additivity="false">
+                    <Appender-ref ref="stdout" level="WARN" />
+        </Logger>
     </Loggers>
 </Configuration>
diff --git 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/FileWriter.java
 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/FileWriter.java
index 175da298..799ee466 100644
--- 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/FileWriter.java
+++ 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/FileWriter.java
@@ -54,7 +54,6 @@ public final class FileWriter implements DeviceObserver {
 
   private final FileInfo fileInfo;
   private FileChannel channel;
-  private FSDataOutputStream stream;
   private volatile boolean closed;
   private volatile boolean destroyed;
 
@@ -127,7 +126,7 @@ public final class FileWriter implements DeviceObserver {
     if (!fileInfo.isHdfs()) {
       channel = new FileOutputStream(fileInfo.getFilePath()).getChannel();
     } else {
-      stream = StorageManager.hdfsFs().create(fileInfo.getHdfsPath(), true);
+      StorageManager.hdfsFs().create(fileInfo.getHdfsPath(), true).close();
     }
     source = workerSource;
     logger.debug("FileWriter {} split threshold {} mode {}", this, 
splitThreshold, splitMode);
@@ -160,8 +159,8 @@ public final class FileWriter implements DeviceObserver {
     FlushTask task = null;
     if (channel != null) {
       task = new LocalFlushTask(flushBuffer, channel, notifier);
-    } else if (stream != null) {
-      task = new HdfsFlushTask(flushBuffer, stream, notifier);
+    } else if (fileInfo.isHdfs()) {
+      task = new HdfsFlushTask(flushBuffer, fileInfo.getHdfsPath(), notifier);
     }
     addTask(task);
     flushBuffer = null;
@@ -281,8 +280,7 @@ public final class FileWriter implements DeviceObserver {
         if (channel != null) {
           channel.close();
         }
-        if (stream != null) {
-          stream.close();
+        if (fileInfo.isHdfs()) {
           if 
(StorageManager.hdfsFs().exists(fileInfo.getHdfsPeerWriterSuccessPath())) {
             StorageManager.hdfsFs().delete(fileInfo.getHdfsPath(), false);
             deleted = true;
@@ -321,9 +319,6 @@ public final class FileWriter implements DeviceObserver {
         if (channel != null) {
           channel.close();
         }
-        if (stream != null) {
-          stream.close();
-        }
       } catch (IOException e) {
         logger.warn(
             "Close channel failed for file {} caused by {}.",
@@ -334,14 +329,12 @@ public final class FileWriter implements DeviceObserver {
 
     if (!destroyed) {
       destroyed = true;
-      try {
-        fileInfo.deleteAllFiles(StorageManager.hdfsFs());
-      } catch (Exception e) {
-        logger.warn("Exception when cleaning hdfs file {}", 
fileInfo.getFilePath());
-      }
+      fileInfo.deleteAllFiles(StorageManager.hdfsFs());
 
       // unregister from DeviceMonitor
-      deviceMonitor.unregisterFileWriter(this);
+      if (!fileInfo.isHdfs()) {
+        deviceMonitor.unregisterFileWriter(this);
+      }
       destroyHook.run();
     }
   }
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
index aba8809b..37054b4e 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
@@ -37,7 +37,7 @@ import 
org.apache.celeborn.common.protocol.{PartitionLocation, PartitionSplitMod
 import org.apache.celeborn.common.protocol.message.StatusCode
 import org.apache.celeborn.common.unsafe.Platform
 import org.apache.celeborn.common.util.PackedPartitionId
-import org.apache.celeborn.service.deploy.worker.storage.{FileWriter, 
LocalFlusher, StorageManager}
+import org.apache.celeborn.service.deploy.worker.storage.{FileWriter, 
HdfsFlusher, LocalFlusher, StorageManager}
 
 class PushDataHandler extends BaseMessageHandler with Logging {
 
@@ -217,9 +217,14 @@ class PushDataHandler extends BaseMessageHandler with 
Logging {
       callback.onFailure(new Exception(message, exception))
       return
     }
-    val diskFull = workerInfo.diskInfos
-      .get(fileWriter.flusher.asInstanceOf[LocalFlusher].mountPoint)
-      .actualUsableSpace < diskReserveSize
+    val diskFull =
+      if (fileWriter.flusher.isInstanceOf[LocalFlusher]) {
+        workerInfo.diskInfos
+          .get(fileWriter.flusher.asInstanceOf[LocalFlusher].mountPoint)
+          .actualUsableSpace < diskReserveSize
+      } else {
+        false
+      }
     if ((diskFull && fileWriter.getFileInfo.getFileLength > 
partitionSplitMinimumSize) ||
       (isMaster && fileWriter.getFileInfo.getFileLength > 
fileWriter.getSplitThreshold())) {
       if (fileWriter.getSplitMode == PartitionSplitMode.SOFT) {
@@ -766,6 +771,9 @@ class PushDataHandler extends BaseMessageHandler with 
Logging {
   }
 
   private def checkDiskFull(fileWriter: FileWriter): Boolean = {
+    if (fileWriter.flusher.isInstanceOf[HdfsFlusher]) {
+      return false
+    }
     val diskFull = workerInfo.diskInfos
       .get(fileWriter.flusher.asInstanceOf[LocalFlusher].mountPoint)
       .actualUsableSpace < diskReserveSize
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/FlushTask.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/FlushTask.scala
index 1d9b69e8..eedaeff8 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/FlushTask.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/FlushTask.scala
@@ -20,7 +20,7 @@ package org.apache.celeborn.service.deploy.worker.storage
 import java.nio.channels.FileChannel
 
 import io.netty.buffer.{ByteBufUtil, CompositeByteBuf}
-import org.apache.hadoop.fs.FSDataOutputStream
+import org.apache.hadoop.fs.{FSDataOutputStream, Path}
 
 abstract private[worker] class FlushTask(
     val buffer: CompositeByteBuf,
@@ -44,9 +44,11 @@ private[worker] class LocalFlushTask(
 
 private[worker] class HdfsFlushTask(
     buffer: CompositeByteBuf,
-    fsStream: FSDataOutputStream,
+    val path: Path,
     notifier: FlushNotifier) extends FlushTask(buffer, notifier) {
   override def flush(): Unit = {
-    fsStream.write(ByteBufUtil.getBytes(buffer))
+    val hdfsStream = StorageManager.hdfsFs.append(path)
+    hdfsStream.write(ByteBufUtil.getBytes(buffer))
+    hdfsStream.close()
   }
 }
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/Flusher.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/Flusher.scala
index 8a662154..74269620 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/Flusher.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/Flusher.scala
@@ -19,13 +19,15 @@ package org.apache.celeborn.service.deploy.worker.storage
 
 import java.io.IOException
 import java.nio.channels.ClosedByInterruptException
-import java.util.concurrent.{LinkedBlockingQueue, TimeUnit}
+import java.util.concurrent.{Callable, LinkedBlockingQueue, TimeUnit}
 import java.util.concurrent.atomic.{AtomicBoolean, AtomicLongArray, LongAdder}
 
 import scala.collection.JavaConverters._
 import scala.util.Random
 
+import com.google.common.cache.{Cache, CacheBuilder, RemovalListener, 
RemovalNotification}
 import io.netty.buffer.{CompositeByteBuf, Unpooled}
+import org.apache.hadoop.fs.FSDataOutputStream
 
 import org.apache.celeborn.common.internal.Logging
 import org.apache.celeborn.common.meta.DiskStatus
@@ -212,22 +214,24 @@ private[worker] class LocalFlusher(
   override def toString(): String = {
     s"LocalFlusher@$flusherId-$mountPoint"
   }
+
 }
 
 final private[worker] class HdfsFlusher(
     workerSource: AbstractSource,
     hdfsFlusherThreads: Int,
     flushAvgTimeWindowSize: Int,
-    avgFlushTimeSlidingWindowMinCount: Int) extends Flusher(
+    avgFlushTimeSlidingWindowMinCount: Int,
+    hdfsStreamCacheSize: Int) extends Flusher(
     workerSource,
     hdfsFlusherThreads,
     flushAvgTimeWindowSize,
     avgFlushTimeSlidingWindowMinCount) with Logging {
-
   override def toString: String = s"HdfsFlusher@$flusherId"
 
   override def processIOException(e: IOException, deviceErrorType: 
DiskStatus): Unit = {
     stopAndCleanFlusher()
     logError(s"$this write failed, reason $deviceErrorType ,exception: $e")
   }
+
 }
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
index e6adfc3f..4a269a70 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
@@ -129,12 +129,14 @@ final private[worker] class StorageManager(conf: 
CelebornConf, workerSource: Abs
       val hdfsConfiguration = new Configuration
       hdfsConfiguration.set("fs.defaultFS", hdfsDir)
       hdfsConfiguration.set("dfs.replication", "2")
+      hdfsConfiguration.set("fs.hdfs.impl.disable.cache", "false")
       StorageManager.hdfsFs = FileSystem.get(hdfsConfiguration)
       Some(new HdfsFlusher(
         workerSource,
         conf.hdfsFlusherThreads,
         conf.avgFlushTimeSlidingWindowSize,
-        conf.avgFlushTimeSlidingWindowMinCount))
+        conf.avgFlushTimeSlidingWindowMinCount,
+        conf.hdfsStreamCacheSize))
     } else {
       None
     }
@@ -270,7 +272,7 @@ final private[worker] class StorageManager(conf: 
CelebornConf, workerSource: Abs
         if (diskInfo != null && diskInfo.status.equals(DiskStatus.HEALTHY)) {
           diskInfo.dirs
         } else {
-          logWarning(s"Disk unavailable for $suggestedMountPoint, return all 
healthy" +
+          logDebug(s"Disk unavailable for $suggestedMountPoint, return all 
healthy" +
             s" working dirs. diskInfo $diskInfo")
           healthyWorkingDirs()
         }
@@ -399,6 +401,15 @@ final private[worker] class StorageManager(conf: 
CelebornConf, workerSource: Abs
         }
       }
       hdfsInfos.foreach(item => item._2.deleteAllFiles(StorageManager.hdfsFs))
+      if (!hdfsInfos.isEmpty) {
+        try {
+          StorageManager.hdfsFs.delete(
+            new Path(new Path(hdfsDir, conf.workerWorkingDir), 
s"$appId/$shuffleId"),
+            true)
+        } catch {
+          case e: Exception => logWarning("Clean expired hdfs shuffle 
failed.", e)
+        }
+      }
     }
   }
 

Reply via email to