This is an automated email from the ASF dual-hosted git repository.

feiwang pushed a commit to branch branch-0.6
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/branch-0.6 by this push:
     new 458da32a7 [CELEBORN-1721][FOLLOWUP] Return softsplit if there is no 
hardsplit for pushMergeData
458da32a7 is described below

commit 458da32a73e2bbbe10bfe4b73542c77b2914a8a9
Author: Shuang <[email protected]>
AuthorDate: Mon Jun 23 17:32:10 2025 -0700

    [CELEBORN-1721][FOLLOWUP] Return softsplit if there is no hardsplit for 
pushMergeData
    
    ### What changes were proposed in this pull request?
    Return SoftSplit status when there is no hard split for pushMergeData
    
    ### Why are the changes needed?
    HardSplit support was introduced in 
[CELEBORN-1721](https://issues.apache.org/jira/browse/CELEBORN-1721), and it 
works well when the client and server are of the same version. However, using a 
0.5.x client with a 0.6.x server can significantly impact shuffle write 
performance. This is because the 0.6.x server returns hardsplit status whenever 
there is any soft or hard split, leading the 0.5.x client to perform hardsplit 
on every partition. To maintain compatibility during upgrades, it [...]
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Pass GA & 1T tpcds with 0.5.4 version client, according to the test 
results, after applying this PR, revive decreased significantly, and 
performance improved.
    
    #### test use 0.6.0-rc2 server, 0.5.4 client
    
![image](https://github.com/user-attachments/assets/f9d640d7-1dc4-438b-8320-428a7d23dc93)
    
    #### test use 0.7.0 server + this pr,  0.5.4 client
    
![image](https://github.com/user-attachments/assets/d198055a-698c-48ec-9246-9170d2ac64cc)
    
    Closes #3342 from RexXiong/CELEBORN-1721-FOLLOWUP.
    
    Authored-by: Shuang <[email protected]>
    Signed-off-by: Wang, Fei <[email protected]>
    (cherry picked from commit 582726fff8b338ab55cb8dd54da57d97dc881527)
    Signed-off-by: Wang, Fei <[email protected]>
---
 .../apache/celeborn/client/ShuffleClientImpl.java  |  3 ++-
 .../service/deploy/worker/PushDataHandler.scala    | 22 ++++++++++++++++++----
 2 files changed, 20 insertions(+), 5 deletions(-)

diff --git 
a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java 
b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
index d11455f2e..d9a1e67d6 100644
--- a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
+++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
@@ -1489,7 +1489,8 @@ public class ShuffleClientImpl extends ShuffleClient {
           @Override
           public void onSuccess(ByteBuffer response) {
             byte reason = response.get();
-            if (reason == StatusCode.HARD_SPLIT.getValue()) {
+            if (reason == StatusCode.HARD_SPLIT.getValue()
+                || reason == StatusCode.SOFT_SPLIT.getValue()) {
               ArrayList<DataBatches.DataBatch> batchesNeedResubmit;
               if (response.remaining() > 0) {
                 batchesNeedResubmit = new ArrayList<>();
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
index 9edf91741..85eb45dbd 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
@@ -469,7 +469,7 @@ class PushDataHandler(val workerSource: WorkerSource) 
extends BaseMessageHandler
           key,
           callback)
       }
-    val pushMergedDataCallback = new PushMergedDataCallback(callbackWithTimer)
+    val pushMergedDataCallback = new PushMergedDataCallback(callbackWithTimer, 
shuffleKey)
 
     // For test
     if (isPrimary && testPushPrimaryDataTimeout &&
@@ -651,7 +651,7 @@ class PushDataHandler(val workerSource: WorkerSource) 
extends BaseMessageHandler
                     } else {
                       StatusCode.SUCCESS
                     }
-                  if (replicaReason == StatusCode.HARD_SPLIT.getValue) {
+                  if (replicaReason == StatusCode.HARD_SPLIT.getValue || 
replicaReason == StatusCode.SOFT_SPLIT.getValue) {
                     if (response.remaining() > 0) {
                       try {
                         val pushMergedDataResponse: 
PbPushMergedDataSplitPartitionInfo =
@@ -889,7 +889,7 @@ class PushDataHandler(val workerSource: WorkerSource) 
extends BaseMessageHandler
     }
   }
 
-  class PushMergedDataCallback(callback: RpcResponseCallback) {
+  class PushMergedDataCallback(callback: RpcResponseCallback, val shuffleKey: 
String) {
     private val splitPartitionStatuses = new mutable.HashMap[Int, Byte]()
 
     def addSplitPartition(index: Int, statusCode: StatusCode): Unit = {
@@ -930,10 +930,16 @@ class PushDataHandler(val workerSource: WorkerSource) 
extends BaseMessageHandler
     def onSuccess(status: StatusCode): Unit = {
       val splitPartitionIndexes = new util.ArrayList[Integer]()
       val statusCodes = new util.ArrayList[Integer]()
+      var hasHardSplit = false
+
       splitPartitionStatuses.foreach {
         case (partitionIndex, statusCode) =>
           splitPartitionIndexes.add(partitionIndex)
           statusCodes.add(statusCode)
+          // check if there is any hard split partition
+          if (statusCode == StatusCode.HARD_SPLIT.getValue) {
+            hasHardSplit = true
+          }
       }
       if (splitPartitionStatuses.isEmpty || status == StatusCode.MAP_ENDED) {
         callback.onSuccess(
@@ -947,7 +953,15 @@ class PushDataHandler(val workerSource: WorkerSource) 
extends BaseMessageHandler
           .asInstanceOf[TransportMessage]
           .toByteBuffer
         val response = ByteBuffer.allocate(1 + 
pushMergedDataInfoByteBuffer.remaining())
-        response.put(StatusCode.HARD_SPLIT.getValue)
+
+        if (!hasHardSplit) {
+          logDebug(s"PushMergedData has no hard split for shuffle $shuffleKey, 
" +
+            s"splitPartitionIndex $splitPartitionIndexes")
+          response.put(StatusCode.SOFT_SPLIT.getValue)
+        } else {
+          response.put(StatusCode.HARD_SPLIT.getValue)
+        }
+
         response.put(pushMergedDataInfoByteBuffer)
         response.flip()
         callback.onSuccess(response)

Reply via email to