This is an automated email from the ASF dual-hosted git repository.

ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 6c102441c [CELEBORN-2138] Avoiding multiple accesses to HDFS When 
writting index file
6c102441c is described below

commit 6c102441c35c7bf17346f16aa1d5fa25785bb6fb
Author: taowenjun <1483633...@qq.com>
AuthorDate: Wed Sep 3 10:43:13 2025 +0800

    [CELEBORN-2138] Avoiding multiple accesses to HDFS When writting index file
    
    ### What changes were proposed in this pull request?
    
    Avoiding multiple accesses to HDFS When writting index file.
    
    ### Why are the changes needed?
    
    ### Does this PR introduce _any_ user-facing change?
    
    ### How was this patch tested?
    
    Closes #3460 from taowenjun/CELEBORN-2138.
    
    Authored-by: taowenjun <1483633...@qq.com>
    Signed-off-by: mingji <fengmingxiao....@alibaba-inc.com>
---
 .../service/deploy/worker/storage/TierWriter.scala        | 15 +++++++++++----
 1 file changed, 11 insertions(+), 4 deletions(-)

diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala
index 23ee7f9d1..e083108e8 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala
@@ -17,7 +17,7 @@
 
 package org.apache.celeborn.service.deploy.worker.storage
 
-import java.io.IOException
+import java.io.{ByteArrayOutputStream, DataOutputStream, IOException}
 import java.nio.ByteBuffer
 import java.nio.channels.FileChannel
 import java.util.concurrent.TimeUnit
@@ -648,9 +648,16 @@ class DfsTierWriter(
       hadoopFs.create(dfsFileInfo.getDfsWriterSuccessPath).close()
       if (dfsFileInfo.isReduceFileMeta) {
         val indexOutputStream = hadoopFs.create(dfsFileInfo.getDfsIndexPath)
-        
indexOutputStream.writeInt(dfsFileInfo.getReduceFileMeta.getChunkOffsets.size)
-        for (offset <- dfsFileInfo.getReduceFileMeta.getChunkOffsets.asScala) {
-          indexOutputStream.writeLong(offset)
+        val byteStream: ByteArrayOutputStream = new ByteArrayOutputStream()
+        val dataStream = new DataOutputStream(byteStream)
+        try {
+          
dataStream.writeInt(dfsFileInfo.getReduceFileMeta.getChunkOffsets.size)
+          for (offset <- 
dfsFileInfo.getReduceFileMeta.getChunkOffsets.asScala) {
+            dataStream.writeLong(offset)
+          }
+          indexOutputStream.write(byteStream.toByteArray)
+        } finally if (dataStream != null) {
+          dataStream.close()
         }
         indexOutputStream.close()
       }

Reply via email to