This is an automated email from the ASF dual-hosted git repository.

nicholasjiang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 8a0d0d5fd [CELEBORN-2075] Fix `OpenStreamTime` metrics for 
`PbOpenStreamList` request
8a0d0d5fd is described below

commit 8a0d0d5fd4dd92eeb72b8b48c471e1148420eb6d
Author: Wang, Fei <[email protected]>
AuthorDate: Tue Jul 22 17:52:38 2025 +0800

    [CELEBORN-2075] Fix `OpenStreamTime` metrics for `PbOpenStreamList` request
    
    ### What changes were proposed in this pull request?
    Fix OpenStreamTime metrics for PbOpenStreamList request
    
    ### Why are the changes needed?
    
    For `PbOpenStreamList` request, the `OpenStreamTime` metrics is not 
calculated.
    
    
https://github.com/apache/celeborn/blob/cf3c05d6683eb1f96895478ba3e9c2668f3aaca2/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala#L140-L160
    
    And the   `workerSource.stopTimer(WorkerSource.OPEN_STREAM_TIME, 
shuffleKey) ` is meaningless inside `handleReduceOpenStreamInternal`. 
https://github.com/apache/celeborn/blob/cf3c05d6683eb1f96895478ba3e9c2668f3aaca2/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala#L335-L341
    
    The `handleReduceOpenStreamInternal` is called in
    1. inside `handleOpenStreamInternal`, the `OpenStreamTime` metrics is 
handled correctly
    2. for `PbOpenStreamList, the `OpenStreamTime` metrics is not handled.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Manually testing.
    
    Closes #3376 from turboFei/buf_size.
    
    Authored-by: Wang, Fei <[email protected]>
    Signed-off-by: SteNicholas <[email protected]>
---
 .../service/deploy/worker/FetchHandler.scala       | 33 +++++++++++++---------
 1 file changed, 20 insertions(+), 13 deletions(-)

diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
index 6ef8bdb2c..edf71039f 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
@@ -145,18 +145,27 @@ class FetchHandler(
         val readLocalFlags = openStreamList.getReadLocalShuffleList
         val pbOpenStreamListResponse = PbOpenStreamListResponse.newBuilder()
         checkAuth(client, Utils.splitShuffleKey(shuffleKey)._1)
-        0 until files.size() foreach { idx =>
-          val pbStreamHandlerOpt = handleReduceOpenStreamInternal(
-            client,
-            shuffleKey,
-            files.get(idx),
-            startIndices.get(idx),
-            endIndices.get(idx),
-            readLocalFlags.get(idx))
-          if (pbStreamHandlerOpt.getStatus != StatusCode.SUCCESS.getValue) {
-            workerSource.incCounter(WorkerSource.OPEN_STREAM_FAIL_COUNT)
+        val openStreamRequestId = Utils.makeOpenStreamRequestId(
+          shuffleKey,
+          client.getChannel.id().toString,
+          rpcRequest.requestId)
+        workerSource.startTimer(WorkerSource.OPEN_STREAM_TIME, 
openStreamRequestId)
+        try {
+          0 until files.size() foreach { idx =>
+            val pbStreamHandlerOpt = handleReduceOpenStreamInternal(
+              client,
+              shuffleKey,
+              files.get(idx),
+              startIndices.get(idx),
+              endIndices.get(idx),
+              readLocalFlags.get(idx))
+            if (pbStreamHandlerOpt.getStatus != StatusCode.SUCCESS.getValue) {
+              workerSource.incCounter(WorkerSource.OPEN_STREAM_FAIL_COUNT)
+            }
+            pbOpenStreamListResponse.addStreamHandlerOpt(pbStreamHandlerOpt)
           }
-          pbOpenStreamListResponse.addStreamHandlerOpt(pbStreamHandlerOpt)
+        } finally {
+          workerSource.stopTimer(WorkerSource.OPEN_STREAM_TIME, 
openStreamRequestId)
         }
 
         client.getChannel.writeAndFlush(new RpcResponse(
@@ -337,8 +346,6 @@ class FetchHandler(
             client.getChannel)}, Exception: ${e.getMessage}"
         
PbStreamHandlerOpt.newBuilder().setStatus(StatusCode.OPEN_STREAM_FAILED.getValue)
           .setErrorMsg(msg).build()
-    } finally {
-      workerSource.stopTimer(WorkerSource.OPEN_STREAM_TIME, shuffleKey)
     }
   }
 

Reply via email to