This is an automated email from the ASF dual-hosted git repository.
ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 8b7a2dac5 [CELEBORN-1428] WrappedRpcResponseCallback should stop timer
of PrimaryPushDataTime and ReplicaPushDataTime for failure
8b7a2dac5 is described below
commit 8b7a2dac573e47f090a96a44e61403977101cded
Author: SteNicholas <[email protected]>
AuthorDate: Thu May 16 15:30:24 2024 +0800
[CELEBORN-1428] WrappedRpcResponseCallback should stop timer of
PrimaryPushDataTime and ReplicaPushDataTime for failure
### What changes were proposed in this pull request?
`WrappedRpcResponseCallback` stops timer of `PrimaryPushDataTime` and
`ReplicaPushDataTime` for failure.
### Why are the changes needed?
`WrappedRpcResponseCallback` does not stop timer of `PrimaryPushDataTime`
and `ReplicaPushDataTime` for failure, which causes that the value of metric
`PrimaryPushDataTime` and `ReplicaPushDataTime` is incorrect when failing to
push data.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
GA.
Closes #2514 from SteNicholas/CELEBORN-1428.
Authored-by: SteNicholas <[email protected]>
Signed-off-by: mingji <[email protected]>
---
.../org/apache/celeborn/service/deploy/worker/PushDataHandler.scala | 1 +
1 file changed, 1 insertion(+)
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
index be5431fa2..d21a1fdea 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
@@ -1093,6 +1093,7 @@ class PushDataHandler(val workerSource: WorkerSource)
extends BaseMessageHandler
}
override def onFailure(e: Throwable): Unit = {
+ workerSource.stopTimer(workerSourceTime, s"$requestId")
if (location != null) {
logError(s"[handle$messageType.onFailure] partitionLocation:
$location")
}