This is an automated email from the ASF dual-hosted git repository.

zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 82ed9f842 [#2716] feat(client): More overlapping decompression stats 
to log (#2717)
82ed9f842 is described below

commit 82ed9f8426ac12f7d3fdd3dacd55d9be00071c04
Author: Junfan Zhang <[email protected]>
AuthorDate: Wed Jan 28 15:14:52 2026 +0800

    [#2716] feat(client): More overlapping decompression stats to log (#2717)
    
    ### What changes were proposed in this pull request?
    
    Add more decompression stats to log
    1. peek memory used to dig when OOM happens
    2. decompression throughput
    
    ### Why are the changes needed?
    
    to fix #2716
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Existing tests are enough
    
    ---------
    
    Co-authored-by: Junfan Zhang <[email protected]>
---
 .../uniffle/client/impl/DecompressionWorker.java   | 34 ++++++++++++++++++----
 1 file changed, 29 insertions(+), 5 deletions(-)

diff --git 
a/client/src/main/java/org/apache/uniffle/client/impl/DecompressionWorker.java 
b/client/src/main/java/org/apache/uniffle/client/impl/DecompressionWorker.java
index b256ae30e..120844718 100644
--- 
a/client/src/main/java/org/apache/uniffle/client/impl/DecompressionWorker.java
+++ 
b/client/src/main/java/org/apache/uniffle/client/impl/DecompressionWorker.java
@@ -44,6 +44,7 @@ public class DecompressionWorker {
   private final Codec codec;
 
   private final AtomicLong decompressionMillis = new AtomicLong(0);
+  private final AtomicLong decompressionBytes = new AtomicLong(0);
   private final AtomicLong decompressionBufferAllocationMillis = new 
AtomicLong(0);
 
   // the millis for the block get operation to measure profit from overlapping 
decompression
@@ -51,6 +52,9 @@ public class DecompressionWorker {
 
   private final int fetchSecondsThreshold;
 
+  private AtomicLong peekMemoryUsed = new AtomicLong(0);
+  private AtomicLong nowMemoryUsed = new AtomicLong(0);
+
   public DecompressionWorker(Codec codec, int threads, int 
fetchSecondsThreshold) {
     if (codec == null) {
       throw new IllegalArgumentException("Codec cannot be null");
@@ -96,6 +100,10 @@ public class DecompressionWorker {
                 long startDecompression = System.currentTimeMillis();
                 codec.decompress(buffer, uncompressedLen, dst, 0);
                 decompressionMillis.addAndGet(System.currentTimeMillis() - 
startDecompression);
+                decompressionBytes.addAndGet(length);
+
+                nowMemoryUsed.addAndGet(uncompressedLen);
+                resetPeekMemoryUsed();
 
                 return dst;
               },
@@ -119,20 +127,36 @@ public class DecompressionWorker {
       return null;
     }
     DecompressedShuffleBlock block = blocks.remove(segmentIndex);
+    // simplify the memory statistic logic here, just decrease the memory used 
when the block is
+    // fetched, this is effective due to the upstream will use single-thread 
to get and release the
+    // block
+    if (block != null) {
+      nowMemoryUsed.addAndGet(-block.getUncompressLength());
+    }
     return block;
   }
 
+  private void resetPeekMemoryUsed() {
+    long currentMemoryUsed = nowMemoryUsed.get();
+    long peekMemory = peekMemoryUsed.get();
+    if (currentMemoryUsed > peekMemory) {
+      peekMemoryUsed.set(currentMemoryUsed);
+    }
+  }
+
   public void close() {
     long bufferAllocation = decompressionBufferAllocationMillis.get();
-    long decompression = decompressionMillis.get();
+    long decompressionMillis = this.decompressionMillis.get();
     long wait = waitMillis.get();
+    long decompressionBytes = this.decompressionBytes.get() / 1024 / 1024;
     LOG.info(
-        "The statistic of overlapping compression is that bufferAllocation: 
{}(ms), "
-            + "decompression: {}(ms), wait: {}(ms), 
overlappingRatio((bufferAllocation+decompression)/wait)={}",
+        "Overlapping decompression stats: bufferAllocation={}ms, 
decompression={}ms, getWait={}ms, peekMemoryUsed={}MB, decompressionBytes={}MB, 
decompressionThroughput={}MB/s",
         bufferAllocation,
-        decompression,
+        decompressionMillis,
         wait,
-        wait == 0 ? 0 : (bufferAllocation + decompression) / wait);
+        peekMemoryUsed.get() / 1024 / 1024,
+        decompressionBytes,
+        decompressionMillis == 0 ? 0 : (decompressionBytes * 1000L) / 
decompressionMillis);
     executorService.shutdown();
   }
 

Reply via email to