This is an automated email from the ASF dual-hosted git repository.
nicholasjiang pushed a commit to branch branch-0.6
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/branch-0.6 by this push:
new ef008d530 [CELEBORN-2243][FOLLOWUP][SPARK-2] During the close phase of
hashWriter, pushData and mergeData are sent in parallel
ef008d530 is described below
commit ef008d5309503549a4193265b9c454aba89d70da
Author: jiucheng <[email protected]>
AuthorDate: Tue Jan 13 11:41:00 2026 +0800
[CELEBORN-2243][FOLLOWUP][SPARK-2] During the close phase of hashWriter,
pushData and mergeData are sent in parallel
### What changes were proposed in this pull request?
same as https://github.com/apache/celeborn/pull/3574
also do on spark-2
### Why are the changes needed?
### Does this PR resolve a correctness bug?
### Does this PR introduce _any_ user-facing change?
### How was this patch tested?
Closes #3584 from mcdull-zhang/CELEBORN-2243.
Authored-by: jiucheng <[email protected]>
Signed-off-by: SteNicholas <[email protected]>
(cherry picked from commit 7e4bea0e7e09ec57edb030745c13a41f4f96868a)
Signed-off-by: SteNicholas <[email protected]>
---
.../org/apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java | 7 ++-----
1 file changed, 2 insertions(+), 5 deletions(-)
diff --git
a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java
b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java
index 7121ccdf9..de7d00b4e 100644
---
a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java
+++
b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java
@@ -332,11 +332,6 @@ public class HashBasedShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
}
private void close() throws IOException, InterruptedException {
- // here we wait for all the in-flight batches to return which sent by
dataPusher thread
- dataPusher.waitOnTermination();
- sendBufferPool.returnPushTaskQueue(dataPusher.getAndResetIdleQueue());
- shuffleClient.prepareForMergeData(shuffleId, mapId, encodedAttemptId);
-
// merge and push residual data to reduce network traffic
// NB: since dataPusher thread have no in-flight data at this point,
// we now push merged data by task thread will not introduce any
contention
@@ -369,6 +364,8 @@ public class HashBasedShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
sendOffsets = null;
long waitStartTime = System.nanoTime();
+ dataPusher.waitOnTermination();
+ sendBufferPool.returnPushTaskQueue(dataPusher.getAndResetIdleQueue());
shuffleClient.mapperEnd(shuffleId, mapId, encodedAttemptId, numMappers,
numPartitions);
writeMetrics.incWriteTime(System.nanoTime() - waitStartTime);