This is an automated email from the ASF dual-hosted git repository.
feiwang pushed a commit to branch branch-0.6
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/branch-0.6 by this push:
new 1b662f549 [CELEBORN-2033] updateProduceBytes should be called even if
updateProduceBytes throws exception
1b662f549 is described below
commit 1b662f5492b46a1e43f528ce7996aade7a220b4e
Author: Xianming Lei <[email protected]>
AuthorDate: Wed Jun 11 10:54:24 2025 -0700
[CELEBORN-2033] updateProduceBytes should be called even if
updateProduceBytes throws exception
### What changes were proposed in this pull request?
updateProduceBytes should be called even if updateProduceBytes throws
exception
### Why are the changes needed?
To make UserProduceSpeed metrics more accurate
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing UTs.
Closes #3322 from leixm/CELEBORN-2033.
Authored-by: Xianming Lei <[email protected]>
Signed-off-by: Wang, Fei <[email protected]>
(cherry picked from commit bbd3bb4814a7505d6b7499ab1029997958564705)
Signed-off-by: Wang, Fei <[email protected]>
---
.../apache/celeborn/service/deploy/worker/storage/TierWriter.scala | 6 ++----
1 file changed, 2 insertions(+), 4 deletions(-)
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala
index 278b6f945..00dd90bd5 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala
@@ -412,6 +412,8 @@ class LocalTierWriter(
override def writeInternal(buf: ByteBuf): Unit = {
val numBytes = buf.readableBytes()
+ if (userCongestionControlContext != null)
+ userCongestionControlContext.updateProduceBytes(numBytes)
val flushBufferReadableBytes = flushBuffer.readableBytes
if (flushBufferReadableBytes != 0 && flushBufferReadableBytes + numBytes
>= flusherBufferSize) {
flush(false)
@@ -422,13 +424,9 @@ class LocalTierWriter(
} catch {
case oom: OutOfMemoryError =>
MemoryManager.instance.incrementDiskBuffer(numBytes)
- if (userCongestionControlContext != null)
- userCongestionControlContext.updateProduceBytes(numBytes)
throw oom
}
MemoryManager.instance.incrementDiskBuffer(numBytes)
- if (userCongestionControlContext != null)
- userCongestionControlContext.updateProduceBytes(numBytes)
}
override def evict(file: TierWriterBase): Unit = ???