This is an automated email from the ASF dual-hosted git repository.

zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 2f0b954ae [#2601][FOLLOWUP] fix(spark): Release segmentPermits before 
buffer getting to avoid deadlock in decompression worker (#2737)
2f0b954ae is described below

commit 2f0b954ae4d5683b74c107514791d88b2cf1f181
Author: Zhen Wang <[email protected]>
AuthorDate: Thu Mar 5 19:32:32 2026 +0800

    [#2601][FOLLOWUP] fix(spark): Release segmentPermits before buffer getting 
to avoid deadlock in decompression worker (#2737)
    
    ### What changes were proposed in this pull request?
    
    Release segmentPermits first to avoid deadlock
    
    ### Why are the changes needed?
    
    #2601 caused ShuffleReadClientImplTest to hang, see 
https://github.com/apache/uniffle/actions/runs/22700481546/job/65816264462?pr=2736
    
    Threads dump:
    
    ```
    "main" #1 prio=5 os_prio=31 tid=0x0000000152014000 nid=0x1a03 waiting on 
condition [0x000000016b5a9000]
       java.lang.Thread.State: WAITING (parking)
            at sun.misc.Unsafe.park(Native Method)
            - parking to wait for  <0x000000076c844f78> (a 
java.util.concurrent.CompletableFuture$Signaller)
            at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
            at 
java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
            at 
java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3334)
            at 
java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
            at 
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
            at 
org.apache.uniffle.client.response.DecompressedShuffleBlock.getByteBuffer(DecompressedShuffleBlock.java:62)
            at 
org.apache.uniffle.client.response.DecompressedShuffleBlock.getUncompressLength(DecompressedShuffleBlock.java:53)
            at 
org.apache.uniffle.client.impl.DecompressionWorker.get(DecompressionWorker.java:165)
            at 
org.apache.uniffle.client.impl.ShuffleReadClientImpl.readShuffleBlockData(ShuffleReadClientImpl.java:329)
            at 
org.apache.uniffle.client.TestUtils.validateResult(TestUtils.java:55)
            at 
org.apache.uniffle.client.impl.ShuffleReadClientImplTest.readTest7(ShuffleReadClientImplTest.java:357)
    ```
    
    ```
    "decompressionWorker-0" #340 daemon prio=5 os_prio=31 
tid=0x000000011b921000 nid=0x1481f waiting on condition [0x0000000329772000]
       java.lang.Thread.State: WAITING (parking)
            at sun.misc.Unsafe.park(Native Method)
            - parking to wait for  <0x000000076abc0630> (a 
java.util.concurrent.Semaphore$NonfairSync)
            at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
            at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
            at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
            at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
            at java.util.concurrent.Semaphore.acquire(Semaphore.java:312)
            at 
org.apache.uniffle.client.impl.DecompressionWorker.lambda$add$0(DecompressionWorker.java:102)
            at 
org.apache.uniffle.client.impl.DecompressionWorker$$Lambda$490/0x00000008006a0028.get(Unknown
 Source)
            at 
java.util.concurrent.CompletableFuture$AsyncSupply.run$$$capture(CompletableFuture.java:1604)
            at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java)
            at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
            at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
            at java.lang.Thread.run(Thread.java:750)
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Needn't
---
 .../java/org/apache/uniffle/client/impl/DecompressionWorker.java  | 5 +++--
 .../apache/uniffle/client/response/DecompressedShuffleBlock.java  | 8 +++++---
 2 files changed, 8 insertions(+), 5 deletions(-)

diff --git 
a/client/src/main/java/org/apache/uniffle/client/impl/DecompressionWorker.java 
b/client/src/main/java/org/apache/uniffle/client/impl/DecompressionWorker.java
index 66ff8d9cd..fbb81528d 100644
--- 
a/client/src/main/java/org/apache/uniffle/client/impl/DecompressionWorker.java
+++ 
b/client/src/main/java/org/apache/uniffle/client/impl/DecompressionWorker.java
@@ -148,7 +148,8 @@ public class DecompressionWorker {
               waitMillis -> this.waitMillis.addAndGet(waitMillis),
               bufferSegment.getTaskAttemptId(),
               fetchSecondsThreshold,
-              bufferSegment.getLength()));
+              bufferSegment.getLength(),
+              bufferSegment.getUncompressLength()));
     }
   }
 
@@ -162,8 +163,8 @@ public class DecompressionWorker {
     // fetched, this is effective due to the upstream will use single-thread 
to get and release the
     // block
     if (block != null) {
-      nowMemoryUsed.addAndGet(-block.getUncompressLength());
       segmentPermits.ifPresent(x -> x.release());
+      nowMemoryUsed.addAndGet(-block.getUncompressLength());
     }
     return block;
   }
diff --git 
a/client/src/main/java/org/apache/uniffle/client/response/DecompressedShuffleBlock.java
 
b/client/src/main/java/org/apache/uniffle/client/response/DecompressedShuffleBlock.java
index 16a6215a2..2d6bf8e22 100644
--- 
a/client/src/main/java/org/apache/uniffle/client/response/DecompressedShuffleBlock.java
+++ 
b/client/src/main/java/org/apache/uniffle/client/response/DecompressedShuffleBlock.java
@@ -29,18 +29,21 @@ public class DecompressedShuffleBlock extends ShuffleBlock {
   private Consumer<Long> waitMillisCallback;
   private final int fetchSecondsThreshold;
   private final int compressedLength;
+  private final int uncompressedLength;
 
   public DecompressedShuffleBlock(
       CompletableFuture<ByteBuffer> f,
       Consumer<Long> consumer,
       long taskAttemptId,
       int fetchSecondsThreshold,
-      int compressedLength) {
+      int compressedLength,
+      int uncompressedLength) {
     super(taskAttemptId);
     this.f = f;
     this.waitMillisCallback = consumer;
     this.fetchSecondsThreshold = fetchSecondsThreshold;
     this.compressedLength = compressedLength;
+    this.uncompressedLength = uncompressedLength;
   }
 
   @Override
@@ -50,8 +53,7 @@ public class DecompressedShuffleBlock extends ShuffleBlock {
 
   @Override
   public int getUncompressLength() {
-    ByteBuffer buffer = getByteBuffer();
-    return buffer.limit() - buffer.position();
+    return uncompressedLength;
   }
 
   @Override

Reply via email to