This is an automated email from the ASF dual-hosted git repository. angerszhuuuu pushed a commit to branch CELEBORN-1334 in repository https://gitbox.apache.org/repos/asf/celeborn.git
commit 7b429f7d29705389730eed8d81dcdd8f009feaba Author: Angerszhuuuu <[email protected]> AuthorDate: Tue Mar 25 16:28:56 2025 +0800 CELEBORN-1334 [CIP-12] Support HARD_SPLIT in PushMergedData should support compatibility of old worker success rpc with no content --- .../apache/celeborn/client/ShuffleClientImpl.java | 226 +++++++++++---------- 1 file changed, 117 insertions(+), 109 deletions(-) diff --git a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java index be6b5aa3f..955ed9b80 100644 --- a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java +++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java @@ -1490,66 +1490,121 @@ public class ShuffleClientImpl extends ShuffleClient { new RpcResponseCallback() { @Override public void onSuccess(ByteBuffer response) { - byte reason = response.get(); - if (reason == StatusCode.HARD_SPLIT.getValue()) { - ArrayList<DataBatches.DataBatch> batchesNeedResubmit; - if (response.remaining() > 0) { - batchesNeedResubmit = new ArrayList<>(); - PbPushMergedDataSplitPartitionInfo partitionInfo; - try { - partitionInfo = TransportMessage.fromByteBuffer(response).getParsedPayload(); - } catch (CelebornIOException | InvalidProtocolBufferException e) { - callback.onFailure( - new CelebornIOException("parse pushMergedData response failed", e)); - return; + if (response.remaining() > 0) { + byte reason = response.get(); + if (reason == StatusCode.HARD_SPLIT.getValue()) { + ArrayList<DataBatches.DataBatch> batchesNeedResubmit; + if (response.remaining() > 0) { + batchesNeedResubmit = new ArrayList<>(); + PbPushMergedDataSplitPartitionInfo partitionInfo; + try { + partitionInfo = TransportMessage.fromByteBuffer(response).getParsedPayload(); + } catch (CelebornIOException | InvalidProtocolBufferException e) { + callback.onFailure( + new CelebornIOException("parse pushMergedData response failed", e)); + return; + } + List<Integer> splitPartitionIndexes = partitionInfo.getSplitPartitionIndexesList(); + List<Integer> statusCodeList = partitionInfo.getStatusCodesList(); + StringBuilder dataBatchReviveInfos = new StringBuilder(); + for (int i = 0; i < splitPartitionIndexes.size(); i++) { + int partitionIndex = splitPartitionIndexes.get(i); + int batchId = batches.get(partitionIndex).batchId; + dataBatchReviveInfos.append( + String.format( + "(batchId=%d, partitionId=%d, cause=%s)", + batchId, + partitionIds[partitionIndex], + StatusCode.fromValue(statusCodeList.get(i).byteValue()))); + if (statusCodeList.get(i) == StatusCode.SOFT_SPLIT.getValue()) { + PartitionLocation loc = batches.get(partitionIndex).loc; + if (!newerPartitionLocationExists( + reducePartitionMap.get(shuffleId), loc.getId(), loc.getEpoch(), false)) { + ReviveRequest reviveRequest = + new ReviveRequest( + shuffleId, + mapId, + attemptId, + loc.getId(), + loc.getEpoch(), + loc, + StatusCode.SOFT_SPLIT); + reviveManager.addRequest(reviveRequest); + } + } else { + batchesNeedResubmit.add(batches.get(partitionIndex)); + } + } + logger.info( + "Push merged data to {} partial success required for shuffle {} map {} attempt {} groupedBatch {}. split batches {}.", + addressPair, + shuffleId, + mapId, + attemptId, + groupedBatchId, + dataBatchReviveInfos); + } else { + // Workers that do not incorporate changes from [CELEBORN-1721] + // will respond with a status of HARD_SPLIT, + // but will not include a PbPushMergedDataSplitPartitionInfo. + // For backward compatibility, all batches must be resubmitted. + batchesNeedResubmit = batches; + logger.info( + "Push merged data to {} hard split required for shuffle {} map {} attempt {} partition {} groupedBatch {} batch {}.", + addressPair, + shuffleId, + mapId, + attemptId, + Arrays.toString(partitionIds), + groupedBatchId, + Arrays.toString(batchIds)); } - List<Integer> splitPartitionIndexes = partitionInfo.getSplitPartitionIndexesList(); - List<Integer> statusCodeList = partitionInfo.getStatusCodesList(); - StringBuilder dataBatchReviveInfos = new StringBuilder(); - for (int i = 0; i < splitPartitionIndexes.size(); i++) { - int partitionIndex = splitPartitionIndexes.get(i); - int batchId = batches.get(partitionIndex).batchId; - dataBatchReviveInfos.append( - String.format( - "(batchId=%d, partitionId=%d, cause=%s)", - batchId, - partitionIds[partitionIndex], - StatusCode.fromValue(statusCodeList.get(i).byteValue()))); - if (statusCodeList.get(i) == StatusCode.SOFT_SPLIT.getValue()) { - PartitionLocation loc = batches.get(partitionIndex).loc; - if (!newerPartitionLocationExists( - reducePartitionMap.get(shuffleId), loc.getId(), loc.getEpoch(), false)) { - ReviveRequest reviveRequest = - new ReviveRequest( + if (batchesNeedResubmit.isEmpty()) { + pushState.onSuccess(hostPort); + callback.onSuccess(ByteBuffer.wrap(new byte[] {StatusCode.SOFT_SPLIT.getValue()})); + } else { + if (dataPushFailureTrackingEnabled) { + for (DataBatches.DataBatch resubmitBatch : batchesNeedResubmit) { + pushState.addFailedBatch( + resubmitBatch.loc.getUniqueId(), + new PushFailedBatch(mapId, attemptId, resubmitBatch.batchId)); + } + } + ReviveRequest[] requests = + addAndGetReviveRequests( + shuffleId, mapId, attemptId, batchesNeedResubmit, StatusCode.HARD_SPLIT); + pushDataRetryPool.submit( + () -> + submitRetryPushMergedData( + pushState, shuffleId, mapId, attemptId, - loc.getId(), - loc.getEpoch(), - loc, - StatusCode.SOFT_SPLIT); - reviveManager.addRequest(reviveRequest); - } - } else { - batchesNeedResubmit.add(batches.get(partitionIndex)); - } + batchesNeedResubmit, + StatusCode.HARD_SPLIT, + groupedBatchId, + requests, + remainReviveTimes, + System.currentTimeMillis() + + conf.clientRpcRequestPartitionLocationAskTimeout() + .duration() + .toMillis())); } - logger.info( - "Push merged data to {} partial success required for shuffle {} map {} attempt {} groupedBatch {}. split batches {}.", + } else if (reason == StatusCode.PUSH_DATA_SUCCESS_PRIMARY_CONGESTED.getValue()) { + logger.debug( + "Push merged data to {} primary congestion required for shuffle {} map {} attempt {} partition {} groupedBatch {} batch {}.", addressPair, shuffleId, mapId, attemptId, + Arrays.toString(partitionIds), groupedBatchId, - dataBatchReviveInfos); - } else { - // Workers that do not incorporate changes from [CELEBORN-1721] - // will respond with a status of HARD_SPLIT, - // but will not include a PbPushMergedDataSplitPartitionInfo. - // For backward compatibility, all batches must be resubmitted. - batchesNeedResubmit = batches; - logger.info( - "Push merged data to {} hard split required for shuffle {} map {} attempt {} partition {} groupedBatch {} batch {}.", + Arrays.toString(batchIds)); + pushState.onCongestControl(hostPort); + callback.onSuccess(response); + } else if (reason == StatusCode.PUSH_DATA_SUCCESS_REPLICA_CONGESTED.getValue()) { + logger.debug( + "Push merged data to {} replica congestion required for shuffle {} map {} attempt {} partition {} groupedBatch {} batch {}.", addressPair, shuffleId, mapId, @@ -1557,68 +1612,21 @@ public class ShuffleClientImpl extends ShuffleClient { Arrays.toString(partitionIds), groupedBatchId, Arrays.toString(batchIds)); - } - if (batchesNeedResubmit.isEmpty()) { + pushState.onCongestControl(hostPort); + callback.onSuccess(response); + } else if (reason == StatusCode.MAP_ENDED.getValue()) { pushState.onSuccess(hostPort); - callback.onSuccess(ByteBuffer.wrap(new byte[] {StatusCode.SOFT_SPLIT.getValue()})); - } else { - if (dataPushFailureTrackingEnabled) { - for (DataBatches.DataBatch resubmitBatch : batchesNeedResubmit) { - pushState.addFailedBatch( - resubmitBatch.loc.getUniqueId(), - new PushFailedBatch(mapId, attemptId, resubmitBatch.batchId)); - } - } - ReviveRequest[] requests = - addAndGetReviveRequests( - shuffleId, mapId, attemptId, batchesNeedResubmit, StatusCode.HARD_SPLIT); - pushDataRetryPool.submit( - () -> - submitRetryPushMergedData( - pushState, - shuffleId, - mapId, - attemptId, - batchesNeedResubmit, - StatusCode.HARD_SPLIT, - groupedBatchId, - requests, - remainReviveTimes, - System.currentTimeMillis() - + conf.clientRpcRequestPartitionLocationAskTimeout() - .duration() - .toMillis())); + callback.onSuccess(ByteBuffer.wrap(new byte[] {StatusCode.MAP_ENDED.getValue()})); + } else { // success + pushState.onSuccess(hostPort); + callback.onSuccess(ByteBuffer.wrap(new byte[] {StatusCode.SUCCESS.getValue()})); } - } else if (reason == StatusCode.PUSH_DATA_SUCCESS_PRIMARY_CONGESTED.getValue()) { - logger.debug( - "Push merged data to {} primary congestion required for shuffle {} map {} attempt {} partition {} groupedBatch {} batch {}.", - addressPair, - shuffleId, - mapId, - attemptId, - Arrays.toString(partitionIds), - groupedBatchId, - Arrays.toString(batchIds)); - pushState.onCongestControl(hostPort); - callback.onSuccess(response); - } else if (reason == StatusCode.PUSH_DATA_SUCCESS_REPLICA_CONGESTED.getValue()) { - logger.debug( - "Push merged data to {} replica congestion required for shuffle {} map {} attempt {} partition {} groupedBatch {} batch {}.", - addressPair, - shuffleId, - mapId, - attemptId, - Arrays.toString(partitionIds), - groupedBatchId, - Arrays.toString(batchIds)); - pushState.onCongestControl(hostPort); - callback.onSuccess(response); - } else if (reason == StatusCode.MAP_ENDED.getValue()) { - pushState.onSuccess(hostPort); - callback.onSuccess(ByteBuffer.wrap(new byte[] {StatusCode.MAP_ENDED.getValue()})); - } else { // success + } else { + // Workers that do not incorporate changes from [CELEBORN-1721] + // will respond with a status of empty response. + // For backward compatibility, we should keep logic of check response remaining. pushState.onSuccess(hostPort); - callback.onSuccess(ByteBuffer.wrap(new byte[] {StatusCode.SUCCESS.getValue()})); + callback.onSuccess(response); } }
