This is an automated email from the ASF dual-hosted git repository.
ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new eb9e16480 [CELEBORN-1820] Failing to write and flush StreamChunk data
should be counted as FETCH_CHUNK_FAIL
eb9e16480 is described below
commit eb9e16480003c89b65dc6c821177e8c91db7dbaf
Author: wuziyi <[email protected]>
AuthorDate: Tue Jan 7 13:54:59 2025 +0800
[CELEBORN-1820] Failing to write and flush StreamChunk data should be
counted as FETCH_CHUNK_FAIL
<!--
Thanks for sending a pull request! Here are some tips for you:
- Make sure the PR title start w/ a JIRA ticket, e.g. '[CELEBORN-XXXX]
Your PR title ...'.
- Be sure to keep the PR description updated to reflect all changes.
- Please write your PR title to summarize what this PR proposes.
- If possible, provide a concise example to reproduce the issue for a
faster review.
-->
### What changes were proposed in this pull request?
- In current implementation, we use `writeAndFlush` to send chunk data to
client asynchronously. Failing to write and flush StreamChunk data to remote
should be counted as FETCH_CHUNK_FAIL.
- Add remote client address in log for debug.
### Why are the changes needed?
It it important to monitor FETCH_CHUNK_FAIL count correctly.
### Does this PR introduce _any_ user-facing change?
### How was this patch tested?
Closes #3051 from Z1Wu/fix/fetch_handler_metrics.
Authored-by: wuziyi <[email protected]>
Signed-off-by: mingji <[email protected]>
---
.../apache/celeborn/service/deploy/worker/FetchHandler.scala | 10 ++++++----
1 file changed, 6 insertions(+), 4 deletions(-)
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
index f040f4334..ff32b9401 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
@@ -511,7 +511,8 @@ class FetchHandler(
client: TransportClient,
streamChunkSlice: StreamChunkSlice,
req: RequestMessage): Unit = {
- logDebug(s"Received req from
${NettyUtils.getRemoteAddress(client.getChannel)}" +
+ lazy val remoteAddr = NettyUtils.getRemoteAddress(client.getChannel)
+ logDebug(s"Received req from ${remoteAddr}" +
s" to fetch block $streamChunkSlice")
workerSource.recordAppActiveConnection(
@@ -548,14 +549,15 @@ class FetchHandler(
if (future.isSuccess) {
if (log.isDebugEnabled) {
logDebug(
- s"Sending ChunkFetchSuccess operation succeeded, chunk
$streamChunkSlice")
+ s"Sending ChunkFetchSuccess to $remoteAddr succeeded, chunk
$streamChunkSlice")
}
+ workerSource.incCounter(WorkerSource.FETCH_CHUNK_SUCCESS_COUNT)
} else {
logWarning(
- s"Sending ChunkFetchSuccess operation failed, chunk
$streamChunkSlice",
+ s"Sending ChunkFetchSuccess to $remoteAddr failed, chunk
$streamChunkSlice",
future.cause())
+ workerSource.incCounter(WorkerSource.FETCH_CHUNK_FAIL_COUNT)
}
- workerSource.incCounter(WorkerSource.FETCH_CHUNK_SUCCESS_COUNT)
chunkStreamManager.chunkSent(streamChunkSlice.streamId)
if (fetchTimeMetric != null) {
fetchTimeMetric.update(System.nanoTime() - fetchBeginTime)