This is an automated email from the ASF dual-hosted git repository. zhouky pushed a commit to branch branch-0.3 in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
commit ab816d1219290084f4a456310228dab80b5f3917 Author: Angerszhuuuu <[email protected]> AuthorDate: Fri Jul 21 12:14:57 2023 +0800 [CELEBORN-788][FOLLOWUP] Update callback's location should also update the PushState to keep consistent ### What changes were proposed in this pull request? As title ### Why are the changes needed? ### Does this PR introduce _any_ user-facing change? ### How was this patch tested? Closes #1741 from AngersZhuuuu/CELEBORN-788-FOLLOWUP. Authored-by: Angerszhuuuu <[email protected]> Signed-off-by: zky.zhoukeyong <[email protected]> --- .../main/java/org/apache/celeborn/client/ShuffleClientImpl.java | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java index fce284e7c..614c7d81d 100644 --- a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java +++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java @@ -874,7 +874,6 @@ public class ShuffleClientImpl extends ShuffleClient { new RpcResponseCallback() { @Override public void onSuccess(ByteBuffer response) { - pushState.removeBatch(nextBatchId, loc.hostAndPushPort()); if (response.remaining() > 0 && response.get() == StatusCode.STAGE_ENDED.getValue()) { stageEndShuffleSet.add(shuffleId); } @@ -905,6 +904,8 @@ public class ShuffleClientImpl extends ShuffleClient { @Override public void updateLatestPartition(PartitionLocation latest) { + pushState.addBatch(nextBatchId, latest.hostAndPushPort()); + pushState.removeBatch(nextBatchId, this.latest.hostAndPushPort()); this.latest = latest; } @@ -923,6 +924,7 @@ public class ShuffleClientImpl extends ShuffleClient { nextBatchId); splitPartition(shuffleId, partitionId, latest); pushState.onSuccess(latest.hostAndPushPort()); + pushState.removeBatch(nextBatchId, latest.hostAndPushPort()); callback.onSuccess(response); } else if (reason == StatusCode.HARD_SPLIT.getValue()) { logger.debug( @@ -969,6 +971,7 @@ public class ShuffleClientImpl extends ShuffleClient { partitionId, nextBatchId); pushState.onCongestControl(latest.hostAndPushPort()); + pushState.removeBatch(nextBatchId, latest.hostAndPushPort()); callback.onSuccess(response); } else if (reason == StatusCode.PUSH_DATA_SUCCESS_REPLICA_CONGESTED.getValue()) { logger.debug( @@ -980,15 +983,18 @@ public class ShuffleClientImpl extends ShuffleClient { partitionId, nextBatchId); pushState.onCongestControl(latest.hostAndPushPort()); + pushState.removeBatch(nextBatchId, latest.hostAndPushPort()); callback.onSuccess(response); } else { // StageEnd. response.rewind(); pushState.onSuccess(latest.hostAndPushPort()); + pushState.removeBatch(nextBatchId, latest.hostAndPushPort()); callback.onSuccess(response); } } else { pushState.onSuccess(latest.hostAndPushPort()); + pushState.removeBatch(nextBatchId, latest.hostAndPushPort()); callback.onSuccess(response); } }
