This is an automated email from the ASF dual-hosted git repository.
feiwang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new d03efcbdb [CELEBORN-1999] OpenStreamTime should use requestId to
record cost time
d03efcbdb is described below
commit d03efcbdb32cb8acac19108beb6e73f7f9794aea
Author: Xianming Lei <[email protected]>
AuthorDate: Thu May 15 01:52:14 2025 -0700
[CELEBORN-1999] OpenStreamTime should use requestId to record cost time
### What changes were proposed in this pull request?
OpenStreamTime should use requestId to record cost time instead of
shuffleKey
### Why are the changes needed?
OpenStreamTime is wrong because there will be multiple OpenStream requests
for the same shuffleKey in the same time period.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing UTs.
Closes #3258 from leixm/CELEBORN-1999.
Authored-by: Xianming Lei <[email protected]>
Signed-off-by: Wang, Fei <[email protected]>
---
common/src/main/scala/org/apache/celeborn/common/util/Utils.scala | 6 ++++++
.../org/apache/celeborn/service/deploy/worker/FetchHandler.scala | 8 ++++++--
2 files changed, 12 insertions(+), 2 deletions(-)
diff --git a/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
b/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
index 54811357e..e825abce3 100644
--- a/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala
@@ -1261,4 +1261,10 @@ object Utils extends Logging {
connectException || rpcTimeout || fetchChunkTimeout
}
+ def makeOpenStreamRequestId(
+ shuffleKey: String,
+ clientChannelId: String,
+ rpcRequestId: Long): String = {
+ s"$shuffleKey-$clientChannelId-$rpcRequestId"
+ }
}
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
index 568e0d193..a943de58a 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
@@ -355,7 +355,11 @@ class FetchHandler(
callback: RpcResponseCallback): Unit = {
checkAuth(client, Utils.splitShuffleKey(shuffleKey)._1)
workerSource.recordAppActiveConnection(client, shuffleKey)
- workerSource.startTimer(WorkerSource.OPEN_STREAM_TIME, shuffleKey)
+ val requestId = Utils.makeOpenStreamRequestId(
+ shuffleKey,
+ client.getChannel.id().toString,
+ rpcRequestId)
+ workerSource.startTimer(WorkerSource.OPEN_STREAM_TIME, requestId)
try {
val fileInfo = getRawFileInfo(shuffleKey, fileName)
fileInfo.getFileMeta match {
@@ -400,7 +404,7 @@ class FetchHandler(
workerSource.incCounter(WorkerSource.OPEN_STREAM_FAIL_COUNT)
handleRpcIOException(client, rpcRequestId, shuffleKey, fileName, e,
callback)
} finally {
- workerSource.stopTimer(WorkerSource.OPEN_STREAM_TIME, shuffleKey)
+ workerSource.stopTimer(WorkerSource.OPEN_STREAM_TIME, requestId)
}
}