This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new 2fe40b541b8 fix: reserve memory for sorting indices during query 
execution (#16959)
2fe40b541b8 is described below

commit 2fe40b541b846c7f83b5a1a32a4f1a1034c04fe7
Author: shizy <[email protected]>
AuthorDate: Tue Dec 30 07:20:30 2025 +0800

    fix: reserve memory for sorting indices during query execution (#16959)
---
 .../fragment/FragmentInstanceContext.java          | 11 ++-
 .../schemaregion/utils/ResourceByPathUtils.java    |  4 +-
 .../memtable/AbstractWritableMemChunk.java         |  4 +-
 .../memtable/AlignedReadOnlyMemChunk.java          | 37 ++++++++-
 .../dataregion/memtable/ReadOnlyMemChunk.java      | 29 +++++++
 .../db/utils/datastructure/AlignedTVList.java      |  2 +-
 .../iotdb/db/utils/datastructure/TVList.java       | 21 ++++-
 .../fragment/FragmentInstanceExecutionTest.java    | 97 ++++++++++++++++++++++
 8 files changed, 195 insertions(+), 10 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
index 62f82a1f5b4..14995b887bf 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
@@ -869,11 +869,18 @@ public class FragmentInstanceContext extends QueryContext 
{
    */
   private void releaseTVListOwnedByQuery() {
     for (TVList tvList : tvListSet) {
+      long tvListRamSize = tvList.calculateRamSize();
       tvList.lockQueryList();
       Set<QueryContext> queryContextSet = tvList.getQueryContextSet();
       try {
         queryContextSet.remove(this);
         if (tvList.getOwnerQuery() == this) {
+          if (tvList.getReservedMemoryBytes() != tvListRamSize) {
+            LOGGER.warn(
+                "Release TVList owned by query: allocate size {}, release size 
{}",
+                tvList.getReservedMemoryBytes(),
+                tvListRamSize);
+          }
           if (queryContextSet.isEmpty()) {
             if (LOGGER.isDebugEnabled()) {
               LOGGER.debug(
@@ -881,14 +888,14 @@ public class FragmentInstanceContext extends QueryContext 
{
                   tvList,
                   this.getId());
             }
-            
memoryReservationManager.releaseMemoryCumulatively(tvList.calculateRamSize());
+            
memoryReservationManager.releaseMemoryCumulatively(tvList.getReservedMemoryBytes());
             tvList.clear();
           } else {
             // Transfer memory to next query. It must be exception-safe as 
this method is called
             // during FragmentInstanceExecution cleanup. Any exception during 
this process could
             // prevent proper resource cleanup and cause memory leaks.
             Pair<Long, Long> releasedBytes =
-                
memoryReservationManager.releaseMemoryVirtually(tvList.calculateRamSize());
+                
memoryReservationManager.releaseMemoryVirtually(tvList.getReservedMemoryBytes());
             FragmentInstanceContext queryContext =
                 (FragmentInstanceContext) queryContextSet.iterator().next();
             queryContext
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/utils/ResourceByPathUtils.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/utils/ResourceByPathUtils.java
index a66f19108ea..b00c8737bd9 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/utils/ResourceByPathUtils.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/utils/ResourceByPathUtils.java
@@ -148,6 +148,7 @@ public abstract class ResourceByPathUtils {
     // mutable tvlist
     TVList list = memChunk.getWorkingTVList();
     TVList cloneList = null;
+    long tvListRamSize = list.calculateRamSize();
     list.lockQueryList();
     try {
       if (copyTimeFilter != null
@@ -188,7 +189,8 @@ public abstract class ResourceByPathUtils {
           if (firstQuery instanceof FragmentInstanceContext) {
             MemoryReservationManager memoryReservationManager =
                 ((FragmentInstanceContext) 
firstQuery).getMemoryReservationContext();
-            
memoryReservationManager.reserveMemoryCumulatively(list.calculateRamSize());
+            memoryReservationManager.reserveMemoryCumulatively(tvListRamSize);
+            list.setReservedMemoryBytes(tvListRamSize);
           }
           list.setOwnerQuery(firstQuery);
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractWritableMemChunk.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractWritableMemChunk.java
index 58aabe4a5ac..9db0196a3a1 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractWritableMemChunk.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractWritableMemChunk.java
@@ -99,6 +99,7 @@ public abstract class AbstractWritableMemChunk implements 
IWritableMemChunk {
   }
 
   private void tryReleaseTvList(TVList tvList) {
+    long tvListRamSize = tvList.calculateRamSize();
     tvList.lockQueryList();
     try {
       if (tvList.getQueryContextSet().isEmpty()) {
@@ -110,7 +111,8 @@ public abstract class AbstractWritableMemChunk implements 
IWritableMemChunk {
         if (firstQuery instanceof FragmentInstanceContext) {
           MemoryReservationManager memoryReservationManager =
               ((FragmentInstanceContext) 
firstQuery).getMemoryReservationContext();
-          
memoryReservationManager.reserveMemoryCumulatively(tvList.calculateRamSize());
+          memoryReservationManager.reserveMemoryCumulatively(tvListRamSize);
+          tvList.setReservedMemoryBytes(tvListRamSize);
         }
         // update current TVList owner to first query in the list
         tvList.setOwnerQuery(firstQuery);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedReadOnlyMemChunk.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedReadOnlyMemChunk.java
index 6d13b49110e..d00424856ba 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedReadOnlyMemChunk.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedReadOnlyMemChunk.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.storageengine.dataregion.memtable;
 
+import 
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext;
 import org.apache.iotdb.db.queryengine.execution.fragment.QueryContext;
 import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering;
 import 
org.apache.iotdb.db.storageengine.dataregion.read.reader.chunk.MemAlignedChunkLoader;
@@ -113,6 +114,21 @@ public class AlignedReadOnlyMemChunk extends 
ReadOnlyMemChunk {
       int queryRowCount = entry.getValue();
       if (!alignedTvList.isSorted() && queryRowCount > 
alignedTvList.seqRowCount()) {
         alignedTvList.sort();
+        long alignedTvListRamSize = alignedTvList.calculateRamSize();
+        alignedTvList.lockQueryList();
+        try {
+          FragmentInstanceContext ownerQuery =
+              (FragmentInstanceContext) alignedTvList.getOwnerQuery();
+          if (ownerQuery != null) {
+            long deltaBytes = alignedTvListRamSize - 
alignedTvList.getReservedMemoryBytes();
+            if (deltaBytes > 0) {
+              
ownerQuery.getMemoryReservationContext().reserveMemoryCumulatively(deltaBytes);
+              alignedTvList.addReservedMemoryBytes(deltaBytes);
+            }
+          }
+        } finally {
+          alignedTvList.unlockQueryList();
+        }
       }
     }
   }
@@ -339,10 +355,25 @@ public class AlignedReadOnlyMemChunk extends 
ReadOnlyMemChunk {
   @Override
   public IPointReader getPointReader() {
     for (Map.Entry<TVList, Integer> entry : alignedTvListQueryMap.entrySet()) {
-      AlignedTVList tvList = (AlignedTVList) entry.getKey();
+      AlignedTVList alignedTvList = (AlignedTVList) entry.getKey();
       int queryLength = entry.getValue();
-      if (!tvList.isSorted() && queryLength > tvList.seqRowCount()) {
-        tvList.sort();
+      if (!alignedTvList.isSorted() && queryLength > 
alignedTvList.seqRowCount()) {
+        alignedTvList.sort();
+        long alignedTvListRamSize = alignedTvList.calculateRamSize();
+        alignedTvList.lockQueryList();
+        try {
+          FragmentInstanceContext ownerQuery =
+              (FragmentInstanceContext) alignedTvList.getOwnerQuery();
+          if (ownerQuery != null) {
+            long deltaBytes = alignedTvListRamSize - 
alignedTvList.getReservedMemoryBytes();
+            if (deltaBytes > 0) {
+              
ownerQuery.getMemoryReservationContext().reserveMemoryCumulatively(deltaBytes);
+              alignedTvList.addReservedMemoryBytes(deltaBytes);
+            }
+          }
+        } finally {
+          alignedTvList.unlockQueryList();
+        }
       }
     }
     TsBlock tsBlock = buildTsBlock();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/ReadOnlyMemChunk.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/ReadOnlyMemChunk.java
index 64a8868c8c7..3438759cfa9 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/ReadOnlyMemChunk.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/ReadOnlyMemChunk.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.storageengine.dataregion.memtable;
 
 import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
+import 
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext;
 import org.apache.iotdb.db.queryengine.execution.fragment.QueryContext;
 import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering;
 import 
org.apache.iotdb.db.storageengine.dataregion.read.reader.chunk.MemChunkLoader;
@@ -135,6 +136,20 @@ public class ReadOnlyMemChunk {
       int queryRowCount = entry.getValue();
       if (!tvList.isSorted() && queryRowCount > tvList.seqRowCount()) {
         tvList.sort();
+        long tvListRamSize = tvList.calculateRamSize();
+        tvList.lockQueryList();
+        try {
+          FragmentInstanceContext ownerQuery = (FragmentInstanceContext) 
tvList.getOwnerQuery();
+          if (ownerQuery != null) {
+            long deltaBytes = tvListRamSize - tvList.getReservedMemoryBytes();
+            if (deltaBytes > 0) {
+              
ownerQuery.getMemoryReservationContext().reserveMemoryCumulatively(deltaBytes);
+              tvList.addReservedMemoryBytes(deltaBytes);
+            }
+          }
+        } finally {
+          tvList.unlockQueryList();
+        }
       }
     }
   }
@@ -273,6 +288,20 @@ public class ReadOnlyMemChunk {
       int queryLength = entry.getValue();
       if (!tvList.isSorted() && queryLength > tvList.seqRowCount()) {
         tvList.sort();
+        long tvListRamSize = tvList.calculateRamSize();
+        tvList.lockQueryList();
+        try {
+          FragmentInstanceContext ownerQuery = (FragmentInstanceContext) 
tvList.getOwnerQuery();
+          if (ownerQuery != null) {
+            long deltaBytes = tvListRamSize - tvList.getReservedMemoryBytes();
+            if (deltaBytes > 0) {
+              
ownerQuery.getMemoryReservationContext().reserveMemoryCumulatively(deltaBytes);
+              tvList.addReservedMemoryBytes(deltaBytes);
+            }
+          }
+        } finally {
+          tvList.unlockQueryList();
+        }
       }
     }
     TsBlock tsBlock = buildTsBlock();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
index e0c08fbdd43..e4230787a76 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
@@ -836,7 +836,7 @@ public abstract class AlignedTVList extends TVList {
   }
 
   @Override
-  public long calculateRamSize() {
+  public synchronized long calculateRamSize() {
     return timestamps.size() * alignedTvListArrayMemCost();
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
index fedc3830ad8..073b03f0a96 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
@@ -75,8 +75,10 @@ public abstract class TVList implements WALEntryValue {
   // Index relation: arrayIndex -> elementIndex
   protected List<BitMap> bitMap;
 
-  // lock to provide synchronization for query list
+  // Guards queryContextSet, ownerQuery, and reservedMemoryBytes.
+  // Always acquire this lock before accessing/modifying these fields.
   private final ReentrantLock queryListLock = new ReentrantLock();
+
   // set of query that this TVList is used
   protected final Set<QueryContext> queryContextSet;
 
@@ -84,6 +86,9 @@ public abstract class TVList implements WALEntryValue {
   // When it is null, the TVList is owned by insert thread and released after 
flush.
   protected QueryContext ownerQuery;
 
+  // Reserved memory by the query. Ensure to acquire queryListLock before 
update.
+  protected long reservedMemoryBytes = 0L;
+
   protected boolean sorted = true;
   protected long maxTime;
   protected long minTime;
@@ -151,7 +156,7 @@ public abstract class TVList implements WALEntryValue {
     return size;
   }
 
-  public long calculateRamSize() {
+  public synchronized long calculateRamSize() {
     return timestamps.size() * tvListArrayMemCost();
   }
 
@@ -159,6 +164,18 @@ public abstract class TVList implements WALEntryValue {
     return sorted;
   }
 
+  public void setReservedMemoryBytes(long bytes) {
+    this.reservedMemoryBytes = bytes;
+  }
+
+  public void addReservedMemoryBytes(long bytes) {
+    this.reservedMemoryBytes += bytes;
+  }
+
+  public long getReservedMemoryBytes() {
+    return reservedMemoryBytes;
+  }
+
   public abstract void sort();
 
   public void increaseReferenceCount() {
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecutionTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecutionTest.java
index e0655edc55a..30b9df14b3a 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecutionTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecutionTest.java
@@ -20,7 +20,12 @@
 package org.apache.iotdb.db.queryengine.execution.fragment;
 
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.exception.MetadataException;
+import org.apache.iotdb.commons.path.MeasurementPath;
+import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.queryengine.common.FragmentInstanceId;
 import org.apache.iotdb.db.queryengine.common.PlanFragmentId;
 import org.apache.iotdb.db.queryengine.exception.CpuNotEnoughException;
@@ -30,15 +35,26 @@ import 
org.apache.iotdb.db.queryengine.execution.exchange.MPPDataExchangeManager
 import org.apache.iotdb.db.queryengine.execution.exchange.sink.ISink;
 import org.apache.iotdb.db.queryengine.execution.schedule.IDriverScheduler;
 import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
+import org.apache.iotdb.db.storageengine.dataregion.memtable.DeviceIDFactory;
+import org.apache.iotdb.db.storageengine.dataregion.memtable.IMemTable;
+import org.apache.iotdb.db.storageengine.dataregion.memtable.IWritableMemChunk;
+import 
org.apache.iotdb.db.storageengine.dataregion.memtable.IWritableMemChunkGroup;
+import org.apache.iotdb.db.storageengine.dataregion.memtable.PrimitiveMemTable;
+import org.apache.iotdb.db.storageengine.dataregion.memtable.ReadOnlyMemChunk;
 import org.apache.iotdb.db.utils.datastructure.AlignedTVList;
 import org.apache.iotdb.db.utils.datastructure.TVList;
 
 import com.google.common.collect.ImmutableMap;
 import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.file.metadata.enums.CompressionType;
+import org.apache.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.tsfile.read.reader.IPointReader;
+import org.apache.tsfile.write.schema.MeasurementSchema;
 import org.junit.Test;
 import org.mockito.Mockito;
 
 import java.io.ByteArrayOutputStream;
+import java.io.IOException;
 import java.io.PrintStream;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -49,6 +65,7 @@ import java.util.concurrent.ExecutorService;
 import static org.apache.iotdb.db.queryengine.common.QueryId.MOCK_QUERY_ID;
 import static 
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -157,6 +174,70 @@ public class FragmentInstanceExecutionTest {
     }
   }
 
+  @Test
+  public void testTVListCloneForQuery() {
+    IoTDBDescriptor.getInstance().getConfig().setDataNodeId(1);
+
+    ExecutorService instanceNotificationExecutor =
+        IoTDBThreadPoolFactory.newFixedThreadPool(1, 
"test-instance-notification");
+
+    try {
+      String deviceId = "d1";
+      String measurementId = "s1";
+      IMemTable memTable = createMemTable(deviceId, measurementId);
+      assertEquals(1, memTable.getMemTableMap().size());
+      IWritableMemChunkGroup memChunkGroup = 
memTable.getMemTableMap().values().iterator().next();
+      assertEquals(1, memChunkGroup.getMemChunkMap().size());
+      IWritableMemChunk memChunk = 
memChunkGroup.getMemChunkMap().values().iterator().next();
+      TVList tvList = memChunk.getWorkingTVList();
+      assertFalse(tvList.isSorted());
+
+      // FragmentInstance Context
+      FragmentInstanceId id1 = new FragmentInstanceId(new 
PlanFragmentId(MOCK_QUERY_ID, 1), "1");
+      FragmentInstanceStateMachine stateMachine1 =
+          new FragmentInstanceStateMachine(id1, instanceNotificationExecutor);
+      FragmentInstanceContext fragmentInstanceContext1 =
+          createFragmentInstanceContext(id1, stateMachine1);
+
+      FragmentInstanceId id2 = new FragmentInstanceId(new 
PlanFragmentId(MOCK_QUERY_ID, 2), "2");
+      FragmentInstanceStateMachine stateMachine2 =
+          new FragmentInstanceStateMachine(id2, instanceNotificationExecutor);
+      FragmentInstanceContext fragmentInstanceContext2 =
+          createFragmentInstanceContext(id2, stateMachine2);
+
+      // query on memtable
+      MeasurementPath fullPath =
+          new MeasurementPath(
+              deviceId,
+              measurementId,
+              new MeasurementSchema(
+                  measurementId,
+                  TSDataType.INT32,
+                  TSEncoding.RLE,
+                  CompressionType.UNCOMPRESSED,
+                  Collections.emptyMap()));
+      ReadOnlyMemChunk readOnlyMemChunk1 =
+          memTable.query(fragmentInstanceContext1, fullPath, Long.MIN_VALUE, 
null, null);
+      ReadOnlyMemChunk readOnlyMemChunk2 =
+          memTable.query(fragmentInstanceContext2, fullPath, Long.MIN_VALUE, 
null, null);
+
+      IPointReader pointReader = readOnlyMemChunk1.getPointReader();
+      while (pointReader.hasNextTimeValuePair()) {
+        pointReader.nextTimeValuePair();
+      }
+      assertTrue(tvList.isSorted());
+      assertEquals(tvList.calculateRamSize(), tvList.getReservedMemoryBytes());
+    } catch (QueryProcessException
+        | IOException
+        | MetadataException
+        | MemoryNotEnoughException
+        | IllegalArgumentException e) {
+      fail(e.getMessage());
+    } finally {
+      instanceNotificationExecutor.shutdown();
+    }
+  }
+
   private FragmentInstanceExecution createFragmentInstanceExecution(int id, 
Executor executor)
       throws CpuNotEnoughException {
     IDriverScheduler scheduler = Mockito.mock(IDriverScheduler.class);
@@ -201,4 +282,20 @@ public class FragmentInstanceExecutionTest {
     }
     return tvList;
   }
+
+  private IMemTable createMemTable(String deviceId, String measurementId)
+      throws IllegalPathException {
+    IMemTable memTable = new PrimitiveMemTable("root.test", "1");
+
+    int rows = 100;
+    for (int i = 0; i < 100; i++) {
+      memTable.write(
+          DeviceIDFactory.getInstance().getDeviceID(new PartialPath(deviceId)),
+          Collections.singletonList(
+              new MeasurementSchema(measurementId, TSDataType.INT32, 
TSEncoding.PLAIN)),
+          rows - i - 1,
+          new Object[] {i + 10});
+    }
+    return memTable;
+  }
 }

Reply via email to