This is an automated email from the ASF dual-hosted git repository.
zhouky pushed a commit to branch test
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/test by this push:
new 32e13f91e fix
32e13f91e is described below
commit 32e13f91e5a69a54827a800fb893d0f13f0d8f82
Author: zky.zhoukeyong <[email protected]>
AuthorDate: Fri Jun 9 22:07:27 2023 +0800
fix
---
.../main/java/org/apache/celeborn/client/ShuffleClientImpl.java | 8 +++++---
.../apache/celeborn/service/deploy/worker/PushDataHandler.scala | 6 +++---
2 files changed, 8 insertions(+), 6 deletions(-)
diff --git
a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
index f2f3de2b3..4a2e11c35 100644
--- a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
+++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
@@ -688,10 +688,11 @@ public class ShuffleClientImpl extends ShuffleClient {
} else if (reason == StatusCode.HARD_SPLIT.getValue() ||
reason == StatusCode.WORKER_SHUTDOWN.getValue()) {
logger.debug(
- "Push data split for map {} attempt {} batch {}.",
+ "Push data split for map {} attempt {} batch {} reason
{}.",
mapId,
attemptId,
- nextBatchId);
+ nextBatchId,
+ reason);
pushDataRetryPool.submit(
() ->
submitRetryPushData(
@@ -994,7 +995,8 @@ public class ShuffleClientImpl extends ShuffleClient {
+ attemptId
+ " batches "
+ Arrays.toString(batchIds)
- + ".");
+ + ". reason is "
+ + reason);
pushDataRetryPool.submit(
() ->
submitRetryPushMergedData(
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
index 22ba4a667..a023c2a7c 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
@@ -210,7 +210,7 @@ class PushDataHandler extends BaseMessageHandler with
Logging {
// This should before return exception to make current push data can
revive and retry.
if (shutdown.get()) {
logInfo(s"Push data return HARD_SPLIT for shuffle $shuffleKey since
worker shutdown.")
-
callback.onSuccess(ByteBuffer.wrap(Array[Byte](StatusCode.WORKER_SHUTDOWN.getValue)))
+
callback.onSuccess(ByteBuffer.wrap(Array[Byte](StatusCode.HARD_SPLIT.getValue)))
return
}
@@ -395,8 +395,8 @@ class PushDataHandler extends BaseMessageHandler with
Logging {
// During worker shutdown, worker will return HARD_SPLIT for all existed
partition.
// This should before return exception to make current push data can
revive and retry.
if (shutdown.get()) {
- logInfo("shutting down, return WORKER_SHUTDOWN")
-
callback.onSuccess(ByteBuffer.wrap(Array[Byte](StatusCode.WORKER_SHUTDOWN.getValue)))
+ logInfo(s"Push Merged data return HARD_SPLIT for shuffle $shuffleKey
since worker shutdown.")
+
callback.onSuccess(ByteBuffer.wrap(Array[Byte](StatusCode.HARD_SPLIT.getValue)))
return
}