This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 7ee7a83b86f Fix race condition in SharedTsBlockQueue async listener 
causing NPE in MemoryPool.free() (#17196)
7ee7a83b86f is described below

commit 7ee7a83b86fc4d266c9c7aa45cccb0044b56daa6
Author: Jackie Tien <[email protected]>
AuthorDate: Wed Feb 11 14:13:27 2026 +0800

    Fix race condition in SharedTsBlockQueue async listener causing NPE in 
MemoryPool.free() (#17196)
    
    When SharedTsBlockQueue.add() encounters memory pressure, it registers an
    async listener on a MemoryReservationFuture to add the TsBlock later. If
    the upstream FragmentInstance finishes and calls abort()/close() before the
    listener executes, the following race occurs:
    1. abort() sets closed=true, clears the queue, frees 
bufferRetainedSizeInBytes
    2. deRegisterFragmentInstanceFromMemoryPool removes the upstream FI's
       memory mapping
    3. The async listener fires and adds the TsBlock to the closed queue
    4. The downstream consumer calls remove() -> MemoryPool.free() with the
       upstream FI's IDs, but the mapping no longer exists -> NPE
    Fix: Check the `closed` flag inside the async listener before adding the
    TsBlock. When closed, skip the add (memory was already freed by
    abort/close) and complete channelBlocked to prevent hangs.
    Also add a unit test that reproduces this race condition by using a
    manually-controlled SettableFuture to simulate the blocked-on-memory path.
---
 .../execution/exchange/SharedTsBlockQueue.java     | 34 ++++++---
 .../execution/exchange/SharedTsBlockQueueTest.java | 82 +++++++++++++++++++++-
 2 files changed, 107 insertions(+), 9 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/SharedTsBlockQueue.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/SharedTsBlockQueue.java
index 4aea7cf22dd..e49efabb986 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/SharedTsBlockQueue.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/SharedTsBlockQueue.java
@@ -205,7 +205,8 @@ public class SharedTsBlockQueue {
             localPlanNodeId,
             tsBlock.getSizeInBytes());
     bufferRetainedSizeInBytes -= tsBlock.getSizeInBytes();
-    // Every time LocalSourceHandle consumes a TsBlock, it needs to send the 
event to
+    // Every time LocalSourceHandle consumes a TsBlock, it needs to send the 
event
+    // to
     // corresponding LocalSinkChannel.
     if (sinkChannel != null) {
       sinkChannel.checkAndInvokeOnFinished();
@@ -257,6 +258,16 @@ public class SharedTsBlockQueue {
       blockedOnMemory.addListener(
           () -> {
             synchronized (this) {
+              // If the queue has been closed or aborted before this listener 
executes,
+              // we must not add the TsBlock. The memory reserved for this 
TsBlock has
+              // already been freed by abort()/close() via 
bufferRetainedSizeInBytes.
+              // Adding it would cause a downstream NPE in MemoryPool.free() 
when
+              // the consumer calls remove(), because the upstream FI's memory 
mapping
+              // has already been deregistered.
+              if (closed) {
+                channelBlocked.set(null);
+                return;
+              }
               queue.add(tsBlock);
               if (!blocked.isDone()) {
                 blocked.set(null);
@@ -266,8 +277,10 @@ public class SharedTsBlockQueue {
           },
           // Use directExecutor() here could lead to deadlock. Thread A holds 
lock of
           // SharedTsBlockQueueA and tries to invoke the listener of
-          // SharedTsBlockQueueB(when freeing memory to complete 
MemoryReservationFuture) while
-          // Thread B holds lock of SharedTsBlockQueueB and tries to invoke 
the listener of
+          // SharedTsBlockQueueB(when freeing memory to complete 
MemoryReservationFuture)
+          // while
+          // Thread B holds lock of SharedTsBlockQueueB and tries to invoke 
the listener
+          // of
           // SharedTsBlockQueueA
           executorService);
       return channelBlocked;
@@ -307,13 +320,18 @@ public class SharedTsBlockQueue {
       bufferRetainedSizeInBytes = 0;
     }
     if (sinkChannel != null) {
-      // attention: LocalSinkChannel of this SharedTsBlockQueue could be null 
when we close
-      // LocalSourceHandle(with limit clause it's possible) before 
constructing the corresponding
+      // attention: LocalSinkChannel of this SharedTsBlockQueue could be null 
when we
+      // close
+      // LocalSourceHandle(with limit clause it's possible) before 
constructing the
+      // corresponding
       // LocalSinkChannel.
-      // If this close method is invoked by LocalSourceHandle, listener of 
LocalSourceHandle will
-      // remove the LocalSourceHandle from the map of MppDataExchangeManager 
and later when
+      // If this close method is invoked by LocalSourceHandle, listener of
+      // LocalSourceHandle will
+      // remove the LocalSourceHandle from the map of MppDataExchangeManager 
and later
+      // when
       // LocalSinkChannel is initialized, it will construct a new 
SharedTsBlockQueue.
-      // It is still safe that we let the LocalSourceHandle close successfully 
in this case. Because
+      // It is still safe that we let the LocalSourceHandle close successfully 
in this
+      // case. Because
       // the QueryTerminator will do the final cleaning logic.
       sinkChannel.close();
     }
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/exchange/SharedTsBlockQueueTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/exchange/SharedTsBlockQueueTest.java
index b96f849faf6..00c653499b0 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/exchange/SharedTsBlockQueueTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/exchange/SharedTsBlockQueueTest.java
@@ -25,7 +25,10 @@ import 
org.apache.iotdb.db.queryengine.execution.memory.MemoryPool;
 import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
 
 import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
 import org.apache.tsfile.external.commons.lang3.Validate;
+import org.apache.tsfile.read.common.block.TsBlock;
+import org.apache.tsfile.utils.Pair;
 import org.junit.Assert;
 import org.junit.Test;
 import org.mockito.Mockito;
@@ -37,12 +40,89 @@ import java.util.concurrent.atomic.AtomicReference;
 import static 
com.google.common.util.concurrent.MoreExecutors.newDirectExecutorService;
 
 public class SharedTsBlockQueueTest {
+
+  /**
+   * Test that when add() goes into the async listener path (memory blocked) 
and the queue is
+   * aborted before the listener fires, the listener does NOT add the TsBlock 
to the closed queue.
+   * This reproduces the race condition that caused NPE in MemoryPool.free().
+   */
+  @Test
+  public void testAsyncListenerAfterAbortDoesNotAddTsBlock() {
+    final String queryId = "q0";
+    final long mockTsBlockSize = 1024L;
+    final TFragmentInstanceId fragmentInstanceId = new 
TFragmentInstanceId(queryId, 0, "0");
+    final String planNodeId = "test";
+
+    // Use a SettableFuture to manually control when the blocked-on-memory 
future
+    // completes.
+    SettableFuture<Void> manualFuture = SettableFuture.create();
+
+    // Create a mock MemoryPool that returns the manually-controlled future
+    // (simulating blocked).
+    LocalMemoryManager mockLocalMemoryManager = 
Mockito.mock(LocalMemoryManager.class);
+    MemoryPool mockMemoryPool = Mockito.mock(MemoryPool.class);
+    
Mockito.when(mockLocalMemoryManager.getQueryPool()).thenReturn(mockMemoryPool);
+
+    // reserve() returns (manualFuture, false) — simulating memory blocked
+    Mockito.when(
+            mockMemoryPool.reserve(
+                Mockito.anyString(),
+                Mockito.anyString(),
+                Mockito.anyString(),
+                Mockito.anyLong(),
+                Mockito.anyLong()))
+        .thenReturn(new Pair<>(manualFuture, Boolean.FALSE));
+    // tryCancel returns 0 — simulating future already completed (can't cancel)
+    Mockito.when(mockMemoryPool.tryCancel(Mockito.any())).thenReturn(0L);
+
+    // Use a direct executor so that when we complete manualFuture, the 
listener
+    // runs immediately.
+    SharedTsBlockQueue queue =
+        new SharedTsBlockQueue(
+            fragmentInstanceId, planNodeId, mockLocalMemoryManager, 
newDirectExecutorService());
+    queue.getCanAddTsBlock().set(null);
+    queue.setMaxBytesCanReserve(Long.MAX_VALUE);
+
+    TsBlock mockTsBlock = Utils.createMockTsBlock(mockTsBlockSize);
+
+    // Step 1: add() goes into async path — listener is registered on 
manualFuture.
+    // reserve() returns (manualFuture, false), so the TsBlock is NOT yet 
added to
+    // the queue.
+    ListenableFuture<Void> addFuture;
+    synchronized (queue) {
+      addFuture = queue.add(mockTsBlock);
+    }
+    // The addFuture (channelBlocked) should not be done yet
+    Assert.assertFalse(addFuture.isDone());
+    // Queue should be empty — TsBlock is waiting for memory
+    Assert.assertTrue(queue.isEmpty());
+
+    // Step 2: Abort the queue (simulates upstream FI state change listener 
calling
+    // abort)
+    synchronized (queue) {
+      queue.abort();
+    }
+    Assert.assertTrue(queue.isClosed());
+
+    // Step 3: Now complete the manualFuture — this triggers the async 
listener.
+    // Before the fix, this would add the TsBlock to the closed queue.
+    // After the fix, the listener detects closed==true and returns without 
adding.
+    manualFuture.set(null);
+
+    // Verify: queue should still be empty (TsBlock was NOT added to the closed
+    // queue)
+    Assert.assertTrue(queue.isEmpty());
+    // The channelBlocked future should be completed (no hang)
+    Assert.assertTrue(addFuture.isDone());
+  }
+
   @Test(timeout = 15000L)
   public void concurrencyTest() {
     final String queryId = "q0";
     final long mockTsBlockSize = 1024L * 1024L;
 
-    // Construct a mock LocalMemoryManager with capacity 5 * mockTsBlockSize 
per query.
+    // Construct a mock LocalMemoryManager with capacity 5 * mockTsBlockSize 
per
+    // query.
     LocalMemoryManager mockLocalMemoryManager = 
Mockito.mock(LocalMemoryManager.class);
     MemoryManager memoryManager = Mockito.spy(new MemoryManager(10 * 
mockTsBlockSize));
     MemoryPool spyMemoryPool =

Reply via email to