This is an automated email from the ASF dual-hosted git repository.

feiwang pushed a commit to branch branch-0.6
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/branch-0.6 by this push:
     new 1b662f549 [CELEBORN-2033] updateProduceBytes should be called even if 
updateProduceBytes throws exception
1b662f549 is described below

commit 1b662f5492b46a1e43f528ce7996aade7a220b4e
Author: Xianming Lei <[email protected]>
AuthorDate: Wed Jun 11 10:54:24 2025 -0700

    [CELEBORN-2033] updateProduceBytes should be called even if 
updateProduceBytes throws exception
    
    ### What changes were proposed in this pull request?
    updateProduceBytes should be called even if updateProduceBytes throws 
exception
    
    ### Why are the changes needed?
    To make UserProduceSpeed ​​metrics more accurate
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    Existing UTs.
    
    Closes #3322 from leixm/CELEBORN-2033.
    
    Authored-by: Xianming Lei <[email protected]>
    Signed-off-by: Wang, Fei <[email protected]>
    (cherry picked from commit bbd3bb4814a7505d6b7499ab1029997958564705)
    Signed-off-by: Wang, Fei <[email protected]>
---
 .../apache/celeborn/service/deploy/worker/storage/TierWriter.scala  | 6 ++----
 1 file changed, 2 insertions(+), 4 deletions(-)

diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala
index 278b6f945..00dd90bd5 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala
@@ -412,6 +412,8 @@ class LocalTierWriter(
 
   override def writeInternal(buf: ByteBuf): Unit = {
     val numBytes = buf.readableBytes()
+    if (userCongestionControlContext != null)
+      userCongestionControlContext.updateProduceBytes(numBytes)
     val flushBufferReadableBytes = flushBuffer.readableBytes
     if (flushBufferReadableBytes != 0 && flushBufferReadableBytes + numBytes 
>= flusherBufferSize) {
       flush(false)
@@ -422,13 +424,9 @@ class LocalTierWriter(
     } catch {
       case oom: OutOfMemoryError =>
         MemoryManager.instance.incrementDiskBuffer(numBytes)
-        if (userCongestionControlContext != null)
-          userCongestionControlContext.updateProduceBytes(numBytes)
         throw oom
     }
     MemoryManager.instance.incrementDiskBuffer(numBytes)
-    if (userCongestionControlContext != null)
-      userCongestionControlContext.updateProduceBytes(numBytes)
   }
 
   override def evict(file: TierWriterBase): Unit = ???

Reply via email to