This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new e020df2aff6 [To dev/1.3] fix: memory to be released is larger than the 
memory of memory block in TVList owner transfer case #16943  Open
e020df2aff6 is described below

commit e020df2aff657a41281797795e03e270416dab58
Author: shizy <[email protected]>
AuthorDate: Tue Dec 23 16:04:04 2025 +0800

    [To dev/1.3] fix: memory to be released is larger than the memory of memory 
block in TVList owner transfer case #16943  Open
---
 .../fragment/FragmentInstanceContext.java          |  32 +++--
 .../plan/planner/LocalExecutionPlanner.java        |   8 ++
 .../memory/FakedMemoryReservationManager.java      |  11 ++
 .../planner/memory/MemoryReservationManager.java   |  29 ++++
 .../NotThreadSafeMemoryReservationManager.java     |  23 ++++
 .../memory/ThreadSafeMemoryReservationManager.java |  13 ++
 .../fragment/FragmentInstanceExecutionTest.java    | 149 +++++++++++++++++----
 7 files changed, 230 insertions(+), 35 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
index 2ee3f5dc1e7..1a9a80300f8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
@@ -57,6 +57,7 @@ import 
org.apache.iotdb.mpp.rpc.thrift.TFetchFragmentInstanceStatisticsResp;
 import org.apache.tsfile.file.metadata.IDeviceID;
 import org.apache.tsfile.read.common.TimeRange;
 import org.apache.tsfile.read.filter.basic.Filter;
+import org.apache.tsfile.utils.Pair;
 import org.apache.tsfile.utils.RamUsageEstimator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -873,20 +874,33 @@ public class FragmentInstanceContext extends QueryContext 
{
       try {
         queryContextSet.remove(this);
         if (tvList.getOwnerQuery() == this) {
-          if (queryContextSet.isEmpty()) {
-            LOGGER.debug(
-                "TVList {} is released by the query, FragmentInstance Id is 
{}",
-                tvList,
-                this.getId());
+          if (LOGGER.isDebugEnabled()) {
+            if (queryContextSet.isEmpty()) {
+              LOGGER.debug(
+                  "TVList {} is released by the query, FragmentInstance Id is 
{}",
+                  tvList,
+                  this.getId());
+            }
             
memoryReservationManager.releaseMemoryCumulatively(tvList.calculateRamSize());
             tvList.clear();
           } else {
+            // Transfer memory to next query. It must be exception-safe as 
this method is called
+            // during FragmentInstanceExecution cleanup. Any exception during 
this process could
+            // prevent proper resource cleanup and cause memory leaks.
+            Pair<Long, Long> releasedBytes =
+                
memoryReservationManager.releaseMemoryVirtually(tvList.calculateRamSize());
             FragmentInstanceContext queryContext =
                 (FragmentInstanceContext) queryContextSet.iterator().next();
-            LOGGER.debug(
-                "TVList {} is now owned by another query, FragmentInstance Id 
is {}",
-                tvList,
-                queryContext.getId());
+            queryContext
+                .getMemoryReservationContext()
+                .reserveMemoryVirtually(releasedBytes.left, 
releasedBytes.right);
+
+            if (LOGGER.isDebugEnabled()) {
+              LOGGER.debug(
+                  "TVList {} is now owned by another query, FragmentInstance 
Id is {}",
+                  tvList,
+                  queryContext.getId());
+            }
             tvList.setOwnerQuery(queryContext);
           }
         }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java
index d91e68e7413..97445414c65 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java
@@ -244,6 +244,10 @@ public class LocalExecutionPlanner {
       final long reservedBytes,
       final String queryId,
       final String contextHolder) {
+    if (memoryInBytes <= 0) {
+      throw new IllegalArgumentException(
+          "Bytes to reserve from free memory for operators should be larger 
than 0");
+    }
     if (memoryInBytes > freeMemoryForOperators) {
       throw new MemoryNotEnoughException(
           String.format(
@@ -264,6 +268,10 @@ public class LocalExecutionPlanner {
   }
 
   public synchronized void releaseToFreeMemoryForOperators(final long 
memoryInBytes) {
+    if (memoryInBytes <= 0) {
+      throw new IllegalArgumentException(
+          "Bytes to release to free memory for operators should be larger than 
0");
+    }
     freeMemoryForOperators += memoryInBytes;
 
     if (freeMemoryForOperators > ALLOCATE_MEMORY_FOR_OPERATORS) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/FakedMemoryReservationManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/FakedMemoryReservationManager.java
index 265ca47ca23..35ded4d6252 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/FakedMemoryReservationManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/FakedMemoryReservationManager.java
@@ -19,6 +19,8 @@
 
 package org.apache.iotdb.db.queryengine.plan.planner.memory;
 
+import org.apache.tsfile.utils.Pair;
+
 public class FakedMemoryReservationManager implements MemoryReservationManager 
{
 
   @Override
@@ -32,4 +34,13 @@ public class FakedMemoryReservationManager implements 
MemoryReservationManager {
 
   @Override
   public void releaseAllReservedMemory() {}
+
+  @Override
+  public Pair<Long, Long> releaseMemoryVirtually(final long size) {
+    return new Pair<>(0L, 0L);
+  }
+
+  @Override
+  public void reserveMemoryVirtually(
+      final long bytesToBeReserved, final long bytesAlreadyReserved) {}
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/MemoryReservationManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/MemoryReservationManager.java
index ba55d60b096..62393673120 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/MemoryReservationManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/MemoryReservationManager.java
@@ -19,6 +19,8 @@
 
 package org.apache.iotdb.db.queryengine.plan.planner.memory;
 
+import org.apache.tsfile.utils.Pair;
+
 public interface MemoryReservationManager {
   /**
    * Reserve memory for the given size. The memory reservation request will be 
accumulated and the
@@ -43,4 +45,31 @@ public interface MemoryReservationManager {
    * this manager ends, Or the memory to be released in the batch may not be 
released correctly.
    */
   void releaseAllReservedMemory();
+
+  /**
+   * Release memory virtually without actually freeing the memory. This is 
used for memory
+   * reservation transfer scenarios where memory ownership needs to be 
transferred between different
+   * FragmentInstances without actual memory deallocation.
+   *
+   * <p>NOTE: When calling this method, it should be guaranteed that 
bytesToBeReserved +
+   * reservedBytesInTotal >= size to ensure proper memory accounting and 
prevent negative
+   * reservation values.
+   *
+   * @param size the size of memory to release virtually
+   * @return a Pair where the left element is the amount of memory released 
from the pending
+   *     reservation queue (bytesToBeReserved), and the right element is the 
amount of memory that
+   *     has already been reserved
+   */
+  Pair<Long, Long> releaseMemoryVirtually(final long size);
+
+  /**
+   * Reserve memory virtually without actually allocating new memory. This is 
used to transfer
+   * memory ownership from one FragmentInstances to another by reserving the 
memory that was
+   * previously released virtually. It updates the internal reservation state 
without changing the
+   * actual memory allocation.
+   *
+   * @param bytesToBeReserved the amount of memory that needs to be reserved 
cumulatively.
+   * @param bytesAlreadyReserved the amount of memory that has already been 
reserved
+   */
+  void reserveMemoryVirtually(final long bytesToBeReserved, final long 
bytesAlreadyReserved);
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/NotThreadSafeMemoryReservationManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/NotThreadSafeMemoryReservationManager.java
index 7d8d4b076c2..4fa97f368ad 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/NotThreadSafeMemoryReservationManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/NotThreadSafeMemoryReservationManager.java
@@ -22,6 +22,8 @@ package org.apache.iotdb.db.queryengine.plan.planner.memory;
 import org.apache.iotdb.db.queryengine.common.QueryId;
 import org.apache.iotdb.db.queryengine.plan.planner.LocalExecutionPlanner;
 
+import org.apache.tsfile.utils.Pair;
+
 import javax.annotation.concurrent.NotThreadSafe;
 
 @NotThreadSafe
@@ -91,4 +93,25 @@ public class NotThreadSafeMemoryReservationManager 
implements MemoryReservationM
       bytesToBeReleased = 0;
     }
   }
+
+  @Override
+  public Pair<Long, Long> releaseMemoryVirtually(final long size) {
+    if (bytesToBeReserved >= size) {
+      bytesToBeReserved -= size;
+      return new Pair<>(size, 0L);
+    } else {
+      long releasedBytesInReserved = bytesToBeReserved;
+      long releasedBytesInTotal = size - bytesToBeReserved;
+      bytesToBeReserved = 0;
+      reservedBytesInTotal -= releasedBytesInTotal;
+      return new Pair<>(releasedBytesInReserved, releasedBytesInTotal);
+    }
+  }
+
+  @Override
+  public void reserveMemoryVirtually(
+      final long bytesToBeReserved, final long bytesAlreadyReserved) {
+    reservedBytesInTotal += bytesAlreadyReserved;
+    reserveMemoryCumulatively(bytesToBeReserved);
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/ThreadSafeMemoryReservationManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/ThreadSafeMemoryReservationManager.java
index efe83d23c05..190c4fedd5c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/ThreadSafeMemoryReservationManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/ThreadSafeMemoryReservationManager.java
@@ -21,6 +21,8 @@ package org.apache.iotdb.db.queryengine.plan.planner.memory;
 
 import org.apache.iotdb.db.queryengine.common.QueryId;
 
+import org.apache.tsfile.utils.Pair;
+
 import javax.annotation.concurrent.ThreadSafe;
 
 @ThreadSafe
@@ -48,4 +50,15 @@ public class ThreadSafeMemoryReservationManager extends 
NotThreadSafeMemoryReser
   public synchronized void releaseAllReservedMemory() {
     super.releaseAllReservedMemory();
   }
+
+  @Override
+  public Pair<Long, Long> releaseMemoryVirtually(final long size) {
+    return super.releaseMemoryVirtually(size);
+  }
+
+  @Override
+  public void reserveMemoryVirtually(
+      final long bytesToBeReserved, final long bytesAlreadyReserved) {
+    super.reserveMemoryVirtually(bytesToBeReserved, bytesAlreadyReserved);
+  }
 }
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecutionTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecutionTest.java
index 87e63ae0691..fb659c6914a 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecutionTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecutionTest.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.queryengine.execution.fragment;
 
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.queryengine.common.FragmentInstanceId;
 import org.apache.iotdb.db.queryengine.common.PlanFragmentId;
 import org.apache.iotdb.db.queryengine.exception.CpuNotEnoughException;
@@ -29,17 +30,26 @@ import 
org.apache.iotdb.db.queryengine.execution.exchange.MPPDataExchangeManager
 import org.apache.iotdb.db.queryengine.execution.exchange.sink.ISink;
 import org.apache.iotdb.db.queryengine.execution.schedule.IDriverScheduler;
 import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
+import org.apache.iotdb.db.utils.datastructure.AlignedTVList;
+import org.apache.iotdb.db.utils.datastructure.TVList;
 
+import com.google.common.collect.ImmutableMap;
+import org.apache.tsfile.enums.TSDataType;
 import org.junit.Test;
 import org.mockito.Mockito;
 
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 
 import static org.apache.iotdb.db.queryengine.common.QueryId.MOCK_QUERY_ID;
 import static 
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -50,32 +60,11 @@ public class FragmentInstanceExecutionTest {
     ExecutorService instanceNotificationExecutor =
         IoTDBThreadPoolFactory.newFixedThreadPool(1, 
"test-instance-notification");
     try {
-      IDriverScheduler scheduler = Mockito.mock(IDriverScheduler.class);
-      FragmentInstanceId instanceId =
-          new FragmentInstanceId(new PlanFragmentId(MOCK_QUERY_ID, 0), "0");
-      FragmentInstanceStateMachine stateMachine =
-          new FragmentInstanceStateMachine(instanceId, 
instanceNotificationExecutor);
-      DataRegion dataRegion = Mockito.mock(DataRegion.class);
-      FragmentInstanceContext fragmentInstanceContext =
-          createFragmentInstanceContext(instanceId, stateMachine);
-      fragmentInstanceContext.initializeNumOfDrivers(1);
-      fragmentInstanceContext.setMayHaveTmpFile(true);
-      fragmentInstanceContext.setDataRegion(dataRegion);
-      List<IDriver> drivers = Collections.emptyList();
-      ISink sinkHandle = Mockito.mock(ISink.class);
-      long timeOut = -1;
-      MPPDataExchangeManager exchangeManager = 
Mockito.mock(MPPDataExchangeManager.class);
       FragmentInstanceExecution execution =
-          FragmentInstanceExecution.createFragmentInstanceExecution(
-              scheduler,
-              instanceId,
-              fragmentInstanceContext,
-              drivers,
-              sinkHandle,
-              stateMachine,
-              timeOut,
-              false,
-              exchangeManager);
+          createFragmentInstanceExecution(0, instanceNotificationExecutor);
+      FragmentInstanceContext fragmentInstanceContext = 
execution.getFragmentInstanceContext();
+      FragmentInstanceStateMachine stateMachine = execution.getStateMachine();
+
       assertEquals(FragmentInstanceState.RUNNING, 
execution.getInstanceState());
       FragmentInstanceInfo instanceInfo = execution.getInstanceInfo();
       assertEquals(FragmentInstanceState.RUNNING, instanceInfo.getState());
@@ -84,7 +73,7 @@ public class FragmentInstanceExecutionTest {
       assertEquals(fragmentInstanceContext.getFailureInfoList(), 
instanceInfo.getFailureInfoList());
 
       assertEquals(fragmentInstanceContext.getStartTime(), 
execution.getStartTime());
-      assertEquals(timeOut, execution.getTimeoutInMs());
+      assertEquals(-1, execution.getTimeoutInMs());
       assertEquals(stateMachine, execution.getStateMachine());
 
       fragmentInstanceContext.decrementNumOfUnClosedDriver();
@@ -107,4 +96,112 @@ public class FragmentInstanceExecutionTest {
       instanceNotificationExecutor.shutdown();
     }
   }
+
+  @Test
+  public void testTVListOwnerTransfer() throws InterruptedException {
+    // Capture System.err to check for warning messages
+    PrintStream systemOut = System.out;
+    ByteArrayOutputStream logPrint = new ByteArrayOutputStream();
+    System.setOut(new PrintStream(logPrint));
+
+    try {
+      IoTDBDescriptor.getInstance().getConfig().setDataNodeId(1);
+
+      ExecutorService instanceNotificationExecutor =
+          IoTDBThreadPoolFactory.newFixedThreadPool(1, 
"test-instance-notification");
+      try {
+        // TVList
+        TVList tvList = buildTVList();
+
+        // FragmentInstance Context & Execution
+        FragmentInstanceExecution execution1 =
+            createFragmentInstanceExecution(1, instanceNotificationExecutor);
+        FragmentInstanceContext fragmentInstanceContext1 = 
execution1.getFragmentInstanceContext();
+        fragmentInstanceContext1.addTVListToSet(ImmutableMap.of(tvList, 0));
+        tvList.getQueryContextSet().add(fragmentInstanceContext1);
+
+        FragmentInstanceExecution execution2 =
+            createFragmentInstanceExecution(2, instanceNotificationExecutor);
+        FragmentInstanceContext fragmentInstanceContext2 = 
execution2.getFragmentInstanceContext();
+        fragmentInstanceContext2.addTVListToSet(ImmutableMap.of(tvList, 0));
+        tvList.getQueryContextSet().add(fragmentInstanceContext2);
+
+        // mock flush's behavior
+        fragmentInstanceContext1
+            .getMemoryReservationContext()
+            .reserveMemoryCumulatively(tvList.calculateRamSize());
+        tvList.setOwnerQuery(fragmentInstanceContext1);
+
+        fragmentInstanceContext1.decrementNumOfUnClosedDriver();
+        fragmentInstanceContext2.decrementNumOfUnClosedDriver();
+
+        fragmentInstanceContext1.getStateMachine().finished();
+        Thread.sleep(100);
+        fragmentInstanceContext2.getStateMachine().finished();
+
+        assertTrue(execution1.getInstanceState().isDone());
+        assertTrue(execution2.getInstanceState().isDone());
+        Thread.sleep(100);
+      } catch (CpuNotEnoughException | MemoryNotEnoughException | 
IllegalArgumentException e) {
+        fail(e.getMessage());
+      } finally {
+        instanceNotificationExecutor.shutdown();
+      }
+    } finally {
+      // Restore original System.out
+      System.setErr(systemOut);
+
+      // should not contain warn message: "The memory cost to be released is 
larger than the memory
+      // cost of memory block"
+      String capturedOutput = logPrint.toString();
+      assertFalse(
+          "Should not contain error message",
+          capturedOutput.contains("is more than allocated memory"));
+    }
+  }
+
+  private FragmentInstanceExecution createFragmentInstanceExecution(int id, 
Executor executor)
+      throws CpuNotEnoughException {
+    IDriverScheduler scheduler = Mockito.mock(IDriverScheduler.class);
+    FragmentInstanceId instanceId =
+        new FragmentInstanceId(new PlanFragmentId(MOCK_QUERY_ID, id), 
String.valueOf(id));
+    FragmentInstanceStateMachine stateMachine =
+        new FragmentInstanceStateMachine(instanceId, executor);
+    DataRegion dataRegion = Mockito.mock(DataRegion.class);
+    FragmentInstanceContext fragmentInstanceContext =
+        createFragmentInstanceContext(instanceId, stateMachine);
+    fragmentInstanceContext.initializeNumOfDrivers(1);
+    fragmentInstanceContext.setMayHaveTmpFile(true);
+    fragmentInstanceContext.setDataRegion(dataRegion);
+    List<IDriver> drivers = Collections.emptyList();
+    ISink sinkHandle = Mockito.mock(ISink.class);
+    long timeOut = -1;
+    MPPDataExchangeManager exchangeManager = 
Mockito.mock(MPPDataExchangeManager.class);
+    return FragmentInstanceExecution.createFragmentInstanceExecution(
+        scheduler,
+        instanceId,
+        fragmentInstanceContext,
+        drivers,
+        sinkHandle,
+        stateMachine,
+        timeOut,
+        false,
+        exchangeManager);
+  }
+
+  private TVList buildTVList() {
+    int columns = 200;
+    int rows = 1000;
+    List<TSDataType> dataTypes = new ArrayList<>();
+    Object[] values = new Object[columns];
+    for (int i = 0; i < columns; i++) {
+      dataTypes.add(TSDataType.INT64);
+      values[i] = 1L;
+    }
+    AlignedTVList tvList = AlignedTVList.newAlignedList(dataTypes);
+    for (long t = 1; t < rows; t++) {
+      tvList.putAlignedValue(t, values);
+    }
+    return tvList;
+  }
 }

Reply via email to