This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 7ee7a83b86f Fix race condition in SharedTsBlockQueue async listener
causing NPE in MemoryPool.free() (#17196)
7ee7a83b86f is described below
commit 7ee7a83b86fc4d266c9c7aa45cccb0044b56daa6
Author: Jackie Tien <[email protected]>
AuthorDate: Wed Feb 11 14:13:27 2026 +0800
Fix race condition in SharedTsBlockQueue async listener causing NPE in
MemoryPool.free() (#17196)
When SharedTsBlockQueue.add() encounters memory pressure, it registers an
async listener on a MemoryReservationFuture to add the TsBlock later. If
the upstream FragmentInstance finishes and calls abort()/close() before the
listener executes, the following race occurs:
1. abort() sets closed=true, clears the queue, frees
bufferRetainedSizeInBytes
2. deRegisterFragmentInstanceFromMemoryPool removes the upstream FI's
memory mapping
3. The async listener fires and adds the TsBlock to the closed queue
4. The downstream consumer calls remove() -> MemoryPool.free() with the
upstream FI's IDs, but the mapping no longer exists -> NPE
Fix: Check the `closed` flag inside the async listener before adding the
TsBlock. When closed, skip the add (memory was already freed by
abort/close) and complete channelBlocked to prevent hangs.
Also add a unit test that reproduces this race condition by using a
manually-controlled SettableFuture to simulate the blocked-on-memory path.
---
.../execution/exchange/SharedTsBlockQueue.java | 34 ++++++---
.../execution/exchange/SharedTsBlockQueueTest.java | 82 +++++++++++++++++++++-
2 files changed, 107 insertions(+), 9 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/SharedTsBlockQueue.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/SharedTsBlockQueue.java
index 4aea7cf22dd..e49efabb986 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/SharedTsBlockQueue.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/SharedTsBlockQueue.java
@@ -205,7 +205,8 @@ public class SharedTsBlockQueue {
localPlanNodeId,
tsBlock.getSizeInBytes());
bufferRetainedSizeInBytes -= tsBlock.getSizeInBytes();
- // Every time LocalSourceHandle consumes a TsBlock, it needs to send the
event to
+ // Every time LocalSourceHandle consumes a TsBlock, it needs to send the
event
+ // to
// corresponding LocalSinkChannel.
if (sinkChannel != null) {
sinkChannel.checkAndInvokeOnFinished();
@@ -257,6 +258,16 @@ public class SharedTsBlockQueue {
blockedOnMemory.addListener(
() -> {
synchronized (this) {
+ // If the queue has been closed or aborted before this listener
executes,
+ // we must not add the TsBlock. The memory reserved for this
TsBlock has
+ // already been freed by abort()/close() via
bufferRetainedSizeInBytes.
+ // Adding it would cause a downstream NPE in MemoryPool.free()
when
+ // the consumer calls remove(), because the upstream FI's memory
mapping
+ // has already been deregistered.
+ if (closed) {
+ channelBlocked.set(null);
+ return;
+ }
queue.add(tsBlock);
if (!blocked.isDone()) {
blocked.set(null);
@@ -266,8 +277,10 @@ public class SharedTsBlockQueue {
},
// Use directExecutor() here could lead to deadlock. Thread A holds
lock of
// SharedTsBlockQueueA and tries to invoke the listener of
- // SharedTsBlockQueueB(when freeing memory to complete
MemoryReservationFuture) while
- // Thread B holds lock of SharedTsBlockQueueB and tries to invoke
the listener of
+ // SharedTsBlockQueueB(when freeing memory to complete
MemoryReservationFuture)
+ // while
+ // Thread B holds lock of SharedTsBlockQueueB and tries to invoke
the listener
+ // of
// SharedTsBlockQueueA
executorService);
return channelBlocked;
@@ -307,13 +320,18 @@ public class SharedTsBlockQueue {
bufferRetainedSizeInBytes = 0;
}
if (sinkChannel != null) {
- // attention: LocalSinkChannel of this SharedTsBlockQueue could be null
when we close
- // LocalSourceHandle(with limit clause it's possible) before
constructing the corresponding
+ // attention: LocalSinkChannel of this SharedTsBlockQueue could be null
when we
+ // close
+ // LocalSourceHandle(with limit clause it's possible) before
constructing the
+ // corresponding
// LocalSinkChannel.
- // If this close method is invoked by LocalSourceHandle, listener of
LocalSourceHandle will
- // remove the LocalSourceHandle from the map of MppDataExchangeManager
and later when
+ // If this close method is invoked by LocalSourceHandle, listener of
+ // LocalSourceHandle will
+ // remove the LocalSourceHandle from the map of MppDataExchangeManager
and later
+ // when
// LocalSinkChannel is initialized, it will construct a new
SharedTsBlockQueue.
- // It is still safe that we let the LocalSourceHandle close successfully
in this case. Because
+ // It is still safe that we let the LocalSourceHandle close successfully
in this
+ // case. Because
// the QueryTerminator will do the final cleaning logic.
sinkChannel.close();
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/exchange/SharedTsBlockQueueTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/exchange/SharedTsBlockQueueTest.java
index b96f849faf6..00c653499b0 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/exchange/SharedTsBlockQueueTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/exchange/SharedTsBlockQueueTest.java
@@ -25,7 +25,10 @@ import
org.apache.iotdb.db.queryengine.execution.memory.MemoryPool;
import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
import org.apache.tsfile.external.commons.lang3.Validate;
+import org.apache.tsfile.read.common.block.TsBlock;
+import org.apache.tsfile.utils.Pair;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
@@ -37,12 +40,89 @@ import java.util.concurrent.atomic.AtomicReference;
import static
com.google.common.util.concurrent.MoreExecutors.newDirectExecutorService;
public class SharedTsBlockQueueTest {
+
+ /**
+ * Test that when add() goes into the async listener path (memory blocked)
and the queue is
+ * aborted before the listener fires, the listener does NOT add the TsBlock
to the closed queue.
+ * This reproduces the race condition that caused NPE in MemoryPool.free().
+ */
+ @Test
+ public void testAsyncListenerAfterAbortDoesNotAddTsBlock() {
+ final String queryId = "q0";
+ final long mockTsBlockSize = 1024L;
+ final TFragmentInstanceId fragmentInstanceId = new
TFragmentInstanceId(queryId, 0, "0");
+ final String planNodeId = "test";
+
+ // Use a SettableFuture to manually control when the blocked-on-memory
future
+ // completes.
+ SettableFuture<Void> manualFuture = SettableFuture.create();
+
+ // Create a mock MemoryPool that returns the manually-controlled future
+ // (simulating blocked).
+ LocalMemoryManager mockLocalMemoryManager =
Mockito.mock(LocalMemoryManager.class);
+ MemoryPool mockMemoryPool = Mockito.mock(MemoryPool.class);
+
Mockito.when(mockLocalMemoryManager.getQueryPool()).thenReturn(mockMemoryPool);
+
+ // reserve() returns (manualFuture, false) — simulating memory blocked
+ Mockito.when(
+ mockMemoryPool.reserve(
+ Mockito.anyString(),
+ Mockito.anyString(),
+ Mockito.anyString(),
+ Mockito.anyLong(),
+ Mockito.anyLong()))
+ .thenReturn(new Pair<>(manualFuture, Boolean.FALSE));
+ // tryCancel returns 0 — simulating future already completed (can't cancel)
+ Mockito.when(mockMemoryPool.tryCancel(Mockito.any())).thenReturn(0L);
+
+ // Use a direct executor so that when we complete manualFuture, the
listener
+ // runs immediately.
+ SharedTsBlockQueue queue =
+ new SharedTsBlockQueue(
+ fragmentInstanceId, planNodeId, mockLocalMemoryManager,
newDirectExecutorService());
+ queue.getCanAddTsBlock().set(null);
+ queue.setMaxBytesCanReserve(Long.MAX_VALUE);
+
+ TsBlock mockTsBlock = Utils.createMockTsBlock(mockTsBlockSize);
+
+ // Step 1: add() goes into async path — listener is registered on
manualFuture.
+ // reserve() returns (manualFuture, false), so the TsBlock is NOT yet
added to
+ // the queue.
+ ListenableFuture<Void> addFuture;
+ synchronized (queue) {
+ addFuture = queue.add(mockTsBlock);
+ }
+ // The addFuture (channelBlocked) should not be done yet
+ Assert.assertFalse(addFuture.isDone());
+ // Queue should be empty — TsBlock is waiting for memory
+ Assert.assertTrue(queue.isEmpty());
+
+ // Step 2: Abort the queue (simulates upstream FI state change listener
calling
+ // abort)
+ synchronized (queue) {
+ queue.abort();
+ }
+ Assert.assertTrue(queue.isClosed());
+
+ // Step 3: Now complete the manualFuture — this triggers the async
listener.
+ // Before the fix, this would add the TsBlock to the closed queue.
+ // After the fix, the listener detects closed==true and returns without
adding.
+ manualFuture.set(null);
+
+ // Verify: queue should still be empty (TsBlock was NOT added to the closed
+ // queue)
+ Assert.assertTrue(queue.isEmpty());
+ // The channelBlocked future should be completed (no hang)
+ Assert.assertTrue(addFuture.isDone());
+ }
+
@Test(timeout = 15000L)
public void concurrencyTest() {
final String queryId = "q0";
final long mockTsBlockSize = 1024L * 1024L;
- // Construct a mock LocalMemoryManager with capacity 5 * mockTsBlockSize
per query.
+ // Construct a mock LocalMemoryManager with capacity 5 * mockTsBlockSize
per
+ // query.
LocalMemoryManager mockLocalMemoryManager =
Mockito.mock(LocalMemoryManager.class);
MemoryManager memoryManager = Mockito.spy(new MemoryManager(10 *
mockTsBlockSize));
MemoryPool spyMemoryPool =