This is an automated email from the ASF dual-hosted git repository.

nicholasjiang pushed a commit to branch branch-0.6
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/branch-0.6 by this push:
     new ef008d530 [CELEBORN-2243][FOLLOWUP][SPARK-2] During the close phase of 
hashWriter, pushData and mergeData are sent in parallel
ef008d530 is described below

commit ef008d5309503549a4193265b9c454aba89d70da
Author: jiucheng <[email protected]>
AuthorDate: Tue Jan 13 11:41:00 2026 +0800

    [CELEBORN-2243][FOLLOWUP][SPARK-2] During the close phase of hashWriter, 
pushData and mergeData are sent in parallel
    
    ### What changes were proposed in this pull request?
    
    same as https://github.com/apache/celeborn/pull/3574
    also do on spark-2
    
    ### Why are the changes needed?
    
    ### Does this PR resolve a correctness bug?
    
    ### Does this PR introduce _any_ user-facing change?
    
    ### How was this patch tested?
    
    Closes #3584 from mcdull-zhang/CELEBORN-2243.
    
    Authored-by: jiucheng <[email protected]>
    Signed-off-by: SteNicholas <[email protected]>
    (cherry picked from commit 7e4bea0e7e09ec57edb030745c13a41f4f96868a)
    Signed-off-by: SteNicholas <[email protected]>
---
 .../org/apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java  | 7 ++-----
 1 file changed, 2 insertions(+), 5 deletions(-)

diff --git 
a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java
 
b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java
index 7121ccdf9..de7d00b4e 100644
--- 
a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java
+++ 
b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java
@@ -332,11 +332,6 @@ public class HashBasedShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
   }
 
   private void close() throws IOException, InterruptedException {
-    // here we wait for all the in-flight batches to return which sent by 
dataPusher thread
-    dataPusher.waitOnTermination();
-    sendBufferPool.returnPushTaskQueue(dataPusher.getAndResetIdleQueue());
-    shuffleClient.prepareForMergeData(shuffleId, mapId, encodedAttemptId);
-
     // merge and push residual data to reduce network traffic
     // NB: since dataPusher thread have no in-flight data at this point,
     //     we now push merged data by task thread will not introduce any 
contention
@@ -369,6 +364,8 @@ public class HashBasedShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
     sendOffsets = null;
 
     long waitStartTime = System.nanoTime();
+    dataPusher.waitOnTermination();
+    sendBufferPool.returnPushTaskQueue(dataPusher.getAndResetIdleQueue());
     shuffleClient.mapperEnd(shuffleId, mapId, encodedAttemptId, numMappers, 
numPartitions);
     writeMetrics.incWriteTime(System.nanoTime() - waitStartTime);
 

Reply via email to