This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 2fe40b541b8 fix: reserve memory for sorting indices during query
execution (#16959)
2fe40b541b8 is described below
commit 2fe40b541b846c7f83b5a1a32a4f1a1034c04fe7
Author: shizy <[email protected]>
AuthorDate: Tue Dec 30 07:20:30 2025 +0800
fix: reserve memory for sorting indices during query execution (#16959)
---
.../fragment/FragmentInstanceContext.java | 11 ++-
.../schemaregion/utils/ResourceByPathUtils.java | 4 +-
.../memtable/AbstractWritableMemChunk.java | 4 +-
.../memtable/AlignedReadOnlyMemChunk.java | 37 ++++++++-
.../dataregion/memtable/ReadOnlyMemChunk.java | 29 +++++++
.../db/utils/datastructure/AlignedTVList.java | 2 +-
.../iotdb/db/utils/datastructure/TVList.java | 21 ++++-
.../fragment/FragmentInstanceExecutionTest.java | 97 ++++++++++++++++++++++
8 files changed, 195 insertions(+), 10 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
index 62f82a1f5b4..14995b887bf 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
@@ -869,11 +869,18 @@ public class FragmentInstanceContext extends QueryContext
{
*/
private void releaseTVListOwnedByQuery() {
for (TVList tvList : tvListSet) {
+ long tvListRamSize = tvList.calculateRamSize();
tvList.lockQueryList();
Set<QueryContext> queryContextSet = tvList.getQueryContextSet();
try {
queryContextSet.remove(this);
if (tvList.getOwnerQuery() == this) {
+ if (tvList.getReservedMemoryBytes() != tvListRamSize) {
+ LOGGER.warn(
+ "Release TVList owned by query: allocate size {}, release size
{}",
+ tvList.getReservedMemoryBytes(),
+ tvListRamSize);
+ }
if (queryContextSet.isEmpty()) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(
@@ -881,14 +888,14 @@ public class FragmentInstanceContext extends QueryContext
{
tvList,
this.getId());
}
-
memoryReservationManager.releaseMemoryCumulatively(tvList.calculateRamSize());
+
memoryReservationManager.releaseMemoryCumulatively(tvList.getReservedMemoryBytes());
tvList.clear();
} else {
// Transfer memory to next query. It must be exception-safe as
this method is called
// during FragmentInstanceExecution cleanup. Any exception during
this process could
// prevent proper resource cleanup and cause memory leaks.
Pair<Long, Long> releasedBytes =
-
memoryReservationManager.releaseMemoryVirtually(tvList.calculateRamSize());
+
memoryReservationManager.releaseMemoryVirtually(tvList.getReservedMemoryBytes());
FragmentInstanceContext queryContext =
(FragmentInstanceContext) queryContextSet.iterator().next();
queryContext
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/utils/ResourceByPathUtils.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/utils/ResourceByPathUtils.java
index a66f19108ea..b00c8737bd9 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/utils/ResourceByPathUtils.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/utils/ResourceByPathUtils.java
@@ -148,6 +148,7 @@ public abstract class ResourceByPathUtils {
// mutable tvlist
TVList list = memChunk.getWorkingTVList();
TVList cloneList = null;
+ long tvListRamSize = list.calculateRamSize();
list.lockQueryList();
try {
if (copyTimeFilter != null
@@ -188,7 +189,8 @@ public abstract class ResourceByPathUtils {
if (firstQuery instanceof FragmentInstanceContext) {
MemoryReservationManager memoryReservationManager =
((FragmentInstanceContext)
firstQuery).getMemoryReservationContext();
-
memoryReservationManager.reserveMemoryCumulatively(list.calculateRamSize());
+ memoryReservationManager.reserveMemoryCumulatively(tvListRamSize);
+ list.setReservedMemoryBytes(tvListRamSize);
}
list.setOwnerQuery(firstQuery);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractWritableMemChunk.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractWritableMemChunk.java
index 58aabe4a5ac..9db0196a3a1 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractWritableMemChunk.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractWritableMemChunk.java
@@ -99,6 +99,7 @@ public abstract class AbstractWritableMemChunk implements
IWritableMemChunk {
}
private void tryReleaseTvList(TVList tvList) {
+ long tvListRamSize = tvList.calculateRamSize();
tvList.lockQueryList();
try {
if (tvList.getQueryContextSet().isEmpty()) {
@@ -110,7 +111,8 @@ public abstract class AbstractWritableMemChunk implements
IWritableMemChunk {
if (firstQuery instanceof FragmentInstanceContext) {
MemoryReservationManager memoryReservationManager =
((FragmentInstanceContext)
firstQuery).getMemoryReservationContext();
-
memoryReservationManager.reserveMemoryCumulatively(tvList.calculateRamSize());
+ memoryReservationManager.reserveMemoryCumulatively(tvListRamSize);
+ tvList.setReservedMemoryBytes(tvListRamSize);
}
// update current TVList owner to first query in the list
tvList.setOwnerQuery(firstQuery);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedReadOnlyMemChunk.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedReadOnlyMemChunk.java
index 6d13b49110e..d00424856ba 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedReadOnlyMemChunk.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedReadOnlyMemChunk.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.storageengine.dataregion.memtable;
+import
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext;
import org.apache.iotdb.db.queryengine.execution.fragment.QueryContext;
import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering;
import
org.apache.iotdb.db.storageengine.dataregion.read.reader.chunk.MemAlignedChunkLoader;
@@ -113,6 +114,21 @@ public class AlignedReadOnlyMemChunk extends
ReadOnlyMemChunk {
int queryRowCount = entry.getValue();
if (!alignedTvList.isSorted() && queryRowCount >
alignedTvList.seqRowCount()) {
alignedTvList.sort();
+ long alignedTvListRamSize = alignedTvList.calculateRamSize();
+ alignedTvList.lockQueryList();
+ try {
+ FragmentInstanceContext ownerQuery =
+ (FragmentInstanceContext) alignedTvList.getOwnerQuery();
+ if (ownerQuery != null) {
+ long deltaBytes = alignedTvListRamSize -
alignedTvList.getReservedMemoryBytes();
+ if (deltaBytes > 0) {
+
ownerQuery.getMemoryReservationContext().reserveMemoryCumulatively(deltaBytes);
+ alignedTvList.addReservedMemoryBytes(deltaBytes);
+ }
+ }
+ } finally {
+ alignedTvList.unlockQueryList();
+ }
}
}
}
@@ -339,10 +355,25 @@ public class AlignedReadOnlyMemChunk extends
ReadOnlyMemChunk {
@Override
public IPointReader getPointReader() {
for (Map.Entry<TVList, Integer> entry : alignedTvListQueryMap.entrySet()) {
- AlignedTVList tvList = (AlignedTVList) entry.getKey();
+ AlignedTVList alignedTvList = (AlignedTVList) entry.getKey();
int queryLength = entry.getValue();
- if (!tvList.isSorted() && queryLength > tvList.seqRowCount()) {
- tvList.sort();
+ if (!alignedTvList.isSorted() && queryLength >
alignedTvList.seqRowCount()) {
+ alignedTvList.sort();
+ long alignedTvListRamSize = alignedTvList.calculateRamSize();
+ alignedTvList.lockQueryList();
+ try {
+ FragmentInstanceContext ownerQuery =
+ (FragmentInstanceContext) alignedTvList.getOwnerQuery();
+ if (ownerQuery != null) {
+ long deltaBytes = alignedTvListRamSize -
alignedTvList.getReservedMemoryBytes();
+ if (deltaBytes > 0) {
+
ownerQuery.getMemoryReservationContext().reserveMemoryCumulatively(deltaBytes);
+ alignedTvList.addReservedMemoryBytes(deltaBytes);
+ }
+ }
+ } finally {
+ alignedTvList.unlockQueryList();
+ }
}
}
TsBlock tsBlock = buildTsBlock();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/ReadOnlyMemChunk.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/ReadOnlyMemChunk.java
index 64a8868c8c7..3438759cfa9 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/ReadOnlyMemChunk.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/ReadOnlyMemChunk.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.storageengine.dataregion.memtable;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.exception.query.QueryProcessException;
+import
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext;
import org.apache.iotdb.db.queryengine.execution.fragment.QueryContext;
import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering;
import
org.apache.iotdb.db.storageengine.dataregion.read.reader.chunk.MemChunkLoader;
@@ -135,6 +136,20 @@ public class ReadOnlyMemChunk {
int queryRowCount = entry.getValue();
if (!tvList.isSorted() && queryRowCount > tvList.seqRowCount()) {
tvList.sort();
+ long tvListRamSize = tvList.calculateRamSize();
+ tvList.lockQueryList();
+ try {
+ FragmentInstanceContext ownerQuery = (FragmentInstanceContext)
tvList.getOwnerQuery();
+ if (ownerQuery != null) {
+ long deltaBytes = tvListRamSize - tvList.getReservedMemoryBytes();
+ if (deltaBytes > 0) {
+
ownerQuery.getMemoryReservationContext().reserveMemoryCumulatively(deltaBytes);
+ tvList.addReservedMemoryBytes(deltaBytes);
+ }
+ }
+ } finally {
+ tvList.unlockQueryList();
+ }
}
}
}
@@ -273,6 +288,20 @@ public class ReadOnlyMemChunk {
int queryLength = entry.getValue();
if (!tvList.isSorted() && queryLength > tvList.seqRowCount()) {
tvList.sort();
+ long tvListRamSize = tvList.calculateRamSize();
+ tvList.lockQueryList();
+ try {
+ FragmentInstanceContext ownerQuery = (FragmentInstanceContext)
tvList.getOwnerQuery();
+ if (ownerQuery != null) {
+ long deltaBytes = tvListRamSize - tvList.getReservedMemoryBytes();
+ if (deltaBytes > 0) {
+
ownerQuery.getMemoryReservationContext().reserveMemoryCumulatively(deltaBytes);
+ tvList.addReservedMemoryBytes(deltaBytes);
+ }
+ }
+ } finally {
+ tvList.unlockQueryList();
+ }
}
}
TsBlock tsBlock = buildTsBlock();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
index e0c08fbdd43..e4230787a76 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
@@ -836,7 +836,7 @@ public abstract class AlignedTVList extends TVList {
}
@Override
- public long calculateRamSize() {
+ public synchronized long calculateRamSize() {
return timestamps.size() * alignedTvListArrayMemCost();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
index fedc3830ad8..073b03f0a96 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
@@ -75,8 +75,10 @@ public abstract class TVList implements WALEntryValue {
// Index relation: arrayIndex -> elementIndex
protected List<BitMap> bitMap;
- // lock to provide synchronization for query list
+ // Guards queryContextSet, ownerQuery, and reservedMemoryBytes.
+ // Always acquire this lock before accessing/modifying these fields.
private final ReentrantLock queryListLock = new ReentrantLock();
+
// set of query that this TVList is used
protected final Set<QueryContext> queryContextSet;
@@ -84,6 +86,9 @@ public abstract class TVList implements WALEntryValue {
// When it is null, the TVList is owned by insert thread and released after
flush.
protected QueryContext ownerQuery;
+ // Reserved memory by the query. Ensure to acquire queryListLock before
update.
+ protected long reservedMemoryBytes = 0L;
+
protected boolean sorted = true;
protected long maxTime;
protected long minTime;
@@ -151,7 +156,7 @@ public abstract class TVList implements WALEntryValue {
return size;
}
- public long calculateRamSize() {
+ public synchronized long calculateRamSize() {
return timestamps.size() * tvListArrayMemCost();
}
@@ -159,6 +164,18 @@ public abstract class TVList implements WALEntryValue {
return sorted;
}
+ public void setReservedMemoryBytes(long bytes) {
+ this.reservedMemoryBytes = bytes;
+ }
+
+ public void addReservedMemoryBytes(long bytes) {
+ this.reservedMemoryBytes += bytes;
+ }
+
+ public long getReservedMemoryBytes() {
+ return reservedMemoryBytes;
+ }
+
public abstract void sort();
public void increaseReferenceCount() {
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecutionTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecutionTest.java
index e0655edc55a..30b9df14b3a 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecutionTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecutionTest.java
@@ -20,7 +20,12 @@
package org.apache.iotdb.db.queryengine.execution.fragment;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.exception.MetadataException;
+import org.apache.iotdb.commons.path.MeasurementPath;
+import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.queryengine.common.FragmentInstanceId;
import org.apache.iotdb.db.queryengine.common.PlanFragmentId;
import org.apache.iotdb.db.queryengine.exception.CpuNotEnoughException;
@@ -30,15 +35,26 @@ import
org.apache.iotdb.db.queryengine.execution.exchange.MPPDataExchangeManager
import org.apache.iotdb.db.queryengine.execution.exchange.sink.ISink;
import org.apache.iotdb.db.queryengine.execution.schedule.IDriverScheduler;
import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
+import org.apache.iotdb.db.storageengine.dataregion.memtable.DeviceIDFactory;
+import org.apache.iotdb.db.storageengine.dataregion.memtable.IMemTable;
+import org.apache.iotdb.db.storageengine.dataregion.memtable.IWritableMemChunk;
+import
org.apache.iotdb.db.storageengine.dataregion.memtable.IWritableMemChunkGroup;
+import org.apache.iotdb.db.storageengine.dataregion.memtable.PrimitiveMemTable;
+import org.apache.iotdb.db.storageengine.dataregion.memtable.ReadOnlyMemChunk;
import org.apache.iotdb.db.utils.datastructure.AlignedTVList;
import org.apache.iotdb.db.utils.datastructure.TVList;
import com.google.common.collect.ImmutableMap;
import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.file.metadata.enums.CompressionType;
+import org.apache.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.tsfile.read.reader.IPointReader;
+import org.apache.tsfile.write.schema.MeasurementSchema;
import org.junit.Test;
import org.mockito.Mockito;
import java.io.ByteArrayOutputStream;
+import java.io.IOException;
import java.io.PrintStream;
import java.util.ArrayList;
import java.util.Collections;
@@ -49,6 +65,7 @@ import java.util.concurrent.ExecutorService;
import static org.apache.iotdb.db.queryengine.common.QueryId.MOCK_QUERY_ID;
import static
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -157,6 +174,70 @@ public class FragmentInstanceExecutionTest {
}
}
+ @Test
+ public void testTVListCloneForQuery() {
+ IoTDBDescriptor.getInstance().getConfig().setDataNodeId(1);
+
+ ExecutorService instanceNotificationExecutor =
+ IoTDBThreadPoolFactory.newFixedThreadPool(1,
"test-instance-notification");
+
+ try {
+ String deviceId = "d1";
+ String measurementId = "s1";
+ IMemTable memTable = createMemTable(deviceId, measurementId);
+ assertEquals(1, memTable.getMemTableMap().size());
+ IWritableMemChunkGroup memChunkGroup =
memTable.getMemTableMap().values().iterator().next();
+ assertEquals(1, memChunkGroup.getMemChunkMap().size());
+ IWritableMemChunk memChunk =
memChunkGroup.getMemChunkMap().values().iterator().next();
+ TVList tvList = memChunk.getWorkingTVList();
+ assertFalse(tvList.isSorted());
+
+ // FragmentInstance Context
+ FragmentInstanceId id1 = new FragmentInstanceId(new
PlanFragmentId(MOCK_QUERY_ID, 1), "1");
+ FragmentInstanceStateMachine stateMachine1 =
+ new FragmentInstanceStateMachine(id1, instanceNotificationExecutor);
+ FragmentInstanceContext fragmentInstanceContext1 =
+ createFragmentInstanceContext(id1, stateMachine1);
+
+ FragmentInstanceId id2 = new FragmentInstanceId(new
PlanFragmentId(MOCK_QUERY_ID, 2), "2");
+ FragmentInstanceStateMachine stateMachine2 =
+ new FragmentInstanceStateMachine(id2, instanceNotificationExecutor);
+ FragmentInstanceContext fragmentInstanceContext2 =
+ createFragmentInstanceContext(id2, stateMachine2);
+
+ // query on memtable
+ MeasurementPath fullPath =
+ new MeasurementPath(
+ deviceId,
+ measurementId,
+ new MeasurementSchema(
+ measurementId,
+ TSDataType.INT32,
+ TSEncoding.RLE,
+ CompressionType.UNCOMPRESSED,
+ Collections.emptyMap()));
+ ReadOnlyMemChunk readOnlyMemChunk1 =
+ memTable.query(fragmentInstanceContext1, fullPath, Long.MIN_VALUE,
null, null);
+ ReadOnlyMemChunk readOnlyMemChunk2 =
+ memTable.query(fragmentInstanceContext2, fullPath, Long.MIN_VALUE,
null, null);
+
+ IPointReader pointReader = readOnlyMemChunk1.getPointReader();
+ while (pointReader.hasNextTimeValuePair()) {
+ pointReader.nextTimeValuePair();
+ }
+ assertTrue(tvList.isSorted());
+ assertEquals(tvList.calculateRamSize(), tvList.getReservedMemoryBytes());
+ } catch (QueryProcessException
+ | IOException
+ | MetadataException
+ | MemoryNotEnoughException
+ | IllegalArgumentException e) {
+ fail(e.getMessage());
+ } finally {
+ instanceNotificationExecutor.shutdown();
+ }
+ }
+
private FragmentInstanceExecution createFragmentInstanceExecution(int id,
Executor executor)
throws CpuNotEnoughException {
IDriverScheduler scheduler = Mockito.mock(IDriverScheduler.class);
@@ -201,4 +282,20 @@ public class FragmentInstanceExecutionTest {
}
return tvList;
}
+
+ private IMemTable createMemTable(String deviceId, String measurementId)
+ throws IllegalPathException {
+ IMemTable memTable = new PrimitiveMemTable("root.test", "1");
+
+ int rows = 100;
+ for (int i = 0; i < 100; i++) {
+ memTable.write(
+ DeviceIDFactory.getInstance().getDeviceID(new PartialPath(deviceId)),
+ Collections.singletonList(
+ new MeasurementSchema(measurementId, TSDataType.INT32,
TSEncoding.PLAIN)),
+ rows - i - 1,
+ new Object[] {i + 10});
+ }
+ return memTable;
+ }
}