This is an automated email from the ASF dual-hosted git repository. ethanfeng pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push: new 6c102441c [CELEBORN-2138] Avoiding multiple accesses to HDFS When writting index file 6c102441c is described below commit 6c102441c35c7bf17346f16aa1d5fa25785bb6fb Author: taowenjun <1483633...@qq.com> AuthorDate: Wed Sep 3 10:43:13 2025 +0800 [CELEBORN-2138] Avoiding multiple accesses to HDFS When writting index file ### What changes were proposed in this pull request? Avoiding multiple accesses to HDFS When writting index file. ### Why are the changes needed? ### Does this PR introduce _any_ user-facing change? ### How was this patch tested? Closes #3460 from taowenjun/CELEBORN-2138. Authored-by: taowenjun <1483633...@qq.com> Signed-off-by: mingji <fengmingxiao....@alibaba-inc.com> --- .../service/deploy/worker/storage/TierWriter.scala | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala index 23ee7f9d1..e083108e8 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala @@ -17,7 +17,7 @@ package org.apache.celeborn.service.deploy.worker.storage -import java.io.IOException +import java.io.{ByteArrayOutputStream, DataOutputStream, IOException} import java.nio.ByteBuffer import java.nio.channels.FileChannel import java.util.concurrent.TimeUnit @@ -648,9 +648,16 @@ class DfsTierWriter( hadoopFs.create(dfsFileInfo.getDfsWriterSuccessPath).close() if (dfsFileInfo.isReduceFileMeta) { val indexOutputStream = hadoopFs.create(dfsFileInfo.getDfsIndexPath) - indexOutputStream.writeInt(dfsFileInfo.getReduceFileMeta.getChunkOffsets.size) - for (offset <- dfsFileInfo.getReduceFileMeta.getChunkOffsets.asScala) { - indexOutputStream.writeLong(offset) + val byteStream: ByteArrayOutputStream = new ByteArrayOutputStream() + val dataStream = new DataOutputStream(byteStream) + try { + dataStream.writeInt(dfsFileInfo.getReduceFileMeta.getChunkOffsets.size) + for (offset <- dfsFileInfo.getReduceFileMeta.getChunkOffsets.asScala) { + dataStream.writeLong(offset) + } + indexOutputStream.write(byteStream.toByteArray) + } finally if (dataStream != null) { + dataStream.close() } indexOutputStream.close() }