This is an automated email from the ASF dual-hosted git repository.

zhouky pushed a commit to branch test
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/test by this push:
     new 32e13f91e fix
32e13f91e is described below

commit 32e13f91e5a69a54827a800fb893d0f13f0d8f82
Author: zky.zhoukeyong <[email protected]>
AuthorDate: Fri Jun 9 22:07:27 2023 +0800

    fix
---
 .../main/java/org/apache/celeborn/client/ShuffleClientImpl.java   | 8 +++++---
 .../apache/celeborn/service/deploy/worker/PushDataHandler.scala   | 6 +++---
 2 files changed, 8 insertions(+), 6 deletions(-)

diff --git 
a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java 
b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
index f2f3de2b3..4a2e11c35 100644
--- a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
+++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
@@ -688,10 +688,11 @@ public class ShuffleClientImpl extends ShuffleClient {
                 } else if (reason == StatusCode.HARD_SPLIT.getValue() ||
                 reason == StatusCode.WORKER_SHUTDOWN.getValue()) {
                   logger.debug(
-                      "Push data split for map {} attempt {} batch {}.",
+                      "Push data split for map {} attempt {} batch {} reason 
{}.",
                       mapId,
                       attemptId,
-                      nextBatchId);
+                      nextBatchId,
+                    reason);
                   pushDataRetryPool.submit(
                       () ->
                           submitRetryPushData(
@@ -994,7 +995,8 @@ public class ShuffleClientImpl extends ShuffleClient {
                         + attemptId
                         + " batches "
                         + Arrays.toString(batchIds)
-                        + ".");
+                        + ". reason is "
+                + reason);
                 pushDataRetryPool.submit(
                     () ->
                         submitRetryPushMergedData(
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
index 22ba4a667..a023c2a7c 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
@@ -210,7 +210,7 @@ class PushDataHandler extends BaseMessageHandler with 
Logging {
     // This should before return exception to make current push data can 
revive and retry.
     if (shutdown.get()) {
       logInfo(s"Push data return HARD_SPLIT for shuffle $shuffleKey since 
worker shutdown.")
-      
callback.onSuccess(ByteBuffer.wrap(Array[Byte](StatusCode.WORKER_SHUTDOWN.getValue)))
+      
callback.onSuccess(ByteBuffer.wrap(Array[Byte](StatusCode.HARD_SPLIT.getValue)))
       return
     }
 
@@ -395,8 +395,8 @@ class PushDataHandler extends BaseMessageHandler with 
Logging {
     // During worker shutdown, worker will return HARD_SPLIT for all existed 
partition.
     // This should before return exception to make current push data can 
revive and retry.
     if (shutdown.get()) {
-      logInfo("shutting down, return WORKER_SHUTDOWN")
-      
callback.onSuccess(ByteBuffer.wrap(Array[Byte](StatusCode.WORKER_SHUTDOWN.getValue)))
+      logInfo(s"Push Merged data return HARD_SPLIT for shuffle $shuffleKey 
since worker shutdown.")
+      
callback.onSuccess(ByteBuffer.wrap(Array[Byte](StatusCode.HARD_SPLIT.getValue)))
       return
     }
 

Reply via email to