This is an automated email from the ASF dual-hosted git repository. ethanfeng pushed a commit to branch CELEBORN-83-FOLLOWUP in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
commit f2703dfa087e4f35ab7d02b04de592b2b886b5ed Author: mingji <[email protected]> AuthorDate: Fri Dec 9 16:44:44 2022 +0800 [CELEBORN-83][FOLLOWUP] Fix various bugs when using HDFS as storage. --- README.md | 1 + .../org/apache/celeborn/client/ShuffleClient.java | 1 + .../celeborn/client/read/DfsPartitionReader.java | 16 ++++++++++++--- .../org/apache/celeborn/common/meta/FileInfo.java | 15 ++++++++------ .../org/apache/celeborn/common/CelebornConf.scala | 10 ++++++++++ conf/log4j2.xml.template | 4 ++++ .../service/deploy/worker/storage/FileWriter.java | 23 ++++++++-------------- .../service/deploy/worker/PushDataHandler.scala | 16 +++++++++++---- .../service/deploy/worker/storage/FlushTask.scala | 8 +++++--- .../service/deploy/worker/storage/Flusher.scala | 10 +++++++--- .../deploy/worker/storage/StorageManager.scala | 15 ++++++++++++-- 11 files changed, 83 insertions(+), 36 deletions(-) diff --git a/README.md b/README.md index 5f4d4f28..aefdc583 100644 --- a/README.md +++ b/README.md @@ -121,6 +121,7 @@ celeborn.ha.master.node.3.ratis.port 9874 celeborn.ha.master.ratis.raft.server.storage.dir /mnt/disk1/rss_ratis/ celeborn.metrics.enabled true +# If you want to use HDFS as shuffle storage, make sure that flush buffer size is at least 4MB or larger. celeborn.worker.flush.buffer.size 256k celeborn.worker.storage.dirs /mnt/disk1/,/mnt/disk2 # If your hosts have disk raid or use lvm, set celeborn.worker.monitor.disk.enabled to false diff --git a/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java b/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java index b9fcb9e1..876047a5 100644 --- a/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java +++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java @@ -100,6 +100,7 @@ public abstract class ShuffleClient implements Cloneable { synchronized (ShuffleClient.class) { if (null == hdfsFs) { Configuration hdfsConfiguration = new Configuration(); + hdfsConfiguration.set("fs.hdfs.impl.disable.cache", "false"); try { hdfsFs = FileSystem.get(hdfsConfiguration); } catch (IOException e) { diff --git a/client/src/main/java/org/apache/celeborn/client/read/DfsPartitionReader.java b/client/src/main/java/org/apache/celeborn/client/read/DfsPartitionReader.java index 4259e1e0..3ac4a181 100644 --- a/client/src/main/java/org/apache/celeborn/client/read/DfsPartitionReader.java +++ b/client/src/main/java/org/apache/celeborn/client/read/DfsPartitionReader.java @@ -98,6 +98,12 @@ public class DfsPartitionReader implements PartitionReader { ShuffleClient.getHdfsFs(conf).open(new Path(location.getStorageInfo().getFilePath())); chunkOffsets.addAll(getChunkOffsetsFromUnsortedIndex(conf, location)); } + StringBuilder builder = new StringBuilder(); + for (Long offset : chunkOffsets) { + builder.append(offset + " "); + } + logger.info( + "DFS " + location.getStorageInfo().getFilePath() + "index count:" + chunkOffsets.size() + " offsets:" + builder.toString()); if (chunkOffsets.size() > 1) { numChunks = chunkOffsets.size() - 1; fetchThread = @@ -110,17 +116,21 @@ public class DfsPartitionReader implements PartitionReader { } long offset = chunkOffsets.get(currentChunkIndex.get()); long length = chunkOffsets.get(currentChunkIndex.get() + 1) - offset; + logger.info("read " + currentChunkIndex.get() + " offset " + offset + " length" + length); byte[] buffer = new byte[(int) length]; hdfsInputStream.readFully(offset, buffer); results.add(Unpooled.wrappedBuffer(buffer)); - currentChunkIndex.incrementAndGet(); + int fetchedIndex = currentChunkIndex.incrementAndGet(); + logger.info("fetch chunkIndex" + fetchedIndex); } } catch (IOException e) { exception.set(e); } catch (InterruptedException e) { + logger.warn("Fetch thread is cancelled."); // cancel a task for speculative, ignore this exception } - }); + }, + "Dfs-fetch-thread" + location.getFileName()); fetchThread.start(); logger.debug("Start dfs read on location {}", location); } @@ -162,7 +172,7 @@ public class DfsPartitionReader implements PartitionReader { @Override public boolean hasNext() { - return currentChunkIndex.get() < numChunks; + return currentChunkIndex.get() < numChunks || !results.isEmpty(); } @Override diff --git a/common/src/main/java/org/apache/celeborn/common/meta/FileInfo.java b/common/src/main/java/org/apache/celeborn/common/meta/FileInfo.java index 6a815e3e..9ec2df3f 100644 --- a/common/src/main/java/org/apache/celeborn/common/meta/FileInfo.java +++ b/common/src/main/java/org/apache/celeborn/common/meta/FileInfo.java @@ -18,7 +18,6 @@ package org.apache.celeborn.common.meta; import java.io.File; -import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -126,12 +125,16 @@ public class FileInfo { return userIdentifier; } - public void deleteAllFiles(FileSystem hdfsFs) throws IOException { + public void deleteAllFiles(FileSystem hdfsFs) { if (isHdfs()) { - hdfsFs.delete(getHdfsPath(), false); - hdfsFs.delete(getHdfsWriterSuccessPath(), false); - hdfsFs.delete(getHdfsIndexPath(), false); - hdfsFs.delete(getHdfsSortedPath(), false); + try { + hdfsFs.delete(getHdfsPath(), false); + hdfsFs.delete(getHdfsWriterSuccessPath(), false); + hdfsFs.delete(getHdfsIndexPath(), false); + hdfsFs.delete(getHdfsSortedPath(), false); + } catch (Exception e) { + // ignore delete exceptions because some other workers might be deleting the directory + } } else { getFile().delete(); new File(getIndexPath()).delete(); diff --git a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala index 64ccfd7c..73b44019 100644 --- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala +++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala @@ -719,6 +719,7 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se def avgFlushTimeSlidingWindowSize: Int = get(WORKER_FLUSHER_AVGFLUSHTIME_SLIDINGWINDOW_SIZE) def avgFlushTimeSlidingWindowMinCount: Int = get(WORKER_FLUSHER_AVGFLUSHTIME_SLIDINGWINDOW_MINCOUNT) + def hdfsStreamCacheSize: Int = get(WORKER_FLUSHER_HDFS_STREAM_CACHE_SIZE) def diskReserveSize: Long = get(WORKER_DISK_RESERVE_SIZE) def diskMonitorEnabled: Boolean = get(WORKER_DISK_MONITOR_ENABLED) def diskMonitorCheckList: Seq[String] = get(WORKER_DISK_MONITOR_CHECKLIST) @@ -1956,6 +1957,15 @@ object CelebornConf extends Logging { .intConf .createWithDefault(1000) + val WORKER_FLUSHER_HDFS_STREAM_CACHE_SIZE: ConfigEntry[Int] = + buildConf("celeborn.worker.flusher.hdfsStream.cacheSize") + .categories("worker") + .doc("Max number of cached HDFS output stream size.") + .version("0.2.0") + .internal + .intConf + .createWithDefault(100) + val SLOTS_ASSIGN_LOADAWARE_DISKGROUP_NUM: ConfigEntry[Int] = buildConf("celeborn.slots.assign.loadAware.numDiskGroups") .withAlternative("rss.disk.groups") diff --git a/conf/log4j2.xml.template b/conf/log4j2.xml.template index 5a93485d..4ed4e8a7 100644 --- a/conf/log4j2.xml.template +++ b/conf/log4j2.xml.template @@ -33,9 +33,13 @@ <PatternLayout pattern="%d{yy/MM/dd HH:mm:ss,SSS} %p [%t] %c{1}: %m%n%ex"/> </Console> </Appenders> + <Loggers> <Root level="INFO"> <AppenderRef ref="stdout"/> </Root> + <Logger name="org.apache.hadoop.hdfs" level="WARN" additivity="false"> + <Appender-ref ref="stdout" level="WARN" /> + </Logger> </Loggers> </Configuration> diff --git a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/FileWriter.java b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/FileWriter.java index 175da298..799ee466 100644 --- a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/FileWriter.java +++ b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/FileWriter.java @@ -54,7 +54,6 @@ public final class FileWriter implements DeviceObserver { private final FileInfo fileInfo; private FileChannel channel; - private FSDataOutputStream stream; private volatile boolean closed; private volatile boolean destroyed; @@ -127,7 +126,7 @@ public final class FileWriter implements DeviceObserver { if (!fileInfo.isHdfs()) { channel = new FileOutputStream(fileInfo.getFilePath()).getChannel(); } else { - stream = StorageManager.hdfsFs().create(fileInfo.getHdfsPath(), true); + StorageManager.hdfsFs().create(fileInfo.getHdfsPath(), true).close(); } source = workerSource; logger.debug("FileWriter {} split threshold {} mode {}", this, splitThreshold, splitMode); @@ -160,8 +159,8 @@ public final class FileWriter implements DeviceObserver { FlushTask task = null; if (channel != null) { task = new LocalFlushTask(flushBuffer, channel, notifier); - } else if (stream != null) { - task = new HdfsFlushTask(flushBuffer, stream, notifier); + } else if (fileInfo.isHdfs()) { + task = new HdfsFlushTask(flushBuffer, fileInfo.getHdfsPath(), notifier); } addTask(task); flushBuffer = null; @@ -281,8 +280,7 @@ public final class FileWriter implements DeviceObserver { if (channel != null) { channel.close(); } - if (stream != null) { - stream.close(); + if (fileInfo.isHdfs()) { if (StorageManager.hdfsFs().exists(fileInfo.getHdfsPeerWriterSuccessPath())) { StorageManager.hdfsFs().delete(fileInfo.getHdfsPath(), false); deleted = true; @@ -321,9 +319,6 @@ public final class FileWriter implements DeviceObserver { if (channel != null) { channel.close(); } - if (stream != null) { - stream.close(); - } } catch (IOException e) { logger.warn( "Close channel failed for file {} caused by {}.", @@ -334,14 +329,12 @@ public final class FileWriter implements DeviceObserver { if (!destroyed) { destroyed = true; - try { - fileInfo.deleteAllFiles(StorageManager.hdfsFs()); - } catch (Exception e) { - logger.warn("Exception when cleaning hdfs file {}", fileInfo.getFilePath()); - } + fileInfo.deleteAllFiles(StorageManager.hdfsFs()); // unregister from DeviceMonitor - deviceMonitor.unregisterFileWriter(this); + if (!fileInfo.isHdfs()) { + deviceMonitor.unregisterFileWriter(this); + } destroyHook.run(); } } diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala index aba8809b..37054b4e 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala @@ -37,7 +37,7 @@ import org.apache.celeborn.common.protocol.{PartitionLocation, PartitionSplitMod import org.apache.celeborn.common.protocol.message.StatusCode import org.apache.celeborn.common.unsafe.Platform import org.apache.celeborn.common.util.PackedPartitionId -import org.apache.celeborn.service.deploy.worker.storage.{FileWriter, LocalFlusher, StorageManager} +import org.apache.celeborn.service.deploy.worker.storage.{FileWriter, HdfsFlusher, LocalFlusher, StorageManager} class PushDataHandler extends BaseMessageHandler with Logging { @@ -217,9 +217,14 @@ class PushDataHandler extends BaseMessageHandler with Logging { callback.onFailure(new Exception(message, exception)) return } - val diskFull = workerInfo.diskInfos - .get(fileWriter.flusher.asInstanceOf[LocalFlusher].mountPoint) - .actualUsableSpace < diskReserveSize + val diskFull = + if (fileWriter.flusher.isInstanceOf[LocalFlusher]) { + workerInfo.diskInfos + .get(fileWriter.flusher.asInstanceOf[LocalFlusher].mountPoint) + .actualUsableSpace < diskReserveSize + } else { + false + } if ((diskFull && fileWriter.getFileInfo.getFileLength > partitionSplitMinimumSize) || (isMaster && fileWriter.getFileInfo.getFileLength > fileWriter.getSplitThreshold())) { if (fileWriter.getSplitMode == PartitionSplitMode.SOFT) { @@ -766,6 +771,9 @@ class PushDataHandler extends BaseMessageHandler with Logging { } private def checkDiskFull(fileWriter: FileWriter): Boolean = { + if (fileWriter.flusher.isInstanceOf[HdfsFlusher]) { + return false + } val diskFull = workerInfo.diskInfos .get(fileWriter.flusher.asInstanceOf[LocalFlusher].mountPoint) .actualUsableSpace < diskReserveSize diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/FlushTask.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/FlushTask.scala index 1d9b69e8..eedaeff8 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/FlushTask.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/FlushTask.scala @@ -20,7 +20,7 @@ package org.apache.celeborn.service.deploy.worker.storage import java.nio.channels.FileChannel import io.netty.buffer.{ByteBufUtil, CompositeByteBuf} -import org.apache.hadoop.fs.FSDataOutputStream +import org.apache.hadoop.fs.{FSDataOutputStream, Path} abstract private[worker] class FlushTask( val buffer: CompositeByteBuf, @@ -44,9 +44,11 @@ private[worker] class LocalFlushTask( private[worker] class HdfsFlushTask( buffer: CompositeByteBuf, - fsStream: FSDataOutputStream, + val path: Path, notifier: FlushNotifier) extends FlushTask(buffer, notifier) { override def flush(): Unit = { - fsStream.write(ByteBufUtil.getBytes(buffer)) + val hdfsStream = StorageManager.hdfsFs.append(path) + hdfsStream.write(ByteBufUtil.getBytes(buffer)) + hdfsStream.close() } } diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/Flusher.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/Flusher.scala index 8a662154..74269620 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/Flusher.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/Flusher.scala @@ -19,13 +19,15 @@ package org.apache.celeborn.service.deploy.worker.storage import java.io.IOException import java.nio.channels.ClosedByInterruptException -import java.util.concurrent.{LinkedBlockingQueue, TimeUnit} +import java.util.concurrent.{Callable, LinkedBlockingQueue, TimeUnit} import java.util.concurrent.atomic.{AtomicBoolean, AtomicLongArray, LongAdder} import scala.collection.JavaConverters._ import scala.util.Random +import com.google.common.cache.{Cache, CacheBuilder, RemovalListener, RemovalNotification} import io.netty.buffer.{CompositeByteBuf, Unpooled} +import org.apache.hadoop.fs.FSDataOutputStream import org.apache.celeborn.common.internal.Logging import org.apache.celeborn.common.meta.DiskStatus @@ -212,22 +214,24 @@ private[worker] class LocalFlusher( override def toString(): String = { s"LocalFlusher@$flusherId-$mountPoint" } + } final private[worker] class HdfsFlusher( workerSource: AbstractSource, hdfsFlusherThreads: Int, flushAvgTimeWindowSize: Int, - avgFlushTimeSlidingWindowMinCount: Int) extends Flusher( + avgFlushTimeSlidingWindowMinCount: Int, + hdfsStreamCacheSize: Int) extends Flusher( workerSource, hdfsFlusherThreads, flushAvgTimeWindowSize, avgFlushTimeSlidingWindowMinCount) with Logging { - override def toString: String = s"HdfsFlusher@$flusherId" override def processIOException(e: IOException, deviceErrorType: DiskStatus): Unit = { stopAndCleanFlusher() logError(s"$this write failed, reason $deviceErrorType ,exception: $e") } + } diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala index e6adfc3f..4a269a70 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala @@ -129,12 +129,14 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs val hdfsConfiguration = new Configuration hdfsConfiguration.set("fs.defaultFS", hdfsDir) hdfsConfiguration.set("dfs.replication", "2") + hdfsConfiguration.set("fs.hdfs.impl.disable.cache", "false") StorageManager.hdfsFs = FileSystem.get(hdfsConfiguration) Some(new HdfsFlusher( workerSource, conf.hdfsFlusherThreads, conf.avgFlushTimeSlidingWindowSize, - conf.avgFlushTimeSlidingWindowMinCount)) + conf.avgFlushTimeSlidingWindowMinCount, + conf.hdfsStreamCacheSize)) } else { None } @@ -270,7 +272,7 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs if (diskInfo != null && diskInfo.status.equals(DiskStatus.HEALTHY)) { diskInfo.dirs } else { - logWarning(s"Disk unavailable for $suggestedMountPoint, return all healthy" + + logDebug(s"Disk unavailable for $suggestedMountPoint, return all healthy" + s" working dirs. diskInfo $diskInfo") healthyWorkingDirs() } @@ -399,6 +401,15 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs } } hdfsInfos.foreach(item => item._2.deleteAllFiles(StorageManager.hdfsFs)) + if (!hdfsInfos.isEmpty) { + try { + StorageManager.hdfsFs.delete( + new Path(new Path(hdfsDir, conf.workerWorkingDir), s"$appId/$shuffleId"), + true) + } catch { + case e: Exception => logWarning("Clean expired hdfs shuffle failed.", e) + } + } } }
