This is an automated email from the ASF dual-hosted git repository.
ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 7a596bbed [CELEBORN-1469] Support writing shuffle data to OSS(S3 only)
7a596bbed is described below
commit 7a596bbed1f94ef0ad983b51b9f4ebe41053075e
Author: zhaohehuhu <[email protected]>
AuthorDate: Wed Jul 24 11:59:15 2024 +0800
[CELEBORN-1469] Support writing shuffle data to OSS(S3 only)
### What changes were proposed in this pull request?
as title
### Why are the changes needed?
Now, Celeborn doesn't support sinking shuffle data directly to Amazon S3,
which could be a limitation when we're trying to move on-premises servers to
AWS and use S3 as a data sink for shuffled data.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Closes #2579 from zhaohehuhu/dev-0619.
Authored-by: zhaohehuhu <[email protected]>
Signed-off-by: mingji <[email protected]>
---
LICENSE-binary | 2 +
NOTICE-binary | 4 +
build/make-distribution.sh | 10 +-
.../org/apache/celeborn/client/ShuffleClient.java | 18 +--
.../celeborn/client/read/DfsPartitionReader.java | 54 +++++----
common/pom.xml | 20 ++++
.../apache/celeborn/common/meta/DiskFileInfo.java | 40 ++++---
.../celeborn/common/protocol/StorageInfo.java | 21 +++-
common/src/main/proto/TransportMessages.proto | 2 +-
.../org/apache/celeborn/common/CelebornConf.scala | 82 ++++++++++++-
.../apache/celeborn/common/meta/WorkerInfo.scala | 62 +++++-----
.../common/protocol/message/ControlMessages.scala | 10 +-
.../celeborn/common/util/CelebornHadoopUtils.scala | 36 +++++-
.../org/apache/celeborn/common/util/Utils.scala | 7 +-
dev/deps/dependencies-server | 2 +
docs/configuration/client.md | 4 +
docs/configuration/master.md | 5 +
docs/configuration/worker.md | 6 +
master/pom.xml | 20 ++++
.../service/deploy/master/SlotsAllocator.java | 20 +++-
.../master/clustermeta/AbstractMetaManager.java | 8 +-
.../celeborn/service/deploy/master/Master.scala | 49 +++++---
.../src/main/openapi3/worker_rest_v1.yaml | 1 +
pom.xml | 20 +++-
project/CelebornBuild.scala | 12 +-
.../server/common/http/api/v1/ApiUtils.scala | 2 +
.../deploy/worker/memory/MemoryManager.java | 2 +-
.../worker/storage/MapPartitionDataWriter.java | 45 +++----
.../deploy/worker/storage/PartitionDataWriter.java | 45 ++++---
.../worker/storage/PartitionFilesSorter.java | 102 +++++++++-------
.../worker/storage/ReducePartitionDataWriter.java | 13 +-
.../service/deploy/worker/Controller.scala | 2 +-
.../service/deploy/worker/FetchHandler.scala | 6 +
.../service/deploy/worker/PushDataHandler.scala | 5 +-
.../celeborn/service/deploy/worker/Worker.scala | 2 +-
.../service/deploy/worker/storage/FlushTask.scala | 35 +++++-
.../service/deploy/worker/storage/Flusher.scala | 19 +++
.../deploy/worker/storage/StorageManager.scala | 131 ++++++++++++++++-----
.../deploy/worker/storage/StoragePolicy.scala | 2 +-
.../MemoryReducePartitionDataWriterSuiteJ.java | 2 +-
40 files changed, 685 insertions(+), 243 deletions(-)
diff --git a/LICENSE-binary b/LICENSE-binary
index 221f632dd..eabd3db4a 100644
--- a/LICENSE-binary
+++ b/LICENSE-binary
@@ -263,6 +263,7 @@ org.apache.commons:commons-crypto
org.apache.commons:commons-lang3
org.apache.hadoop:hadoop-client-api
org.apache.hadoop:hadoop-client-runtime
+org.apache.hadoop:hadoop-aws
org.apache.ibatis:mybatis
org.apache.logging.log4j:log4j-1.2-api
org.apache.logging.log4j:log4j-api
@@ -307,6 +308,7 @@ org.slf4j:jcl-over-slf4j
org.webjars:swagger-ui
org.xerial.snappy:snappy-java
org.yaml:snakeyaml
+com.amazonaws:aws-java-sdk-bundle
------------------------------------------------------------------------------------
This product bundles various third-party components under other open source
licenses.
diff --git a/NOTICE-binary b/NOTICE-binary
index e4f0de0af..9942e1440 100644
--- a/NOTICE-binary
+++ b/NOTICE-binary
@@ -202,3 +202,7 @@ Copyright (c) 2022 Luke Hutchison
mimepool
Copyright (c) 2018, 2022 Oracle and/or its affiliates.
+
+
+aws-java-sdk
+Copyright 2010-2024 Amazon.com, Inc. or its affiliates.
\ No newline at end of file
diff --git a/build/make-distribution.sh b/build/make-distribution.sh
index e5e39ec6e..1f093bd12 100755
--- a/build/make-distribution.sh
+++ b/build/make-distribution.sh
@@ -27,6 +27,7 @@ RELEASE="false"
MVN="$PROJECT_DIR/build/mvn"
SBT="$PROJECT_DIR/build/sbt"
SBT_ENABLED="false"
+HADOOP_AWS_ENABLED="false"
function exit_with_usage {
echo "make-distribution.sh - tool for making binary distributions of
Celeborn"
@@ -62,6 +63,11 @@ while (( "$#" )); do
echo "Error: $1 is not supported"
exit_with_usage
;;
+ -P*)
+ if [[ "$1" == *"hadoop-aws"* ]]; then
+ HADOOP_AWS_ENABLED="true"
+ fi
+ ;;
-*)
break
;;
@@ -256,7 +262,9 @@ function sbt_build_service {
echo "Celeborn $VERSION$GITREVSTRING" > "$DIST_DIR/RELEASE"
echo "Build flags: $@" >> "$DIST_DIR/RELEASE"
-
+ if [[ "$HADOOP_AWS_ENABLED" == "true" ]]; then
+ export SBT_MAVEN_PROFILES="hadoop-aws"
+ fi
BUILD_COMMAND=("$SBT" clean package)
# Actually build the jar
diff --git a/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java
b/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java
index 0ea484deb..0738368ff 100644
--- a/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java
+++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java
@@ -19,6 +19,7 @@ package org.apache.celeborn.client;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.LongAdder;
@@ -34,6 +35,7 @@ import org.apache.celeborn.common.identity.UserIdentifier;
import org.apache.celeborn.common.network.client.TransportClientFactory;
import org.apache.celeborn.common.protocol.PartitionLocation;
import org.apache.celeborn.common.protocol.PbStreamHandler;
+import org.apache.celeborn.common.protocol.StorageInfo;
import org.apache.celeborn.common.rpc.RpcEndpointRef;
import org.apache.celeborn.common.util.CelebornHadoopUtils;
import org.apache.celeborn.common.util.ExceptionMaker;
@@ -47,7 +49,7 @@ public abstract class ShuffleClient {
private static Logger logger = LoggerFactory.getLogger(ShuffleClient.class);
private static volatile ShuffleClient _instance;
private static volatile boolean initialized = false;
- private static volatile FileSystem hdfsFs;
+ private static volatile Map<StorageInfo.Type, FileSystem> hadoopFs;
private static LongAdder totalReadCounter = new LongAdder();
private static LongAdder localShuffleReadCounter = new LongAdder();
@@ -55,7 +57,7 @@ public abstract class ShuffleClient {
public static void reset() {
_instance = null;
initialized = false;
- hdfsFs = null;
+ hadoopFs = null;
}
protected ShuffleClient() {}
@@ -101,19 +103,19 @@ public abstract class ShuffleClient {
return _instance;
}
- public static FileSystem getHdfsFs(CelebornConf conf) {
- if (null == hdfsFs) {
+ public static Map<StorageInfo.Type, FileSystem> getHadoopFs(CelebornConf
conf) {
+ if (null == hadoopFs) {
synchronized (ShuffleClient.class) {
- if (null == hdfsFs) {
+ if (null == hadoopFs) {
try {
- hdfsFs = CelebornHadoopUtils.getHadoopFS(conf);
+ hadoopFs = CelebornHadoopUtils.getHadoopFS(conf);
} catch (Exception e) {
- logger.error("Celeborn initialize HDFS failed.", e);
+ logger.error("Celeborn initialize DFS failed.", e);
}
}
}
}
- return hdfsFs;
+ return hadoopFs;
}
public static void incrementLocalReadCounter() {
diff --git
a/client/src/main/java/org/apache/celeborn/client/read/DfsPartitionReader.java
b/client/src/main/java/org/apache/celeborn/client/read/DfsPartitionReader.java
index 712bd82e4..b69cf580f 100644
---
a/client/src/main/java/org/apache/celeborn/client/read/DfsPartitionReader.java
+++
b/client/src/main/java/org/apache/celeborn/client/read/DfsPartitionReader.java
@@ -30,6 +30,7 @@ import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.util.ReferenceCounted;
import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -44,6 +45,7 @@ import org.apache.celeborn.common.protocol.PartitionLocation;
import org.apache.celeborn.common.protocol.PbBufferStreamEnd;
import org.apache.celeborn.common.protocol.PbOpenStream;
import org.apache.celeborn.common.protocol.PbStreamHandler;
+import org.apache.celeborn.common.protocol.StorageInfo;
import org.apache.celeborn.common.protocol.StreamType;
import org.apache.celeborn.common.util.ShuffleBlockInfoUtils;
import org.apache.celeborn.common.util.ThreadUtils;
@@ -60,7 +62,7 @@ public class DfsPartitionReader implements PartitionReader {
private volatile boolean closed = false;
private ExecutorService fetchThread;
private boolean fetchThreadStarted;
- private FSDataInputStream hdfsInputStream;
+ private FSDataInputStream dfsInputStream;
private int numChunks = 0;
private int returnedChunks = 0;
private int currentChunkIndex = 0;
@@ -68,6 +70,7 @@ public class DfsPartitionReader implements PartitionReader {
private TransportClient client;
private PbStreamHandler streamHandler;
private MetricsCallback metricsCallback;
+ private FileSystem hadoopFs;
public DfsPartitionReader(
CelebornConf conf,
@@ -85,6 +88,12 @@ public class DfsPartitionReader implements PartitionReader {
this.metricsCallback = metricsCallback;
this.location = location;
+ if (location.getStorageInfo() != null
+ && location.getStorageInfo().getType() == StorageInfo.Type.S3) {
+ this.hadoopFs = ShuffleClient.getHadoopFs(conf).get(StorageInfo.Type.S3);
+ } else {
+ this.hadoopFs =
ShuffleClient.getHadoopFs(conf).get(StorageInfo.Type.HDFS);
+ }
if (endMapIndex != Integer.MAX_VALUE) {
long fetchTimeoutMs = conf.clientFetchTimeoutMs();
@@ -105,18 +114,17 @@ public class DfsPartitionReader implements
PartitionReader {
// Parse this message to ensure sort is done.
} catch (IOException | InterruptedException e) {
throw new IOException(
- "read shuffle file from HDFS failed, filePath: "
+ "read shuffle file from DFS failed, filePath: "
+ location.getStorageInfo().getFilePath(),
e);
}
- hdfsInputStream =
- ShuffleClient.getHdfsFs(conf)
- .open(new
Path(Utils.getSortedFilePath(location.getStorageInfo().getFilePath())));
+
+ dfsInputStream =
+ hadoopFs.open(new
Path(Utils.getSortedFilePath(location.getStorageInfo().getFilePath())));
chunkOffsets.addAll(
getChunkOffsetsFromSortedIndex(conf, location, startMapIndex,
endMapIndex));
} else {
- hdfsInputStream =
- ShuffleClient.getHdfsFs(conf).open(new
Path(location.getStorageInfo().getFilePath()));
+ dfsInputStream = hadoopFs.open(new
Path(location.getStorageInfo().getFilePath()));
chunkOffsets.addAll(getChunkOffsetsFromUnsortedIndex(conf, location));
}
logger.debug(
@@ -138,8 +146,7 @@ public class DfsPartitionReader implements PartitionReader {
throws IOException {
List<Long> offsets;
try (FSDataInputStream indexInputStream =
- ShuffleClient.getHdfsFs(conf)
- .open(new
Path(Utils.getIndexFilePath(location.getStorageInfo().getFilePath())))) {
+ hadoopFs.open(new
Path(Utils.getIndexFilePath(location.getStorageInfo().getFilePath())))) {
offsets = new ArrayList<>();
int offsetCount = indexInputStream.readInt();
for (int i = 0; i < offsetCount; i++) {
@@ -154,10 +161,9 @@ public class DfsPartitionReader implements PartitionReader
{
throws IOException {
String indexPath =
Utils.getIndexFilePath(location.getStorageInfo().getFilePath());
List<Long> offsets;
- try (FSDataInputStream indexInputStream =
- ShuffleClient.getHdfsFs(conf).open(new Path(indexPath))) {
+ try (FSDataInputStream indexInputStream = hadoopFs.open(new
Path(indexPath))) {
logger.debug("read sorted index {}", indexPath);
- long indexSize = ShuffleClient.getHdfsFs(conf).getFileStatus(new
Path(indexPath)).getLen();
+ long indexSize = hadoopFs.getFileStatus(new Path(indexPath)).getLen();
// Index size won't be large, so it's safe to do the conversion.
byte[] indexBuffer = new byte[(int) indexSize];
indexInputStream.readFully(0L, indexBuffer);
@@ -196,24 +202,22 @@ public class DfsPartitionReader implements
PartitionReader {
logger.debug("read {} offset {} length {}", currentChunkIndex,
offset, length);
byte[] buffer = new byte[(int) length];
try {
- hdfsInputStream.readFully(offset, buffer);
+ dfsInputStream.readFully(offset, buffer);
} catch (IOException e) {
logger.warn(
- "read HDFS {} failed will retry, error detail {}",
+ "read DFS {} failed will retry, error detail {}",
location.getStorageInfo().getFilePath(),
e);
try {
- hdfsInputStream.close();
- hdfsInputStream =
- ShuffleClient.getHdfsFs(conf)
- .open(
- new Path(
- Utils.getSortedFilePath(
-
location.getStorageInfo().getFilePath())));
- hdfsInputStream.readFully(offset, buffer);
+ dfsInputStream.close();
+ dfsInputStream =
+ hadoopFs.open(
+ new Path(
+
Utils.getSortedFilePath(location.getStorageInfo().getFilePath())));
+ dfsInputStream.readFully(offset, buffer);
} catch (IOException ex) {
logger.warn(
- "retry read HDFS {} failed, error detail {} ",
+ "retry read DFS {} failed, error detail {} ",
location.getStorageInfo().getFilePath(),
e);
exception.set(ex);
@@ -261,9 +265,9 @@ public class DfsPartitionReader implements PartitionReader {
fetchThread.shutdownNow();
}
try {
- hdfsInputStream.close();
+ dfsInputStream.close();
} catch (IOException e) {
- logger.warn("close HDFS input stream failed.", e);
+ logger.warn("close DFS input stream failed.", e);
}
if (results.size() > 0) {
results.forEach(ReferenceCounted::release);
diff --git a/common/pom.xml b/common/pom.xml
index 29baba769..9db43a17b 100644
--- a/common/pom.xml
+++ b/common/pom.xml
@@ -204,5 +204,25 @@
</dependency>
</dependencies>
</profile>
+ <profile>
+ <id>hadoop-aws</id>
+ <activation>
+ <property>
+ <name>hadoop-aws-deps</name>
+ </property>
+ </activation>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-aws</artifactId>
+ <version>${hadoop.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.amazonaws</groupId>
+ <artifactId>aws-java-sdk-bundle</artifactId>
+ <version>${aws.version}</version>
+ </dependency>
+ </dependencies>
+ </profile>
</profiles>
</project>
diff --git
a/common/src/main/java/org/apache/celeborn/common/meta/DiskFileInfo.java
b/common/src/main/java/org/apache/celeborn/common/meta/DiskFileInfo.java
index ce99192c6..51978ac2e 100644
--- a/common/src/main/java/org/apache/celeborn/common/meta/DiskFileInfo.java
+++ b/common/src/main/java/org/apache/celeborn/common/meta/DiskFileInfo.java
@@ -96,41 +96,41 @@ public class DiskFileInfo extends FileInfo {
return Utils.getIndexFilePath(filePath);
}
- public Path getHdfsPath() {
+ public Path getDfsPath() {
return new Path(filePath);
}
- public Path getHdfsIndexPath() {
+ public Path getDfsIndexPath() {
return new Path(Utils.getIndexFilePath(filePath));
}
- public Path getHdfsSortedPath() {
+ public Path getDfsSortedPath() {
return new Path(Utils.getSortedFilePath(filePath));
}
- public Path getHdfsWriterSuccessPath() {
+ public Path getDfsWriterSuccessPath() {
return new Path(Utils.getWriteSuccessFilePath(filePath));
}
- public Path getHdfsPeerWriterSuccessPath() {
+ public Path getDfsPeerWriterSuccessPath() {
return new
Path(Utils.getWriteSuccessFilePath(Utils.getPeerPath(filePath)));
}
- public void deleteAllFiles(FileSystem hdfsFs) {
- if (isHdfs()) {
+ public void deleteAllFiles(FileSystem dfsFs) {
+ if (isDFS()) {
try {
- hdfsFs.delete(getHdfsPath(), false);
- hdfsFs.delete(getHdfsWriterSuccessPath(), false);
- hdfsFs.delete(getHdfsIndexPath(), false);
- hdfsFs.delete(getHdfsSortedPath(), false);
+ dfsFs.delete(getDfsPath(), false);
+ dfsFs.delete(getDfsWriterSuccessPath(), false);
+ dfsFs.delete(getDfsIndexPath(), false);
+ dfsFs.delete(getDfsSortedPath(), false);
} catch (Exception e) {
// ignore delete exceptions because some other workers might be
deleting the directory
logger.debug(
- "delete HDFS file {},{},{},{} failed {}",
- getHdfsPath(),
- getHdfsWriterSuccessPath(),
- getHdfsIndexPath(),
- getHdfsSortedPath(),
+ "delete DFS file {},{},{},{} failed {}",
+ getDfsPath(),
+ getDfsWriterSuccessPath(),
+ getDfsIndexPath(),
+ getDfsSortedPath(),
e);
}
} else {
@@ -151,4 +151,12 @@ public class DiskFileInfo extends FileInfo {
public boolean isHdfs() {
return Utils.isHdfsPath(filePath);
}
+
+ public boolean isS3() {
+ return Utils.isS3Path(filePath);
+ }
+
+ public boolean isDFS() {
+ return Utils.isS3Path(filePath) || Utils.isHdfsPath(filePath);
+ }
}
diff --git
a/common/src/main/java/org/apache/celeborn/common/protocol/StorageInfo.java
b/common/src/main/java/org/apache/celeborn/common/protocol/StorageInfo.java
index 621edb774..28cb65256 100644
--- a/common/src/main/java/org/apache/celeborn/common/protocol/StorageInfo.java
+++ b/common/src/main/java/org/apache/celeborn/common/protocol/StorageInfo.java
@@ -26,7 +26,9 @@ public class StorageInfo implements Serializable {
HDD(1),
SSD(2),
HDFS(3),
- OSS(4);
+ OSS(4),
+ S3(5);
+
private final int value;
Type(int value) {
@@ -54,6 +56,7 @@ public class StorageInfo implements Serializable {
public static final int LOCAL_DISK_MASK = 0b10;
public static final int HDFS_MASK = 0b100;
public static final int OSS_MASK = 0b1000;
+ public static final int S3_MASK = 0b10000;
public static final int ALL_TYPES_AVAILABLE_MASK = 0;
// Default storage Type is MEMORY.
@@ -162,15 +165,28 @@ public class StorageInfo implements Serializable {
return StorageInfo.HDFSOnly(availableStorageTypes);
}
+ public static boolean S3Only(int availableStorageTypes) {
+ return availableStorageTypes == S3_MASK;
+ }
+
public static boolean OSSAvailable(int availableStorageTypes) {
return availableStorageTypes == ALL_TYPES_AVAILABLE_MASK
|| (availableStorageTypes & OSS_MASK) > 0;
}
+ public static boolean S3Available(int availableStorageTypes) {
+ return availableStorageTypes == ALL_TYPES_AVAILABLE_MASK
+ || (availableStorageTypes & S3_MASK) > 0;
+ }
+
public boolean OSSAvailable() {
return StorageInfo.OSSAvailable(availableStorageTypes);
}
+ public boolean S3Available() {
+ return StorageInfo.S3Available(availableStorageTypes);
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) return true;
@@ -232,6 +248,9 @@ public class StorageInfo implements Serializable {
case OSS:
ava = ava | OSS_MASK;
break;
+ case S3:
+ ava = ava | S3_MASK;
+ break;
}
}
return ava;
diff --git a/common/src/main/proto/TransportMessages.proto
b/common/src/main/proto/TransportMessages.proto
index f8de50486..2b8432c44 100644
--- a/common/src/main/proto/TransportMessages.proto
+++ b/common/src/main/proto/TransportMessages.proto
@@ -70,7 +70,7 @@ enum MessageType {
PARTITION_SPLIT = 47;
REGISTER_MAP_PARTITION_TASK = 48;
HEARTBEAT_FROM_APPLICATION_RESPONSE = 49;
- CHECK_FOR_HDFS_EXPIRED_DIRS_TIMEOUT = 50;
+ CHECK_FOR_DFS_EXPIRED_DIRS_TIMEOUT = 50;
OPEN_STREAM = 51;
STREAM_HANDLER = 52;
CHECK_WORKERS_AVAILABLE = 53;
diff --git
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index 9f05c96bf..ff21a311f 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -26,6 +26,7 @@ import scala.collection.mutable
import scala.concurrent.duration._
import scala.util.Try
+import org.apache.celeborn.common.CelebornConf.{MASTER_INTERNAL_ENDPOINTS,
S3_ACCESS_KEY, S3_DIR, S3_SECRET_KEY}
import
org.apache.celeborn.common.authentication.AnonymousAuthenticationProviderImpl
import org.apache.celeborn.common.identity.{DefaultIdentityProvider,
IdentityProvider}
import org.apache.celeborn.common.internal.Logging
@@ -641,6 +642,8 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable
with Logging with Se
}
def hasHDFSStorage: Boolean =
get(ACTIVE_STORAGE_TYPES).contains(StorageInfo.Type.HDFS.name()) &&
get(HDFS_DIR).isDefined
+ def hasS3Storage: Boolean =
+ get(ACTIVE_STORAGE_TYPES).contains(StorageInfo.Type.S3.name()) &&
get(S3_DIR).isDefined
def masterSlotAssignLoadAwareDiskGroupNum: Int =
get(MASTER_SLOT_ASSIGN_LOADAWARE_DISKGROUP_NUM)
def masterSlotAssignLoadAwareDiskGroupGradient: Double =
get(MASTER_SLOT_ASSIGN_LOADAWARE_DISKGROUP_GRADIENT)
@@ -886,6 +889,7 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable
with Logging with Se
def clientCommitFilesIgnoreExcludedWorkers: Boolean =
get(CLIENT_COMMIT_IGNORE_EXCLUDED_WORKERS)
def appHeartbeatTimeoutMs: Long = get(APPLICATION_HEARTBEAT_TIMEOUT)
def hdfsExpireDirsTimeoutMS: Long = get(HDFS_EXPIRE_DIRS_TIMEOUT)
+ def dfsExpireDirsTimeoutMS: Long = get(DFS_EXPIRE_DIRS_TIMEOUT)
def appHeartbeatIntervalMs: Long = get(APPLICATION_HEARTBEAT_INTERVAL)
def applicationUnregisterEnabled: Boolean =
get(APPLICATION_UNREGISTER_ENABLED)
@@ -1093,7 +1097,7 @@ class CelebornConf(loadDefaults: Boolean) extends
Cloneable with Logging with Se
(dir, maxCapacity, flushThread, diskType)
}
}.getOrElse {
- if (!hasHDFSStorage) {
+ if (!hasHDFSStorage && !hasS3Storage) {
val prefix = workerStorageBaseDirPrefix
val number = workerStorageBaseDirNumber
(1 to number).map { i =>
@@ -1108,6 +1112,24 @@ class CelebornConf(loadDefaults: Boolean) extends
Cloneable with Logging with Se
def partitionSplitMinimumSize: Long = get(WORKER_PARTITION_SPLIT_MIN_SIZE)
def partitionSplitMaximumSize: Long = get(WORKER_PARTITION_SPLIT_MAX_SIZE)
+ def s3AccessKey: String = get(S3_ACCESS_KEY).getOrElse("")
+
+ def s3SecretKey: String = get(S3_SECRET_KEY).getOrElse("")
+
+ def s3Endpoint: String = get(S3_ENDPOINT).getOrElse("")
+
+ def s3Dir: String = {
+ get(S3_DIR).map {
+ s3Dir =>
+ if (!Utils.isS3Path(s3Dir)) {
+ log.error(s"${S3_DIR.key} configuration is wrong $s3Dir. Disable S3
support.")
+ ""
+ } else {
+ s3Dir
+ }
+ }.getOrElse("")
+ }
+
def hdfsDir: String = {
get(HDFS_DIR).map {
hdfsDir =>
@@ -1177,10 +1199,12 @@ class CelebornConf(loadDefaults: Boolean) extends
Cloneable with Logging with Se
// //////////////////////////////////////////////////////
def workerFlusherBufferSize: Long = get(WORKER_FLUSHER_BUFFER_SIZE)
def workerHdfsFlusherBufferSize: Long = get(WORKER_HDFS_FLUSHER_BUFFER_SIZE)
+ def workerS3FlusherBufferSize: Long = get(WORKER_S3_FLUSHER_BUFFER_SIZE)
def workerWriterCloseTimeoutMs: Long = get(WORKER_WRITER_CLOSE_TIMEOUT)
def workerHddFlusherThreads: Int = get(WORKER_FLUSHER_HDD_THREADS)
def workerSsdFlusherThreads: Int = get(WORKER_FLUSHER_SSD_THREADS)
def workerHdfsFlusherThreads: Int = get(WORKER_FLUSHER_HDFS_THREADS)
+ def workerS3FlusherThreads: Int = get(WORKER_FLUSHER_S3_THREADS)
def workerCreateWriterMaxAttempts: Int =
get(WORKER_WRITER_CREATE_MAX_ATTEMPTS)
// //////////////////////////////////////////////////////
@@ -2134,6 +2158,14 @@ object CelebornConf extends Logging {
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("1h")
+ val DFS_EXPIRE_DIRS_TIMEOUT: ConfigEntry[Long] =
+ buildConf("celeborn.master.dfs.expireDirs.timeout")
+ .categories("master")
+ .version("0.6.0")
+ .doc("The timeout for a expire dirs to be deleted on S3 or HDFS.")
+ .timeConf(TimeUnit.MILLISECONDS)
+ .createWithDefaultString("1h")
+
val WORKER_HEARTBEAT_TIMEOUT: ConfigEntry[Long] =
buildConf("celeborn.master.heartbeat.worker.timeout")
.withAlternative("celeborn.worker.heartbeat.timeout")
@@ -2801,6 +2833,38 @@ object CelebornConf extends Logging {
.stringConf
.createOptional
+ val S3_DIR: OptionalConfigEntry[String] =
+ buildConf("celeborn.storage.s3.dir")
+ .categories("worker", "master", "client")
+ .version("0.6.0")
+ .doc("S3 base directory for Celeborn to store shuffle data.")
+ .stringConf
+ .createOptional
+
+ val S3_SECRET_KEY: OptionalConfigEntry[String] =
+ buildConf("celeborn.storage.s3.secret.key")
+ .categories("worker", "master", "client")
+ .version("0.6.0")
+ .doc("S3 secret key for Celeborn to store shuffle data.")
+ .stringConf
+ .createOptional
+
+ val S3_ACCESS_KEY: OptionalConfigEntry[String] =
+ buildConf("celeborn.storage.s3.access.key")
+ .categories("worker", "master", "client")
+ .version("0.6.0")
+ .doc("S3 access key for Celeborn to store shuffle data.")
+ .stringConf
+ .createOptional
+
+ val S3_ENDPOINT: OptionalConfigEntry[String] =
+ buildConf("celeborn.storage.s3.endpoint")
+ .categories("worker", "master", "client")
+ .version("0.6.0")
+ .doc("S3 endpoint for Celeborn to store shuffle data.")
+ .stringConf
+ .createOptional
+
val WORKER_DISK_RESERVE_SIZE: ConfigEntry[Long] =
buildConf("celeborn.worker.storage.disk.reserve.size")
.withAlternative("celeborn.worker.disk.reserve.size")
@@ -3219,6 +3283,14 @@ object CelebornConf extends Logging {
.bytesConf(ByteUnit.BYTE)
.createWithDefaultString("4m")
+ val WORKER_S3_FLUSHER_BUFFER_SIZE: ConfigEntry[Long] =
+ buildConf("celeborn.worker.flusher.s3.buffer.size")
+ .categories("worker")
+ .version("0.6.0")
+ .doc("Size of buffer used by a S3 flusher.")
+ .bytesConf(ByteUnit.BYTE)
+ .createWithDefaultString("4m")
+
val WORKER_WRITER_CLOSE_TIMEOUT: ConfigEntry[Long] =
buildConf("celeborn.worker.writer.close.timeout")
.categories("worker")
@@ -3259,6 +3331,14 @@ object CelebornConf extends Logging {
.intConf
.createWithDefault(8)
+ val WORKER_FLUSHER_S3_THREADS: ConfigEntry[Int] =
+ buildConf("celeborn.worker.flusher.s3.threads")
+ .categories("worker")
+ .doc("Flusher's thread count used for write data to S3.")
+ .version("0.6.0")
+ .intConf
+ .createWithDefault(8)
+
val WORKER_FLUSHER_SHUTDOWN_TIMEOUT: ConfigEntry[Long] =
buildConf("celeborn.worker.flusher.shutdownTimeout")
.categories("worker")
diff --git
a/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala
b/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala
index a0eedc3f9..bc801b7c8 100644
--- a/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala
@@ -46,7 +46,8 @@ class WorkerInfo(
var lastHeartbeat: Long = 0
var workerStatus = WorkerStatus.normalWorkerStatus()
val diskInfos =
- if (_diskInfos != null) JavaUtils.newConcurrentHashMap[String,
DiskInfo](_diskInfos) else null
+ if (_diskInfos != null) JavaUtils.newConcurrentHashMap[String,
DiskInfo](_diskInfos)
+ else null
val userResourceConsumption =
if (_userResourceConsumption != null)
JavaUtils.newConcurrentHashMap[UserIdentifier,
ResourceConsumption](_userResourceConsumption)
@@ -198,40 +199,41 @@ class WorkerInfo(
def updateThenGetDiskInfos(
newDiskInfos: java.util.Map[String, DiskInfo],
- estimatedPartitionSize: Option[Long] = None): util.Map[String, DiskInfo]
= this.synchronized {
- import scala.collection.JavaConverters._
- for (newDisk <- newDiskInfos.values().asScala) {
- val mountPoint: String = newDisk.mountPoint
- val curDisk = diskInfos.get(mountPoint)
- if (curDisk != null) {
- curDisk.actualUsableSpace = newDisk.actualUsableSpace
- curDisk.totalSpace = newDisk.totalSpace
- // Update master's diskinfo activeslots to worker's value
- curDisk.activeSlots = newDisk.activeSlots
- curDisk.avgFlushTime = newDisk.avgFlushTime
- curDisk.avgFetchTime = newDisk.avgFetchTime
- if (estimatedPartitionSize.nonEmpty && curDisk.storageType !=
StorageInfo.Type.HDFS) {
- curDisk.maxSlots = curDisk.actualUsableSpace /
estimatedPartitionSize.get
- }
- curDisk.setStatus(newDisk.status)
- } else {
- if (estimatedPartitionSize.nonEmpty && newDisk.storageType !=
StorageInfo.Type.HDFS) {
- newDisk.maxSlots = newDisk.actualUsableSpace /
estimatedPartitionSize.get
+ estimatedPartitionSize: Option[Long] = None): util.Map[String, DiskInfo]
=
+ this.synchronized {
+ import scala.collection.JavaConverters._
+ for (newDisk <- newDiskInfos.values().asScala) {
+ val mountPoint: String = newDisk.mountPoint
+ val curDisk = diskInfos.get(mountPoint)
+ if (curDisk != null) {
+ curDisk.actualUsableSpace = newDisk.actualUsableSpace
+ curDisk.totalSpace = newDisk.totalSpace
+ // Update master's diskinfo activeslots to worker's value
+ curDisk.activeSlots = newDisk.activeSlots
+ curDisk.avgFlushTime = newDisk.avgFlushTime
+ curDisk.avgFetchTime = newDisk.avgFetchTime
+ if (estimatedPartitionSize.nonEmpty && curDisk.storageType !=
StorageInfo.Type.HDFS && curDisk.storageType != StorageInfo.Type.S3) {
+ curDisk.maxSlots = curDisk.actualUsableSpace /
estimatedPartitionSize.get
+ }
+ curDisk.setStatus(newDisk.status)
+ } else {
+ if (estimatedPartitionSize.nonEmpty && newDisk.storageType !=
StorageInfo.Type.HDFS && newDisk.storageType != StorageInfo.Type.S3) {
+ newDisk.maxSlots = newDisk.actualUsableSpace /
estimatedPartitionSize.get
+ }
+ diskInfos.put(mountPoint, newDisk)
}
- diskInfos.put(mountPoint, newDisk)
}
- }
- val nonExistsMountPoints: java.util.Set[String] = new util.HashSet[String]
- nonExistsMountPoints.addAll(diskInfos.keySet)
- nonExistsMountPoints.removeAll(newDiskInfos.keySet)
- if (!nonExistsMountPoints.isEmpty) {
- for (nonExistsMountPoint <- nonExistsMountPoints.asScala) {
- diskInfos.remove(nonExistsMountPoint)
+ val nonExistsMountPoints: java.util.Set[String] = new
util.HashSet[String]
+ nonExistsMountPoints.addAll(diskInfos.keySet)
+ nonExistsMountPoints.removeAll(newDiskInfos.keySet)
+ if (!nonExistsMountPoints.isEmpty) {
+ for (nonExistsMountPoint <- nonExistsMountPoints.asScala) {
+ diskInfos.remove(nonExistsMountPoint)
+ }
}
+ JavaUtils.newConcurrentHashMap[String, DiskInfo](diskInfos)
}
- JavaUtils.newConcurrentHashMap[String, DiskInfo](diskInfos)
- }
def updateThenGetUserResourceConsumption(resourceConsumptions: util.Map[
UserIdentifier,
diff --git
a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
index 94d4a22a8..c94744bb4 100644
---
a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
+++
b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
@@ -68,7 +68,7 @@ object ControlMessages extends Logging {
case object CheckForWorkerUnavailableInfoTimeout extends Message
- case object CheckForHDFSExpiredDirsTimeout extends Message
+ case object CheckForDFSExpiredDirsTimeout extends Message
case object RemoveExpiredShuffle extends Message
@@ -509,8 +509,8 @@ object ControlMessages extends Logging {
case CheckForApplicationTimeOut =>
new TransportMessage(MessageType.CHECK_APPLICATION_TIMEOUT, null)
- case CheckForHDFSExpiredDirsTimeout =>
- new TransportMessage(MessageType.CHECK_FOR_HDFS_EXPIRED_DIRS_TIMEOUT,
null)
+ case CheckForDFSExpiredDirsTimeout =>
+ new TransportMessage(MessageType.CHECK_FOR_DFS_EXPIRED_DIRS_TIMEOUT,
null)
case RemoveExpiredShuffle =>
new TransportMessage(MessageType.REMOVE_EXPIRED_SHUFFLE, null)
@@ -1263,8 +1263,8 @@ object ControlMessages extends Logging {
case CHECK_APPLICATION_TIMEOUT_VALUE =>
CheckForApplicationTimeOut
- case CHECK_FOR_HDFS_EXPIRED_DIRS_TIMEOUT_VALUE =>
- CheckForHDFSExpiredDirsTimeout
+ case CHECK_FOR_DFS_EXPIRED_DIRS_TIMEOUT_VALUE =>
+ CheckForDFSExpiredDirsTimeout
case WORKER_LOST_VALUE =>
PbWorkerLost.parseFrom(message.getPayload)
diff --git
a/common/src/main/scala/org/apache/celeborn/common/util/CelebornHadoopUtils.scala
b/common/src/main/scala/org/apache/celeborn/common/util/CelebornHadoopUtils.scala
index ae07187e7..cf15709a7 100644
---
a/common/src/main/scala/org/apache/celeborn/common/util/CelebornHadoopUtils.scala
+++
b/common/src/main/scala/org/apache/celeborn/common/util/CelebornHadoopUtils.scala
@@ -20,6 +20,8 @@ package org.apache.celeborn.common.util
import java.io.{File, IOException}
import java.util.concurrent.atomic.AtomicBoolean
+import scala.collection.JavaConverters._
+
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.security.UserGroupInformation
@@ -27,6 +29,7 @@ import org.apache.hadoop.security.UserGroupInformation
import org.apache.celeborn.common.CelebornConf
import org.apache.celeborn.common.exception.CelebornException
import org.apache.celeborn.common.internal.Logging
+import org.apache.celeborn.common.protocol.StorageInfo
object CelebornHadoopUtils extends Logging {
private var logPrinted = new AtomicBoolean(false)
@@ -46,6 +49,20 @@ object CelebornHadoopUtils extends Logging {
"prefix 'celeborn.hadoop.', e.g.
'celeborn.hadoop.dfs.replication=3'")
}
}
+
+ if (conf.s3Dir.nonEmpty) {
+ if (conf.s3AccessKey.isEmpty || conf.s3SecretKey.isEmpty ||
conf.s3Endpoint.isEmpty) {
+ throw new CelebornException(
+ "S3 storage is enabled but s3AccessKey, s3SecretKey, or s3Endpoint
is not set")
+ }
+ hadoopConf.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
+ hadoopConf.set(
+ "fs.s3a.aws.credentials.provider",
+ "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider")
+ hadoopConf.set("fs.s3a.access.key", conf.s3AccessKey)
+ hadoopConf.set("fs.s3a.secret.key", conf.s3SecretKey)
+ hadoopConf.set("fs.s3a.endpoint", conf.s3Endpoint)
+ }
appendSparkHadoopConfigs(conf, hadoopConf)
hadoopConf
}
@@ -57,22 +74,31 @@ object CelebornHadoopUtils extends Logging {
}
}
- def getHadoopFS(conf: CelebornConf): FileSystem = {
+ def getHadoopFS(conf: CelebornConf): java.util.Map[StorageInfo.Type,
FileSystem] = {
val hadoopConf = newConfiguration(conf)
initKerberos(conf, hadoopConf)
- new Path(conf.hdfsDir).getFileSystem(hadoopConf)
+ val hadoopFs = new java.util.HashMap[StorageInfo.Type, FileSystem]()
+ if (conf.hasHDFSStorage) {
+ val hdfsDir = new Path(conf.hdfsDir)
+ hadoopFs.put(StorageInfo.Type.HDFS, hdfsDir.getFileSystem(hadoopConf))
+ }
+ if (conf.hasS3Storage) {
+ val s3Dir = new Path(conf.s3Dir)
+ hadoopFs.put(StorageInfo.Type.S3, s3Dir.getFileSystem(hadoopConf))
+ }
+ hadoopFs
}
- def deleteHDFSPathOrLogError(hadoopFs: FileSystem, path: Path, recursive:
Boolean): Unit = {
+ def deleteDFSPathOrLogError(hadoopFs: FileSystem, path: Path, recursive:
Boolean): Unit = {
try {
val startTime = System.currentTimeMillis()
hadoopFs.delete(path, recursive)
logInfo(
- s"Delete HDFS ${path}(recursive=$recursive) costs " +
+ s"Delete DFS ${path}(recursive=$recursive) costs " +
Utils.msDurationToString(System.currentTimeMillis() - startTime))
} catch {
case e: IOException =>
- logError(s"Failed to delete HDFS ${path}(recursive=$recursive) due to:
", e)
+ logError(s"Failed to delete DFS ${path}(recursive=$recursive) due to:
", e)
}
}
diff --git a/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
b/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
index 494c77ab9..d390561d7 100644
--- a/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
@@ -1119,7 +1119,8 @@ object Utils extends Logging {
val SORTED_SUFFIX = ".sorted"
val INDEX_SUFFIX = ".index"
val SUFFIX_HDFS_WRITE_SUCCESS = ".success"
- val COMPATIBLE_HDFS_REGEX = "^[a-zA-Z0-9]+://.*"
+ val COMPATIBLE_HDFS_REGEX = "^(?!s3a://)[a-zA-Z0-9]+://.*"
+ val S3_REGEX = "^s3[a]?://([a-z0-9][a-z0-9-]{1,61}[a-z0-9])(/.*)?$"
val UNKNOWN_APP_SHUFFLE_ID = -1
@@ -1127,6 +1128,10 @@ object Utils extends Logging {
path.matches(COMPATIBLE_HDFS_REGEX)
}
+ def isS3Path(path: String): Boolean = {
+ path.matches(S3_REGEX)
+ }
+
def getSortedFilePath(path: String): String = {
path + SORTED_SUFFIX
}
diff --git a/dev/deps/dependencies-server b/dev/deps/dependencies-server
index 4e305005e..521e9872d 100644
--- a/dev/deps/dependencies-server
+++ b/dev/deps/dependencies-server
@@ -19,6 +19,7 @@ HikariCP/4.0.3//HikariCP-4.0.3.jar
RoaringBitmap/1.0.6//RoaringBitmap-1.0.6.jar
aopalliance-repackaged/2.6.1//aopalliance-repackaged-2.6.1.jar
ap-loader-all/3.0-8//ap-loader-all-3.0-8.jar
+aws-java-sdk-bundle/1.12.367//aws-java-sdk-bundle-1.12.367.jar
classgraph/4.8.138//classgraph-4.8.138.jar
commons-cli/1.5.0//commons-cli-1.5.0.jar
commons-crypto/1.0.0//commons-crypto-1.0.0.jar
@@ -27,6 +28,7 @@ commons-lang3/3.13.0//commons-lang3-3.13.0.jar
commons-logging/1.1.3//commons-logging-1.1.3.jar
failureaccess/1.0.2//failureaccess-1.0.2.jar
guava/33.1.0-jre//guava-33.1.0-jre.jar
+hadoop-aws/3.3.6//hadoop-aws-3.3.6.jar
hadoop-client-api/3.3.6//hadoop-client-api-3.3.6.jar
hadoop-client-runtime/3.3.6//hadoop-client-runtime-3.3.6.jar
hk2-api/2.6.1//hk2-api-2.6.1.jar
diff --git a/docs/configuration/client.md b/docs/configuration/client.md
index 947a79db6..eda8edce2 100644
--- a/docs/configuration/client.md
+++ b/docs/configuration/client.md
@@ -122,4 +122,8 @@ license: |
| celeborn.quota.identity.user-specific.userName | default | false | User name
if celeborn.quota.identity.provider is
org.apache.celeborn.common.identity.DefaultIdentityProvider. | 0.3.0 | |
| celeborn.storage.availableTypes | HDD | false | Enabled storages. Available
options: MEMORY,HDD,SSD,HDFS. Note: HDD and SSD would be treated as identical.
| 0.3.0 | celeborn.storage.activeTypes |
| celeborn.storage.hdfs.dir | <undefined> | false | HDFS base directory
for Celeborn to store shuffle data. | 0.2.0 | |
+| celeborn.storage.s3.access.key | <undefined> | false | S3 access key
for Celeborn to store shuffle data. | 0.6.0 | |
+| celeborn.storage.s3.dir | <undefined> | false | S3 base directory for
Celeborn to store shuffle data. | 0.6.0 | |
+| celeborn.storage.s3.endpoint | <undefined> | false | S3 endpoint for
Celeborn to store shuffle data. | 0.6.0 | |
+| celeborn.storage.s3.secret.key | <undefined> | false | S3 secret key
for Celeborn to store shuffle data. | 0.6.0 | |
<!--end-include-->
diff --git a/docs/configuration/master.md b/docs/configuration/master.md
index 5c5bd8ac2..11d2e09f3 100644
--- a/docs/configuration/master.md
+++ b/docs/configuration/master.md
@@ -34,6 +34,7 @@ license: |
| celeborn.dynamicConfig.store.fs.path | <undefined> | false | The path
of dynamic config file for fs store backend. The file format should be yaml.
The default path is `${CELEBORN_CONF_DIR}/dynamicConfig.yaml`. | 0.5.0 | |
| celeborn.internal.port.enabled | false | false | Whether to create a
internal port on Masters/Workers for inter-Masters/Workers communication. This
is beneficial when SASL authentication is enforced for all interactions between
clients and Celeborn Services, but the services can exchange messages without
being subject to SASL authentication. | 0.5.0 | |
| celeborn.logConf.enabled | false | false | When `true`, log the CelebornConf
for debugging purposes. | 0.5.0 | |
+| celeborn.master.dfs.expireDirs.timeout | 1h | false | The timeout for a
expire dirs to be deleted on S3 or HDFS. | 0.6.0 | |
| celeborn.master.estimatedPartitionSize.initialSize | 64mb | false | Initial
partition size for estimation, it will change according to runtime stats. |
0.3.0 | celeborn.shuffle.initialEstimatedPartitionSize |
| celeborn.master.estimatedPartitionSize.maxSize | <undefined> | false |
Max partition size for estimation. Default value should be
celeborn.worker.shuffle.partitionSplit.max * 2. | 0.4.1 | |
| celeborn.master.estimatedPartitionSize.minSize | 8mb | false | Ignore
partition size smaller than this configuration of partition size for
estimation. | 0.3.0 | celeborn.shuffle.minPartitionSizeToEstimate |
@@ -74,4 +75,8 @@ license: |
| celeborn.storage.hdfs.dir | <undefined> | false | HDFS base directory
for Celeborn to store shuffle data. | 0.2.0 | |
| celeborn.storage.hdfs.kerberos.keytab | <undefined> | false | Kerberos
keytab file path for HDFS storage connection. | 0.3.2 | |
| celeborn.storage.hdfs.kerberos.principal | <undefined> | false |
Kerberos principal for HDFS storage connection. | 0.3.2 | |
+| celeborn.storage.s3.access.key | <undefined> | false | S3 access key
for Celeborn to store shuffle data. | 0.6.0 | |
+| celeborn.storage.s3.dir | <undefined> | false | S3 base directory for
Celeborn to store shuffle data. | 0.6.0 | |
+| celeborn.storage.s3.endpoint | <undefined> | false | S3 endpoint for
Celeborn to store shuffle data. | 0.6.0 | |
+| celeborn.storage.s3.secret.key | <undefined> | false | S3 secret key
for Celeborn to store shuffle data. | 0.6.0 | |
<!--end-include-->
diff --git a/docs/configuration/worker.md b/docs/configuration/worker.md
index 2b6ec3df8..ed3d15454 100644
--- a/docs/configuration/worker.md
+++ b/docs/configuration/worker.md
@@ -44,6 +44,10 @@ license: |
| celeborn.storage.hdfs.dir | <undefined> | false | HDFS base directory
for Celeborn to store shuffle data. | 0.2.0 | |
| celeborn.storage.hdfs.kerberos.keytab | <undefined> | false | Kerberos
keytab file path for HDFS storage connection. | 0.3.2 | |
| celeborn.storage.hdfs.kerberos.principal | <undefined> | false |
Kerberos principal for HDFS storage connection. | 0.3.2 | |
+| celeborn.storage.s3.access.key | <undefined> | false | S3 access key
for Celeborn to store shuffle data. | 0.6.0 | |
+| celeborn.storage.s3.dir | <undefined> | false | S3 base directory for
Celeborn to store shuffle data. | 0.6.0 | |
+| celeborn.storage.s3.endpoint | <undefined> | false | S3 endpoint for
Celeborn to store shuffle data. | 0.6.0 | |
+| celeborn.storage.s3.secret.key | <undefined> | false | S3 secret key
for Celeborn to store shuffle data. | 0.6.0 | |
| celeborn.worker.activeConnection.max | <undefined> | false | If the
number of active connections on a worker exceeds this configuration value, the
worker will be marked as high-load in the heartbeat report, and the master will
not include that node in the response of RequestSlots. | 0.3.1 | |
| celeborn.worker.applicationRegistry.cache.size | 10000 | false | Cache size
of the application registry on Workers. | 0.5.0 | |
| celeborn.worker.bufferStream.threadsPerMountpoint | 8 | false | Threads
count for read buffer per mount point. | 0.3.0 | |
@@ -74,6 +78,8 @@ license: |
| celeborn.worker.flusher.hdd.threads | 1 | false | Flusher's thread count per
disk used for write data to HDD disks. | 0.2.0 | |
| celeborn.worker.flusher.hdfs.buffer.size | 4m | false | Size of buffer used
by a HDFS flusher. | 0.3.0 | |
| celeborn.worker.flusher.hdfs.threads | 8 | false | Flusher's thread count
used for write data to HDFS. | 0.2.0 | |
+| celeborn.worker.flusher.s3.buffer.size | 4m | false | Size of buffer used by
a S3 flusher. | 0.6.0 | |
+| celeborn.worker.flusher.s3.threads | 8 | false | Flusher's thread count used
for write data to S3. | 0.6.0 | |
| celeborn.worker.flusher.shutdownTimeout | 3s | false | Timeout for a flusher
to shutdown. | 0.2.0 | |
| celeborn.worker.flusher.ssd.threads | 16 | false | Flusher's thread count
per disk used for write data to SSD disks. | 0.2.0 | |
| celeborn.worker.flusher.threads | 16 | false | Flusher's thread count per
disk for unknown-type disks. | 0.2.0 | |
diff --git a/master/pom.xml b/master/pom.xml
index abe061473..a2573e277 100644
--- a/master/pom.xml
+++ b/master/pom.xml
@@ -158,6 +158,26 @@
</dependency>
</dependencies>
</profile>
+ <profile>
+ <id>hadoop-aws</id>
+ <activation>
+ <property>
+ <name>hadoop-aws-deps</name>
+ </property>
+ </activation>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-aws</artifactId>
+ <version>${hadoop.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.amazonaws</groupId>
+ <artifactId>aws-java-sdk-bundle</artifactId>
+ <version>${aws.version}</version>
+ </dependency>
+ </dependencies>
+ </profile>
<profile>
<id>hadoop-2</id>
<activation>
diff --git
a/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java
b/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java
index ced728b2f..eda5a3a2a 100644
---
a/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java
+++
b/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java
@@ -73,7 +73,8 @@ public class SlotsAllocator {
for (Map.Entry<String, DiskInfo> diskInfoEntry :
worker.diskInfos().entrySet()) {
if (diskInfoEntry.getValue().status().equals(DiskStatus.HEALTHY)) {
if (StorageInfo.localDiskAvailable(availableStorageTypes)
- && diskInfoEntry.getValue().storageType() !=
StorageInfo.Type.HDFS) {
+ && diskInfoEntry.getValue().storageType() !=
StorageInfo.Type.HDFS
+ && diskInfoEntry.getValue().storageType() !=
StorageInfo.Type.S3) {
usableDisks.add(
new UsableDiskInfo(
diskInfoEntry.getValue(),
diskInfoEntry.getValue().availableSlots()));
@@ -82,6 +83,11 @@ public class SlotsAllocator {
usableDisks.add(
new UsableDiskInfo(
diskInfoEntry.getValue(),
diskInfoEntry.getValue().availableSlots()));
+ } else if (StorageInfo.S3Available(availableStorageTypes)
+ && diskInfoEntry.getValue().storageType() ==
StorageInfo.Type.S3) {
+ usableDisks.add(
+ new UsableDiskInfo(
+ diskInfoEntry.getValue(),
diskInfoEntry.getValue().availableSlots()));
}
}
}
@@ -123,6 +129,10 @@ public class SlotsAllocator {
return offerSlotsRoundRobin(
workers, partitionIds, shouldReplicate, shouldRackAware,
availableStorageTypes);
}
+ if (StorageInfo.S3Only(availableStorageTypes)) {
+ return offerSlotsRoundRobin(
+ workers, partitionIds, shouldReplicate, shouldRackAware,
availableStorageTypes);
+ }
List<DiskInfo> usableDisks = new ArrayList<>();
Map<DiskInfo, WorkerInfo> diskToWorkerMap = new HashMap<>();
@@ -141,7 +151,8 @@ public class SlotsAllocator {
? Option.empty()
: Option.apply(diskReserveRatio.get()))
&& diskInfo.status().equals(DiskStatus.HEALTHY)
- && diskInfo.storageType() != StorageInfo.Type.HDFS) {
+ && diskInfo.storageType() != StorageInfo.Type.HDFS
+ && diskInfo.storageType() != StorageInfo.Type.S3) {
usableDisks.add(diskInfo);
}
}));
@@ -198,6 +209,8 @@ public class SlotsAllocator {
DiskInfo selectedDiskInfo = usableDiskInfos.get(diskIndex).diskInfo;
if (selectedDiskInfo.storageType() == StorageInfo.Type.HDFS) {
storageInfo = new StorageInfo("", StorageInfo.Type.HDFS,
availableStorageTypes);
+ } else if (selectedDiskInfo.storageType() == StorageInfo.Type.S3) {
+ storageInfo = new StorageInfo("", StorageInfo.Type.S3,
availableStorageTypes);
} else {
storageInfo =
new StorageInfo(
@@ -211,6 +224,7 @@ public class SlotsAllocator {
DiskInfo[] diskInfos =
selectedWorker.diskInfos().values().stream()
.filter(p -> p.storageType() != StorageInfo.Type.HDFS)
+ .filter(p -> p.storageType() != StorageInfo.Type.S3)
.collect(Collectors.toList())
.toArray(new DiskInfo[0]);
storageInfo =
@@ -219,6 +233,8 @@ public class SlotsAllocator {
diskInfos[diskIndex].storageType(),
availableStorageTypes);
workerDiskIndex.put(selectedWorker, (diskIndex + 1) %
diskInfos.length);
+ } else if (StorageInfo.S3Available(availableStorageTypes)) {
+ storageInfo = new StorageInfo("", StorageInfo.Type.S3,
availableStorageTypes);
} else {
storageInfo = new StorageInfo("", StorageInfo.Type.HDFS,
availableStorageTypes);
}
diff --git
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
index f3ab34b67..371e89162 100644
---
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
+++
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
@@ -212,10 +212,14 @@ public abstract class AbstractMetaManager implements
IMetadataHandler {
long healthyDiskNum =
disks.values().stream().filter(s ->
s.status().equals(DiskStatus.HEALTHY)).count();
if (!excludedWorkers.contains(worker)
- && (((disks.isEmpty() || healthyDiskNum <= 0) &&
!conf.hasHDFSStorage()) || highWorkload)) {
+ && (((disks.isEmpty() || healthyDiskNum <= 0)
+ && (!conf.hasHDFSStorage())
+ && (!conf.hasS3Storage()))
+ || highWorkload)) {
LOG.debug("Worker: {} num total slots is 0, add to excluded list",
worker);
excludedWorkers.add(worker);
- } else if ((availableSlots.get() > 0 || conf.hasHDFSStorage()) &&
!highWorkload) {
+ } else if ((availableSlots.get() > 0 || conf.hasHDFSStorage() ||
conf.hasS3Storage())
+ && !highWorkload) {
// only unblack if numSlots larger than 0
excludedWorkers.remove(worker);
}
diff --git
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
index 2ce5fc83f..1e99956e9 100644
---
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
+++
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
@@ -165,6 +165,7 @@ private[celeborn] class Master(
private var checkForApplicationTimeOutTask: ScheduledFuture[_] = _
private var checkForUnavailableWorkerTimeOutTask: ScheduledFuture[_] = _
private var checkForHDFSRemnantDirsTimeOutTask: ScheduledFuture[_] = _
+ private var checkForS3RemnantDirsTimeOutTask: ScheduledFuture[_] = _
private val nonEagerHandler =
ThreadUtils.newDaemonCachedThreadPool("master-noneager-handler", 64)
// Config constants
@@ -172,8 +173,9 @@ private[celeborn] class Master(
private val appHeartbeatTimeoutMs = conf.appHeartbeatTimeoutMs
private val workerUnavailableInfoExpireTimeoutMs =
conf.workerUnavailableInfoExpireTimeout
- private val hdfsExpireDirsTimeoutMS = conf.hdfsExpireDirsTimeoutMS
+ private val dfsExpireDirsTimeoutMS = conf.dfsExpireDirsTimeoutMS
private val hasHDFSStorage = conf.hasHDFSStorage
+ private val hasS3Storage = conf.hasS3Storage
private val quotaManager = new QuotaManager(conf, configService)
private val masterResourceConsumptionInterval =
conf.masterResourceConsumptionInterval
@@ -211,7 +213,7 @@ private[celeborn] class Master(
TimeUnit.MILLISECONDS)
private val slotsAssignPolicy = conf.masterSlotAssignPolicy
- private var hadoopFs: FileSystem = _
+ private var hadoopFs: util.Map[StorageInfo.Type, FileSystem] = _
masterSource.addGauge(MasterSource.REGISTERED_SHUFFLE_COUNT) { () =>
statusSystem.registeredShuffle.size
}
@@ -319,15 +321,15 @@ private[celeborn] class Master(
workerUnavailableInfoExpireTimeoutMs / 2,
TimeUnit.MILLISECONDS)
- if (hasHDFSStorage) {
- checkForHDFSRemnantDirsTimeOutTask =
forwardMessageThread.scheduleWithFixedDelay(
+ if (hasHDFSStorage || hasS3Storage) {
+ checkForS3RemnantDirsTimeOutTask =
forwardMessageThread.scheduleWithFixedDelay(
new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
- self.send(CheckForHDFSExpiredDirsTimeout)
+ self.send(CheckForDFSExpiredDirsTimeout)
}
},
- hdfsExpireDirsTimeoutMS,
- hdfsExpireDirsTimeoutMS,
+ dfsExpireDirsTimeoutMS,
+ dfsExpireDirsTimeoutMS,
TimeUnit.MILLISECONDS)
}
@@ -350,6 +352,9 @@ private[celeborn] class Master(
if (checkForHDFSRemnantDirsTimeOutTask != null) {
checkForHDFSRemnantDirsTimeOutTask.cancel(true)
}
+ if (checkForS3RemnantDirsTimeOutTask != null) {
+ checkForS3RemnantDirsTimeOutTask.cancel(true)
+ }
forwardMessageThread.shutdownNow()
rackResolver.stop()
if (authEnabled) {
@@ -380,8 +385,8 @@ private[celeborn] class Master(
executeWithLeaderChecker(null, timeoutWorkerUnavailableInfos())
case CheckForApplicationTimeOut =>
executeWithLeaderChecker(null, timeoutDeadApplications())
- case CheckForHDFSExpiredDirsTimeout =>
- executeWithLeaderChecker(null, checkAndCleanExpiredAppDirsOnHDFS())
+ case CheckForDFSExpiredDirsTimeout =>
+ executeWithLeaderChecker(null, checkAndCleanExpiredAppDirsOnDFS())
case pb: PbWorkerLost =>
val host = pb.getHost
val rpcPort = pb.getRpcPort
@@ -984,38 +989,44 @@ private[celeborn] class Master(
workersAssignedToApp.remove(appId)
statusSystem.handleAppLost(appId, requestId)
logInfo(s"Removed application $appId")
- if (hasHDFSStorage) {
- checkAndCleanExpiredAppDirsOnHDFS(appId)
+ if (hasHDFSStorage || hasS3Storage) {
+ checkAndCleanExpiredAppDirsOnDFS(appId)
}
context.reply(ApplicationLostResponse(StatusCode.SUCCESS))
}
})
}
- private def checkAndCleanExpiredAppDirsOnHDFS(expiredDir: String = ""): Unit
= {
+ private def checkAndCleanExpiredAppDirsOnDFS(expiredDir: String = ""): Unit
= {
if (hadoopFs == null) {
try {
hadoopFs = CelebornHadoopUtils.getHadoopFS(conf)
} catch {
case e: Exception =>
- logError("Celeborn initialize HDFS failed.", e)
+ logError("Celeborn initialize DFS failed.", e)
throw e
}
}
- val hdfsWorkPath = new Path(conf.hdfsDir, conf.workerWorkingDir)
- if (hadoopFs.exists(hdfsWorkPath)) {
+ if (hasHDFSStorage) processDir(conf.hdfsDir, expiredDir)
+ if (hasS3Storage) processDir(conf.s3Dir, expiredDir)
+ }
+
+ private def processDir(dfsDir: String, expiredDir: String): Unit = {
+ val dfsWorkPath = new Path(dfsDir, conf.workerWorkingDir)
+ hadoopFs.asScala.map(_._2).filter(_.exists(dfsWorkPath)).foreach { fs =>
if (expiredDir.nonEmpty) {
- val dirToDelete = new Path(hdfsWorkPath, expiredDir)
+ val dirToDelete = new Path(dfsWorkPath, expiredDir)
// delete specific app dir on application lost
- CelebornHadoopUtils.deleteHDFSPathOrLogError(hadoopFs, dirToDelete,
true)
+ CelebornHadoopUtils.deleteDFSPathOrLogError(fs, dirToDelete, true)
} else {
- val iter = hadoopFs.listStatusIterator(hdfsWorkPath)
+ val iter = fs.listStatusIterator(dfsWorkPath)
while (iter.hasNext && isMasterActive == 1) {
val fileStatus = iter.next()
if
(!statusSystem.appHeartbeatTime.containsKey(fileStatus.getPath.getName)) {
- CelebornHadoopUtils.deleteHDFSPathOrLogError(hadoopFs,
fileStatus.getPath, true)
+ CelebornHadoopUtils.deleteDFSPathOrLogError(fs,
fileStatus.getPath, true)
}
}
+
}
}
}
diff --git a/openapi/openapi-client/src/main/openapi3/worker_rest_v1.yaml
b/openapi/openapi-client/src/main/openapi3/worker_rest_v1.yaml
index e854f0a2c..3db792a54 100644
--- a/openapi/openapi-client/src/main/openapi3/worker_rest_v1.yaml
+++ b/openapi/openapi-client/src/main/openapi3/worker_rest_v1.yaml
@@ -541,6 +541,7 @@ components:
- SSD
- HDFS
- OSS
+ - S3
mapIdBitMap:
type: string
description: The map id bitmap hint.
diff --git a/pom.xml b/pom.xml
index 85b136aae..ee31504f7 100644
--- a/pom.xml
+++ b/pom.xml
@@ -71,7 +71,6 @@
<!-- use hadoop-3 as default -->
<hadoop.version>3.3.6</hadoop.version>
-
<!--
If you change codahale.metrics.version, you also need to change
the link to metrics.dropwizard.io in docs/monitoring.md.
@@ -1283,6 +1282,25 @@
</dependency>
</dependencies>
</profile>
+ <profile>
+ <id>hadoop-aws</id>
+ <properties>
+ <hadoop-aws-deps>true</hadoop-aws-deps>
+ <aws.version>1.12.367</aws.version>
+ </properties>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-aws</artifactId>
+ <version>${hadoop.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.amazonaws</groupId>
+ <artifactId>aws-java-sdk-bundle</artifactId>
+ <version>${aws.version}</version>
+ </dependency>
+ </dependencies>
+ </profile>
<profile>
<id>spark-2.4</id>
<modules>
diff --git a/project/CelebornBuild.scala b/project/CelebornBuild.scala
index 903833192..d31854eea 100644
--- a/project/CelebornBuild.scala
+++ b/project/CelebornBuild.scala
@@ -49,6 +49,7 @@ object Dependencies {
val findbugsVersion = "1.3.9"
val guavaVersion = "33.1.0-jre"
val hadoopVersion = "3.3.6"
+ val awsVersion = "1.12.367"
val junitInterfaceVersion = "0.13.3"
// don't forget update `junitInterfaceVersion` when we upgrade junit
val junitVersion = "4.13.2"
@@ -110,6 +111,8 @@ object Dependencies {
ExclusionRule("jline", "jline"),
ExclusionRule("log4j", "log4j"),
ExclusionRule("org.slf4j", "slf4j-log4j12"))
+ val hadoopAws = "org.apache.hadoop" % "hadoop-aws" % hadoopVersion
+ val awsClient = "com.amazonaws" % "aws-java-sdk-bundle" % awsVersion
val ioDropwizardMetricsCore = "io.dropwizard.metrics" % "metrics-core" %
metricsVersion
val ioDropwizardMetricsGraphite = "io.dropwizard.metrics" %
"metrics-graphite" % metricsVersion excludeAll (
ExclusionRule("com.rabbitmq", "amqp-client"))
@@ -442,6 +445,13 @@ object Utils {
}
object CelebornCommon {
+
+ lazy val hadoopAwsDependencies =
if(profiles.exists(_.startsWith("hadoop-aws"))){
+ Seq(Dependencies.hadoopAws, Dependencies.awsClient)
+ } else {
+ Seq.empty
+ }
+
lazy val common = Project("celeborn-common", file("common"))
.settings (
commonSettings,
@@ -478,7 +488,7 @@ object CelebornCommon {
// SSL support
Dependencies.bouncycastleBcprovJdk18on,
Dependencies.bouncycastleBcpkixJdk18on
- ) ++ commonUnitTestDependencies,
+ ) ++ commonUnitTestDependencies ++ hadoopAwsDependencies,
Compile / sourceGenerators += Def.task {
val file = (Compile / sourceManaged).value / "org" / "apache" /
"celeborn" / "package.scala"
diff --git
a/service/src/main/scala/org/apache/celeborn/server/common/http/api/v1/ApiUtils.scala
b/service/src/main/scala/org/apache/celeborn/server/common/http/api/v1/ApiUtils.scala
index 0b2802035..a4e4d2bca 100644
---
a/service/src/main/scala/org/apache/celeborn/server/common/http/api/v1/ApiUtils.scala
+++
b/service/src/main/scala/org/apache/celeborn/server/common/http/api/v1/ApiUtils.scala
@@ -124,6 +124,8 @@ object ApiUtils {
locationData.storage(StorageEnum.HDFS)
case StorageInfo.Type.OSS =>
locationData.storage(StorageEnum.OSS)
+ case StorageInfo.Type.S3 =>
+ locationData.storage(StorageEnum.S3)
}
Option(partitionLocation.getMapIdBitMap).map(_.toString).foreach(locationData.mapIdBitMap)
locationData
diff --git
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java
index c8b6af9af..368e3485a 100644
---
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java
+++
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java
@@ -228,7 +228,7 @@ public class MemoryManager {
this.storageManager = storageManager;
if (memoryFileStorageThreshold > 0
&& storageManager != null
- && storageManager.localOrHdfsStorageAvailable()) {
+ && storageManager.localOrDfsStorageAvailable()) {
ScheduledExecutorService memoryFileStorageService =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("memory-file-storage-checker");
memoryFileStorageService.scheduleWithFixedDelay(
diff --git
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionDataWriter.java
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionDataWriter.java
index 4eab8f0b1..d6be27d28 100644
---
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionDataWriter.java
+++
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionDataWriter.java
@@ -26,6 +26,7 @@ import java.util.Arrays;
import com.google.common.base.Preconditions;
import io.netty.buffer.ByteBuf;
import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -33,6 +34,7 @@ import org.slf4j.LoggerFactory;
import org.apache.celeborn.common.CelebornConf;
import org.apache.celeborn.common.meta.MapFileMeta;
import org.apache.celeborn.common.metrics.source.AbstractSource;
+import org.apache.celeborn.common.protocol.StorageInfo;
import org.apache.celeborn.common.util.FileChannelUtils;
import org.apache.celeborn.common.util.Utils;
@@ -53,6 +55,8 @@ public final class MapPartitionDataWriter extends
PartitionDataWriter {
private FileChannel indexChannel;
private volatile boolean isRegionFinished = true;
+ private FileSystem hadoopFs;
+
public MapPartitionDataWriter(
StorageManager storageManager,
AbstractSource workerSource,
@@ -63,11 +67,14 @@ public final class MapPartitionDataWriter extends
PartitionDataWriter {
super(storageManager, workerSource, conf, deviceMonitor, writerContext,
false);
Preconditions.checkState(diskFileInfo != null);
- if (!diskFileInfo.isHdfs()) {
+ if (!diskFileInfo.isDFS()) {
indexChannel =
FileChannelUtils.createWritableFileChannel(diskFileInfo.getIndexPath());
} else {
+ StorageInfo.Type storageType =
+ diskFileInfo.isS3() ? StorageInfo.Type.S3 : StorageInfo.Type.HDFS;
+ this.hadoopFs = StorageManager.hadoopFs().get(storageType);
try {
- StorageManager.hadoopFs().create(diskFileInfo.getHdfsIndexPath(),
true).close();
+ hadoopFs.create(diskFileInfo.getDfsIndexPath(), true).close();
} catch (IOException e) {
try {
// If create file failed, wait 10 ms and retry
@@ -75,7 +82,7 @@ public final class MapPartitionDataWriter extends
PartitionDataWriter {
} catch (InterruptedException ex) {
throw new RuntimeException(ex);
}
- StorageManager.hadoopFs().create(diskFileInfo.getHdfsIndexPath(),
true).close();
+ hadoopFs.create(diskFileInfo.getDfsIndexPath(), true).close();
}
}
}
@@ -120,12 +127,12 @@ public final class MapPartitionDataWriter extends
PartitionDataWriter {
return super.close(
this::flushIndex,
() -> {
- if (diskFileInfo.isHdfs()) {
- if
(StorageManager.hadoopFs().exists(diskFileInfo.getHdfsPeerWriterSuccessPath()))
{
- StorageManager.hadoopFs().delete(diskFileInfo.getHdfsPath(),
false);
+ if (diskFileInfo.isDFS()) {
+ if (hadoopFs.exists(diskFileInfo.getDfsPeerWriterSuccessPath())) {
+ hadoopFs.delete(diskFileInfo.getDfsPath(), false);
deleted = true;
} else {
-
StorageManager.hadoopFs().create(diskFileInfo.getHdfsWriterSuccessPath()).close();
+ hadoopFs.create(diskFileInfo.getDfsWriterSuccessPath()).close();
}
}
},
@@ -133,16 +140,15 @@ public final class MapPartitionDataWriter extends
PartitionDataWriter {
if (indexChannel != null) {
indexChannel.close();
}
- if (diskFileInfo.isHdfs()) {
- if (StorageManager.hadoopFs()
- .exists(
- new Path(
- Utils.getWriteSuccessFilePath(
- Utils.getPeerPath(diskFileInfo.getIndexPath())))))
{
-
StorageManager.hadoopFs().delete(diskFileInfo.getHdfsIndexPath(), false);
+ if (diskFileInfo.isDFS()) {
+ if (hadoopFs.exists(
+ new Path(
+ Utils.getWriteSuccessFilePath(
+ Utils.getPeerPath(diskFileInfo.getIndexPath()))))) {
+ hadoopFs.delete(diskFileInfo.getDfsIndexPath(), false);
deleted = true;
} else {
- StorageManager.hadoopFs()
+ hadoopFs
.create(new
Path(Utils.getWriteSuccessFilePath(diskFileInfo.getIndexPath())))
.close();
}
@@ -255,11 +261,10 @@ public final class MapPartitionDataWriter extends
PartitionDataWriter {
while (indexBuffer.hasRemaining()) {
indexChannel.write(indexBuffer);
}
- } else if (diskFileInfo.isHdfs()) {
- FSDataOutputStream hdfsStream =
-
StorageManager.hadoopFs().append(diskFileInfo.getHdfsIndexPath());
- hdfsStream.write(indexBuffer.array());
- hdfsStream.close();
+ } else if (diskFileInfo.isDFS()) {
+ FSDataOutputStream dfsStream =
hadoopFs.append(diskFileInfo.getDfsIndexPath());
+ dfsStream.write(indexBuffer.array());
+ dfsStream.close();
}
}
indexBuffer.clear();
diff --git
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java
index f40f8ee66..9cae453a6 100644
---
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java
+++
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java
@@ -32,6 +32,7 @@ import com.google.common.base.Preconditions;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
+import org.apache.hadoop.fs.FileSystem;
import org.roaringbitmap.RoaringBitmap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -101,12 +102,15 @@ public abstract class PartitionDataWriter implements
DeviceObserver {
private final PartitionDataWriterContext writerContext;
private final long localFlusherBufferSize;
private final long hdfsFlusherBufferSize;
+
+ private final long s3FlusherBufferSize;
private Exception exception = null;
private boolean metricsCollectCriticalEnabled;
private long chunkSize;
-
private UserBufferInfo userBufferInfo = null;
+ protected FileSystem hadoopFs;
+
public PartitionDataWriter(
StorageManager storageManager,
AbstractSource workerSource,
@@ -128,6 +132,7 @@ public abstract class PartitionDataWriter implements
DeviceObserver {
this.writerContext = writerContext;
this.localFlusherBufferSize = conf.workerFlusherBufferSize();
this.hdfsFlusherBufferSize = conf.workerHdfsFlusherBufferSize();
+ this.s3FlusherBufferSize = conf.workerS3FlusherBufferSize();
this.metricsCollectCriticalEnabled = conf.metricsCollectCriticalEnabled();
this.chunkSize = conf.shuffleChunkSize();
@@ -166,16 +171,19 @@ public abstract class PartitionDataWriter implements
DeviceObserver {
}
public void initFileChannelsForDiskFile() throws IOException {
- if (!this.diskFileInfo.isHdfs()) {
+ if (!this.diskFileInfo.isDFS()) {
this.flusherBufferSize = localFlusherBufferSize;
channel =
FileChannelUtils.createWritableFileChannel(this.diskFileInfo.getFilePath());
} else {
- this.flusherBufferSize = hdfsFlusherBufferSize;
- // We open the stream and close immediately because HDFS output stream
will
+ StorageInfo.Type storageType =
+ diskFileInfo.isS3() ? StorageInfo.Type.S3 : StorageInfo.Type.HDFS;
+ this.hadoopFs = StorageManager.hadoopFs().get(storageType);
+ this.flusherBufferSize = diskFileInfo.isS3() ? s3FlusherBufferSize :
hdfsFlusherBufferSize;
+ // We open the stream and close immediately because DFS output stream
will
// create a DataStreamer that is a thread.
- // If we reuse HDFS output stream, we will exhaust the memory soon.
+ // If we reuse DFS output stream, we will exhaust the memory soon.
try {
- StorageManager.hadoopFs().create(this.diskFileInfo.getHdfsPath(),
true).close();
+ hadoopFs.create(this.diskFileInfo.getDfsPath(), true).close();
} catch (IOException e) {
try {
// If create file failed, wait 10 ms and retry
@@ -183,7 +191,7 @@ public abstract class PartitionDataWriter implements
DeviceObserver {
} catch (InterruptedException ex) {
throw new RuntimeException(ex);
}
- StorageManager.hadoopFs().create(this.diskFileInfo.getHdfsPath(),
true).close();
+ hadoopFs.create(this.diskFileInfo.getDfsPath(), true).close();
}
}
}
@@ -220,7 +228,9 @@ public abstract class PartitionDataWriter implements
DeviceObserver {
if (channel != null) {
task = new LocalFlushTask(flushBuffer, channel, notifier, false);
} else if (diskFileInfo.isHdfs()) {
- task = new HdfsFlushTask(flushBuffer, diskFileInfo.getHdfsPath(),
notifier, false);
+ task = new HdfsFlushTask(flushBuffer, diskFileInfo.getDfsPath(),
notifier, false);
+ } else if (diskFileInfo.isS3()) {
+ task = new S3FlushTask(flushBuffer, diskFileInfo.getDfsPath(),
notifier, false);
}
MemoryManager.instance().releaseMemoryFileStorage(numBytes);
MemoryManager.instance().incrementDiskBuffer(numBytes);
@@ -246,7 +256,9 @@ public abstract class PartitionDataWriter implements
DeviceObserver {
if (channel != null) {
task = new LocalFlushTask(flushBuffer, channel, notifier, true);
} else if (diskFileInfo.isHdfs()) {
- task = new HdfsFlushTask(flushBuffer,
diskFileInfo.getHdfsPath(), notifier, true);
+ task = new HdfsFlushTask(flushBuffer, diskFileInfo.getDfsPath(),
notifier, true);
+ } else if (diskFileInfo.isS3()) {
+ task = new S3FlushTask(flushBuffer, diskFileInfo.getDfsPath(),
notifier, true);
}
}
}
@@ -270,7 +282,7 @@ public abstract class PartitionDataWriter implements
DeviceObserver {
if (!isMemoryShuffleFile.get()) {
return false;
} else {
- return !storageManager.localOrHdfsStorageAvailable()
+ return !storageManager.localOrDfsStorageAvailable()
&& (memoryFileInfo.getFileLength() > memoryFileStorageMaxFileSize
|| !MemoryManager.instance().memoryFileStorageAvailable());
}
@@ -325,7 +337,7 @@ public abstract class PartitionDataWriter implements
DeviceObserver {
}
} else {
if (flushBufferReadableBytes > memoryFileStorageMaxFileSize
- && storageManager.localOrHdfsStorageAvailable()) {
+ && storageManager.localOrDfsStorageAvailable()) {
logger.debug(
"{} Evict, memory buffer is {}",
writerContext.getPartitionLocation().getFileName(),
@@ -376,9 +388,11 @@ public abstract class PartitionDataWriter implements
DeviceObserver {
public StorageInfo getStorageInfo() {
if (diskFileInfo != null) {
- if (diskFileInfo.isHdfs()) {
+ if (diskFileInfo.isDFS()) {
if (deleted) {
return null;
+ } else if (diskFileInfo.isS3()) {
+ return new StorageInfo(StorageInfo.Type.S3, true,
diskFileInfo.getFilePath());
} else {
return new StorageInfo(StorageInfo.Type.HDFS, true,
diskFileInfo.getFilePath());
}
@@ -442,7 +456,7 @@ public abstract class PartitionDataWriter implements
DeviceObserver {
finalClose.run();
// unregister from DeviceMonitor
- if (diskFileInfo != null && !diskFileInfo.isHdfs()) {
+ if (diskFileInfo != null && !this.diskFileInfo.isDFS()) {
logger.debug("file info {} unregister from device monitor",
diskFileInfo);
deviceMonitor.unregisterFileWriter(this);
}
@@ -513,10 +527,9 @@ public abstract class PartitionDataWriter implements
DeviceObserver {
if (!destroyed) {
destroyed = true;
if (diskFileInfo != null) {
- diskFileInfo.deleteAllFiles(StorageManager.hadoopFs());
-
+ diskFileInfo.deleteAllFiles(hadoopFs);
// unregister from DeviceMonitor
- if (!diskFileInfo.isHdfs()) {
+ if (!diskFileInfo.isDFS()) {
deviceMonitor.unregisterFileWriter(this);
}
}
diff --git
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java
index 4cbd28b46..81bc44eb7 100644
---
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java
+++
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java
@@ -45,6 +45,7 @@ import io.netty.buffer.CompositeByteBuf;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -53,6 +54,7 @@ import org.apache.celeborn.common.CelebornConf;
import org.apache.celeborn.common.identity.UserIdentifier;
import org.apache.celeborn.common.meta.*;
import org.apache.celeborn.common.metrics.source.AbstractSource;
+import org.apache.celeborn.common.protocol.StorageInfo;
import org.apache.celeborn.common.unsafe.Platform;
import org.apache.celeborn.common.util.*;
import org.apache.celeborn.common.util.ShuffleBlockInfoUtils.ShuffleBlockInfo;
@@ -252,8 +254,8 @@ public class PartitionFilesSorter extends
ShuffleRecoverHelper {
throw new IOException(
"Sort scheduler thread is interrupted means worker is shutting
down.", e);
} catch (IOException e) {
- logger.error("File sorter access HDFS failed.", e);
- throw new IOException("File sorter access HDFS failed.", e);
+ logger.error("File sorter access DFS failed.", e);
+ throw new IOException("File sorter access DFS failed.", e);
}
}
}
@@ -473,14 +475,17 @@ public class PartitionFilesSorter extends
ShuffleRecoverHelper {
}
protected void writeIndex(
- Map<Integer, List<ShuffleBlockInfo>> indexMap, String indexFilePath,
boolean isHdfs)
+ Map<Integer, List<ShuffleBlockInfo>> indexMap, String indexFilePath,
boolean isDfs)
throws IOException {
- FSDataOutputStream hdfsIndexOutput = null;
+ FSDataOutputStream dfsIndexOutput = null;
FileChannel indexFileChannel = null;
- if (isHdfs) {
+ if (isDfs) {
+ boolean isS3 = Utils.isS3Path(indexFilePath);
+ StorageInfo.Type storageType = isS3 ? StorageInfo.Type.S3 :
StorageInfo.Type.HDFS;
+ FileSystem hadoopFs = StorageManager.hadoopFs().get(storageType);
// If the index file exists, it will be overwritten.
// So there is no need to check its existence.
- hdfsIndexOutput = StorageManager.hadoopFs().create(new
Path(indexFilePath));
+ dfsIndexOutput = hadoopFs.create(new Path(indexFilePath));
} else {
indexFileChannel =
FileChannelUtils.createWritableFileChannel(indexFilePath);
}
@@ -505,12 +510,12 @@ public class PartitionFilesSorter extends
ShuffleRecoverHelper {
}
indexBuf.flip();
- if (isHdfs) {
+ if (isDfs) {
// Direct byte buffer has no array, so can not invoke indexBuf.array()
here.
byte[] tmpBuf = new byte[indexSize];
indexBuf.get(tmpBuf);
- hdfsIndexOutput.write(tmpBuf);
- hdfsIndexOutput.close();
+ dfsIndexOutput.write(tmpBuf);
+ dfsIndexOutput.close();
} else {
while (indexBuf.hasRemaining()) {
indexFileChannel.write(indexBuf);
@@ -588,25 +593,25 @@ public class PartitionFilesSorter extends
ShuffleRecoverHelper {
fileId,
() -> {
FileChannel indexChannel = null;
- FSDataInputStream hdfsIndexStream = null;
- boolean isHdfs = Utils.isHdfsPath(indexFilePath);
+ FSDataInputStream dfsIndexStream = null;
+ boolean isDfs = Utils.isHdfsPath(indexFilePath) ||
Utils.isS3Path(indexFilePath);
+ boolean isS3 = Utils.isS3Path(indexFilePath);
int indexSize;
try {
- if (isHdfs) {
- hdfsIndexStream = StorageManager.hadoopFs().open(new
Path(indexFilePath));
- indexSize =
- (int)
- StorageManager.hadoopFs()
- .getFileStatus(new Path(indexFilePath))
- .getLen();
+ if (isDfs) {
+ StorageInfo.Type storageType =
+ isS3 ? StorageInfo.Type.S3 : StorageInfo.Type.HDFS;
+ FileSystem hadoopFs =
StorageManager.hadoopFs().get(storageType);
+ dfsIndexStream = hadoopFs.open(new Path(indexFilePath));
+ indexSize = (int) hadoopFs.getFileStatus(new
Path(indexFilePath)).getLen();
} else {
indexChannel =
FileChannelUtils.openReadableFileChannel(indexFilePath);
File indexFile = new File(indexFilePath);
indexSize = (int) indexFile.length();
}
ByteBuffer indexBuf = ByteBuffer.allocate(indexSize);
- if (isHdfs) {
- readStreamFully(hdfsIndexStream, indexBuf, indexFilePath);
+ if (isDfs) {
+ readStreamFully(dfsIndexStream, indexBuf, indexFilePath);
} else {
readChannelFully(indexChannel, indexBuf, indexFilePath);
}
@@ -623,7 +628,7 @@ public class PartitionFilesSorter extends
ShuffleRecoverHelper {
throw new IOException("Read sorted shuffle file index
failed.", e);
} finally {
IOUtils.closeQuietly(indexChannel, null);
- IOUtils.closeQuietly(hdfsIndexStream, null);
+ IOUtils.closeQuietly(dfsIndexStream, null);
}
});
} catch (ExecutionException e) {
@@ -645,25 +650,30 @@ public class PartitionFilesSorter extends
ShuffleRecoverHelper {
private final String fileId;
private final String shuffleKey;
private final boolean isHdfs;
+ private final boolean isS3;
+ private final boolean isDfs;
private final boolean isPrefetch;
private final FileInfo originFileInfo;
- private FSDataInputStream hdfsOriginInput = null;
- private FSDataOutputStream hdfsSortedOutput = null;
+ private FSDataInputStream dfsOriginInput = null;
+ private FSDataOutputStream dfsSortedOutput = null;
private FileChannel originFileChannel = null;
private FileChannel sortedFileChannel = null;
+ private FileSystem hadoopFs;
FileSorter(DiskFileInfo fileInfo, String fileId, String shuffleKey) throws
IOException {
this.originFileInfo = fileInfo;
this.originFilePath = fileInfo.getFilePath();
this.sortedFilePath = Utils.getSortedFilePath(originFilePath);
this.isHdfs = fileInfo.isHdfs();
- this.isPrefetch = !isHdfs && prefetchEnabled;
+ this.isS3 = fileInfo.isS3();
+ this.isDfs = isHdfs || isS3;
+ this.isPrefetch = !isDfs && prefetchEnabled;
this.originFileLen = fileInfo.getFileLength();
this.fileId = fileId;
this.shuffleKey = shuffleKey;
this.indexFilePath = Utils.getIndexFilePath(originFilePath);
- if (!isHdfs) {
+ if (!isDfs) {
File sortedFile = new File(this.sortedFilePath);
if (sortedFile.exists()) {
sortedFile.delete();
@@ -673,11 +683,14 @@ public class PartitionFilesSorter extends
ShuffleRecoverHelper {
indexFile.delete();
}
} else {
- if (StorageManager.hadoopFs().exists(fileInfo.getHdfsSortedPath())) {
- StorageManager.hadoopFs().delete(fileInfo.getHdfsSortedPath(),
false);
+ boolean isS3 = Utils.isS3Path(indexFilePath);
+ StorageInfo.Type storageType = isS3 ? StorageInfo.Type.S3 :
StorageInfo.Type.HDFS;
+ this.hadoopFs = StorageManager.hadoopFs().get(storageType);
+ if (hadoopFs.exists(fileInfo.getDfsSortedPath())) {
+ hadoopFs.delete(fileInfo.getDfsSortedPath(), false);
}
- if (StorageManager.hadoopFs().exists(fileInfo.getHdfsIndexPath())) {
- StorageManager.hadoopFs().delete(fileInfo.getHdfsIndexPath(), false);
+ if (hadoopFs.exists(fileInfo.getDfsIndexPath())) {
+ hadoopFs.delete(fileInfo.getDfsIndexPath(), false);
}
}
}
@@ -748,7 +761,7 @@ public class PartitionFilesSorter extends
ShuffleRecoverHelper {
sortedBlockInfoMap.put(mapId, sortedShuffleBlocks);
}
- writeIndex(sortedBlockInfoMap, indexFilePath, isHdfs);
+ writeIndex(sortedBlockInfoMap, indexFilePath, isDfs);
updateSortedShuffleFiles(shuffleKey, fileId, originFileLen);
originFileInfo.getReduceFileMeta().setSorted();
cleaner.add(this);
@@ -775,10 +788,9 @@ public class PartitionFilesSorter extends
ShuffleRecoverHelper {
}
private void initializeFiles() throws IOException {
- if (isHdfs) {
- hdfsOriginInput = StorageManager.hadoopFs().open(new
Path(originFilePath));
- hdfsSortedOutput =
- StorageManager.hadoopFs().create(new Path(sortedFilePath), true,
256 * 1024);
+ if (isDfs) {
+ dfsOriginInput = hadoopFs.open(new Path(originFilePath));
+ dfsSortedOutput = hadoopFs.create(new Path(sortedFilePath), true, 256
* 1024);
} else {
originFileChannel =
FileChannelUtils.openReadableFileChannel(originFilePath);
sortedFileChannel =
FileChannelUtils.createWritableFileChannel(sortedFilePath);
@@ -786,23 +798,23 @@ public class PartitionFilesSorter extends
ShuffleRecoverHelper {
}
private void closeFiles() {
- IOUtils.closeQuietly(hdfsOriginInput, null);
- IOUtils.closeQuietly(hdfsSortedOutput, null);
+ IOUtils.closeQuietly(dfsOriginInput, null);
+ IOUtils.closeQuietly(dfsSortedOutput, null);
IOUtils.closeQuietly(originFileChannel, null);
IOUtils.closeQuietly(sortedFileChannel, null);
}
private void readBufferFully(ByteBuffer buffer) throws IOException {
- if (isHdfs) {
- readStreamFully(hdfsOriginInput, buffer, originFilePath);
+ if (isDfs) {
+ readStreamFully(dfsOriginInput, buffer, originFilePath);
} else {
readChannelFully(originFileChannel, buffer, originFilePath);
}
}
private long transferBlock(long offset, long length) throws IOException {
- if (isHdfs) {
- return transferStreamFully(hdfsOriginInput, hdfsSortedOutput, offset,
length);
+ if (isDfs) {
+ return transferStreamFully(dfsOriginInput, dfsSortedOutput, offset,
length);
} else {
return transferChannelFully(originFileChannel, sortedFileChannel,
offset, length);
}
@@ -810,8 +822,8 @@ public class PartitionFilesSorter extends
ShuffleRecoverHelper {
public void deleteOriginFiles() throws IOException {
boolean deleteSuccess;
- if (isHdfs) {
- deleteSuccess = StorageManager.hadoopFs().delete(new
Path(originFilePath), false);
+ if (isDfs) {
+ deleteSuccess = hadoopFs.delete(new Path(originFilePath), false);
} else {
deleteSuccess = new File(originFilePath).delete();
}
@@ -849,9 +861,9 @@ public class PartitionFilesSorter extends
ShuffleRecoverHelper {
}
private void readBufferBySize(ByteBuffer buffer, int toRead) throws
IOException {
- if (isHdfs) {
- // HDFS does not need to prefetch.
- hdfsOriginInput.seek(toRead + hdfsOriginInput.getPos());
+ if (isDfs) {
+ // DFS does not need to prefetch.
+ dfsOriginInput.seek(toRead + dfsOriginInput.getPos());
} else if (prefetchEnabled) {
buffer.clear();
readChannelBySize(originFileChannel, buffer, originFilePath, toRead);
diff --git
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/ReducePartitionDataWriter.java
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/ReducePartitionDataWriter.java
index 20910ab88..0c1980e05 100644
---
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/ReducePartitionDataWriter.java
+++
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/ReducePartitionDataWriter.java
@@ -88,17 +88,14 @@ public final class ReducePartitionDataWriter extends
PartitionDataWriter {
},
() -> {
if (diskFileInfo != null) {
- if (diskFileInfo.isHdfs()) {
- if (StorageManager.hadoopFs()
- .exists(diskFileInfo.getHdfsPeerWriterSuccessPath())) {
-
StorageManager.hadoopFs().delete(diskFileInfo.getHdfsPath(), false);
+ if (diskFileInfo.isDFS()) {
+ if
(hadoopFs.exists(diskFileInfo.getDfsPeerWriterSuccessPath())) {
+ hadoopFs.delete(diskFileInfo.getDfsPath(), false);
deleted = true;
} else {
- StorageManager.hadoopFs()
- .create(diskFileInfo.getHdfsWriterSuccessPath())
- .close();
+
hadoopFs.create(diskFileInfo.getDfsWriterSuccessPath()).close();
FSDataOutputStream indexOutputStream =
-
StorageManager.hadoopFs().create(diskFileInfo.getHdfsIndexPath());
+ hadoopFs.create(diskFileInfo.getDfsIndexPath());
indexOutputStream.writeInt(
(diskFileInfo.getReduceFileMeta()).getChunkOffsets().size());
for (Long offset :
(diskFileInfo.getReduceFileMeta()).getChunkOffsets()) {
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala
index 8deac0905..e59014120 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala
@@ -164,7 +164,7 @@ private[deploy] class Controller(
return
}
- if (storageManager.healthyWorkingDirs().size <= 0 && !conf.hasHDFSStorage)
{
+ if (storageManager.healthyWorkingDirs().size <= 0 && !conf.hasHDFSStorage
&& !conf.hasS3Storage) {
val msg = "Local storage has no available dirs!"
logError(s"[handleReserveSlots] $msg")
context.reply(ReserveSlotsResponse(StatusCode.NO_AVAILABLE_WORKING_DIR,
msg))
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
index 934a4e967..dc4f27bdf 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
@@ -274,6 +274,12 @@ class FetchHandler(
shuffleKey,
fileName)
makeStreamHandler(streamId, numChunks = 0)
+ case info: DiskFileInfo if info.isS3 =>
+ chunkStreamManager.registerStream(
+ streamId,
+ shuffleKey,
+ fileName)
+ makeStreamHandler(streamId, numChunks = 0)
case _ =>
val managedBuffer = fileInfo match {
case df: DiskFileInfo =>
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
index 0fe387f58..11b8871e9 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
@@ -45,7 +45,7 @@ import org.apache.celeborn.common.protocol.message.StatusCode
import org.apache.celeborn.common.unsafe.Platform
import org.apache.celeborn.common.util.{DiskUtils, ExceptionUtils, Utils}
import
org.apache.celeborn.service.deploy.worker.congestcontrol.CongestionController
-import org.apache.celeborn.service.deploy.worker.storage.{HdfsFlusher,
LocalFlusher, MapPartitionDataWriter, PartitionDataWriter, StorageManager}
+import org.apache.celeborn.service.deploy.worker.storage.{HdfsFlusher,
LocalFlusher, MapPartitionDataWriter, PartitionDataWriter, S3Flusher,
StorageManager}
class PushDataHandler(val workerSource: WorkerSource) extends
BaseMessageHandler with Logging {
@@ -1185,7 +1185,8 @@ class PushDataHandler(val workerSource: WorkerSource)
extends BaseMessageHandler
}
private def checkDiskFull(fileWriter: PartitionDataWriter): Boolean = {
- if (fileWriter.flusher == null ||
fileWriter.flusher.isInstanceOf[HdfsFlusher]) {
+ if (fileWriter.flusher == null || fileWriter.flusher.isInstanceOf[
+ HdfsFlusher] || fileWriter.flusher.isInstanceOf[S3Flusher]) {
return false
}
val mountPoint = fileWriter.flusher.asInstanceOf[LocalFlusher].mountPoint
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
index 2f8767383..ae15dbc73 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
@@ -457,7 +457,7 @@ private[celeborn] class Worker(
val diskInfos =
workerInfo.updateThenGetDiskInfos(storageManager.disksSnapshot().map {
disk =>
disk.mountPoint -> disk
- }.toMap.asJava).values().asScala.toSeq ++ storageManager.hdfsDiskInfo
+ }.toMap.asJava).values().asScala.toSeq ++ storageManager.hdfsDiskInfo ++
storageManager.s3DiskInfo
workerStatusManager.checkIfNeedTransitionStatus()
val response = masterClient.askSync[HeartbeatFromWorkerResponse](
HeartbeatFromWorker(
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/FlushTask.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/FlushTask.scala
index 607e47ddc..df0d63be3 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/FlushTask.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/FlushTask.scala
@@ -21,6 +21,9 @@ import java.nio.channels.FileChannel
import io.netty.buffer.{ByteBufUtil, CompositeByteBuf}
import org.apache.hadoop.fs.Path
+import org.apache.hadoop.io.IOUtils
+
+import org.apache.celeborn.common.protocol.StorageInfo.Type
abstract private[worker] class FlushTask(
val buffer: CompositeByteBuf,
@@ -50,8 +53,38 @@ private[worker] class HdfsFlushTask(
notifier: FlushNotifier,
keepBuffer: Boolean) extends FlushTask(buffer, notifier, keepBuffer) {
override def flush(): Unit = {
- val hdfsStream = StorageManager.hadoopFs.append(path, 256 * 1024)
+ val hadoopFs = StorageManager.hadoopFs.get(Type.HDFS)
+ val hdfsStream = hadoopFs.append(path, 256 * 1024)
hdfsStream.write(ByteBufUtil.getBytes(buffer))
hdfsStream.close()
}
}
+
+private[worker] class S3FlushTask(
+ buffer: CompositeByteBuf,
+ val path: Path,
+ notifier: FlushNotifier,
+ keepBuffer: Boolean) extends FlushTask(buffer, notifier, keepBuffer) {
+ override def flush(): Unit = {
+ val hadoopFs = StorageManager.hadoopFs.get(Type.S3)
+ if (hadoopFs.exists(path)) {
+ val conf = hadoopFs.getConf
+ val tempPath = new Path(path.getParent, path.getName + ".tmp")
+ val outputStream = hadoopFs.create(tempPath, true, 256 * 1024)
+ val inputStream = hadoopFs.open(path)
+ try {
+ IOUtils.copyBytes(inputStream, outputStream, conf, false)
+ } finally {
+ inputStream.close()
+ }
+ outputStream.write(ByteBufUtil.getBytes(buffer))
+ outputStream.close()
+ hadoopFs.delete(path, false)
+ hadoopFs.rename(tempPath, path)
+ } else {
+ val s3Stream = hadoopFs.create(path, true, 256 * 1024)
+ s3Stream.write(ByteBufUtil.getBytes(buffer))
+ s3Stream.close()
+ }
+ }
+}
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/Flusher.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/Flusher.scala
index cfc94e962..30194aa83 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/Flusher.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/Flusher.scala
@@ -198,3 +198,22 @@ final private[worker] class HdfsFlusher(
override def toString: String = s"HdfsFlusher@$flusherId"
}
+
+final private[worker] class S3Flusher(
+ workerSource: AbstractSource,
+ s3FlusherThreads: Int,
+ allocator: PooledByteBufAllocator,
+ maxComponents: Int) extends Flusher(
+ workerSource,
+ s3FlusherThreads,
+ allocator,
+ maxComponents,
+ null,
+ "S3") with Logging {
+
+ override def processIOException(e: IOException, deviceErrorType:
DiskStatus): Unit = {
+ logError(s"$this write failed, reason $deviceErrorType ,exception: $e")
+ }
+
+ override def toString: String = s"s3Flusher@$flusherId"
+}
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
index 34579327a..1bfca9b64 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
@@ -57,6 +57,7 @@ final private[worker] class StorageManager(conf:
CelebornConf, workerSource: Abs
val workingDirWriters =
JavaUtils.newConcurrentHashMap[File, ConcurrentHashMap[String,
PartitionDataWriter]]()
val hdfsWriters = JavaUtils.newConcurrentHashMap[String,
PartitionDataWriter]()
+ val s3Writers = JavaUtils.newConcurrentHashMap[String, PartitionDataWriter]()
val memoryWriters = JavaUtils.newConcurrentHashMap[MemoryFileInfo,
PartitionDataWriter]()
// (shuffleKey->(filename->DiskFileInfo))
private val diskFileInfos =
@@ -67,6 +68,8 @@ final private[worker] class StorageManager(conf:
CelebornConf, workerSource: Abs
val hasHDFSStorage = conf.hasHDFSStorage
+ val hasS3Storage = conf.hasS3Storage
+
val storageExpireDirTimeout = conf.workerStorageExpireDirTimeout
val storagePolicy = new StoragePolicy(conf, this, workerSource)
@@ -77,7 +80,7 @@ final private[worker] class StorageManager(conf:
CelebornConf, workerSource: Abs
(new File(workdir, conf.workerWorkingDir), maxSpace, flusherThread,
storageType)
}
- if (workingDirInfos.size <= 0 && !hasHDFSStorage) {
+ if (workingDirInfos.size <= 0 && !hasHDFSStorage && !hasS3Storage) {
throw new IOException("Empty working directory configuration!")
}
@@ -89,6 +92,11 @@ final private[worker] class StorageManager(conf:
CelebornConf, workerSource: Abs
Option(new DiskInfo("HDFS", Long.MaxValue, 999999, 999999, 0,
StorageInfo.Type.HDFS))
else None
+ val s3DiskInfo =
+ if (conf.hasS3Storage)
+ Option(new DiskInfo("S3", Long.MaxValue, 999999, 999999, 0,
StorageInfo.Type.S3))
+ else None
+
def disksSnapshot(): List[DiskInfo] = {
diskInfos.synchronized {
val disks = new util.ArrayList[DiskInfo](diskInfos.values())
@@ -149,6 +157,7 @@ final private[worker] class StorageManager(conf:
CelebornConf, workerSource: Abs
deviceMonitor.startCheck()
val hdfsDir = conf.hdfsDir
+ val s3Dir = conf.s3Dir
val hdfsPermission = new FsPermission("755")
val (hdfsFlusher, _totalHdfsFlusherThread) =
if (hasHDFSStorage) {
@@ -171,12 +180,36 @@ final private[worker] class StorageManager(conf:
CelebornConf, workerSource: Abs
(None, 0)
}
- def totalFlusherThread: Int = _totalLocalFlusherThread +
_totalHdfsFlusherThread
+ val (s3Flusher, _totalS3FlusherThread) =
+ if (hasS3Storage) {
+ logInfo(s"Initialize S3 support with path $s3Dir")
+ try {
+ StorageManager.hadoopFs = CelebornHadoopUtils.getHadoopFS(conf)
+ } catch {
+ case e: Exception =>
+ logError("Celeborn initialize S3 failed.", e)
+ throw e
+ }
+ (
+ Some(new S3Flusher(
+ workerSource,
+ conf.workerS3FlusherThreads,
+ storageBufferAllocator,
+ conf.workerPushMaxComponents)),
+ conf.workerS3FlusherThreads)
+ } else {
+ (None, 0)
+ }
+
+ def totalFlusherThread: Int =
+ _totalLocalFlusherThread + _totalHdfsFlusherThread + _totalS3FlusherThread
+
val activeTypes = conf.availableStorageTypes
- def localOrHdfsStorageAvailable(): Boolean = {
- StorageInfo.HDFSAvailable(activeTypes) || StorageInfo.localDiskAvailable(
- activeTypes) || hdfsDir.nonEmpty || !diskInfos.isEmpty
+ def localOrDfsStorageAvailable(): Boolean = {
+ StorageInfo.OSSAvailable(activeTypes) || StorageInfo.HDFSAvailable(
+ activeTypes) || StorageInfo.localDiskAvailable(
+ activeTypes) || hdfsDir.nonEmpty || !diskInfos.isEmpty || s3Dir.nonEmpty
}
override def notifyError(mountPoint: String, diskStatus: DiskStatus): Unit =
this.synchronized {
@@ -383,7 +416,7 @@ final private[worker] class StorageManager(conf:
CelebornConf, workerSource: Abs
rangeReadFilter: Boolean,
userIdentifier: UserIdentifier,
partitionSplitEnabled: Boolean): PartitionDataWriter = {
- if (healthyWorkingDirs().size <= 0 && !hasHDFSStorage) {
+ if (healthyWorkingDirs().size <= 0 && !hasHDFSStorage && !hasS3Storage) {
throw new IOException("No available working dirs!")
}
val partitionDataWriterContext = new PartitionDataWriterContext(
@@ -445,6 +478,10 @@ final private[worker] class StorageManager(conf:
CelebornConf, workerSource: Abs
hdfsWriters.put(fileInfo.getFilePath, writer)
return
}
+ if (writer.getDiskFileInfo.isS3) {
+ s3Writers.put(fileInfo.getFilePath, writer)
+ return
+ }
deviceMonitor.registerFileWriter(writer)
workingDirWriters.computeIfAbsent(workingDir,
workingDirWriterListFunc).put(
fileInfo.getFilePath,
@@ -506,17 +543,25 @@ final private[worker] class StorageManager(conf:
CelebornConf, workerSource: Abs
def cleanFileInternal(shuffleKey: String, fileInfo: FileInfo): Boolean = {
if (fileInfo == null) return false
- var isHdfsExpired = false
+ var isDfsExpired = false
fileInfo match {
case info: DiskFileInfo =>
if (info.isHdfs) {
- isHdfsExpired = true
+ isDfsExpired = true
val hdfsFileWriter = hdfsWriters.get(info.getFilePath)
if (hdfsFileWriter != null) {
hdfsFileWriter.destroy(new IOException(
s"Destroy FileWriter $hdfsFileWriter caused by shuffle
$shuffleKey expired."))
hdfsWriters.remove(info.getFilePath)
}
+ } else if (info.isS3) {
+ isDfsExpired = true
+ val s3FileWriter = s3Writers.get(info.getFilePath)
+ if (s3FileWriter != null) {
+ s3FileWriter.destroy(new IOException(
+ s"Destroy FileWriter $s3FileWriter caused by shuffle $shuffleKey
expired."))
+ s3Writers.remove(info.getFilePath)
+ }
} else {
val workingDir =
info.getFile.getParentFile.getParentFile.getParentFile
@@ -537,7 +582,7 @@ final private[worker] class StorageManager(conf:
CelebornConf, workerSource: Abs
case _ =>
}
- isHdfsExpired
+ isDfsExpired
}
def cleanupExpiredShuffleKey(
@@ -547,12 +592,14 @@ final private[worker] class StorageManager(conf:
CelebornConf, workerSource: Abs
logInfo(s"Cleanup expired shuffle $shuffleKey.")
if (diskFileInfos.containsKey(shuffleKey)) {
val removedFileInfos = diskFileInfos.remove(shuffleKey)
- var isHdfsExpired = false
+ var isDfsExpired = false
+ var isHdfs = false
if (removedFileInfos != null) {
removedFileInfos.asScala.foreach {
case (_, fileInfo) =>
if (cleanFileInternal(shuffleKey, fileInfo)) {
- isHdfsExpired = true
+ isDfsExpired = true
+ isHdfs = fileInfo.isHdfs
}
}
}
@@ -565,13 +612,16 @@ final private[worker] class StorageManager(conf:
CelebornConf, workerSource: Abs
deleteDirectory(file, diskOperators.get(diskInfo.mountPoint))
}
}
- if (isHdfsExpired) {
+ if (isDfsExpired) {
try {
- StorageManager.hadoopFs.delete(
- new Path(new Path(hdfsDir, conf.workerWorkingDir),
s"$appId/$shuffleId"),
+ val dir = if (hasHDFSStorage && isHdfs) hdfsDir else s3Dir
+ val storageInfo =
+ if (hasHDFSStorage && isHdfs) StorageInfo.Type.HDFS else
StorageInfo.Type.S3
+ StorageManager.hadoopFs.get(storageInfo).delete(
+ new Path(new Path(dir, conf.workerWorkingDir),
s"$appId/$shuffleId"),
true)
} catch {
- case e: Exception => logWarning("Clean expired HDFS shuffle
failed.", e)
+ case e: Exception => logWarning("Clean expired DFS shuffle
failed.", e)
}
}
if (workerGracefulShutdown) {
@@ -671,12 +721,13 @@ final private[worker] class StorageManager(conf:
CelebornConf, workerSource: Abs
}
}
- val hdfsCleaned = hadoopFs match {
- case hdfs: FileSystem =>
- val hdfsWorkPath = new Path(hdfsDir, conf.workerWorkingDir)
- // HDFS path not exist when first time initialize
- if (hdfs.exists(hdfsWorkPath)) {
- !hdfs.listFiles(hdfsWorkPath, false).hasNext
+ val dfsCleaned = hadoopFs match {
+ case dfs: FileSystem =>
+ val dfsDir = if (hasHDFSStorage) hdfsDir else s3Dir
+ val dfsWorkPath = new Path(dfsDir, conf.workerWorkingDir)
+ // DFS path not exist when first time initialize
+ if (dfs.exists(dfsWorkPath)) {
+ !dfs.listFiles(dfsWorkPath, false).hasNext
} else {
true
}
@@ -684,7 +735,7 @@ final private[worker] class StorageManager(conf:
CelebornConf, workerSource: Abs
true
}
- if (localCleaned && hdfsCleaned) {
+ if (localCleaned && dfsCleaned) {
return true
}
retryTimes += 1
@@ -763,6 +814,11 @@ final private[worker] class StorageManager(conf:
CelebornConf, workerSource: Abs
u.flushOnMemoryPressure()
}
})
+ s3Writers.forEach(new BiConsumer[String, PartitionDataWriter] {
+ override def accept(t: String, u: PartitionDataWriter): Unit = {
+ u.flushOnMemoryPressure()
+ }
+ })
}
override def onPause(moduleName: String): Unit = {}
@@ -838,7 +894,7 @@ final private[worker] class StorageManager(conf:
CelebornConf, workerSource: Abs
fileInfos: List[DiskFileInfo],
subResourceConsumptions: util.Map[String, ResourceConsumption] = null)
: ResourceConsumption = {
- val diskFileInfos = fileInfos.filter(!_.isHdfs)
+ val diskFileInfos = fileInfos.filter(!_.isDFS)
val hdfsFileInfos = fileInfos.filter(_.isHdfs)
ResourceConsumption(
diskFileInfos.map(_.getFileLength).sum,
@@ -884,7 +940,7 @@ final private[worker] class StorageManager(conf:
CelebornConf, workerSource: Abs
null,
null,
null)
- } else if (location.getStorageInfo.localDiskAvailable() ||
location.getStorageInfo.HDFSAvailable()) {
+ } else if (location.getStorageInfo.localDiskAvailable() ||
location.getStorageInfo.HDFSAvailable() ||
location.getStorageInfo.S3Available()) {
logDebug(s"create non-memory file for
${partitionDataWriterContext.getShuffleKey}
${partitionDataWriterContext.getPartitionLocation.getFileName}")
val createDiskFileResult = createDiskFile(
location,
@@ -953,14 +1009,17 @@ final private[worker] class StorageManager(conf:
CelebornConf, workerSource: Abs
s" working dirs. diskInfo $diskInfo")
healthyWorkingDirs()
}
- if (dirs.isEmpty && hdfsFlusher.isEmpty) {
+ if (dirs.isEmpty && hdfsFlusher.isEmpty && s3Flusher.isEmpty) {
throw new IOException(s"No available disks! suggested mountPoint
$suggestedMountPoint")
}
if (dirs.isEmpty && location.getStorageInfo.HDFSAvailable()) {
val shuffleDir =
new Path(new Path(hdfsDir, conf.workerWorkingDir),
s"$appId/$shuffleId")
- FileSystem.mkdirs(StorageManager.hadoopFs, shuffleDir, hdfsPermission)
+ FileSystem.mkdirs(
+ StorageManager.hadoopFs.get(StorageInfo.Type.HDFS),
+ shuffleDir,
+ hdfsPermission)
val hdfsFilePath = new Path(shuffleDir, fileName).toString
val hdfsFileInfo = new DiskFileInfo(
userIdentifier,
@@ -972,6 +1031,24 @@ final private[worker] class StorageManager(conf:
CelebornConf, workerSource: Abs
fileName,
hdfsFileInfo)
return (hdfsFlusher.get, hdfsFileInfo, null)
+ } else if (dirs.isEmpty && location.getStorageInfo.S3Available()) {
+ val shuffleDir =
+ new Path(new Path(s3Dir, conf.workerWorkingDir),
s"$appId/$shuffleId")
+ FileSystem.mkdirs(
+ StorageManager.hadoopFs.get(StorageInfo.Type.S3),
+ shuffleDir,
+ hdfsPermission)
+ val s3FilePath = new Path(shuffleDir, fileName).toString
+ val s3FileInfo = new DiskFileInfo(
+ userIdentifier,
+ partitionSplitEnabled,
+ new ReduceFileMeta(conf.shuffleChunkSize),
+ s3FilePath,
+ StorageInfo.Type.S3)
+ diskFileInfos.computeIfAbsent(shuffleKey, diskFileInfoMapFunc).put(
+ fileName,
+ s3FileInfo)
+ return (s3Flusher.get, s3FileInfo, null)
} else if (dirs.nonEmpty &&
location.getStorageInfo.localDiskAvailable()) {
val dir = dirs(getNextIndex % dirs.size)
val mountPoint = DeviceInfo.getMountPoint(dir.getAbsolutePath,
mountPoints)
@@ -1037,5 +1114,5 @@ final private[worker] class StorageManager(conf:
CelebornConf, workerSource: Abs
}
object StorageManager {
- var hadoopFs: FileSystem = _
+ var hadoopFs: util.Map[StorageInfo.Type, FileSystem] = _
}
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StoragePolicy.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StoragePolicy.scala
index 228063d81..20c56ee8c 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StoragePolicy.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StoragePolicy.scala
@@ -61,7 +61,7 @@ class StoragePolicy(conf: CelebornConf, storageManager:
StorageManager, source:
partitionDataWriterContext.isPartitionSplitEnabled)
partitionDataWriterContext.setStorageType(storageInfoType)
new CelebornMemoryFile(conf, source, memoryFileInfo,
storageInfoType)
- case StorageInfo.Type.HDD | StorageInfo.Type.SSD |
StorageInfo.Type.HDFS | StorageInfo.Type.OSS =>
+ case StorageInfo.Type.HDD | StorageInfo.Type.SSD |
StorageInfo.Type.HDFS | StorageInfo.Type.OSS | StorageInfo.Type.S3 =>
logDebug(s"create non-memory file for
${partitionDataWriterContext.getShuffleKey}
${partitionDataWriterContext.getPartitionLocation.getFileName}")
val (flusher, diskFileInfo, workingDir) =
storageManager.createDiskFile(
location,
diff --git
a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/memory/MemoryReducePartitionDataWriterSuiteJ.java
b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/memory/MemoryReducePartitionDataWriterSuiteJ.java
index cf71200dc..e2d9f6a41 100644
---
a/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/memory/MemoryReducePartitionDataWriterSuiteJ.java
+++
b/worker/src/test/java/org/apache/celeborn/service/deploy/worker/storage/memory/MemoryReducePartitionDataWriterSuiteJ.java
@@ -110,7 +110,7 @@ public class MemoryReducePartitionDataWriterSuiteJ {
storageManager = Mockito.mock(StorageManager.class);
AtomicLong evictCount = new AtomicLong();
Mockito.when(storageManager.evictedFileCount()).thenAnswer(a ->
evictCount);
- Mockito.when(storageManager.localOrHdfsStorageAvailable()).thenAnswer(a ->
true);
+ Mockito.when(storageManager.localOrDfsStorageAvailable()).thenAnswer(a ->
true);
Mockito.when(storageManager.storageBufferAllocator()).thenAnswer(a ->
allocator);
MemoryManager.initialize(conf, storageManager);
}