This is an automated email from the ASF dual-hosted git repository.

feiwang pushed a commit to branch branch-0.5
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/branch-0.5 by this push:
     new f13912348 [CELEBORN-1999] OpenStreamTime should use requestId to 
record cost time
f13912348 is described below

commit f139123489219d248808ef2e965e1441e848280f
Author: Xianming Lei <[email protected]>
AuthorDate: Thu May 15 01:52:14 2025 -0700

    [CELEBORN-1999] OpenStreamTime should use requestId to record cost time
    
    ### What changes were proposed in this pull request?
    OpenStreamTime should use requestId to record cost time instead of 
shuffleKey
    
    ### Why are the changes needed?
    OpenStreamTime is wrong because there will be multiple OpenStream requests 
for the same shuffleKey in the same time period.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    Existing UTs.
    
    Closes #3258 from leixm/CELEBORN-1999.
    
    Authored-by: Xianming Lei <[email protected]>
    Signed-off-by: Wang, Fei <[email protected]>
    (cherry picked from commit d03efcbdb32cb8acac19108beb6e73f7f9794aea)
    Signed-off-by: Wang, Fei <[email protected]>
---
 common/src/main/scala/org/apache/celeborn/common/util/Utils.scala | 6 ++++++
 .../org/apache/celeborn/service/deploy/worker/FetchHandler.scala  | 8 ++++++--
 2 files changed, 12 insertions(+), 2 deletions(-)

diff --git a/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala 
b/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
index 1a5208be5..78932bcab 100644
--- a/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
@@ -1297,4 +1297,10 @@ object Utils extends Logging {
     connectException || rpcTimeout || fetchChunkTimeout
   }
 
+  def makeOpenStreamRequestId(
+      shuffleKey: String,
+      clientChannelId: String,
+      rpcRequestId: Long): String = {
+    s"$shuffleKey-$clientChannelId-$rpcRequestId"
+  }
 }
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
index ae693e932..7e13eb59a 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
@@ -339,7 +339,11 @@ class FetchHandler(
       callback: RpcResponseCallback): Unit = {
     checkAuth(client, Utils.splitShuffleKey(shuffleKey)._1)
     workerSource.recordAppActiveConnection(client, shuffleKey)
-    workerSource.startTimer(WorkerSource.OPEN_STREAM_TIME, shuffleKey)
+    val requestId = Utils.makeOpenStreamRequestId(
+      shuffleKey,
+      client.getChannel.id().toString,
+      rpcRequestId)
+    workerSource.startTimer(WorkerSource.OPEN_STREAM_TIME, requestId)
     try {
       val fileInfo = getRawFileInfo(shuffleKey, fileName)
       fileInfo.getFileMeta match {
@@ -384,7 +388,7 @@ class FetchHandler(
         workerSource.incCounter(WorkerSource.OPEN_STREAM_FAIL_COUNT)
         handleRpcIOException(client, rpcRequestId, shuffleKey, fileName, e, 
callback)
     } finally {
-      workerSource.stopTimer(WorkerSource.OPEN_STREAM_TIME, shuffleKey)
+      workerSource.stopTimer(WorkerSource.OPEN_STREAM_TIME, requestId)
     }
   }
 

Reply via email to