This is an automated email from the ASF dual-hosted git repository.

ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new eb9e16480 [CELEBORN-1820] Failing to write and flush StreamChunk data 
should be counted as FETCH_CHUNK_FAIL
eb9e16480 is described below

commit eb9e16480003c89b65dc6c821177e8c91db7dbaf
Author: wuziyi <[email protected]>
AuthorDate: Tue Jan 7 13:54:59 2025 +0800

    [CELEBORN-1820] Failing to write and flush StreamChunk data should be 
counted as FETCH_CHUNK_FAIL
    
    <!--
    Thanks for sending a pull request!  Here are some tips for you:
      - Make sure the PR title start w/ a JIRA ticket, e.g. '[CELEBORN-XXXX] 
Your PR title ...'.
      - Be sure to keep the PR description updated to reflect all changes.
      - Please write your PR title to summarize what this PR proposes.
      - If possible, provide a concise example to reproduce the issue for a 
faster review.
    -->
    
    ### What changes were proposed in this pull request?
    
    - In current implementation, we use `writeAndFlush` to send chunk data to 
client asynchronously. Failing to write and flush StreamChunk data to remote 
should be counted as FETCH_CHUNK_FAIL.
    - Add remote client address in log for debug.
    
    ### Why are the changes needed?
    
    It it important to monitor FETCH_CHUNK_FAIL count correctly.
    
    ### Does this PR introduce _any_ user-facing change?
    
    ### How was this patch tested?
    
    Closes #3051 from Z1Wu/fix/fetch_handler_metrics.
    
    Authored-by: wuziyi <[email protected]>
    Signed-off-by: mingji <[email protected]>
---
 .../apache/celeborn/service/deploy/worker/FetchHandler.scala   | 10 ++++++----
 1 file changed, 6 insertions(+), 4 deletions(-)

diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
index f040f4334..ff32b9401 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
@@ -511,7 +511,8 @@ class FetchHandler(
       client: TransportClient,
       streamChunkSlice: StreamChunkSlice,
       req: RequestMessage): Unit = {
-    logDebug(s"Received req from 
${NettyUtils.getRemoteAddress(client.getChannel)}" +
+    lazy val remoteAddr = NettyUtils.getRemoteAddress(client.getChannel)
+    logDebug(s"Received req from ${remoteAddr}" +
       s" to fetch block $streamChunkSlice")
 
     workerSource.recordAppActiveConnection(
@@ -548,14 +549,15 @@ class FetchHandler(
             if (future.isSuccess) {
               if (log.isDebugEnabled) {
                 logDebug(
-                  s"Sending ChunkFetchSuccess operation succeeded, chunk 
$streamChunkSlice")
+                  s"Sending ChunkFetchSuccess to $remoteAddr succeeded, chunk 
$streamChunkSlice")
               }
+              workerSource.incCounter(WorkerSource.FETCH_CHUNK_SUCCESS_COUNT)
             } else {
               logWarning(
-                s"Sending ChunkFetchSuccess operation failed, chunk 
$streamChunkSlice",
+                s"Sending ChunkFetchSuccess to $remoteAddr failed, chunk 
$streamChunkSlice",
                 future.cause())
+              workerSource.incCounter(WorkerSource.FETCH_CHUNK_FAIL_COUNT)
             }
-            workerSource.incCounter(WorkerSource.FETCH_CHUNK_SUCCESS_COUNT)
             chunkStreamManager.chunkSent(streamChunkSlice.streamId)
             if (fetchTimeMetric != null) {
               fetchTimeMetric.update(System.nanoTime() - fetchBeginTime)

Reply via email to