This is an automated email from the ASF dual-hosted git repository.

angerszhuuuu pushed a commit to branch CELEBORN-1334
in repository https://gitbox.apache.org/repos/asf/celeborn.git

commit 7b429f7d29705389730eed8d81dcdd8f009feaba
Author: Angerszhuuuu <[email protected]>
AuthorDate: Tue Mar 25 16:28:56 2025 +0800

    CELEBORN-1334 [CIP-12] Support HARD_SPLIT in PushMergedData should support 
compatibility of old worker success rpc with no content
---
 .../apache/celeborn/client/ShuffleClientImpl.java  | 226 +++++++++++----------
 1 file changed, 117 insertions(+), 109 deletions(-)

diff --git 
a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java 
b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
index be6b5aa3f..955ed9b80 100644
--- a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
+++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
@@ -1490,66 +1490,121 @@ public class ShuffleClientImpl extends ShuffleClient {
         new RpcResponseCallback() {
           @Override
           public void onSuccess(ByteBuffer response) {
-            byte reason = response.get();
-            if (reason == StatusCode.HARD_SPLIT.getValue()) {
-              ArrayList<DataBatches.DataBatch> batchesNeedResubmit;
-              if (response.remaining() > 0) {
-                batchesNeedResubmit = new ArrayList<>();
-                PbPushMergedDataSplitPartitionInfo partitionInfo;
-                try {
-                  partitionInfo = 
TransportMessage.fromByteBuffer(response).getParsedPayload();
-                } catch (CelebornIOException | InvalidProtocolBufferException 
e) {
-                  callback.onFailure(
-                      new CelebornIOException("parse pushMergedData response 
failed", e));
-                  return;
+            if (response.remaining() > 0) {
+              byte reason = response.get();
+              if (reason == StatusCode.HARD_SPLIT.getValue()) {
+                ArrayList<DataBatches.DataBatch> batchesNeedResubmit;
+                if (response.remaining() > 0) {
+                  batchesNeedResubmit = new ArrayList<>();
+                  PbPushMergedDataSplitPartitionInfo partitionInfo;
+                  try {
+                    partitionInfo = 
TransportMessage.fromByteBuffer(response).getParsedPayload();
+                  } catch (CelebornIOException | 
InvalidProtocolBufferException e) {
+                    callback.onFailure(
+                        new CelebornIOException("parse pushMergedData response 
failed", e));
+                    return;
+                  }
+                  List<Integer> splitPartitionIndexes = 
partitionInfo.getSplitPartitionIndexesList();
+                  List<Integer> statusCodeList = 
partitionInfo.getStatusCodesList();
+                  StringBuilder dataBatchReviveInfos = new StringBuilder();
+                  for (int i = 0; i < splitPartitionIndexes.size(); i++) {
+                    int partitionIndex = splitPartitionIndexes.get(i);
+                    int batchId = batches.get(partitionIndex).batchId;
+                    dataBatchReviveInfos.append(
+                        String.format(
+                            "(batchId=%d, partitionId=%d, cause=%s)",
+                            batchId,
+                            partitionIds[partitionIndex],
+                            
StatusCode.fromValue(statusCodeList.get(i).byteValue())));
+                    if (statusCodeList.get(i) == 
StatusCode.SOFT_SPLIT.getValue()) {
+                      PartitionLocation loc = batches.get(partitionIndex).loc;
+                      if (!newerPartitionLocationExists(
+                          reducePartitionMap.get(shuffleId), loc.getId(), 
loc.getEpoch(), false)) {
+                        ReviveRequest reviveRequest =
+                            new ReviveRequest(
+                                shuffleId,
+                                mapId,
+                                attemptId,
+                                loc.getId(),
+                                loc.getEpoch(),
+                                loc,
+                                StatusCode.SOFT_SPLIT);
+                        reviveManager.addRequest(reviveRequest);
+                      }
+                    } else {
+                      batchesNeedResubmit.add(batches.get(partitionIndex));
+                    }
+                  }
+                  logger.info(
+                      "Push merged data to {} partial success required for 
shuffle {} map {} attempt {} groupedBatch {}. split batches {}.",
+                      addressPair,
+                      shuffleId,
+                      mapId,
+                      attemptId,
+                      groupedBatchId,
+                      dataBatchReviveInfos);
+                } else {
+                  // Workers that do not incorporate changes from 
[CELEBORN-1721]
+                  // will respond with a status of HARD_SPLIT,
+                  // but will not include a PbPushMergedDataSplitPartitionInfo.
+                  // For backward compatibility, all batches must be 
resubmitted.
+                  batchesNeedResubmit = batches;
+                  logger.info(
+                      "Push merged data to {} hard split required for shuffle 
{} map {} attempt {} partition {} groupedBatch {} batch {}.",
+                      addressPair,
+                      shuffleId,
+                      mapId,
+                      attemptId,
+                      Arrays.toString(partitionIds),
+                      groupedBatchId,
+                      Arrays.toString(batchIds));
                 }
-                List<Integer> splitPartitionIndexes = 
partitionInfo.getSplitPartitionIndexesList();
-                List<Integer> statusCodeList = 
partitionInfo.getStatusCodesList();
-                StringBuilder dataBatchReviveInfos = new StringBuilder();
-                for (int i = 0; i < splitPartitionIndexes.size(); i++) {
-                  int partitionIndex = splitPartitionIndexes.get(i);
-                  int batchId = batches.get(partitionIndex).batchId;
-                  dataBatchReviveInfos.append(
-                      String.format(
-                          "(batchId=%d, partitionId=%d, cause=%s)",
-                          batchId,
-                          partitionIds[partitionIndex],
-                          
StatusCode.fromValue(statusCodeList.get(i).byteValue())));
-                  if (statusCodeList.get(i) == 
StatusCode.SOFT_SPLIT.getValue()) {
-                    PartitionLocation loc = batches.get(partitionIndex).loc;
-                    if (!newerPartitionLocationExists(
-                        reducePartitionMap.get(shuffleId), loc.getId(), 
loc.getEpoch(), false)) {
-                      ReviveRequest reviveRequest =
-                          new ReviveRequest(
+                if (batchesNeedResubmit.isEmpty()) {
+                  pushState.onSuccess(hostPort);
+                  callback.onSuccess(ByteBuffer.wrap(new byte[] 
{StatusCode.SOFT_SPLIT.getValue()}));
+                } else {
+                  if (dataPushFailureTrackingEnabled) {
+                    for (DataBatches.DataBatch resubmitBatch : 
batchesNeedResubmit) {
+                      pushState.addFailedBatch(
+                          resubmitBatch.loc.getUniqueId(),
+                          new PushFailedBatch(mapId, attemptId, 
resubmitBatch.batchId));
+                    }
+                  }
+                  ReviveRequest[] requests =
+                      addAndGetReviveRequests(
+                          shuffleId, mapId, attemptId, batchesNeedResubmit, 
StatusCode.HARD_SPLIT);
+                  pushDataRetryPool.submit(
+                      () ->
+                          submitRetryPushMergedData(
+                              pushState,
                               shuffleId,
                               mapId,
                               attemptId,
-                              loc.getId(),
-                              loc.getEpoch(),
-                              loc,
-                              StatusCode.SOFT_SPLIT);
-                      reviveManager.addRequest(reviveRequest);
-                    }
-                  } else {
-                    batchesNeedResubmit.add(batches.get(partitionIndex));
-                  }
+                              batchesNeedResubmit,
+                              StatusCode.HARD_SPLIT,
+                              groupedBatchId,
+                              requests,
+                              remainReviveTimes,
+                              System.currentTimeMillis()
+                                  + 
conf.clientRpcRequestPartitionLocationAskTimeout()
+                                  .duration()
+                                  .toMillis()));
                 }
-                logger.info(
-                    "Push merged data to {} partial success required for 
shuffle {} map {} attempt {} groupedBatch {}. split batches {}.",
+              } else if (reason == 
StatusCode.PUSH_DATA_SUCCESS_PRIMARY_CONGESTED.getValue()) {
+                logger.debug(
+                    "Push merged data to {} primary congestion required for 
shuffle {} map {} attempt {} partition {} groupedBatch {} batch {}.",
                     addressPair,
                     shuffleId,
                     mapId,
                     attemptId,
+                    Arrays.toString(partitionIds),
                     groupedBatchId,
-                    dataBatchReviveInfos);
-              } else {
-                // Workers that do not incorporate changes from [CELEBORN-1721]
-                // will respond with a status of HARD_SPLIT,
-                // but will not include a PbPushMergedDataSplitPartitionInfo.
-                // For backward compatibility, all batches must be resubmitted.
-                batchesNeedResubmit = batches;
-                logger.info(
-                    "Push merged data to {} hard split required for shuffle {} 
map {} attempt {} partition {} groupedBatch {} batch {}.",
+                    Arrays.toString(batchIds));
+                pushState.onCongestControl(hostPort);
+                callback.onSuccess(response);
+              } else if (reason == 
StatusCode.PUSH_DATA_SUCCESS_REPLICA_CONGESTED.getValue()) {
+                logger.debug(
+                    "Push merged data to {} replica congestion required for 
shuffle {} map {} attempt {} partition {} groupedBatch {} batch {}.",
                     addressPair,
                     shuffleId,
                     mapId,
@@ -1557,68 +1612,21 @@ public class ShuffleClientImpl extends ShuffleClient {
                     Arrays.toString(partitionIds),
                     groupedBatchId,
                     Arrays.toString(batchIds));
-              }
-              if (batchesNeedResubmit.isEmpty()) {
+                pushState.onCongestControl(hostPort);
+                callback.onSuccess(response);
+              } else if (reason == StatusCode.MAP_ENDED.getValue()) {
                 pushState.onSuccess(hostPort);
-                callback.onSuccess(ByteBuffer.wrap(new byte[] 
{StatusCode.SOFT_SPLIT.getValue()}));
-              } else {
-                if (dataPushFailureTrackingEnabled) {
-                  for (DataBatches.DataBatch resubmitBatch : 
batchesNeedResubmit) {
-                    pushState.addFailedBatch(
-                        resubmitBatch.loc.getUniqueId(),
-                        new PushFailedBatch(mapId, attemptId, 
resubmitBatch.batchId));
-                  }
-                }
-                ReviveRequest[] requests =
-                    addAndGetReviveRequests(
-                        shuffleId, mapId, attemptId, batchesNeedResubmit, 
StatusCode.HARD_SPLIT);
-                pushDataRetryPool.submit(
-                    () ->
-                        submitRetryPushMergedData(
-                            pushState,
-                            shuffleId,
-                            mapId,
-                            attemptId,
-                            batchesNeedResubmit,
-                            StatusCode.HARD_SPLIT,
-                            groupedBatchId,
-                            requests,
-                            remainReviveTimes,
-                            System.currentTimeMillis()
-                                + 
conf.clientRpcRequestPartitionLocationAskTimeout()
-                                    .duration()
-                                    .toMillis()));
+                callback.onSuccess(ByteBuffer.wrap(new byte[] 
{StatusCode.MAP_ENDED.getValue()}));
+              } else { // success
+                pushState.onSuccess(hostPort);
+                callback.onSuccess(ByteBuffer.wrap(new byte[] 
{StatusCode.SUCCESS.getValue()}));
               }
-            } else if (reason == 
StatusCode.PUSH_DATA_SUCCESS_PRIMARY_CONGESTED.getValue()) {
-              logger.debug(
-                  "Push merged data to {} primary congestion required for 
shuffle {} map {} attempt {} partition {} groupedBatch {} batch {}.",
-                  addressPair,
-                  shuffleId,
-                  mapId,
-                  attemptId,
-                  Arrays.toString(partitionIds),
-                  groupedBatchId,
-                  Arrays.toString(batchIds));
-              pushState.onCongestControl(hostPort);
-              callback.onSuccess(response);
-            } else if (reason == 
StatusCode.PUSH_DATA_SUCCESS_REPLICA_CONGESTED.getValue()) {
-              logger.debug(
-                  "Push merged data to {} replica congestion required for 
shuffle {} map {} attempt {} partition {} groupedBatch {} batch {}.",
-                  addressPair,
-                  shuffleId,
-                  mapId,
-                  attemptId,
-                  Arrays.toString(partitionIds),
-                  groupedBatchId,
-                  Arrays.toString(batchIds));
-              pushState.onCongestControl(hostPort);
-              callback.onSuccess(response);
-            } else if (reason == StatusCode.MAP_ENDED.getValue()) {
-              pushState.onSuccess(hostPort);
-              callback.onSuccess(ByteBuffer.wrap(new byte[] 
{StatusCode.MAP_ENDED.getValue()}));
-            } else { // success
+            } else {
+              // Workers that do not incorporate changes from [CELEBORN-1721]
+              // will respond with a status of empty response.
+              //  For backward compatibility, we should keep logic of check 
response remaining.
               pushState.onSuccess(hostPort);
-              callback.onSuccess(ByteBuffer.wrap(new byte[] 
{StatusCode.SUCCESS.getValue()}));
+              callback.onSuccess(response);
             }
           }
 

Reply via email to