This is an automated email from the ASF dual-hosted git repository.
zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 4af5114e1 [CELEBORN-788][FOLLOWUP] Update callback's location should
also update the PushState to keep consistent
4af5114e1 is described below
commit 4af5114e178d3a442f88195c30b792c62ec99b0a
Author: Angerszhuuuu <[email protected]>
AuthorDate: Fri Jul 21 12:14:57 2023 +0800
[CELEBORN-788][FOLLOWUP] Update callback's location should also update the
PushState to keep consistent
### What changes were proposed in this pull request?
As title
### Why are the changes needed?
### Does this PR introduce _any_ user-facing change?
### How was this patch tested?
Closes #1741 from AngersZhuuuu/CELEBORN-788-FOLLOWUP.
Authored-by: Angerszhuuuu <[email protected]>
Signed-off-by: zky.zhoukeyong <[email protected]>
---
.../main/java/org/apache/celeborn/client/ShuffleClientImpl.java | 8 +++++++-
1 file changed, 7 insertions(+), 1 deletion(-)
diff --git
a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
index 7361c7906..19e03c63f 100644
--- a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
+++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
@@ -874,7 +874,6 @@ public class ShuffleClientImpl extends ShuffleClient {
new RpcResponseCallback() {
@Override
public void onSuccess(ByteBuffer response) {
- pushState.removeBatch(nextBatchId, loc.hostAndPushPort());
if (response.remaining() > 0 && response.get() ==
StatusCode.STAGE_ENDED.getValue()) {
stageEndShuffleSet.add(shuffleId);
}
@@ -905,6 +904,8 @@ public class ShuffleClientImpl extends ShuffleClient {
@Override
public void updateLatestPartition(PartitionLocation latest) {
+ pushState.addBatch(nextBatchId, latest.hostAndPushPort());
+ pushState.removeBatch(nextBatchId,
this.latest.hostAndPushPort());
this.latest = latest;
}
@@ -923,6 +924,7 @@ public class ShuffleClientImpl extends ShuffleClient {
nextBatchId);
splitPartition(shuffleId, partitionId, latest);
pushState.onSuccess(latest.hostAndPushPort());
+ pushState.removeBatch(nextBatchId, latest.hostAndPushPort());
callback.onSuccess(response);
} else if (reason == StatusCode.HARD_SPLIT.getValue()) {
logger.debug(
@@ -969,6 +971,7 @@ public class ShuffleClientImpl extends ShuffleClient {
partitionId,
nextBatchId);
pushState.onCongestControl(latest.hostAndPushPort());
+ pushState.removeBatch(nextBatchId, latest.hostAndPushPort());
callback.onSuccess(response);
} else if (reason ==
StatusCode.PUSH_DATA_SUCCESS_REPLICA_CONGESTED.getValue()) {
logger.debug(
@@ -980,15 +983,18 @@ public class ShuffleClientImpl extends ShuffleClient {
partitionId,
nextBatchId);
pushState.onCongestControl(latest.hostAndPushPort());
+ pushState.removeBatch(nextBatchId, latest.hostAndPushPort());
callback.onSuccess(response);
} else {
// StageEnd.
response.rewind();
pushState.onSuccess(latest.hostAndPushPort());
+ pushState.removeBatch(nextBatchId, latest.hostAndPushPort());
callback.onSuccess(response);
}
} else {
pushState.onSuccess(latest.hostAndPushPort());
+ pushState.removeBatch(nextBatchId, latest.hostAndPushPort());
callback.onSuccess(response);
}
}