This is an automated email from the ASF dual-hosted git repository.
nicholasjiang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 8a0d0d5fd [CELEBORN-2075] Fix `OpenStreamTime` metrics for
`PbOpenStreamList` request
8a0d0d5fd is described below
commit 8a0d0d5fd4dd92eeb72b8b48c471e1148420eb6d
Author: Wang, Fei <[email protected]>
AuthorDate: Tue Jul 22 17:52:38 2025 +0800
[CELEBORN-2075] Fix `OpenStreamTime` metrics for `PbOpenStreamList` request
### What changes were proposed in this pull request?
Fix OpenStreamTime metrics for PbOpenStreamList request
### Why are the changes needed?
For `PbOpenStreamList` request, the `OpenStreamTime` metrics is not
calculated.
https://github.com/apache/celeborn/blob/cf3c05d6683eb1f96895478ba3e9c2668f3aaca2/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala#L140-L160
And the `workerSource.stopTimer(WorkerSource.OPEN_STREAM_TIME,
shuffleKey) ` is meaningless inside `handleReduceOpenStreamInternal`.
https://github.com/apache/celeborn/blob/cf3c05d6683eb1f96895478ba3e9c2668f3aaca2/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala#L335-L341
The `handleReduceOpenStreamInternal` is called in
1. inside `handleOpenStreamInternal`, the `OpenStreamTime` metrics is
handled correctly
2. for `PbOpenStreamList, the `OpenStreamTime` metrics is not handled.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Manually testing.
Closes #3376 from turboFei/buf_size.
Authored-by: Wang, Fei <[email protected]>
Signed-off-by: SteNicholas <[email protected]>
---
.../service/deploy/worker/FetchHandler.scala | 33 +++++++++++++---------
1 file changed, 20 insertions(+), 13 deletions(-)
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
index 6ef8bdb2c..edf71039f 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
@@ -145,18 +145,27 @@ class FetchHandler(
val readLocalFlags = openStreamList.getReadLocalShuffleList
val pbOpenStreamListResponse = PbOpenStreamListResponse.newBuilder()
checkAuth(client, Utils.splitShuffleKey(shuffleKey)._1)
- 0 until files.size() foreach { idx =>
- val pbStreamHandlerOpt = handleReduceOpenStreamInternal(
- client,
- shuffleKey,
- files.get(idx),
- startIndices.get(idx),
- endIndices.get(idx),
- readLocalFlags.get(idx))
- if (pbStreamHandlerOpt.getStatus != StatusCode.SUCCESS.getValue) {
- workerSource.incCounter(WorkerSource.OPEN_STREAM_FAIL_COUNT)
+ val openStreamRequestId = Utils.makeOpenStreamRequestId(
+ shuffleKey,
+ client.getChannel.id().toString,
+ rpcRequest.requestId)
+ workerSource.startTimer(WorkerSource.OPEN_STREAM_TIME,
openStreamRequestId)
+ try {
+ 0 until files.size() foreach { idx =>
+ val pbStreamHandlerOpt = handleReduceOpenStreamInternal(
+ client,
+ shuffleKey,
+ files.get(idx),
+ startIndices.get(idx),
+ endIndices.get(idx),
+ readLocalFlags.get(idx))
+ if (pbStreamHandlerOpt.getStatus != StatusCode.SUCCESS.getValue) {
+ workerSource.incCounter(WorkerSource.OPEN_STREAM_FAIL_COUNT)
+ }
+ pbOpenStreamListResponse.addStreamHandlerOpt(pbStreamHandlerOpt)
}
- pbOpenStreamListResponse.addStreamHandlerOpt(pbStreamHandlerOpt)
+ } finally {
+ workerSource.stopTimer(WorkerSource.OPEN_STREAM_TIME,
openStreamRequestId)
}
client.getChannel.writeAndFlush(new RpcResponse(
@@ -337,8 +346,6 @@ class FetchHandler(
client.getChannel)}, Exception: ${e.getMessage}"
PbStreamHandlerOpt.newBuilder().setStatus(StatusCode.OPEN_STREAM_FAILED.getValue)
.setErrorMsg(msg).build()
- } finally {
- workerSource.stopTimer(WorkerSource.OPEN_STREAM_TIME, shuffleKey)
}
}