This is an automated email from the ASF dual-hosted git repository.

nicholasjiang pushed a commit to branch branch-0.6
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/branch-0.6 by this push:
     new 364683651 [CELEBORN-2243] During the close phase of hashWriter, 
pushData and mergeData are sent in parallel
364683651 is described below

commit 3646836516961d810af07577d6d8cc948f25015e
Author: jiucheng <[email protected]>
AuthorDate: Mon Jan 12 15:51:16 2026 +0800

    [CELEBORN-2243] During the close phase of hashWriter, pushData and 
mergeData are sent in parallel
    
    ### What changes were proposed in this pull request?
    Optimize the close process of HashBasedShuffleWriter:
    
    BeforePR:First, synchronously wait for pushData in the dataPusher queue to 
complete, then send mergeData and synchronously wait.
    AfterPR:During the process of sending pushData by dataPusher, mergeData is 
also constructed and sent simultaneously.
    
    Optimization results: The table below shows the shuffle write time for TPCH 
1TB.
    | | SortBasedShuffleWriter | HashBasedShuffleWriter |
    |--------|--------|--------|
    | Before | 615.44W ms | 502.31W ms |
    | After |  442.47W ms | 434.77W ms |
    
    Note: This is the result of multiple optimizations combined;
    other optimizations will be submitted as pull requests in the future.
    
    ### Why are the changes needed?
    Improve performance by increasing the parallelism of pushing/merging data.
    
    ### Does this PR resolve a correctness bug?
    
    NO
    
    ### Does this PR introduce _any_ user-facing change?
    
    NO
    
    ### How was this patch tested?
    
    Exists Unit Test.
    
    Closes #3574 from mcdull-zhang/hash_close_opt.
    
    Authored-by: jiucheng <[email protected]>
    Signed-off-by: SteNicholas <[email protected]>
    (cherry picked from commit 9fca94bac6a7d35f7d83735e6a754a9f3f84d05a)
    Signed-off-by: SteNicholas <[email protected]>
---
 .../apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java    | 7 +++----
 .../main/java/org/apache/celeborn/client/DummyShuffleClient.java | 3 ---
 .../src/main/java/org/apache/celeborn/client/ShuffleClient.java  | 3 ---
 .../main/java/org/apache/celeborn/client/ShuffleClientImpl.java  | 9 ---------
 4 files changed, 3 insertions(+), 19 deletions(-)

diff --git 
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java
 
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java
index d5a0fdf22..df0c6d1eb 100644
--- 
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java
+++ 
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java
@@ -367,17 +367,16 @@ public class HashBasedShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
   }
 
   private void close() throws IOException, InterruptedException {
-    // here we wait for all the in-flight batches to return which sent by 
dataPusher thread
+    // Send the remaining data in sendBuffer
     long pushMergedDataTime = System.nanoTime();
-    dataPusher.waitOnTermination();
-    sendBufferPool.returnPushTaskQueue(dataPusher.getAndResetIdleQueue());
-    shuffleClient.prepareForMergeData(shuffleId, mapId, encodedAttemptId);
     closeWrite();
     shuffleClient.pushMergedData(shuffleId, mapId, encodedAttemptId);
     writeMetrics.incWriteTime(System.nanoTime() - pushMergedDataTime);
     updateRecordsWrittenMetrics();
 
     long waitStartTime = System.nanoTime();
+    dataPusher.waitOnTermination();
+    sendBufferPool.returnPushTaskQueue(dataPusher.getAndResetIdleQueue());
     shuffleClient.mapperEnd(shuffleId, mapId, encodedAttemptId, numMappers, 
numPartitions);
     writeMetrics.incWriteTime(System.nanoTime() - waitStartTime);
 
diff --git 
a/client/src/main/java/org/apache/celeborn/client/DummyShuffleClient.java 
b/client/src/main/java/org/apache/celeborn/client/DummyShuffleClient.java
index 69cc3cd6f..6ca3406be 100644
--- a/client/src/main/java/org/apache/celeborn/client/DummyShuffleClient.java
+++ b/client/src/main/java/org/apache/celeborn/client/DummyShuffleClient.java
@@ -92,9 +92,6 @@ public class DummyShuffleClient extends ShuffleClient {
     return length;
   }
 
-  @Override
-  public void prepareForMergeData(int shuffleId, int mapId, int attemptId) 
throws IOException {}
-
   @Override
   public int mergeData(
       int shuffleId,
diff --git a/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java 
b/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java
index d37ec6442..e4c473a9b 100644
--- a/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java
+++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java
@@ -186,9 +186,6 @@ public abstract class ShuffleClient {
       int numPartitions)
       throws IOException;
 
-  public abstract void prepareForMergeData(int shuffleId, int mapId, int 
attemptId)
-      throws IOException;
-
   public abstract int mergeData(
       int shuffleId,
       int mapId,
diff --git 
a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java 
b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
index cfe40a296..b95fdb2ff 100644
--- a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
+++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
@@ -1375,15 +1375,6 @@ public class ShuffleClientImpl extends ShuffleClient {
         false);
   }
 
-  @Override
-  public void prepareForMergeData(int shuffleId, int mapId, int attemptId) 
throws IOException {
-    final String mapKey = Utils.makeMapKey(shuffleId, mapId, attemptId);
-    PushState pushState = pushStates.get(mapKey);
-    if (pushState != null) {
-      limitZeroInFlight(mapKey, pushState);
-    }
-  }
-
   @Override
   public int mergeData(
       int shuffleId,

Reply via email to