This is an automated email from the ASF dual-hosted git repository.
ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 65cb36c0 [CELEBORN-83][FOLLOWUP] Fix various bugs when using HDFS as
storage. (#1065)
65cb36c0 is described below
commit 65cb36c0023d3100d15533aa51c1aa142b8ede2c
Author: Ethan Feng <[email protected]>
AuthorDate: Thu Dec 15 15:20:29 2022 +0800
[CELEBORN-83][FOLLOWUP] Fix various bugs when using HDFS as storage. (#1065)
---
README.md | 1 +
.../org/apache/celeborn/client/ShuffleClient.java | 8 +++
.../celeborn/client/read/DfsPartitionReader.java | 75 +++++++++++++++++-----
codecov.yml | 8 ++-
.../org/apache/celeborn/common/meta/FileInfo.java | 25 ++++++--
conf/log4j2.xml.template | 4 ++
.../service/deploy/worker/storage/FileWriter.java | 43 +++++++------
.../worker/storage/MapPartitionFileWriter.java | 2 +-
.../worker/storage/PartitionFilesSorter.java | 21 +++---
.../worker/storage/ReducePartitionFileWriter.java | 8 +--
.../service/deploy/worker/PushDataHandler.scala | 16 +++--
.../service/deploy/worker/storage/FlushTask.scala | 8 ++-
.../service/deploy/worker/storage/Flusher.scala | 2 +-
.../deploy/worker/storage/StorageManager.scala | 34 ++++++----
14 files changed, 177 insertions(+), 78 deletions(-)
diff --git a/README.md b/README.md
index 5f4d4f28..aefdc583 100644
--- a/README.md
+++ b/README.md
@@ -121,6 +121,7 @@ celeborn.ha.master.node.3.ratis.port 9874
celeborn.ha.master.ratis.raft.server.storage.dir /mnt/disk1/rss_ratis/
celeborn.metrics.enabled true
+# If you want to use HDFS as shuffle storage, make sure that flush buffer size
is at least 4MB or larger.
celeborn.worker.flush.buffer.size 256k
celeborn.worker.storage.dirs /mnt/disk1/,/mnt/disk2
# If your hosts have disk raid or use lvm, set
celeborn.worker.monitor.disk.enabled to false
diff --git a/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java
b/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java
index f5ea4800..a3d3fa82 100644
--- a/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java
+++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java
@@ -24,6 +24,8 @@ import java.util.function.BooleanSupplier;
import io.netty.buffer.ByteBuf;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.apache.celeborn.client.read.RssInputStream;
import org.apache.celeborn.common.CelebornConf;
@@ -39,6 +41,7 @@ public abstract class ShuffleClient {
private static volatile ShuffleClient _instance;
private static volatile boolean initialized = false;
private static volatile FileSystem hdfsFs;
+ private static Logger logger = LoggerFactory.getLogger(ShuffleClient.class);
// for testing
public static void reset() {
@@ -102,6 +105,11 @@ public abstract class ShuffleClient {
synchronized (ShuffleClient.class) {
if (null == hdfsFs) {
Configuration hdfsConfiguration = new Configuration();
+ // enable fs cache to avoid too many fs instances
+ hdfsConfiguration.set("fs.hdfs.impl.disable.cache", "false");
+ logger.info(
+ "Celeborn client will ignore cluster"
+ + " settings about fs.hdfs.impl.disable.cache and set it to
false");
try {
hdfsFs = FileSystem.get(hdfsConfiguration);
} catch (IOException e) {
diff --git
a/client/src/main/java/org/apache/celeborn/client/read/DfsPartitionReader.java
b/client/src/main/java/org/apache/celeborn/client/read/DfsPartitionReader.java
index 4259e1e0..e9326c32 100644
---
a/client/src/main/java/org/apache/celeborn/client/read/DfsPartitionReader.java
+++
b/client/src/main/java/org/apache/celeborn/client/read/DfsPartitionReader.java
@@ -23,7 +23,6 @@ import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import io.netty.buffer.ByteBuf;
@@ -53,9 +52,10 @@ public class DfsPartitionReader implements PartitionReader {
private final AtomicReference<IOException> exception = new
AtomicReference<>();
private volatile boolean closed = false;
private Thread fetchThread;
- private final FSDataInputStream hdfsInputStream;
+ private FSDataInputStream hdfsInputStream;
private int numChunks = 0;
- private final AtomicInteger currentChunkIndex = new AtomicInteger(0);
+ private int returnedChunks = 0;
+ private int currentChunkIndex = 0;
public DfsPartitionReader(
CelebornConf conf,
@@ -98,29 +98,67 @@ public class DfsPartitionReader implements PartitionReader {
ShuffleClient.getHdfsFs(conf).open(new
Path(location.getStorageInfo().getFilePath()));
chunkOffsets.addAll(getChunkOffsetsFromUnsortedIndex(conf, location));
}
+ logger.debug(
+ "DFS {} index count:{} offsets:{}",
+ location.getStorageInfo().getFilePath(),
+ chunkOffsets.size(),
+ chunkOffsets);
if (chunkOffsets.size() > 1) {
numChunks = chunkOffsets.size() - 1;
fetchThread =
new Thread(
() -> {
try {
- while (!closed && currentChunkIndex.get() < numChunks) {
+ while (!closed && currentChunkIndex < numChunks) {
while (results.size() >= fetchMaxReqsInFlight) {
Thread.sleep(50);
}
- long offset = chunkOffsets.get(currentChunkIndex.get());
- long length = chunkOffsets.get(currentChunkIndex.get() +
1) - offset;
+ long offset = chunkOffsets.get(currentChunkIndex);
+ long length = chunkOffsets.get(currentChunkIndex + 1) -
offset;
+ logger.debug("read {} offset {} length {}",
currentChunkIndex, offset, length);
byte[] buffer = new byte[(int) length];
- hdfsInputStream.readFully(offset, buffer);
- results.add(Unpooled.wrappedBuffer(buffer));
- currentChunkIndex.incrementAndGet();
+ try {
+ hdfsInputStream.readFully(offset, buffer);
+ } catch (IOException e) {
+ logger.warn(
+ "read hdfs {} failed will retry, error detail {}",
+ location.getStorageInfo().getFilePath(),
+ e);
+ try {
+ hdfsInputStream.close();
+ hdfsInputStream =
+ ShuffleClient.getHdfsFs(conf)
+ .open(
+ new Path(
+ Utils.getSortedFilePath(
+
location.getStorageInfo().getFilePath())));
+ hdfsInputStream.readFully(offset, buffer);
+ } catch (IOException ex) {
+ logger.warn(
+ "retry read hdfs {} failed, error detail {} ",
+ location.getStorageInfo().getFilePath(),
+ e);
+ exception.set(ex);
+ break;
+ }
+ }
+ results.put(Unpooled.wrappedBuffer(buffer));
+ logger.debug("add index {} to results",
currentChunkIndex++);
}
- } catch (IOException e) {
- exception.set(e);
- } catch (InterruptedException e) {
+ } catch (Exception e) {
+ logger.warn("Fetch thread is cancelled.", e);
// cancel a task for speculative, ignore this exception
}
- });
+ logger.debug("fetch {} is done.",
location.getStorageInfo().getFilePath());
+ },
+ "Dfs-fetch-thread" + location.getStorageInfo().getFilePath());
+ fetchThread.setUncaughtExceptionHandler(
+ new Thread.UncaughtExceptionHandler() {
+ @Override
+ public void uncaughtException(Thread t, Throwable e) {
+ logger.error("thread {} failed with exception {}", t, e);
+ }
+ });
fetchThread.start();
logger.debug("Start dfs read on location {}", location);
}
@@ -145,6 +183,7 @@ public class DfsPartitionReader implements PartitionReader {
throws IOException {
String indexPath =
Utils.getIndexFilePath(location.getStorageInfo().getFilePath());
FSDataInputStream indexInputStream =
ShuffleClient.getHdfsFs(conf).open(new Path(indexPath));
+ logger.debug("read sorted index {}", indexPath);
long indexSize = ShuffleClient.getHdfsFs(conf).getFileStatus(new
Path(indexPath)).getLen();
// Index size won't be large, so it's safe to do the conversion.
byte[] indexBuffer = new byte[(int) indexSize];
@@ -162,17 +201,18 @@ public class DfsPartitionReader implements
PartitionReader {
@Override
public boolean hasNext() {
- return currentChunkIndex.get() < numChunks;
+ logger.debug("check has next current index: {} chunks {}", returnedChunks,
numChunks);
+ return returnedChunks < numChunks;
}
@Override
public ByteBuf next() throws IOException {
- checkException();
ByteBuf chunk = null;
try {
while (chunk == null) {
checkException();
chunk = results.poll(500, TimeUnit.MILLISECONDS);
+ logger.debug("poll result with result size: {}", results.size());
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
@@ -180,6 +220,7 @@ public class DfsPartitionReader implements PartitionReader {
exception.set(ioe);
throw ioe;
}
+ returnedChunks++;
return chunk;
}
@@ -193,7 +234,9 @@ public class DfsPartitionReader implements PartitionReader {
@Override
public void close() {
closed = true;
- fetchThread.interrupt();
+ if (fetchThread != null) {
+ fetchThread.interrupt();
+ }
try {
hdfsInputStream.close();
} catch (IOException e) {
diff --git a/codecov.yml b/codecov.yml
index 483a35d3..d3fb4dec 100644
--- a/codecov.yml
+++ b/codecov.yml
@@ -21,6 +21,8 @@ coverage:
status:
project:
default:
- target: 0%
- threshold: 0%
- base: auto
\ No newline at end of file
+ target: 10%
+ patch:
+ default:
+ enabled: no
+ if_not_found: success
\ No newline at end of file
diff --git a/common/src/main/java/org/apache/celeborn/common/meta/FileInfo.java
b/common/src/main/java/org/apache/celeborn/common/meta/FileInfo.java
index 35ddebfa..50d3d8bb 100644
--- a/common/src/main/java/org/apache/celeborn/common/meta/FileInfo.java
+++ b/common/src/main/java/org/apache/celeborn/common/meta/FileInfo.java
@@ -18,7 +18,6 @@
package org.apache.celeborn.common.meta;
import java.io.File;
-import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@@ -27,12 +26,15 @@ import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.apache.celeborn.common.identity.UserIdentifier;
import org.apache.celeborn.common.protocol.PartitionType;
import org.apache.celeborn.common.util.Utils;
public class FileInfo {
+ private static Logger logger = LoggerFactory.getLogger(FileInfo.class);
private final String filePath;
private final List<Long> chunkOffsets;
private final UserIdentifier userIdentifier;
@@ -127,12 +129,23 @@ public class FileInfo {
return userIdentifier;
}
- public void deleteAllFiles(FileSystem hdfsFs) throws IOException {
+ public void deleteAllFiles(FileSystem hdfsFs) {
if (isHdfs()) {
- hdfsFs.delete(getHdfsPath(), false);
- hdfsFs.delete(getHdfsWriterSuccessPath(), false);
- hdfsFs.delete(getHdfsIndexPath(), false);
- hdfsFs.delete(getHdfsSortedPath(), false);
+ try {
+ hdfsFs.delete(getHdfsPath(), false);
+ hdfsFs.delete(getHdfsWriterSuccessPath(), false);
+ hdfsFs.delete(getHdfsIndexPath(), false);
+ hdfsFs.delete(getHdfsSortedPath(), false);
+ } catch (Exception e) {
+ // ignore delete exceptions because some other workers might be
deleting the directory
+ logger.debug(
+ "delete hdfs file {},{},{},{} failed {}",
+ getHdfsPath(),
+ getHdfsWriterSuccessPath(),
+ getHdfsIndexPath(),
+ getHdfsSortedPath(),
+ e);
+ }
} else {
getFile().delete();
new File(getIndexPath()).delete();
diff --git a/conf/log4j2.xml.template b/conf/log4j2.xml.template
index 5a93485d..24173a2b 100644
--- a/conf/log4j2.xml.template
+++ b/conf/log4j2.xml.template
@@ -33,9 +33,13 @@
<PatternLayout pattern="%d{yy/MM/dd HH:mm:ss,SSS} %p [%t] %c{1}:
%m%n%ex"/>
</Console>
</Appenders>
+
<Loggers>
<Root level="INFO">
<AppenderRef ref="stdout"/>
</Root>
+ <Logger name="org.apache.hadoop.hdfs" level="WARN" additivity="false">
+ <Appender-ref ref="stdout" level="WARN" />
+ </Logger>
</Loggers>
</Configuration>
diff --git
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/FileWriter.java
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/FileWriter.java
index 5914765c..a10eef9a 100644
---
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/FileWriter.java
+++
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/FileWriter.java
@@ -27,7 +27,6 @@ import java.util.concurrent.atomic.AtomicInteger;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
-import org.apache.hadoop.fs.FSDataOutputStream;
import org.roaringbitmap.RoaringBitmap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -53,10 +52,9 @@ public abstract class FileWriter implements DeviceObserver {
private static final long WAIT_INTERVAL_MS = 20;
protected final FileInfo fileInfo;
- protected FileChannel channel;
- protected FSDataOutputStream stream;
- protected volatile boolean closed;
- protected volatile boolean destroyed;
+ private FileChannel channel;
+ private volatile boolean closed;
+ private volatile boolean destroyed;
protected final AtomicInteger numPendingWrites = new AtomicInteger();
protected long bytesFlushed;
@@ -105,7 +103,20 @@ public abstract class FileWriter implements DeviceObserver
{
if (!fileInfo.isHdfs()) {
channel = new FileOutputStream(fileInfo.getFilePath()).getChannel();
} else {
- stream = StorageManager.hdfsFs().create(fileInfo.getHdfsPath(), true);
+ // We open the stream and close immediately because HDFS output stream
will
+ // create a DataStreamer that is a threaed.
+ // If we reuse HDFS output stream, we will exhaust the memory soon.
+ try {
+ StorageManager.hadoopFs().create(fileInfo.getHdfsPath(), true).close();
+ } catch (IOException e) {
+ try {
+ // If create file failed, wait 10 ms and retry
+ Thread.sleep(10);
+ } catch (InterruptedException ex) {
+ throw new RuntimeException(ex);
+ }
+ StorageManager.hadoopFs().create(fileInfo.getHdfsPath(), true).close();
+ }
}
source = workerSource;
logger.debug("FileWriter {} split threshold {} mode {}", this,
splitThreshold, splitMode);
@@ -138,8 +149,8 @@ public abstract class FileWriter implements DeviceObserver {
FlushTask task = null;
if (channel != null) {
task = new LocalFlushTask(flushBuffer, channel, notifier);
- } else if (stream != null) {
- task = new HdfsFlushTask(flushBuffer, stream, notifier);
+ } else if (fileInfo.isHdfs()) {
+ task = new HdfsFlushTask(flushBuffer, fileInfo.getHdfsPath(), notifier);
}
addTask(task);
flushBuffer = null;
@@ -248,8 +259,7 @@ public abstract class FileWriter implements DeviceObserver {
if (channel != null) {
channel.close();
}
- if (stream != null) {
- stream.close();
+ if (fileInfo.isHdfs()) {
streamClose.run();
}
} catch (IOException e) {
@@ -278,9 +288,6 @@ public abstract class FileWriter implements DeviceObserver {
if (channel != null) {
channel.close();
}
- if (stream != null) {
- stream.close();
- }
} catch (IOException e) {
logger.warn(
"Close channel failed for file {} caused by {}.",
@@ -291,14 +298,12 @@ public abstract class FileWriter implements
DeviceObserver {
if (!destroyed) {
destroyed = true;
- try {
- fileInfo.deleteAllFiles(StorageManager.hdfsFs());
- } catch (Exception e) {
- logger.warn("Exception when cleaning hdfs file {}",
fileInfo.getFilePath());
- }
+ fileInfo.deleteAllFiles(StorageManager.hadoopFs());
// unregister from DeviceMonitor
- deviceMonitor.unregisterFileWriter(this);
+ if (!fileInfo.isHdfs()) {
+ deviceMonitor.unregisterFileWriter(this);
+ }
destroyHook.run();
}
}
diff --git
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionFileWriter.java
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionFileWriter.java
index 7d70f4be..84bf8503 100644
---
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionFileWriter.java
+++
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionFileWriter.java
@@ -75,7 +75,7 @@ public final class MapPartitionFileWriter extends FileWriter {
if (!fileInfo.isHdfs()) {
channelIndex = new
FileOutputStream(fileInfo.getIndexPath()).getChannel();
} else {
- streamIndex =
StorageManager.hdfsFs().create(fileInfo.getHdfsIndexPath(), true);
+ streamIndex =
StorageManager.hadoopFs().create(fileInfo.getHdfsIndexPath(), true);
}
}
diff --git
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java
index 7af454e2..6d3501d2 100644
---
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java
+++
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java
@@ -342,7 +342,7 @@ public class PartitionFilesSorter extends
ShuffleRecoverHelper {
if (isHdfs) {
// If the index file exists, it will be overwritten.
// So there is no need to check its existence.
- hdfsIndexOutput = StorageManager.hdfsFs().create(new
Path(indexFilePath));
+ hdfsIndexOutput = StorageManager.hadoopFs().create(new
Path(indexFilePath));
} else {
indexFileChannel = new FileOutputStream(indexFilePath).getChannel();
}
@@ -455,8 +455,9 @@ public class PartitionFilesSorter extends
ShuffleRecoverHelper {
int indexSize = 0;
try {
if (isHdfs) {
- hdfsIndexStream = StorageManager.hdfsFs().open(new
Path(indexFilePath));
- indexSize = (int) StorageManager.hdfsFs().getFileStatus(new
Path(indexFilePath)).getLen();
+ hdfsIndexStream = StorageManager.hadoopFs().open(new
Path(indexFilePath));
+ indexSize =
+ (int) StorageManager.hadoopFs().getFileStatus(new
Path(indexFilePath)).getLen();
} else {
indexStream = new FileInputStream(indexFilePath);
File indexFile = new File(indexFilePath);
@@ -520,11 +521,11 @@ public class PartitionFilesSorter extends
ShuffleRecoverHelper {
indexFile.delete();
}
} else {
- if (StorageManager.hdfsFs().exists(fileInfo.getHdfsSortedPath())) {
- StorageManager.hdfsFs().delete(fileInfo.getHdfsSortedPath(), false);
+ if (StorageManager.hadoopFs().exists(fileInfo.getHdfsSortedPath())) {
+ StorageManager.hadoopFs().delete(fileInfo.getHdfsSortedPath(),
false);
}
- if (StorageManager.hdfsFs().exists(fileInfo.getHdfsIndexPath())) {
- StorageManager.hdfsFs().delete(fileInfo.getHdfsIndexPath(), false);
+ if (StorageManager.hadoopFs().exists(fileInfo.getHdfsIndexPath())) {
+ StorageManager.hadoopFs().delete(fileInfo.getHdfsIndexPath(), false);
}
}
}
@@ -612,8 +613,8 @@ public class PartitionFilesSorter extends
ShuffleRecoverHelper {
private void initializeFiles() throws IOException {
if (isHdfs) {
- hdfsOriginInput = StorageManager.hdfsFs().open(new
Path(originFilePath));
- hdfsSortedOutput = StorageManager.hdfsFs().create(new
Path(sortedFilePath));
+ hdfsOriginInput = StorageManager.hadoopFs().open(new
Path(originFilePath));
+ hdfsSortedOutput = StorageManager.hadoopFs().create(new
Path(sortedFilePath));
} else {
originFileChannel = new FileInputStream(originFilePath).getChannel();
sortedFileChannel = new FileOutputStream(sortedFilePath).getChannel();
@@ -646,7 +647,7 @@ public class PartitionFilesSorter extends
ShuffleRecoverHelper {
private void deleteOriginFiles() throws IOException {
boolean deleteSuccess = false;
if (isHdfs) {
- deleteSuccess = StorageManager.hdfsFs().delete(new
Path(originFilePath), false);
+ deleteSuccess = StorageManager.hadoopFs().delete(new
Path(originFilePath), false);
} else {
deleteSuccess = new File(originFilePath).delete();
}
diff --git
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/ReducePartitionFileWriter.java
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/ReducePartitionFileWriter.java
index 843841e5..e214df18 100644
---
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/ReducePartitionFileWriter.java
+++
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/ReducePartitionFileWriter.java
@@ -94,13 +94,13 @@ public final class ReducePartitionFileWriter extends
FileWriter {
}
},
() -> {
- if
(StorageManager.hdfsFs().exists(fileInfo.getHdfsPeerWriterSuccessPath())) {
- StorageManager.hdfsFs().delete(fileInfo.getHdfsPath(), false);
+ if
(StorageManager.hadoopFs().exists(fileInfo.getHdfsPeerWriterSuccessPath())) {
+ StorageManager.hadoopFs().delete(fileInfo.getHdfsPath(), false);
deleted = true;
} else {
-
StorageManager.hdfsFs().create(fileInfo.getHdfsWriterSuccessPath()).close();
+
StorageManager.hadoopFs().create(fileInfo.getHdfsWriterSuccessPath()).close();
FSDataOutputStream indexOutputStream =
- StorageManager.hdfsFs().create(fileInfo.getHdfsIndexPath());
+ StorageManager.hadoopFs().create(fileInfo.getHdfsIndexPath());
indexOutputStream.writeInt(fileInfo.getChunkOffsets().size());
for (Long offset : fileInfo.getChunkOffsets()) {
indexOutputStream.writeLong(offset);
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
index b59eafd2..933f3b6b 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
@@ -37,7 +37,7 @@ import
org.apache.celeborn.common.protocol.{PartitionLocation, PartitionSplitMod
import org.apache.celeborn.common.protocol.message.StatusCode
import org.apache.celeborn.common.unsafe.Platform
import org.apache.celeborn.common.util.PackedPartitionId
-import org.apache.celeborn.service.deploy.worker.storage.{FileWriter,
LocalFlusher, MapPartitionFileWriter, StorageManager}
+import org.apache.celeborn.service.deploy.worker.storage.{FileWriter,
HdfsFlusher, LocalFlusher, MapPartitionFileWriter, StorageManager}
class PushDataHandler extends BaseMessageHandler with Logging {
@@ -224,9 +224,14 @@ class PushDataHandler extends BaseMessageHandler with
Logging {
callback.onFailure(new Exception(message, exception))
return
}
- val diskFull = workerInfo.diskInfos
- .get(fileWriter.flusher.asInstanceOf[LocalFlusher].mountPoint)
- .actualUsableSpace < diskReserveSize
+ val diskFull =
+ if (fileWriter.flusher.isInstanceOf[LocalFlusher]) {
+ workerInfo.diskInfos
+ .get(fileWriter.flusher.asInstanceOf[LocalFlusher].mountPoint)
+ .actualUsableSpace < diskReserveSize
+ } else {
+ false
+ }
if ((diskFull && fileWriter.getFileInfo.getFileLength >
partitionSplitMinimumSize) ||
(isMaster && fileWriter.getFileInfo.getFileLength >
fileWriter.getSplitThreshold())) {
if (fileWriter.getSplitMode == PartitionSplitMode.SOFT) {
@@ -859,6 +864,9 @@ class PushDataHandler extends BaseMessageHandler with
Logging {
}
private def checkDiskFull(fileWriter: FileWriter): Boolean = {
+ if (fileWriter.flusher.isInstanceOf[HdfsFlusher]) {
+ return false
+ }
val diskFull = workerInfo.diskInfos
.get(fileWriter.flusher.asInstanceOf[LocalFlusher].mountPoint)
.actualUsableSpace < diskReserveSize
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/FlushTask.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/FlushTask.scala
index 1d9b69e8..340e9e9d 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/FlushTask.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/FlushTask.scala
@@ -20,7 +20,7 @@ package org.apache.celeborn.service.deploy.worker.storage
import java.nio.channels.FileChannel
import io.netty.buffer.{ByteBufUtil, CompositeByteBuf}
-import org.apache.hadoop.fs.FSDataOutputStream
+import org.apache.hadoop.fs.{FSDataOutputStream, Path}
abstract private[worker] class FlushTask(
val buffer: CompositeByteBuf,
@@ -44,9 +44,11 @@ private[worker] class LocalFlushTask(
private[worker] class HdfsFlushTask(
buffer: CompositeByteBuf,
- fsStream: FSDataOutputStream,
+ val path: Path,
notifier: FlushNotifier) extends FlushTask(buffer, notifier) {
override def flush(): Unit = {
- fsStream.write(ByteBufUtil.getBytes(buffer))
+ val hdfsStream = StorageManager.hadoopFs.append(path)
+ hdfsStream.write(ByteBufUtil.getBytes(buffer))
+ hdfsStream.close()
}
}
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/Flusher.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/Flusher.scala
index 8a662154..b180674d 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/Flusher.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/Flusher.scala
@@ -223,11 +223,11 @@ final private[worker] class HdfsFlusher(
hdfsFlusherThreads,
flushAvgTimeWindowSize,
avgFlushTimeSlidingWindowMinCount) with Logging {
-
override def toString: String = s"HdfsFlusher@$flusherId"
override def processIOException(e: IOException, deviceErrorType:
DiskStatus): Unit = {
stopAndCleanFlusher()
logError(s"$this write failed, reason $deviceErrorType ,exception: $e")
}
+
}
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
index 205d4c3c..54d2138a 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
@@ -45,7 +45,7 @@ import
org.apache.celeborn.common.protocol.{PartitionLocation, PartitionSplitMod
import org.apache.celeborn.common.quota.ResourceConsumption
import org.apache.celeborn.common.util.{PbSerDeUtils, ThreadUtils, Utils}
import org.apache.celeborn.service.deploy.worker._
-import org.apache.celeborn.service.deploy.worker.storage.StorageManager.hdfsFs
+import
org.apache.celeborn.service.deploy.worker.storage.StorageManager.hadoopFs
final private[worker] class StorageManager(conf: CelebornConf, workerSource:
AbstractSource)
extends ShuffleRecoverHelper with DeviceObserver with Logging with
MemoryPressureListener {
@@ -129,7 +129,10 @@ final private[worker] class StorageManager(conf:
CelebornConf, workerSource: Abs
val hdfsConfiguration = new Configuration
hdfsConfiguration.set("fs.defaultFS", hdfsDir)
hdfsConfiguration.set("dfs.replication", "2")
- StorageManager.hdfsFs = FileSystem.get(hdfsConfiguration)
+ hdfsConfiguration.set("fs.hdfs.impl.disable.cache", "false")
+ logInfo("Celeborn will ignore cluster settings" +
+ " about fs.hdfs.impl.disable.cache and set it to false")
+ StorageManager.hadoopFs = FileSystem.get(hdfsConfiguration)
Some(new HdfsFlusher(
workerSource,
conf.hdfsFlusherThreads,
@@ -270,7 +273,7 @@ final private[worker] class StorageManager(conf:
CelebornConf, workerSource: Abs
if (diskInfo != null && diskInfo.status.equals(DiskStatus.HEALTHY)) {
diskInfo.dirs
} else {
- logWarning(s"Disk unavailable for $suggestedMountPoint, return all
healthy" +
+ logDebug(s"Disk unavailable for $suggestedMountPoint, return all
healthy" +
s" working dirs. diskInfo $diskInfo")
healthyWorkingDirs()
}
@@ -281,7 +284,7 @@ final private[worker] class StorageManager(conf:
CelebornConf, workerSource: Abs
if (dirs.isEmpty) {
val shuffleDir =
new Path(new Path(hdfsDir, conf.workerWorkingDir),
s"$appId/$shuffleId")
- FileSystem.mkdirs(StorageManager.hdfsFs, shuffleDir, hdfsPermission)
+ FileSystem.mkdirs(StorageManager.hadoopFs, shuffleDir, hdfsPermission)
val fileInfo =
new FileInfo(new Path(shuffleDir, fileName).toString,
userIdentifier, partitionType)
val hdfsWriter = partitionType match {
@@ -420,7 +423,16 @@ final private[worker] class StorageManager(conf:
CelebornConf, workerSource: Abs
deleteDirectory(file, diskOperators.get(diskInfo.mountPoint))
}
}
- hdfsInfos.foreach(item => item._2.deleteAllFiles(StorageManager.hdfsFs))
+ hdfsInfos.foreach(item =>
item._2.deleteAllFiles(StorageManager.hadoopFs))
+ if (!hdfsInfos.isEmpty) {
+ try {
+ StorageManager.hadoopFs.delete(
+ new Path(new Path(hdfsDir, conf.workerWorkingDir),
s"$appId/$shuffleId"),
+ true)
+ } catch {
+ case e: Exception => logWarning("Clean expired hdfs shuffle
failed.", e)
+ }
+ }
}
}
@@ -465,14 +477,14 @@ final private[worker] class StorageManager(conf:
CelebornConf, workerSource: Abs
}
}
- if (hdfsFs != null) {
+ if (hadoopFs != null) {
val hdfsWorkPath = new Path(hdfsDir, conf.workerWorkingDir)
- if (hdfsFs.exists(hdfsWorkPath)) {
- val iter = hdfsFs.listFiles(hdfsWorkPath, false)
+ if (hadoopFs.exists(hdfsWorkPath)) {
+ val iter = hadoopFs.listFiles(hdfsWorkPath, false)
while (iter.hasNext) {
val fileStatus = iter.next()
if (fileStatus.getModificationTime < expireTime) {
- hdfsFs.delete(fileStatus.getPath, true)
+ hadoopFs.delete(fileStatus.getPath, true)
}
}
}
@@ -528,7 +540,7 @@ final private[worker] class StorageManager(conf:
CelebornConf, workerSource: Abs
}
}
- val hdfsCleaned = hdfsFs match {
+ val hdfsCleaned = hadoopFs match {
case hdfs: FileSystem =>
val hdfsWorkPath = new Path(hdfsDir, conf.workerWorkingDir)
// hdfs path not exist when first time initialize
@@ -670,5 +682,5 @@ final private[worker] class StorageManager(conf:
CelebornConf, workerSource: Abs
}
object StorageManager {
- var hdfsFs: FileSystem = _
+ var hadoopFs: FileSystem = _
}