This is an automated email from the ASF dual-hosted git repository.

ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 8b7a2dac5 [CELEBORN-1428] WrappedRpcResponseCallback should stop timer 
of PrimaryPushDataTime and ReplicaPushDataTime for failure
8b7a2dac5 is described below

commit 8b7a2dac573e47f090a96a44e61403977101cded
Author: SteNicholas <[email protected]>
AuthorDate: Thu May 16 15:30:24 2024 +0800

    [CELEBORN-1428] WrappedRpcResponseCallback should stop timer of 
PrimaryPushDataTime and ReplicaPushDataTime for failure
    
    ### What changes were proposed in this pull request?
    
    `WrappedRpcResponseCallback` stops timer of `PrimaryPushDataTime` and 
`ReplicaPushDataTime` for failure.
    
    ### Why are the changes needed?
    
    `WrappedRpcResponseCallback` does not stop timer of `PrimaryPushDataTime` 
and `ReplicaPushDataTime` for failure, which causes that the value of metric 
`PrimaryPushDataTime` and `ReplicaPushDataTime` is incorrect when failing to 
push data.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    GA.
    
    Closes #2514 from SteNicholas/CELEBORN-1428.
    
    Authored-by: SteNicholas <[email protected]>
    Signed-off-by: mingji <[email protected]>
---
 .../org/apache/celeborn/service/deploy/worker/PushDataHandler.scala      | 1 +
 1 file changed, 1 insertion(+)

diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
index be5431fa2..d21a1fdea 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
@@ -1093,6 +1093,7 @@ class PushDataHandler(val workerSource: WorkerSource) 
extends BaseMessageHandler
     }
 
     override def onFailure(e: Throwable): Unit = {
+      workerSource.stopTimer(workerSourceTime, s"$requestId")
       if (location != null) {
         logError(s"[handle$messageType.onFailure] partitionLocation: 
$location")
       }

Reply via email to