This is an automated email from the ASF dual-hosted git repository.
zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 82ed9f842 [#2716] feat(client): More overlapping decompression stats
to log (#2717)
82ed9f842 is described below
commit 82ed9f8426ac12f7d3fdd3dacd55d9be00071c04
Author: Junfan Zhang <[email protected]>
AuthorDate: Wed Jan 28 15:14:52 2026 +0800
[#2716] feat(client): More overlapping decompression stats to log (#2717)
### What changes were proposed in this pull request?
Add more decompression stats to log
1. peek memory used to dig when OOM happens
2. decompression throughput
### Why are the changes needed?
to fix #2716
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing tests are enough
---------
Co-authored-by: Junfan Zhang <[email protected]>
---
.../uniffle/client/impl/DecompressionWorker.java | 34 ++++++++++++++++++----
1 file changed, 29 insertions(+), 5 deletions(-)
diff --git
a/client/src/main/java/org/apache/uniffle/client/impl/DecompressionWorker.java
b/client/src/main/java/org/apache/uniffle/client/impl/DecompressionWorker.java
index b256ae30e..120844718 100644
---
a/client/src/main/java/org/apache/uniffle/client/impl/DecompressionWorker.java
+++
b/client/src/main/java/org/apache/uniffle/client/impl/DecompressionWorker.java
@@ -44,6 +44,7 @@ public class DecompressionWorker {
private final Codec codec;
private final AtomicLong decompressionMillis = new AtomicLong(0);
+ private final AtomicLong decompressionBytes = new AtomicLong(0);
private final AtomicLong decompressionBufferAllocationMillis = new
AtomicLong(0);
// the millis for the block get operation to measure profit from overlapping
decompression
@@ -51,6 +52,9 @@ public class DecompressionWorker {
private final int fetchSecondsThreshold;
+ private AtomicLong peekMemoryUsed = new AtomicLong(0);
+ private AtomicLong nowMemoryUsed = new AtomicLong(0);
+
public DecompressionWorker(Codec codec, int threads, int
fetchSecondsThreshold) {
if (codec == null) {
throw new IllegalArgumentException("Codec cannot be null");
@@ -96,6 +100,10 @@ public class DecompressionWorker {
long startDecompression = System.currentTimeMillis();
codec.decompress(buffer, uncompressedLen, dst, 0);
decompressionMillis.addAndGet(System.currentTimeMillis() -
startDecompression);
+ decompressionBytes.addAndGet(length);
+
+ nowMemoryUsed.addAndGet(uncompressedLen);
+ resetPeekMemoryUsed();
return dst;
},
@@ -119,20 +127,36 @@ public class DecompressionWorker {
return null;
}
DecompressedShuffleBlock block = blocks.remove(segmentIndex);
+ // simplify the memory statistic logic here, just decrease the memory used
when the block is
+ // fetched, this is effective due to the upstream will use single-thread
to get and release the
+ // block
+ if (block != null) {
+ nowMemoryUsed.addAndGet(-block.getUncompressLength());
+ }
return block;
}
+ private void resetPeekMemoryUsed() {
+ long currentMemoryUsed = nowMemoryUsed.get();
+ long peekMemory = peekMemoryUsed.get();
+ if (currentMemoryUsed > peekMemory) {
+ peekMemoryUsed.set(currentMemoryUsed);
+ }
+ }
+
public void close() {
long bufferAllocation = decompressionBufferAllocationMillis.get();
- long decompression = decompressionMillis.get();
+ long decompressionMillis = this.decompressionMillis.get();
long wait = waitMillis.get();
+ long decompressionBytes = this.decompressionBytes.get() / 1024 / 1024;
LOG.info(
- "The statistic of overlapping compression is that bufferAllocation:
{}(ms), "
- + "decompression: {}(ms), wait: {}(ms),
overlappingRatio((bufferAllocation+decompression)/wait)={}",
+ "Overlapping decompression stats: bufferAllocation={}ms,
decompression={}ms, getWait={}ms, peekMemoryUsed={}MB, decompressionBytes={}MB,
decompressionThroughput={}MB/s",
bufferAllocation,
- decompression,
+ decompressionMillis,
wait,
- wait == 0 ? 0 : (bufferAllocation + decompression) / wait);
+ peekMemoryUsed.get() / 1024 / 1024,
+ decompressionBytes,
+ decompressionMillis == 0 ? 0 : (decompressionBytes * 1000L) /
decompressionMillis);
executorService.shutdown();
}