This is an automated email from the ASF dual-hosted git repository.
nicholasjiang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 4749382f9 [CELEBORN-2211] Avoid allocating additional buffers When
HdfsFlushTask writes data
4749382f9 is described below
commit 4749382f929a45c3491c3d4beda0aca253132fcc
Author: xxx <[email protected]>
AuthorDate: Thu Jan 15 17:03:29 2026 +0800
[CELEBORN-2211] Avoid allocating additional buffers When HdfsFlushTask
writes data
### What changes were proposed in this pull request?
Initialize the `FSDataOutputStream` in the `DfsTierWriter`, use it for data
writing when the `HdfsFlushTask` performs flushing, and close the
`FSDataOutputStream` when executing `closeStreams`.
### Why are the changes needed?
Avoid allocating additional buffers When HdfsFlushTask writes data.
### Does this PR resolve a correctness bug?
NO
### Does this PR introduce _any_ user-facing change?
NO
### How was this patch tested?
CI
Closes #3548 from xy2953396112/CELEBORN-2211.
Authored-by: xxx <[email protected]>
Signed-off-by: SteNicholas <[email protected]>
---
.../org/apache/celeborn/common/CelebornConf.scala | 9 +++++++
docs/configuration/worker.md | 1 +
.../service/deploy/worker/storage/FlushTask.scala | 31 +++++++++++++++++-----
.../service/deploy/worker/storage/TierWriter.scala | 25 ++++++++++++++---
4 files changed, 55 insertions(+), 11 deletions(-)
diff --git
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index ec397af51..6b21abed5 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -853,6 +853,7 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable
with Logging with Se
def workerUnavailableInfoExpireTimeout: Long =
get(WORKER_UNAVAILABLE_INFO_EXPIRE_TIMEOUT)
def allowWorkerHostPattern: Option[Regex] = get(ALLOW_WORKER_HOST_PATTERN)
def denyWorkerHostPattern: Option[Regex] = get(DENY_WORKER_HOST_PATTERN)
+ def workerReuseHdfsOuputSteamEnabled: Boolean =
get(WORKER_REUSE_HDFS_OUTPUT_STREAM_ENABLED)
def workerReplicateThreads: Int = get(WORKER_REPLICATE_THREADS)
def workerCommitThreads: Int =
@@ -2488,6 +2489,14 @@ object CelebornConf extends Logging {
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("1h")
+ val WORKER_REUSE_HDFS_OUTPUT_STREAM_ENABLED: ConfigEntry[Boolean] =
+ buildConf("celeborn.worker.reuse.hdfs.outputStream.enabled")
+ .categories("worker")
+ .version("0.7.0")
+ .doc("Whether to enable reuse output stream on hdfs.")
+ .booleanConf
+ .createWithDefault(false)
+
val WORKER_HEARTBEAT_TIMEOUT: ConfigEntry[Long] =
buildConf("celeborn.master.heartbeat.worker.timeout")
.withAlternative("celeborn.worker.heartbeat.timeout")
diff --git a/docs/configuration/worker.md b/docs/configuration/worker.md
index ff5b097fb..b34c5bd66 100644
--- a/docs/configuration/worker.md
+++ b/docs/configuration/worker.md
@@ -177,6 +177,7 @@ license: |
| celeborn.worker.replicate.port | 0 | false | Server port for Worker to
receive replicate data request from other Workers. | 0.2.0 | |
| celeborn.worker.replicate.randomConnection.enabled | true | false | Whether
worker will create random connection to peer when replicate data. When false,
worker tend to reuse the same cached TransportClient to a specific replicate
worker; when true, worker tend to use different cached TransportClient. Netty
will use the same thread to serve the same connection, so with more connections
replicate server can leverage more netty threads | 0.2.1 | |
| celeborn.worker.replicate.threads | 64 | false | Thread number of worker to
replicate shuffle data. | 0.2.0 | |
+| celeborn.worker.reuse.hdfs.outputStream.enabled | false | false | Whether to
enable reuse output stream on hdfs. | 0.7.0 | |
| celeborn.worker.rpc.port | 0 | false | Server port for Worker to receive RPC
request. | 0.2.0 | |
| celeborn.worker.shuffle.partitionSplit.enabled | true | false | enable the
partition split on worker side | 0.3.0 |
celeborn.worker.partition.split.enabled |
| celeborn.worker.shuffle.partitionSplit.max | 2g | false | Specify the
maximum partition size for splitting, and ensure that individual partition
files are always smaller than this limit. | 0.3.0 | |
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/FlushTask.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/FlushTask.scala
index 5053456d4..4effe460d 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/FlushTask.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/FlushTask.scala
@@ -21,7 +21,7 @@ import java.io.{ByteArrayInputStream, Closeable, IOException}
import java.nio.channels.FileChannel
import io.netty.buffer.{ByteBufUtil, CompositeByteBuf}
-import org.apache.hadoop.fs.Path
+import org.apache.hadoop.fs.{FSDataOutputStream, Path}
import org.apache.celeborn.common.internal.Logging
import org.apache.celeborn.common.metrics.source.AbstractSource
@@ -98,20 +98,37 @@ abstract private[worker] class DfsFlushTask(
private[worker] class HdfsFlushTask(
buffer: CompositeByteBuf,
+ hdfsStream: FSDataOutputStream,
val path: Path,
notifier: FlushNotifier,
keepBuffer: Boolean,
source: AbstractSource) extends DfsFlushTask(buffer, notifier, keepBuffer,
source) {
override def flush(copyBytes: Array[Byte]): Unit = {
val readableBytes = buffer.readableBytes()
- val hadoopFs = StorageManager.hadoopFs.get(Type.HDFS)
- val hdfsStream = hadoopFs.append(path, 256 * 1024)
- flush(hdfsStream) {
- hdfsStream.write(convertBufferToBytes(buffer, copyBytes, readableBytes))
- source.incCounter(WorkerSource.HDFS_FLUSH_COUNT)
- source.incCounter(WorkerSource.HDFS_FLUSH_SIZE, readableBytes)
+ val bytes = convertBufferToBytes(buffer, copyBytes, readableBytes)
+
+ if (hdfsStream != null) {
+ // TODO : If the FSDataOutputStream supports concurrent writes, the lock
can be removed.
+ hdfsStream.synchronized {
+ writeAndRecordMetrics(hdfsStream, bytes, readableBytes)
+ }
+ } else {
+ val hadoopFs = StorageManager.hadoopFs.get(Type.HDFS)
+ val appendStream = hadoopFs.append(path, 256 * 1024)
+ flush(appendStream) {
+ writeAndRecordMetrics(appendStream, bytes, readableBytes)
+ }
}
}
+
+ private def writeAndRecordMetrics(
+ hdfsStream: FSDataOutputStream,
+ bytes: Array[Byte],
+ size: Int): Unit = {
+ hdfsStream.write(bytes)
+ source.incCounter(WorkerSource.HDFS_FLUSH_COUNT)
+ source.incCounter(WorkerSource.HDFS_FLUSH_SIZE, size)
+ }
}
private[worker] class S3FlushTask(
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala
index 7ff64f7bd..90924d0ad 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala
@@ -26,7 +26,7 @@ import java.util.concurrent.atomic.AtomicInteger
import scala.collection.JavaConverters.asScalaBufferConverter
import io.netty.buffer.{ByteBuf, CompositeByteBuf}
-import org.apache.hadoop.fs.FileSystem
+import org.apache.hadoop.fs.{FileSystem, FSDataOutputStream}
import org.apache.celeborn.common.CelebornConf
import org.apache.celeborn.common.exception.AlreadyClosedException
@@ -532,6 +532,8 @@ class DfsTierWriter(
private val flushWorkerIndex: Int = flusher.getWorkerIndex
val hadoopFs: FileSystem = StorageManager.hadoopFs.get(storageType)
var deleted = false
+ private var hdfsStream: FSDataOutputStream = null
+ private val workerReuseHdfsOutputStreamEnabled =
conf.workerReuseHdfsOuputSteamEnabled
private var s3MultipartUploadHandler: MultipartUploadHandler = _
private var ossMultipartUploadHandler: MultipartUploadHandler = _
var partNumber: Int = 1
@@ -546,7 +548,10 @@ class DfsTierWriter(
}
try {
- hadoopFs.create(dfsFileInfo.getDfsPath, true).close()
+ hdfsStream = hadoopFs.create(dfsFileInfo.getDfsPath, true)
+ if (!workerReuseHdfsOutputStreamEnabled) {
+ closeHdfsStream()
+ }
hadoopFs.setReplication(dfsFileInfo.getDfsPath,
conf.workerDfsReplicationFactor.toShort);
if (dfsFileInfo.isS3) {
val uri = hadoopFs.getUri
@@ -590,7 +595,11 @@ class DfsTierWriter(
case ex: InterruptedException =>
throw new RuntimeException(ex)
}
- hadoopFs.create(dfsFileInfo.getDfsPath, true).close()
+ closeHdfsStream()
+ hdfsStream = hadoopFs.create(dfsFileInfo.getDfsPath, true)
+ if (!workerReuseHdfsOutputStreamEnabled) {
+ hdfsStream.close()
+ }
}
storageManager.registerDiskFilePartitionWriter(
@@ -598,6 +607,13 @@ class DfsTierWriter(
partitionDataWriterContext.getWorkingDir,
fileInfo.asInstanceOf[DiskFileInfo])
+ def closeHdfsStream(): Unit = {
+ if (hdfsStream != null) {
+ hdfsStream.close()
+ hdfsStream = null
+ }
+ }
+
override def needEvict(): Boolean = {
false
}
@@ -605,7 +621,7 @@ class DfsTierWriter(
override def genFlushTask(finalFlush: Boolean, keepBuffer: Boolean):
FlushTask = {
notifier.numPendingFlushes.incrementAndGet()
if (dfsFileInfo.isHdfs) {
- new HdfsFlushTask(flushBuffer, dfsFileInfo.getDfsPath(), notifier, true,
source)
+ new HdfsFlushTask(flushBuffer, hdfsStream, dfsFileInfo.getDfsPath(),
notifier, true, source)
} else if (dfsFileInfo.isOSS) {
val flushTask = new OssFlushTask(
flushBuffer,
@@ -659,6 +675,7 @@ class DfsTierWriter(
}
override def closeStreams(): Unit = {
+ closeHdfsStream()
if (hadoopFs.exists(dfsFileInfo.getDfsPeerWriterSuccessPath)) {
hadoopFs.delete(dfsFileInfo.getDfsPath, false)
deleted = true