This is an automated email from the ASF dual-hosted git repository.

zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 4af5114e1 [CELEBORN-788][FOLLOWUP] Update callback's location should 
also update the PushState to keep consistent
4af5114e1 is described below

commit 4af5114e178d3a442f88195c30b792c62ec99b0a
Author: Angerszhuuuu <[email protected]>
AuthorDate: Fri Jul 21 12:14:57 2023 +0800

    [CELEBORN-788][FOLLOWUP] Update callback's location should also update the 
PushState to keep consistent
    
    ### What changes were proposed in this pull request?
    As title
    
    ### Why are the changes needed?
    
    ### Does this PR introduce _any_ user-facing change?
    
    ### How was this patch tested?
    
    Closes #1741 from AngersZhuuuu/CELEBORN-788-FOLLOWUP.
    
    Authored-by: Angerszhuuuu <[email protected]>
    Signed-off-by: zky.zhoukeyong <[email protected]>
---
 .../main/java/org/apache/celeborn/client/ShuffleClientImpl.java   | 8 +++++++-
 1 file changed, 7 insertions(+), 1 deletion(-)

diff --git 
a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java 
b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
index 7361c7906..19e03c63f 100644
--- a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
+++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
@@ -874,7 +874,6 @@ public class ShuffleClientImpl extends ShuffleClient {
           new RpcResponseCallback() {
             @Override
             public void onSuccess(ByteBuffer response) {
-              pushState.removeBatch(nextBatchId, loc.hostAndPushPort());
               if (response.remaining() > 0 && response.get() == 
StatusCode.STAGE_ENDED.getValue()) {
                 stageEndShuffleSet.add(shuffleId);
               }
@@ -905,6 +904,8 @@ public class ShuffleClientImpl extends ShuffleClient {
 
             @Override
             public void updateLatestPartition(PartitionLocation latest) {
+              pushState.addBatch(nextBatchId, latest.hostAndPushPort());
+              pushState.removeBatch(nextBatchId, 
this.latest.hostAndPushPort());
               this.latest = latest;
             }
 
@@ -923,6 +924,7 @@ public class ShuffleClientImpl extends ShuffleClient {
                       nextBatchId);
                   splitPartition(shuffleId, partitionId, latest);
                   pushState.onSuccess(latest.hostAndPushPort());
+                  pushState.removeBatch(nextBatchId, latest.hostAndPushPort());
                   callback.onSuccess(response);
                 } else if (reason == StatusCode.HARD_SPLIT.getValue()) {
                   logger.debug(
@@ -969,6 +971,7 @@ public class ShuffleClientImpl extends ShuffleClient {
                       partitionId,
                       nextBatchId);
                   pushState.onCongestControl(latest.hostAndPushPort());
+                  pushState.removeBatch(nextBatchId, latest.hostAndPushPort());
                   callback.onSuccess(response);
                 } else if (reason == 
StatusCode.PUSH_DATA_SUCCESS_REPLICA_CONGESTED.getValue()) {
                   logger.debug(
@@ -980,15 +983,18 @@ public class ShuffleClientImpl extends ShuffleClient {
                       partitionId,
                       nextBatchId);
                   pushState.onCongestControl(latest.hostAndPushPort());
+                  pushState.removeBatch(nextBatchId, latest.hostAndPushPort());
                   callback.onSuccess(response);
                 } else {
                   // StageEnd.
                   response.rewind();
                   pushState.onSuccess(latest.hostAndPushPort());
+                  pushState.removeBatch(nextBatchId, latest.hostAndPushPort());
                   callback.onSuccess(response);
                 }
               } else {
                 pushState.onSuccess(latest.hostAndPushPort());
+                pushState.removeBatch(nextBatchId, latest.hostAndPushPort());
                 callback.onSuccess(response);
               }
             }

Reply via email to