This is an automated email from the ASF dual-hosted git repository.
nicholasjiang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new abd6233a5 [CELEBORN-2081] PushDataHandler onFailure log shuffle key
abd6233a5 is described below
commit abd6233a50b3ff915b29e01a2d37d9974dfbfe20
Author: sychen <[email protected]>
AuthorDate: Fri Jul 25 20:53:46 2025 +0800
[CELEBORN-2081] PushDataHandler onFailure log shuffle key
### What changes were proposed in this pull request?
### Why are the changes needed?
`id-epoch` cannot locate which application is specific. We can add shuffle
key to the log.
```
25/07/25 04:23:06,439 ERROR [celeborn-push-timeout-checker-0]
PushDataHandler: PushMergedData replicate failed for partitionLocation:
PartitionLocation[
id-epoch:1623-0
```
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
GA
Closes #3390 from cxzl25/CELEBORN-2081.
Authored-by: sychen <[email protected]>
Signed-off-by: SteNicholas <[email protected]>
---
.../apache/celeborn/service/deploy/worker/PushDataHandler.scala | 8 ++++++--
1 file changed, 6 insertions(+), 2 deletions(-)
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
index 85eb45dbd..3e526a71e 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
@@ -337,7 +337,9 @@ class PushDataHandler(val workerSource: WorkerSource)
extends BaseMessageHandler
}
override def onFailure(e: Throwable): Unit = {
- logError(s"PushData replication failed for partitionLocation:
$location", e)
+ logError(
+ s"PushData replication failed for shuffle: $shuffleKey,
partitionLocation: $location",
+ e)
// 1. Throw PUSH_DATA_WRITE_FAIL_REPLICA by replica peer worker
// 2. Throw PUSH_DATA_TIMEOUT_REPLICA by TransportResponseHandler
// 3. Throw IOException by channel, convert to
PUSH_DATA_CONNECTION_EXCEPTION_REPLICA
@@ -717,7 +719,9 @@ class PushDataHandler(val workerSource: WorkerSource)
extends BaseMessageHandler
}
override def onFailure(e: Throwable): Unit = {
- logError(s"PushMergedData replicate failed for
partitionLocation: $location", e)
+ logError(
+ s"PushMergedData replicate failed for shuffle: $shuffleKey,
partitionLocation: $location",
+ e)
// 1. Throw PUSH_DATA_WRITE_FAIL_REPLICA by replica peer worker
// 2. Throw PUSH_DATA_TIMEOUT_REPLICA by TransportResponseHandler
// 3. Throw IOException by channel, convert to
PUSH_DATA_CONNECTION_EXCEPTION_REPLICA