This is an automated email from the ASF dual-hosted git repository.
zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 2f0b954ae [#2601][FOLLOWUP] fix(spark): Release segmentPermits before
buffer getting to avoid deadlock in decompression worker (#2737)
2f0b954ae is described below
commit 2f0b954ae4d5683b74c107514791d88b2cf1f181
Author: Zhen Wang <[email protected]>
AuthorDate: Thu Mar 5 19:32:32 2026 +0800
[#2601][FOLLOWUP] fix(spark): Release segmentPermits before buffer getting
to avoid deadlock in decompression worker (#2737)
### What changes were proposed in this pull request?
Release segmentPermits first to avoid deadlock
### Why are the changes needed?
#2601 caused ShuffleReadClientImplTest to hang, see
https://github.com/apache/uniffle/actions/runs/22700481546/job/65816264462?pr=2736
Threads dump:
```
"main" #1 prio=5 os_prio=31 tid=0x0000000152014000 nid=0x1a03 waiting on
condition [0x000000016b5a9000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x000000076c844f78> (a
java.util.concurrent.CompletableFuture$Signaller)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at
java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
at
java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3334)
at
java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
at
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
at
org.apache.uniffle.client.response.DecompressedShuffleBlock.getByteBuffer(DecompressedShuffleBlock.java:62)
at
org.apache.uniffle.client.response.DecompressedShuffleBlock.getUncompressLength(DecompressedShuffleBlock.java:53)
at
org.apache.uniffle.client.impl.DecompressionWorker.get(DecompressionWorker.java:165)
at
org.apache.uniffle.client.impl.ShuffleReadClientImpl.readShuffleBlockData(ShuffleReadClientImpl.java:329)
at
org.apache.uniffle.client.TestUtils.validateResult(TestUtils.java:55)
at
org.apache.uniffle.client.impl.ShuffleReadClientImplTest.readTest7(ShuffleReadClientImplTest.java:357)
```
```
"decompressionWorker-0" #340 daemon prio=5 os_prio=31
tid=0x000000011b921000 nid=0x1481f waiting on condition [0x0000000329772000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x000000076abc0630> (a
java.util.concurrent.Semaphore$NonfairSync)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at
java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
at
java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
at
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
at java.util.concurrent.Semaphore.acquire(Semaphore.java:312)
at
org.apache.uniffle.client.impl.DecompressionWorker.lambda$add$0(DecompressionWorker.java:102)
at
org.apache.uniffle.client.impl.DecompressionWorker$$Lambda$490/0x00000008006a0028.get(Unknown
Source)
at
java.util.concurrent.CompletableFuture$AsyncSupply.run$$$capture(CompletableFuture.java:1604)
at
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
```
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Needn't
---
.../java/org/apache/uniffle/client/impl/DecompressionWorker.java | 5 +++--
.../apache/uniffle/client/response/DecompressedShuffleBlock.java | 8 +++++---
2 files changed, 8 insertions(+), 5 deletions(-)
diff --git
a/client/src/main/java/org/apache/uniffle/client/impl/DecompressionWorker.java
b/client/src/main/java/org/apache/uniffle/client/impl/DecompressionWorker.java
index 66ff8d9cd..fbb81528d 100644
---
a/client/src/main/java/org/apache/uniffle/client/impl/DecompressionWorker.java
+++
b/client/src/main/java/org/apache/uniffle/client/impl/DecompressionWorker.java
@@ -148,7 +148,8 @@ public class DecompressionWorker {
waitMillis -> this.waitMillis.addAndGet(waitMillis),
bufferSegment.getTaskAttemptId(),
fetchSecondsThreshold,
- bufferSegment.getLength()));
+ bufferSegment.getLength(),
+ bufferSegment.getUncompressLength()));
}
}
@@ -162,8 +163,8 @@ public class DecompressionWorker {
// fetched, this is effective due to the upstream will use single-thread
to get and release the
// block
if (block != null) {
- nowMemoryUsed.addAndGet(-block.getUncompressLength());
segmentPermits.ifPresent(x -> x.release());
+ nowMemoryUsed.addAndGet(-block.getUncompressLength());
}
return block;
}
diff --git
a/client/src/main/java/org/apache/uniffle/client/response/DecompressedShuffleBlock.java
b/client/src/main/java/org/apache/uniffle/client/response/DecompressedShuffleBlock.java
index 16a6215a2..2d6bf8e22 100644
---
a/client/src/main/java/org/apache/uniffle/client/response/DecompressedShuffleBlock.java
+++
b/client/src/main/java/org/apache/uniffle/client/response/DecompressedShuffleBlock.java
@@ -29,18 +29,21 @@ public class DecompressedShuffleBlock extends ShuffleBlock {
private Consumer<Long> waitMillisCallback;
private final int fetchSecondsThreshold;
private final int compressedLength;
+ private final int uncompressedLength;
public DecompressedShuffleBlock(
CompletableFuture<ByteBuffer> f,
Consumer<Long> consumer,
long taskAttemptId,
int fetchSecondsThreshold,
- int compressedLength) {
+ int compressedLength,
+ int uncompressedLength) {
super(taskAttemptId);
this.f = f;
this.waitMillisCallback = consumer;
this.fetchSecondsThreshold = fetchSecondsThreshold;
this.compressedLength = compressedLength;
+ this.uncompressedLength = uncompressedLength;
}
@Override
@@ -50,8 +53,7 @@ public class DecompressedShuffleBlock extends ShuffleBlock {
@Override
public int getUncompressLength() {
- ByteBuffer buffer = getByteBuffer();
- return buffer.limit() - buffer.position();
+ return uncompressedLength;
}
@Override