AngersZhuuuu commented on code in PR #3171:
URL: https://github.com/apache/celeborn/pull/3171#discussion_r2011566235


##########
client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java:
##########
@@ -1490,135 +1490,143 @@ public void onFailure(Throwable e) {
         new RpcResponseCallback() {
           @Override
           public void onSuccess(ByteBuffer response) {
-            byte reason = response.get();
-            if (reason == StatusCode.HARD_SPLIT.getValue()) {
-              ArrayList<DataBatches.DataBatch> batchesNeedResubmit;
-              if (response.remaining() > 0) {
-                batchesNeedResubmit = new ArrayList<>();
-                PbPushMergedDataSplitPartitionInfo partitionInfo;
-                try {
-                  partitionInfo = 
TransportMessage.fromByteBuffer(response).getParsedPayload();
-                } catch (CelebornIOException | InvalidProtocolBufferException 
e) {
-                  callback.onFailure(
-                      new CelebornIOException("parse pushMergedData response 
failed", e));
-                  return;
+            if (response.remaining() > 0) {
+              byte reason = response.get();
+              if (reason == StatusCode.HARD_SPLIT.getValue()) {
+                ArrayList<DataBatches.DataBatch> batchesNeedResubmit;
+                if (response.remaining() > 0) {
+                  batchesNeedResubmit = new ArrayList<>();
+                  PbPushMergedDataSplitPartitionInfo partitionInfo;
+                  try {
+                    partitionInfo = 
TransportMessage.fromByteBuffer(response).getParsedPayload();
+                  } catch (CelebornIOException | 
InvalidProtocolBufferException e) {
+                    callback.onFailure(
+                        new CelebornIOException("parse pushMergedData response 
failed", e));
+                    return;
+                  }
+                  List<Integer> splitPartitionIndexes = 
partitionInfo.getSplitPartitionIndexesList();
+                  List<Integer> statusCodeList = 
partitionInfo.getStatusCodesList();
+                  StringBuilder dataBatchReviveInfos = new StringBuilder();
+                  for (int i = 0; i < splitPartitionIndexes.size(); i++) {
+                    int partitionIndex = splitPartitionIndexes.get(i);
+                    int batchId = batches.get(partitionIndex).batchId;
+                    dataBatchReviveInfos.append(
+                        String.format(
+                            "(batchId=%d, partitionId=%d, cause=%s)",
+                            batchId,
+                            partitionIds[partitionIndex],
+                            
StatusCode.fromValue(statusCodeList.get(i).byteValue())));
+                    if (statusCodeList.get(i) == 
StatusCode.SOFT_SPLIT.getValue()) {
+                      PartitionLocation loc = batches.get(partitionIndex).loc;
+                      if (!newerPartitionLocationExists(
+                          reducePartitionMap.get(shuffleId), loc.getId(), 
loc.getEpoch(), false)) {
+                        ReviveRequest reviveRequest =
+                            new ReviveRequest(
+                                shuffleId,
+                                mapId,
+                                attemptId,
+                                loc.getId(),
+                                loc.getEpoch(),
+                                loc,
+                                StatusCode.SOFT_SPLIT);
+                        reviveManager.addRequest(reviveRequest);
+                      }
+                    } else {
+                      batchesNeedResubmit.add(batches.get(partitionIndex));
+                    }
+                  }
+                  logger.info(
+                      "Push merged data to {} partial success required for 
shuffle {} map {} attempt {} groupedBatch {}. split batches {}.",
+                      addressPair,
+                      shuffleId,
+                      mapId,
+                      attemptId,
+                      groupedBatchId,
+                      dataBatchReviveInfos);
+                } else {
+                  // Workers that do not incorporate changes from 
[CELEBORN-1721]
+                  // will respond with a status of HARD_SPLIT,
+                  // but will not include a PbPushMergedDataSplitPartitionInfo.
+                  // For backward compatibility, all batches must be 
resubmitted.
+                  batchesNeedResubmit = batches;
+                  logger.info(
+                      "Push merged data to {} hard split required for shuffle 
{} map {} attempt {} partition {} groupedBatch {} batch {}.",
+                      addressPair,
+                      shuffleId,
+                      mapId,
+                      attemptId,
+                      Arrays.toString(partitionIds),
+                      groupedBatchId,
+                      Arrays.toString(batchIds));
                 }
-                List<Integer> splitPartitionIndexes = 
partitionInfo.getSplitPartitionIndexesList();
-                List<Integer> statusCodeList = 
partitionInfo.getStatusCodesList();
-                StringBuilder dataBatchReviveInfos = new StringBuilder();
-                for (int i = 0; i < splitPartitionIndexes.size(); i++) {
-                  int partitionIndex = splitPartitionIndexes.get(i);
-                  int batchId = batches.get(partitionIndex).batchId;
-                  dataBatchReviveInfos.append(
-                      String.format(
-                          "(batchId=%d, partitionId=%d, cause=%s)",
-                          batchId,
-                          partitionIds[partitionIndex],
-                          
StatusCode.fromValue(statusCodeList.get(i).byteValue())));
-                  if (statusCodeList.get(i) == 
StatusCode.SOFT_SPLIT.getValue()) {
-                    PartitionLocation loc = batches.get(partitionIndex).loc;
-                    if (!newerPartitionLocationExists(
-                        reducePartitionMap.get(shuffleId), loc.getId(), 
loc.getEpoch(), false)) {
-                      ReviveRequest reviveRequest =
-                          new ReviveRequest(
+                if (batchesNeedResubmit.isEmpty()) {
+                  pushState.onSuccess(hostPort);
+                  callback.onSuccess(ByteBuffer.wrap(new byte[] 
{StatusCode.SOFT_SPLIT.getValue()}));
+                } else {
+                  if (dataPushFailureTrackingEnabled) {
+                    for (DataBatches.DataBatch resubmitBatch : 
batchesNeedResubmit) {
+                      pushState.addFailedBatch(
+                          resubmitBatch.loc.getUniqueId(),
+                          new PushFailedBatch(mapId, attemptId, 
resubmitBatch.batchId));
+                    }
+                  }
+                  ReviveRequest[] requests =
+                      addAndGetReviveRequests(
+                          shuffleId, mapId, attemptId, batchesNeedResubmit, 
StatusCode.HARD_SPLIT);
+                  pushDataRetryPool.submit(
+                      () ->
+                          submitRetryPushMergedData(
+                              pushState,
                               shuffleId,
                               mapId,
                               attemptId,
-                              loc.getId(),
-                              loc.getEpoch(),
-                              loc,
-                              StatusCode.SOFT_SPLIT);
-                      reviveManager.addRequest(reviveRequest);
-                    }
-                  } else {
-                    batchesNeedResubmit.add(batches.get(partitionIndex));
-                  }
+                              batchesNeedResubmit,
+                              StatusCode.HARD_SPLIT,
+                              groupedBatchId,
+                              requests,
+                              remainReviveTimes,
+                              System.currentTimeMillis()
+                                  + 
conf.clientRpcRequestPartitionLocationAskTimeout()
+                                  .duration()
+                                  .toMillis()));
                 }
-                logger.info(
-                    "Push merged data to {} partial success required for 
shuffle {} map {} attempt {} groupedBatch {}. split batches {}.",
+              } else if (reason == 
StatusCode.PUSH_DATA_SUCCESS_PRIMARY_CONGESTED.getValue()) {
+                logger.debug(
+                    "Push merged data to {} primary congestion required for 
shuffle {} map {} attempt {} partition {} groupedBatch {} batch {}.",
                     addressPair,
                     shuffleId,
                     mapId,
                     attemptId,
+                    Arrays.toString(partitionIds),
                     groupedBatchId,
-                    dataBatchReviveInfos);
-              } else {
-                // Workers that do not incorporate changes from [CELEBORN-1721]
-                // will respond with a status of HARD_SPLIT,
-                // but will not include a PbPushMergedDataSplitPartitionInfo.
-                // For backward compatibility, all batches must be resubmitted.
-                batchesNeedResubmit = batches;
-                logger.info(
-                    "Push merged data to {} hard split required for shuffle {} 
map {} attempt {} partition {} groupedBatch {} batch {}.",
+                    Arrays.toString(batchIds));
+                pushState.onCongestControl(hostPort);
+                callback.onSuccess(response);
+              } else if (reason == 
StatusCode.PUSH_DATA_SUCCESS_REPLICA_CONGESTED.getValue()) {
+                logger.debug(
+                    "Push merged data to {} replica congestion required for 
shuffle {} map {} attempt {} partition {} groupedBatch {} batch {}.",
                     addressPair,
                     shuffleId,
                     mapId,
                     attemptId,
                     Arrays.toString(partitionIds),
                     groupedBatchId,
                     Arrays.toString(batchIds));
-              }
-              if (batchesNeedResubmit.isEmpty()) {
+                pushState.onCongestControl(hostPort);
+                callback.onSuccess(response);
+              } else if (reason == StatusCode.MAP_ENDED.getValue()) {
                 pushState.onSuccess(hostPort);
-                callback.onSuccess(ByteBuffer.wrap(new byte[] 
{StatusCode.SOFT_SPLIT.getValue()}));
-              } else {
-                if (dataPushFailureTrackingEnabled) {
-                  for (DataBatches.DataBatch resubmitBatch : 
batchesNeedResubmit) {
-                    pushState.addFailedBatch(
-                        resubmitBatch.loc.getUniqueId(),
-                        new PushFailedBatch(mapId, attemptId, 
resubmitBatch.batchId));
-                  }
-                }
-                ReviveRequest[] requests =
-                    addAndGetReviveRequests(
-                        shuffleId, mapId, attemptId, batchesNeedResubmit, 
StatusCode.HARD_SPLIT);
-                pushDataRetryPool.submit(
-                    () ->
-                        submitRetryPushMergedData(
-                            pushState,
-                            shuffleId,
-                            mapId,
-                            attemptId,
-                            batchesNeedResubmit,
-                            StatusCode.HARD_SPLIT,
-                            groupedBatchId,
-                            requests,
-                            remainReviveTimes,
-                            System.currentTimeMillis()
-                                + 
conf.clientRpcRequestPartitionLocationAskTimeout()
-                                    .duration()
-                                    .toMillis()));
+                callback.onSuccess(ByteBuffer.wrap(new byte[] 
{StatusCode.MAP_ENDED.getValue()}));
+              } else { // success
+                pushState.onSuccess(hostPort);
+                callback.onSuccess(ByteBuffer.wrap(new byte[] 
{StatusCode.SUCCESS.getValue()}));
               }
-            } else if (reason == 
StatusCode.PUSH_DATA_SUCCESS_PRIMARY_CONGESTED.getValue()) {
-              logger.debug(
-                  "Push merged data to {} primary congestion required for 
shuffle {} map {} attempt {} partition {} groupedBatch {} batch {}.",
-                  addressPair,
-                  shuffleId,
-                  mapId,
-                  attemptId,
-                  Arrays.toString(partitionIds),
-                  groupedBatchId,
-                  Arrays.toString(batchIds));
-              pushState.onCongestControl(hostPort);
-              callback.onSuccess(response);
-            } else if (reason == 
StatusCode.PUSH_DATA_SUCCESS_REPLICA_CONGESTED.getValue()) {
-              logger.debug(
-                  "Push merged data to {} replica congestion required for 
shuffle {} map {} attempt {} partition {} groupedBatch {} batch {}.",
-                  addressPair,
-                  shuffleId,
-                  mapId,
-                  attemptId,
-                  Arrays.toString(partitionIds),
-                  groupedBatchId,
-                  Arrays.toString(batchIds));
-              pushState.onCongestControl(hostPort);
-              callback.onSuccess(response);
-            } else if (reason == StatusCode.MAP_ENDED.getValue()) {
-              pushState.onSuccess(hostPort);
-              callback.onSuccess(ByteBuffer.wrap(new byte[] 
{StatusCode.MAP_ENDED.getValue()}));
-            } else { // success
+            } else {
+              // Workers that do not incorporate changes from [CELEBORN-1721]
+              // will respond with a status of empty response.
+              //  For backward compatibility, we should keep logic of check 
response remaining.
               pushState.onSuccess(hostPort);
-              callback.onSuccess(ByteBuffer.wrap(new byte[] 
{StatusCode.SUCCESS.getValue()}));
+              callback.onSuccess(response);

Review Comment:
   <img width="1052" alt="截屏2025-03-25 16 30 16" 
src="https://github.com/user-attachments/assets/75c55154-30c8-4bce-a3a3-18b9679cfd32";
 />
   
   cc @RexXiong 
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to