This is an automated email from the ASF dual-hosted git repository.
nicholasjiang pushed a commit to branch branch-0.6
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/branch-0.6 by this push:
new 364683651 [CELEBORN-2243] During the close phase of hashWriter,
pushData and mergeData are sent in parallel
364683651 is described below
commit 3646836516961d810af07577d6d8cc948f25015e
Author: jiucheng <[email protected]>
AuthorDate: Mon Jan 12 15:51:16 2026 +0800
[CELEBORN-2243] During the close phase of hashWriter, pushData and
mergeData are sent in parallel
### What changes were proposed in this pull request?
Optimize the close process of HashBasedShuffleWriter:
BeforePR:First, synchronously wait for pushData in the dataPusher queue to
complete, then send mergeData and synchronously wait.
AfterPR:During the process of sending pushData by dataPusher, mergeData is
also constructed and sent simultaneously.
Optimization results: The table below shows the shuffle write time for TPCH
1TB.
| | SortBasedShuffleWriter | HashBasedShuffleWriter |
|--------|--------|--------|
| Before | 615.44W ms | 502.31W ms |
| After | 442.47W ms | 434.77W ms |
Note: This is the result of multiple optimizations combined;
other optimizations will be submitted as pull requests in the future.
### Why are the changes needed?
Improve performance by increasing the parallelism of pushing/merging data.
### Does this PR resolve a correctness bug?
NO
### Does this PR introduce _any_ user-facing change?
NO
### How was this patch tested?
Exists Unit Test.
Closes #3574 from mcdull-zhang/hash_close_opt.
Authored-by: jiucheng <[email protected]>
Signed-off-by: SteNicholas <[email protected]>
(cherry picked from commit 9fca94bac6a7d35f7d83735e6a754a9f3f84d05a)
Signed-off-by: SteNicholas <[email protected]>
---
.../apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java | 7 +++----
.../main/java/org/apache/celeborn/client/DummyShuffleClient.java | 3 ---
.../src/main/java/org/apache/celeborn/client/ShuffleClient.java | 3 ---
.../main/java/org/apache/celeborn/client/ShuffleClientImpl.java | 9 ---------
4 files changed, 3 insertions(+), 19 deletions(-)
diff --git
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java
index d5a0fdf22..df0c6d1eb 100644
---
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java
+++
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java
@@ -367,17 +367,16 @@ public class HashBasedShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
}
private void close() throws IOException, InterruptedException {
- // here we wait for all the in-flight batches to return which sent by
dataPusher thread
+ // Send the remaining data in sendBuffer
long pushMergedDataTime = System.nanoTime();
- dataPusher.waitOnTermination();
- sendBufferPool.returnPushTaskQueue(dataPusher.getAndResetIdleQueue());
- shuffleClient.prepareForMergeData(shuffleId, mapId, encodedAttemptId);
closeWrite();
shuffleClient.pushMergedData(shuffleId, mapId, encodedAttemptId);
writeMetrics.incWriteTime(System.nanoTime() - pushMergedDataTime);
updateRecordsWrittenMetrics();
long waitStartTime = System.nanoTime();
+ dataPusher.waitOnTermination();
+ sendBufferPool.returnPushTaskQueue(dataPusher.getAndResetIdleQueue());
shuffleClient.mapperEnd(shuffleId, mapId, encodedAttemptId, numMappers,
numPartitions);
writeMetrics.incWriteTime(System.nanoTime() - waitStartTime);
diff --git
a/client/src/main/java/org/apache/celeborn/client/DummyShuffleClient.java
b/client/src/main/java/org/apache/celeborn/client/DummyShuffleClient.java
index 69cc3cd6f..6ca3406be 100644
--- a/client/src/main/java/org/apache/celeborn/client/DummyShuffleClient.java
+++ b/client/src/main/java/org/apache/celeborn/client/DummyShuffleClient.java
@@ -92,9 +92,6 @@ public class DummyShuffleClient extends ShuffleClient {
return length;
}
- @Override
- public void prepareForMergeData(int shuffleId, int mapId, int attemptId)
throws IOException {}
-
@Override
public int mergeData(
int shuffleId,
diff --git a/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java
b/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java
index d37ec6442..e4c473a9b 100644
--- a/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java
+++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java
@@ -186,9 +186,6 @@ public abstract class ShuffleClient {
int numPartitions)
throws IOException;
- public abstract void prepareForMergeData(int shuffleId, int mapId, int
attemptId)
- throws IOException;
-
public abstract int mergeData(
int shuffleId,
int mapId,
diff --git
a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
index cfe40a296..b95fdb2ff 100644
--- a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
+++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
@@ -1375,15 +1375,6 @@ public class ShuffleClientImpl extends ShuffleClient {
false);
}
- @Override
- public void prepareForMergeData(int shuffleId, int mapId, int attemptId)
throws IOException {
- final String mapKey = Utils.makeMapKey(shuffleId, mapId, attemptId);
- PushState pushState = pushStates.get(mapKey);
- if (pushState != null) {
- limitZeroInFlight(mapKey, pushState);
- }
- }
-
@Override
public int mergeData(
int shuffleId,