FMX commented on code in PR #2579:
URL: https://github.com/apache/celeborn/pull/2579#discussion_r1678873110
##########
pom.xml:
##########
@@ -71,6 +71,7 @@
<!-- use hadoop-3 as default -->
<hadoop.version>3.3.6</hadoop.version>
+ <aws.version>1.12.367</aws.version>
Review Comment:
This version can be moved to the hadoop-aws profile.
##########
client/src/main/java/org/apache/celeborn/client/read/DfsPartitionReader.java:
##########
@@ -85,6 +87,13 @@ public DfsPartitionReader(
this.metricsCallback = metricsCallback;
this.location = location;
+ FileSystem hadoopFs = null;
Review Comment:
A dfs partition reader will read one partition location only. So the
hadoopFS can be cached as a variable in this class. This can eliminate the
unnecessary condition blocks.
##########
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala:
##########
@@ -89,9 +92,14 @@ final private[worker] class StorageManager(conf:
CelebornConf, workerSource: Abs
Option(new DiskInfo("HDFS", Long.MaxValue, 999999, 999999, 0,
StorageInfo.Type.HDFS))
else None
- def disksSnapshot(): List[DiskInfo] = {
+ val s3DiskInfo =
+ if (conf.hasS3Storage)
+ Option(new S3DiskInfo("S3", Long.MaxValue, 999999, 999999, 0,
StorageInfo.Type.S3))
Review Comment:
Is there any particular reason why S3 can not use diskInfo?
##########
common/src/main/java/org/apache/celeborn/common/meta/DiskInfoBase.scala:
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.common.meta
+
+import java.io.File
+import java.util
+
+import org.apache.celeborn.common.internal.Logging
+import org.apache.celeborn.common.protocol.StorageInfo
+import org.apache.celeborn.common.util.Utils
+import scala.collection.JavaConverters._
+
+abstract class DiskInfoBase(
Review Comment:
DiskInfo can be used in S3 scenarios. What is the necessity of this
abstraction?
##########
worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionDataWriter.java:
##########
@@ -63,19 +65,25 @@ public MapPartitionDataWriter(
super(storageManager, workerSource, conf, deviceMonitor, writerContext,
false);
Preconditions.checkState(diskFileInfo != null);
- if (!diskFileInfo.isHdfs()) {
+ if (!diskFileInfo.isDFS()) {
indexChannel =
FileChannelUtils.createWritableFileChannel(diskFileInfo.getIndexPath());
} else {
+ FileSystem hadoopFs = null;
Review Comment:
A map partition data writer is connected to one partition location which
means that its location is fixed. So cache the fs and avoid the unnecessary
condition checks.
##########
master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala:
##########
@@ -323,14 +326,26 @@ private[celeborn] class Master(
checkForHDFSRemnantDirsTimeOutTask =
forwardMessageThread.scheduleWithFixedDelay(
new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
- self.send(CheckForHDFSExpiredDirsTimeout)
+ self.send(CheckForDFSExpiredDirsTimeout)
}
},
hdfsExpireDirsTimeoutMS,
hdfsExpireDirsTimeoutMS,
TimeUnit.MILLISECONDS)
}
+ if (hasS3Storage) {
Review Comment:
this block can be merged with previous one.
##########
worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java:
##########
@@ -588,25 +597,28 @@ public DiskFileInfo resolve(
fileId,
() -> {
FileChannel indexChannel = null;
- FSDataInputStream hdfsIndexStream = null;
- boolean isHdfs = Utils.isHdfsPath(indexFilePath);
+ FSDataInputStream dfsIndexStream = null;
+ boolean isDfs = Utils.isHdfsPath(indexFilePath) ||
Utils.isS3Path(indexFilePath);
+ boolean isS3 = Utils.isS3Path(indexFilePath);
int indexSize;
try {
- if (isHdfs) {
- hdfsIndexStream = StorageManager.hadoopFs().open(new
Path(indexFilePath));
- indexSize =
- (int)
- StorageManager.hadoopFs()
- .getFileStatus(new Path(indexFilePath))
- .getLen();
+ if (isDfs) {
Review Comment:
Do not write codes like
https://github.com/apache/celeborn/blob/272412287d33e3701e9d93a1237901de8323a6db/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java#L605-L611
##########
worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java:
##########
@@ -166,24 +170,32 @@ public PartitionDataWriter(
}
public void initFileChannelsForDiskFile() throws IOException {
- if (!this.diskFileInfo.isHdfs()) {
+ if (!this.diskFileInfo.isDFS()) {
this.flusherBufferSize = localFlusherBufferSize;
channel =
FileChannelUtils.createWritableFileChannel(this.diskFileInfo.getFilePath());
} else {
- this.flusherBufferSize = hdfsFlusherBufferSize;
- // We open the stream and close immediately because HDFS output stream
will
+ FileSystem hadoopFs = null;
Review Comment:
Keep this hadoopFs as a class variable. No need to do so many conditions to
get the corresponding hadoopFS.
##########
worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/ReducePartitionDataWriter.java:
##########
@@ -88,17 +90,20 @@ public synchronized long close() throws IOException {
},
() -> {
if (diskFileInfo != null) {
- if (diskFileInfo.isHdfs()) {
- if (StorageManager.hadoopFs()
- .exists(diskFileInfo.getHdfsPeerWriterSuccessPath())) {
-
StorageManager.hadoopFs().delete(diskFileInfo.getHdfsPath(), false);
+ if (diskFileInfo.isDFS()) {
Review Comment:
ditto.
##########
build/make-distribution.sh:
##########
@@ -27,6 +27,8 @@ RELEASE="false"
MVN="$PROJECT_DIR/build/mvn"
SBT="$PROJECT_DIR/build/sbt"
SBT_ENABLED="false"
+HADOOP_AWS_ENABLED="false"
Review Comment:
This change is not suitable.
We make releases using `./build/make-distribution.sh
-Pspark-3.3,hadoop-hadoop-aws`.
##########
client/src/main/java/org/apache/celeborn/client/read/DfsPartitionReader.java:
##########
@@ -154,10 +168,16 @@ private List<Long> getChunkOffsetsFromSortedIndex(
throws IOException {
String indexPath =
Utils.getIndexFilePath(location.getStorageInfo().getFilePath());
List<Long> offsets;
- try (FSDataInputStream indexInputStream =
- ShuffleClient.getHdfsFs(conf).open(new Path(indexPath))) {
+ FileSystem hadoopFs = null;
Review Comment:
ditto
##########
build/make-distribution.sh:
##########
@@ -62,6 +64,11 @@ while (( "$#" )); do
echo "Error: $1 is not supported"
exit_with_usage
;;
+ -P*)
Review Comment:
These changes can be deleted.
##########
build/make-distribution.sh:
##########
@@ -256,8 +263,11 @@ function sbt_build_service {
echo "Celeborn $VERSION$GITREVSTRING" > "$DIST_DIR/RELEASE"
echo "Build flags: $@" >> "$DIST_DIR/RELEASE"
+ if [[ "$HADOOP_AWS_ENABLED" == "true" ]]; then
+ PROFILE="-Phadoop-aws"
+ fi
- BUILD_COMMAND=("$SBT" clean package)
+ BUILD_COMMAND=("$SBT" clean package "$PROFILE")
Review Comment:
These changes can be deleted.
##########
client/src/main/java/org/apache/celeborn/client/read/DfsPartitionReader.java:
##########
@@ -137,9 +145,15 @@ public DfsPartitionReader(
private List<Long> getChunkOffsetsFromUnsortedIndex(CelebornConf conf,
PartitionLocation location)
throws IOException {
List<Long> offsets;
+ FileSystem hadoopFs = null;
Review Comment:
ditto
##########
client/src/main/java/org/apache/celeborn/client/read/DfsPartitionReader.java:
##########
@@ -196,24 +216,29 @@ public ByteBuf next() throws IOException,
InterruptedException {
logger.debug("read {} offset {} length {}", currentChunkIndex,
offset, length);
byte[] buffer = new byte[(int) length];
try {
- hdfsInputStream.readFully(offset, buffer);
+ dfsInputStream.readFully(offset, buffer);
} catch (IOException e) {
logger.warn(
- "read HDFS {} failed will retry, error detail {}",
+ "read DFS {} failed will retry, error detail {}",
location.getStorageInfo().getFilePath(),
e);
try {
- hdfsInputStream.close();
- hdfsInputStream =
- ShuffleClient.getHdfsFs(conf)
- .open(
- new Path(
- Utils.getSortedFilePath(
-
location.getStorageInfo().getFilePath())));
- hdfsInputStream.readFully(offset, buffer);
+ dfsInputStream.close();
Review Comment:
ditto
##########
common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala:
##########
@@ -2126,6 +2150,14 @@ object CelebornConf extends Logging {
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("1h")
+ val S3_EXPIRE_DIRS_TIMEOUT: ConfigEntry[Long] =
+ buildConf("celeborn.master.s3.expireDirs.timeout")
+ .categories("master")
+ .version("0.5.0")
Review Comment:
These version numbers should change to 0.6.0.
##########
common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala:
##########
@@ -2782,6 +2814,38 @@ object CelebornConf extends Logging {
.stringConf
.createOptional
+ val S3_DIR: OptionalConfigEntry[String] =
+ buildConf("celeborn.storage.s3.dir")
+ .categories("worker", "master", "client")
+ .version("0.5.0")
+ .doc("S3 base directory for Celeborn to store shuffle data.")
+ .stringConf
+ .createOptional
+
+ val S3_SECRET_KEY: OptionalConfigEntry[String] =
+ buildConf("celeborn.storage.s3.secret.key")
+ .categories("worker", "master", "client")
+ .version("0.5.0")
Review Comment:
ditto
##########
common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala:
##########
@@ -2782,6 +2814,38 @@ object CelebornConf extends Logging {
.stringConf
.createOptional
+ val S3_DIR: OptionalConfigEntry[String] =
+ buildConf("celeborn.storage.s3.dir")
+ .categories("worker", "master", "client")
+ .version("0.5.0")
+ .doc("S3 base directory for Celeborn to store shuffle data.")
+ .stringConf
+ .createOptional
+
+ val S3_SECRET_KEY: OptionalConfigEntry[String] =
+ buildConf("celeborn.storage.s3.secret.key")
+ .categories("worker", "master", "client")
+ .version("0.5.0")
+ .doc("S3 secret key for Celeborn to store shuffle data.")
+ .stringConf
+ .createOptional
+
+ val S3_ACCESS_KEY: OptionalConfigEntry[String] =
+ buildConf("celeborn.storage.s3.access.key")
+ .categories("worker", "master", "client")
+ .version("0.5.0")
Review Comment:
ditto
##########
common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala:
##########
@@ -2782,6 +2814,38 @@ object CelebornConf extends Logging {
.stringConf
.createOptional
+ val S3_DIR: OptionalConfigEntry[String] =
+ buildConf("celeborn.storage.s3.dir")
+ .categories("worker", "master", "client")
+ .version("0.5.0")
Review Comment:
version number needs to be updated
##########
common/src/main/scala/org/apache/celeborn/common/meta/DeviceInfo.scala:
##########
@@ -77,127 +83,67 @@ class DiskInfo(
conf.workerDiskTimeSlidingWindowMinFetchCount)
}
- var flushTimeMetrics: TimeWindow = _
- var fetchTimeMetrics: TimeWindow = _
- var status: DiskStatus = DiskStatus.HEALTHY
- var threadCount = 1
- var configuredUsableSpace = 0L
- var totalSpace = 0L
- var storageType: StorageInfo.Type = StorageInfo.Type.SSD
- var maxSlots: Long = 0
- lazy val shuffleAllocations = new util.HashMap[String, Integer]()
- lazy val applicationAllocations = new util.HashMap[String, Integer]()
-
- def setStorageType(storageType: StorageInfo.Type) = {
- this.storageType = storageType
- }
-
- def setStatus(status: DiskStatus): this.type = this.synchronized {
- this.status = status
- this
- }
-
- def setUsableSpace(usableSpace: Long): this.type = this.synchronized {
- this.actualUsableSpace = usableSpace
- this
- }
-
- def setTotalSpace(totalSpace: Long): this.type = this.synchronized {
- this.totalSpace = totalSpace
- this
- }
-
- def updateFlushTime(): Unit = {
- avgFlushTime = flushTimeMetrics.getAverage()
- }
-
- def updateFetchTime(): Unit = {
- avgFetchTime = fetchTimeMetrics.getAverage()
- }
-
- /**
- * Returns the available slots of the disk calculated by maxSlots minus
activeSlots.
- * Returns zero for the negative slots calculated.
- *
- * <b>Note:</b>`maxSlots` is calculated by actualUsableSpace divided
estimatedPartitionSize.
- * Meanwhile, `activeSlots` include slots reserved.
- *
- * @return the available slots of the disk.
- */
- def availableSlots(): Long = this.synchronized {
- math.max(maxSlots - activeSlots, 0L)
- }
-
- def allocateSlots(shuffleKey: String, slots: Int): Unit = this.synchronized {
- val applicationId = Utils.splitShuffleKey(shuffleKey)._1
- val shuffleAllocated = shuffleAllocations.getOrDefault(shuffleKey, 0)
- val applicationAllocated =
applicationAllocations.getOrDefault(applicationId, 0)
- shuffleAllocations.put(shuffleKey, shuffleAllocated + slots)
- applicationAllocations.put(applicationId, applicationAllocated + slots)
- activeSlots = activeSlots + slots
- }
-
- def releaseSlots(shuffleKey: String, slots: Int): Unit = this.synchronized {
- val applicationId = Utils.splitShuffleKey(shuffleKey)._1
- val shuffleAllocated = shuffleAllocations.getOrDefault(shuffleKey, 0)
- val applicationAllocated =
applicationAllocations.getOrDefault(applicationId, 0)
- if (shuffleAllocated < slots) {
- logError(s"allocated $shuffleAllocated is less than to release $slots !")
- } else {
- shuffleAllocations.put(shuffleKey, shuffleAllocated - slots)
- applicationAllocations.put(applicationId, applicationAllocated - slots)
- }
- activeSlots -= Math.min(shuffleAllocated, slots)
- }
+}
- def releaseSlots(shuffleKey: String): Unit = this.synchronized {
- val allocated = shuffleAllocations.remove(shuffleKey)
- if (allocated != null) {
- val applicationId = Utils.splitShuffleKey(shuffleKey)._1
- var applicationAllocated =
applicationAllocations.getOrDefault(applicationId, 0)
- applicationAllocated = applicationAllocated - allocated
- if (applicationAllocated <= 0) {
- applicationAllocations.remove(applicationId)
- } else {
- applicationAllocations.put(applicationId, applicationAllocated)
- }
- activeSlots = activeSlots - allocated
- }
+class S3DiskInfo(
Review Comment:
I think it's unnecessary.
##########
common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala:
##########
@@ -3189,6 +3253,14 @@ object CelebornConf extends Logging {
.bytesConf(ByteUnit.BYTE)
.createWithDefaultString("4m")
+ val WORKER_S3_FLUSHER_BUFFER_SIZE: ConfigEntry[Long] =
+ buildConf("celeborn.worker.flusher.s3.buffer.size")
+ .categories("worker")
+ .version("0.5.0")
Review Comment:
ditto
##########
common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala:
##########
@@ -2782,6 +2814,38 @@ object CelebornConf extends Logging {
.stringConf
.createOptional
+ val S3_DIR: OptionalConfigEntry[String] =
+ buildConf("celeborn.storage.s3.dir")
+ .categories("worker", "master", "client")
+ .version("0.5.0")
+ .doc("S3 base directory for Celeborn to store shuffle data.")
+ .stringConf
+ .createOptional
+
+ val S3_SECRET_KEY: OptionalConfigEntry[String] =
+ buildConf("celeborn.storage.s3.secret.key")
+ .categories("worker", "master", "client")
+ .version("0.5.0")
+ .doc("S3 secret key for Celeborn to store shuffle data.")
+ .stringConf
+ .createOptional
+
+ val S3_ACCESS_KEY: OptionalConfigEntry[String] =
+ buildConf("celeborn.storage.s3.access.key")
+ .categories("worker", "master", "client")
+ .version("0.5.0")
+ .doc("S3 access key for Celeborn to store shuffle data.")
+ .stringConf
+ .createOptional
+
+ val S3_ENDPOINT: OptionalConfigEntry[String] =
+ buildConf("celeborn.storage.s3.endpoint")
+ .categories("worker", "master", "client")
+ .version("0.5.0")
Review Comment:
ditto
##########
worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionDataWriter.java:
##########
@@ -120,29 +128,40 @@ public synchronized long close() throws IOException {
return super.close(
this::flushIndex,
() -> {
- if (diskFileInfo.isHdfs()) {
- if
(StorageManager.hadoopFs().exists(diskFileInfo.getHdfsPeerWriterSuccessPath()))
{
- StorageManager.hadoopFs().delete(diskFileInfo.getHdfsPath(),
false);
+ if (diskFileInfo.isDFS()) {
Review Comment:
ditto
##########
common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala:
##########
@@ -3229,6 +3301,14 @@ object CelebornConf extends Logging {
.intConf
.createWithDefault(8)
+ val WORKER_FLUSHER_S3_THREADS: ConfigEntry[Int] =
+ buildConf("celeborn.worker.flusher.s3.threads")
+ .categories("worker")
+ .doc("Flusher's thread count used for write data to S3.")
+ .version("0.5.0")
Review Comment:
ditto
##########
master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/MetaUtil.java:
##########
@@ -55,34 +57,47 @@ public static ResourceProtos.WorkerAddress
infoToAddr(WorkerInfo info) {
.build();
}
- public static Map<String, DiskInfo> fromPbDiskInfos(
- Map<String, ResourceProtos.DiskInfo> diskInfos) {
- Map<String, DiskInfo> map = new HashMap<>();
+ public static Map<String, DiskInfoBase> fromPbDiskInfos(
+ Map<String, ResourceProtos.DiskInfoBase> diskInfos) {
+ Map<String, DiskInfoBase> map = new HashMap<>();
diskInfos.forEach(
(k, v) -> {
- DiskInfo diskInfo =
- new DiskInfo(
- v.getMountPoint(),
- v.getUsableSpace(),
- v.getAvgFlushTime(),
- v.getAvgFetchTime(),
- v.getUsedSlots(),
- StorageInfo.typesMap.get(v.getStorageType()));
- diskInfo.setStatus(Utils.toDiskStatus(v.getStatus()));
- map.put(k, diskInfo);
+ if (v.getStorageType() == StorageInfo.Type.S3.getValue()) {
Review Comment:
These two blocks are identical.
##########
worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionDataWriter.java:
##########
@@ -120,29 +128,40 @@ public synchronized long close() throws IOException {
return super.close(
this::flushIndex,
() -> {
- if (diskFileInfo.isHdfs()) {
- if
(StorageManager.hadoopFs().exists(diskFileInfo.getHdfsPeerWriterSuccessPath()))
{
- StorageManager.hadoopFs().delete(diskFileInfo.getHdfsPath(),
false);
+ if (diskFileInfo.isDFS()) {
+ FileSystem hadoopFs = null;
+ if (diskFileInfo.isS3()) {
Review Comment:
ditto
##########
worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionDataWriter.java:
##########
@@ -255,11 +274,16 @@ private void flushIndex() throws IOException {
while (indexBuffer.hasRemaining()) {
indexChannel.write(indexBuffer);
}
- } else if (diskFileInfo.isHdfs()) {
- FSDataOutputStream hdfsStream =
-
StorageManager.hadoopFs().append(diskFileInfo.getHdfsIndexPath());
- hdfsStream.write(indexBuffer.array());
- hdfsStream.close();
+ } else if (diskFileInfo.isDFS()) {
Review Comment:
ditto
##########
worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/MapPartitionDataWriter.java:
##########
@@ -120,29 +128,40 @@ public synchronized long close() throws IOException {
return super.close(
this::flushIndex,
() -> {
- if (diskFileInfo.isHdfs()) {
- if
(StorageManager.hadoopFs().exists(diskFileInfo.getHdfsPeerWriterSuccessPath()))
{
- StorageManager.hadoopFs().delete(diskFileInfo.getHdfsPath(),
false);
+ if (diskFileInfo.isDFS()) {
+ FileSystem hadoopFs = null;
+ if (diskFileInfo.isS3()) {
+ hadoopFs = StorageManager.hadoopFs().get(StorageInfo.Type.S3);
+ } else {
+ hadoopFs = StorageManager.hadoopFs().get(StorageInfo.Type.HDFS);
+ }
+ if (hadoopFs.exists(diskFileInfo.getDfsPeerWriterSuccessPath())) {
+ hadoopFs.delete(diskFileInfo.getDfsPath(), false);
deleted = true;
} else {
-
StorageManager.hadoopFs().create(diskFileInfo.getHdfsWriterSuccessPath()).close();
+ hadoopFs.create(diskFileInfo.getDfsWriterSuccessPath()).close();
}
}
},
() -> {
if (indexChannel != null) {
indexChannel.close();
}
- if (diskFileInfo.isHdfs()) {
- if (StorageManager.hadoopFs()
- .exists(
- new Path(
- Utils.getWriteSuccessFilePath(
- Utils.getPeerPath(diskFileInfo.getIndexPath())))))
{
-
StorageManager.hadoopFs().delete(diskFileInfo.getHdfsIndexPath(), false);
+ if (diskFileInfo.isDFS()) {
Review Comment:
ditto
##########
worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java:
##########
@@ -166,24 +170,32 @@ public PartitionDataWriter(
}
public void initFileChannelsForDiskFile() throws IOException {
- if (!this.diskFileInfo.isHdfs()) {
+ if (!this.diskFileInfo.isDFS()) {
this.flusherBufferSize = localFlusherBufferSize;
channel =
FileChannelUtils.createWritableFileChannel(this.diskFileInfo.getFilePath());
} else {
- this.flusherBufferSize = hdfsFlusherBufferSize;
- // We open the stream and close immediately because HDFS output stream
will
+ FileSystem hadoopFs = null;
+ if (diskFileInfo.isS3()) {
+ hadoopFs = StorageManager.hadoopFs().get(StorageInfo.Type.S3);
+ this.flusherBufferSize = s3FlusherBufferSize;
+ } else {
+ hadoopFs = StorageManager.hadoopFs().get(StorageInfo.Type.HDFS);
+
+ this.flusherBufferSize = hdfsFlusherBufferSize;
+ }
+ // We open the stream and close immediately because DFS output stream
will
// create a DataStreamer that is a thread.
- // If we reuse HDFS output stream, we will exhaust the memory soon.
+ // If we reuse DFS output stream, we will exhaust the memory soon.
Review Comment:
I wonder if the AWS Hadoop has some optimization about this point.
##########
worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java:
##########
@@ -588,25 +597,28 @@ public DiskFileInfo resolve(
fileId,
() -> {
FileChannel indexChannel = null;
- FSDataInputStream hdfsIndexStream = null;
- boolean isHdfs = Utils.isHdfsPath(indexFilePath);
+ FSDataInputStream dfsIndexStream = null;
+ boolean isDfs = Utils.isHdfsPath(indexFilePath) ||
Utils.isS3Path(indexFilePath);
+ boolean isS3 = Utils.isS3Path(indexFilePath);
int indexSize;
try {
- if (isHdfs) {
- hdfsIndexStream = StorageManager.hadoopFs().open(new
Path(indexFilePath));
- indexSize =
- (int)
- StorageManager.hadoopFs()
- .getFileStatus(new Path(indexFilePath))
- .getLen();
+ if (isDfs) {
Review Comment:
A file sorter will handle one partition location only. Save the
corresponding hadoopFs to the object will be fine.
##########
worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java:
##########
@@ -673,11 +690,17 @@ class FileSorter {
indexFile.delete();
}
} else {
- if (StorageManager.hadoopFs().exists(fileInfo.getHdfsSortedPath())) {
- StorageManager.hadoopFs().delete(fileInfo.getHdfsSortedPath(),
false);
+ FileSystem hadoopFs = null;
Review Comment:
ditto
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]