This is an automated email from the ASF dual-hosted git repository.
JackieTien97 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new d563cd046e2 Improve query modification loading memory control (#17788)
d563cd046e2 is described below
commit d563cd046e28fbb1aeb99f94abcca9cba8100a99
Author: shuwenwei <[email protected]>
AuthorDate: Fri Jun 5 13:56:15 2026 +0800
Improve query modification loading memory control (#17788)
---
.../planner/memory/MemoryReservationManager.java | 8 +
.../iotdb/db/i18n/DataNodeQueryMessages.java | 2 +
.../iotdb/db/i18n/DataNodeQueryMessages.java | 2 +
.../fragment/FragmentInstanceContext.java | 90 ++++--
.../execution/fragment/QueryContext.java | 20 +-
.../fragment/QueryModificationLoader.java | 314 ++++++++++++++++++
.../memory/FakedMemoryReservationManager.java | 3 +
.../NotThreadSafeMemoryReservationManager.java | 9 +
.../memory/ThreadSafeMemoryReservationManager.java | 5 +
.../dataregion/tsfile/TsFileResource.java | 18 +-
.../fragment/QueryModificationLoaderTest.java | 358 +++++++++++++++++++++
11 files changed, 786 insertions(+), 43 deletions(-)
diff --git
a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/plan/planner/memory/MemoryReservationManager.java
b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/plan/planner/memory/MemoryReservationManager.java
index 9df8d52131b..b52534b4ec2 100644
---
a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/plan/planner/memory/MemoryReservationManager.java
+++
b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/plan/planner/memory/MemoryReservationManager.java
@@ -33,6 +33,14 @@ public interface MemoryReservationManager {
/** Reserve memory for the accumulated memory size immediately. */
void reserveMemoryImmediately();
+ /**
+ * Reserve memory for the given size immediately without changing the
accumulated pending
+ * reservation size maintained by this manager.
+ *
+ * @param size the size of memory to reserve immediately
+ */
+ void reserveMemoryImmediately(final long size);
+
/**
* Release memory for the given size.
*
diff --git
a/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/DataNodeQueryMessages.java
b/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/DataNodeQueryMessages.java
index 2c4978618a6..964163ce14f 100644
---
a/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/DataNodeQueryMessages.java
+++
b/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/DataNodeQueryMessages.java
@@ -136,6 +136,8 @@ public final class DataNodeQueryMessages {
public static final String FREE_MORE_MEMORY_THAN_HAS_BEEN_RESERVED =
"Free more memory than has been reserved.";
+ public static final String ESTIMATED_MODS_TREE_SIZE_DECREASED =
+ "Estimated mods tree size decreased from %d to %d for TsFile %s.";
// --- Execution / Operator ---
diff --git
a/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/DataNodeQueryMessages.java
b/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/DataNodeQueryMessages.java
index 7a7de06471b..ae020f8c5a3 100644
---
a/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/DataNodeQueryMessages.java
+++
b/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/DataNodeQueryMessages.java
@@ -136,6 +136,8 @@ public final class DataNodeQueryMessages {
public static final String FREE_MORE_MEMORY_THAN_HAS_BEEN_RESERVED =
"释放的内存超过已预留的量。";
+ public static final String ESTIMATED_MODS_TREE_SIZE_DECREASED =
+ "估算的 mods tree 大小从 %d 减少到 %d,TsFile:%s。";
// --- Execution / Operator ---
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
index 2a0373cf6fd..6bda6e9c14d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
@@ -26,11 +26,11 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.audit.UserEntity;
import org.apache.iotdb.commons.conf.CommonConfig;
import org.apache.iotdb.commons.conf.CommonDescriptor;
+import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.exception.IoTDBException;
import org.apache.iotdb.commons.exception.IoTDBRuntimeException;
import org.apache.iotdb.commons.path.AlignedFullPath;
import org.apache.iotdb.commons.path.IFullPath;
-import org.apache.iotdb.commons.path.PatternTreeMap;
import org.apache.iotdb.commons.queryengine.common.SessionInfo;
import org.apache.iotdb.commons.queryengine.utils.TimestampPrecisionUtils;
import org.apache.iotdb.commons.utils.TestOnly;
@@ -56,7 +56,6 @@ import
org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSourceForRegio
import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSourceType;
import
org.apache.iotdb.db.storageengine.dataregion.read.control.FileReaderManager;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
-import org.apache.iotdb.db.utils.datastructure.PatternTreeMapFactory;
import org.apache.iotdb.db.utils.datastructure.TVList;
import org.apache.iotdb.mpp.rpc.thrift.TFetchFragmentInstanceStatisticsResp;
@@ -82,6 +81,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Predicate;
import java.util.stream.Collectors;
import static
org.apache.iotdb.commons.utils.ErrorHandlingCommonUtils.getRootCause;
@@ -99,6 +99,7 @@ public class FragmentInstanceContext extends QueryContext {
private static final long END_TIME_INITIAL_VALUE = -1L;
// wait over 5s for driver to close is abnormal
private static final long LONG_WAIT_DURATION = 5_000_000_000L;
+ private static final int MODS_MEMORY_ESTIMATE_READ_INTERVAL = 10_000;
private final FragmentInstanceId id;
private final FragmentInstanceStateMachine stateMachine;
@@ -378,41 +379,60 @@ public class FragmentInstanceContext extends QueryContext
{
}
@Override
- protected PatternTreeMap<ModEntry, PatternTreeMapFactory.ModsSerializer>
getAllModifications(
- TsFileResource resource) {
- if (isSingleSourcePath() || memoryReservationManager == null) {
- return loadAllModificationsFromDisk(resource);
+ public List<ModEntry> getPathModifications(
+ TsFileResource tsFileResource, IDeviceID deviceID, String measurement) {
+ if (!checkIfModificationExists(tsFileResource)) {
+ return Collections.emptyList();
}
+ if (memoryReservationManager == null) {
+ return super.getPathModifications(tsFileResource, deviceID, measurement);
+ }
+ try (QueryModificationLoader modificationLoader =
+ getQueryModificationLoader(
+ tsFileResource,
+ modification -> modification.affects(deviceID) &&
modification.affects(measurement),
+ mods -> getPathModifications(mods, deviceID, measurement))) {
+ return modificationLoader.getPathModifications();
+ } catch (IllegalPathException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ @Override
+ public List<ModEntry> getPathModifications(TsFileResource tsFileResource,
IDeviceID deviceID)
+ throws IllegalPathException {
+ if (!checkIfModificationExists(tsFileResource)) {
+ return Collections.emptyList();
+ }
+ if (memoryReservationManager == null) {
+ return super.getPathModifications(tsFileResource, deviceID);
+ }
+ try (QueryModificationLoader modificationLoader =
+ getQueryModificationLoader(
+ tsFileResource,
+ modification ->
+ deviceID.isTableModel()
+ ? modification.affects(deviceID)
+ : modification.affectsAll(deviceID),
+ mods -> getPathModifications(mods, deviceID))) {
+ return modificationLoader.getPathModifications();
+ }
+ }
- AtomicReference<PatternTreeMap<ModEntry,
PatternTreeMapFactory.ModsSerializer>>
- atomicReference = new AtomicReference<>();
- PatternTreeMap<ModEntry, PatternTreeMapFactory.ModsSerializer>
cachedResult =
- fileModCache.computeIfAbsent(
- resource.getTsFileID(),
- k -> {
- PatternTreeMap<ModEntry, PatternTreeMapFactory.ModsSerializer>
allMods =
- loadAllModificationsFromDisk(resource);
- atomicReference.set(allMods);
- if (cachedModEntriesSize.get() >=
CONFIG.getModsCacheSizeLimitPerFI()) {
- return null;
- }
- long memCost =
- RamUsageEstimator.sizeOfObject(allMods)
- +
RamUsageEstimator.SHALLOW_SIZE_OF_CONCURRENT_HASHMAP_ENTRY;
- long alreadyUsedMemoryForCachedModEntries =
cachedModEntriesSize.get();
- while (alreadyUsedMemoryForCachedModEntries + memCost
- < CONFIG.getModsCacheSizeLimitPerFI()) {
- if (cachedModEntriesSize.compareAndSet(
- alreadyUsedMemoryForCachedModEntries,
- alreadyUsedMemoryForCachedModEntries + memCost)) {
- memoryReservationManager.reserveMemoryCumulatively(memCost);
- return allMods;
- }
- alreadyUsedMemoryForCachedModEntries =
cachedModEntriesSize.get();
- }
- return null;
- });
- return cachedResult == null ? atomicReference.get() : cachedResult;
+ private QueryModificationLoader getQueryModificationLoader(
+ TsFileResource tsFileResource,
+ Predicate<ModEntry> fallbackModificationMatcher,
+ QueryModificationLoader.ModsTreeMatcher modsTreeMatcher) {
+ return new QueryModificationLoader(
+ this,
+ tsFileResource,
+ memoryReservationManager,
+ CONFIG.getModsCacheSizeLimitPerFI(),
+ MODS_MEMORY_ESTIMATE_READ_INTERVAL,
+ fileModCache,
+ cachedModEntriesSize,
+ fallbackModificationMatcher,
+ modsTreeMatcher);
}
// the state change listener is added here in a separate initialize() method
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/QueryContext.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/QueryContext.java
index d3edbe2e0d0..44a88826696 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/QueryContext.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/QueryContext.java
@@ -127,20 +127,26 @@ public class QueryContext {
TsFileResource resource) {
PatternTreeMap<ModEntry, ModsSerializer> modifications =
PatternTreeMapFactory.getModsPatternTreeMap();
- TsFileResource.ModIterator modEntryIterator =
resource.getModEntryIterator();
- while (modEntryIterator.hasNext()) {
- ModEntry modification = modEntryIterator.next();
- if (tables != null && modification instanceof TableDeletionEntry) {
- String tableName = ((TableDeletionEntry) modification).getTableName();
- if (!tables.contains(tableName)) {
+ try (TsFileResource.ModIterator modEntryIterator =
resource.getModEntryIterator()) {
+ while (modEntryIterator.hasNext()) {
+ ModEntry modification = modEntryIterator.next();
+ if (shouldSkipModification(modification)) {
continue;
}
+ modifications.append(modification.keyOfPatternTree(), modification);
}
- modifications.append(modification.keyOfPatternTree(), modification);
}
return modifications;
}
+ protected boolean shouldSkipModification(ModEntry modification) {
+ if (tables != null && modification instanceof TableDeletionEntry) {
+ String tableName = ((TableDeletionEntry) modification).getTableName();
+ return !tables.contains(tableName);
+ }
+ return false;
+ }
+
public List<ModEntry> getPathModifications(
TsFileResource tsFileResource, IDeviceID deviceID, String measurement) {
// if the mods file does not exist, do not add it to the cache
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/QueryModificationLoader.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/QueryModificationLoader.java
new file mode 100644
index 00000000000..bdb1914a354
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/QueryModificationLoader.java
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.execution.fragment;
+
+import org.apache.iotdb.calc.exception.MemoryNotEnoughException;
+import org.apache.iotdb.calc.plan.planner.memory.MemoryReservationManager;
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.path.PatternTreeMap;
+import org.apache.iotdb.db.i18n.DataNodeQueryMessages;
+import org.apache.iotdb.db.storageengine.dataregion.modification.ModEntry;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileID;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+import org.apache.iotdb.db.utils.ModificationUtils;
+import org.apache.iotdb.db.utils.datastructure.PatternTreeMapFactory;
+
+import org.apache.tsfile.utils.RamUsageEstimator;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Predicate;
+
+class QueryModificationLoader implements AutoCloseable {
+
+ private final QueryContext queryContext;
+ private final TsFileResource resource;
+ private final MemoryReservationManager memoryReservationManager;
+ private final long modsCacheSizeLimitPerFI;
+ private final int modsMemoryEstimateReadInterval;
+ private final Map<TsFileID, PatternTreeMap<ModEntry,
PatternTreeMapFactory.ModsSerializer>>
+ fileModCache;
+ private final AtomicLong cachedModEntriesSize;
+ private final Predicate<ModEntry> modificationMatcher;
+ private final ModsTreeMatcher modsTreeMatcher;
+
+ private TsFileResource.ModIterator currentIterator;
+
+ QueryModificationLoader(
+ QueryContext queryContext,
+ TsFileResource resource,
+ MemoryReservationManager memoryReservationManager,
+ long modsCacheSizeLimitPerFI,
+ int modsMemoryEstimateReadInterval,
+ Map<TsFileID, PatternTreeMap<ModEntry,
PatternTreeMapFactory.ModsSerializer>> fileModCache,
+ AtomicLong cachedModEntriesSize,
+ Predicate<ModEntry> modificationMatcher,
+ ModsTreeMatcher modsTreeMatcher) {
+ this.queryContext = queryContext;
+ this.resource = resource;
+ this.memoryReservationManager = memoryReservationManager;
+ this.modsCacheSizeLimitPerFI = modsCacheSizeLimitPerFI;
+ this.modsMemoryEstimateReadInterval = modsMemoryEstimateReadInterval;
+ this.fileModCache = fileModCache;
+ this.cachedModEntriesSize = cachedModEntriesSize;
+ this.modificationMatcher = modificationMatcher;
+ this.modsTreeMatcher = modsTreeMatcher;
+ }
+
+ List<ModEntry> getPathModifications() throws IllegalPathException {
+ AtomicReference<LoadModsResult> loadedResult = new AtomicReference<>();
+ PatternTreeMap<ModEntry, PatternTreeMapFactory.ModsSerializer> cachedMods =
+ fileModCache.computeIfAbsent(
+ resource.getTsFileID(), ignored ->
loadAllModificationsForCache(loadedResult));
+ if (cachedMods != null) {
+ return modsTreeMatcher.match(cachedMods);
+ }
+
+ LoadModsResult result = loadedResult.get();
+ try {
+ if (result.loadedAllModEntries) {
+ return fallbackByMatchLoadedPatternTree(result);
+ } else {
+ return fallbackByMatchedScan(result);
+ }
+ } finally {
+ close();
+ }
+ }
+
+ private PatternTreeMap<ModEntry, PatternTreeMapFactory.ModsSerializer>
+ loadAllModificationsForCache(AtomicReference<LoadModsResult>
loadedResult) {
+ LoadModsResult result = loadAllModificationsWithQuotaControl();
+ loadedResult.set(result);
+ if (!result.cacheable) {
+ return null;
+ }
+
+ closeCurrentIterator();
+ return result.mods;
+ }
+
+ private LoadModsResult loadAllModificationsWithQuotaControl() {
+ PatternTreeMap<ModEntry, PatternTreeMapFactory.ModsSerializer>
modifications =
+ PatternTreeMapFactory.getModsPatternTreeMap();
+ LoadModsResult result = new LoadModsResult(modifications);
+ if (resource.getTotalModSizeInByte() > getRemainingCacheQuota()) {
+ currentIterator = resource.getModEntryIterator();
+ result.loadedAllModEntries = false;
+ result.cacheable = false;
+ return result;
+ }
+
+ currentIterator = resource.getModEntryIterator();
+
+ int appendedModCount = 0;
+ boolean estimatedAfterLastAppend = false;
+
+ while (currentIterator.hasNext()) {
+ ModEntry modification = currentIterator.next();
+ if (queryContext.shouldSkipModification(modification)) {
+ continue;
+ }
+
+ modifications.append(modification.keyOfPatternTree(), modification);
+ appendedModCount++;
+ estimatedAfterLastAppend = false;
+
+ if (appendedModCount % modsMemoryEstimateReadInterval == 0) {
+ if (!tryEstimateAndReserveTreeMemory(result)) {
+ result.loadedAllModEntries = false;
+ result.cacheable = false;
+ return result;
+ }
+ estimatedAfterLastAppend = true;
+ }
+ }
+
+ if (!estimatedAfterLastAppend) {
+ result.cacheable = tryEstimateAndReserveTreeMemory(result);
+ } else {
+ result.cacheable = true;
+ }
+
+ result.loadedAllModEntries = true;
+ return result;
+ }
+
+ private boolean tryEstimateAndReserveTreeMemory(LoadModsResult result) {
+ long currentEstimatedSize = estimateModsTreeMemory(result.mods);
+ long delta = currentEstimatedSize - result.reservedTreeMemoryBytes;
+ if (delta < 0) {
+ throw new IllegalStateException(
+ String.format(
+ DataNodeQueryMessages.ESTIMATED_MODS_TREE_SIZE_DECREASED,
+ result.reservedTreeMemoryBytes,
+ currentEstimatedSize,
+ resource));
+ }
+ if (delta == 0) {
+ return true;
+ }
+
+ if (!tryClaimCacheQuota(delta)) {
+ return false;
+ }
+ result.cacheQuotaBytes += delta;
+
+ try {
+ memoryReservationManager.reserveMemoryImmediately(delta);
+ } catch (MemoryNotEnoughException e) {
+ return false;
+ }
+
+ result.reservedTreeMemoryBytes = currentEstimatedSize;
+ return true;
+ }
+
+ private boolean tryClaimCacheQuota(long delta) {
+ if (delta <= 0) {
+ return true;
+ }
+
+ long alreadyUsedMemoryForCachedModEntries = cachedModEntriesSize.get();
+ while (alreadyUsedMemoryForCachedModEntries + delta <=
modsCacheSizeLimitPerFI) {
+ if (cachedModEntriesSize.compareAndSet(
+ alreadyUsedMemoryForCachedModEntries,
alreadyUsedMemoryForCachedModEntries + delta)) {
+ return true;
+ }
+ alreadyUsedMemoryForCachedModEntries = cachedModEntriesSize.get();
+ }
+ return false;
+ }
+
+ private long getRemainingCacheQuota() {
+ return modsCacheSizeLimitPerFI - cachedModEntriesSize.get();
+ }
+
+ private List<ModEntry> fallbackByMatchedScan(LoadModsResult partialTree)
+ throws IllegalPathException {
+ List<ModEntry> matchedMods = matchLoadedTreeAndRelease(partialTree);
+ long reservedMatchedModsMemoryBytes =
reserveMatchedModsMemory(matchedMods);
+ int matchedModCount = matchedMods.size();
+
+ while (currentIterator.hasNext()) {
+ ModEntry modification = currentIterator.next();
+ if (queryContext.shouldSkipModification(modification)) {
+ continue;
+ }
+ if (modificationMatcher.test(modification)) {
+ matchedMods.add(modification);
+ matchedModCount++;
+ if (matchedModCount % modsMemoryEstimateReadInterval == 0) {
+ reservedMatchedModsMemoryBytes =
+ reserveMatchedModsMemoryIncrementally(matchedMods,
reservedMatchedModsMemoryBytes);
+ }
+ }
+ }
+
+ List<ModEntry> sortedAndMergedMods =
ModificationUtils.sortAndMerge(matchedMods);
+ adjustMatchedModsMemoryReservation(sortedAndMergedMods,
reservedMatchedModsMemoryBytes);
+ return sortedAndMergedMods;
+ }
+
+ private List<ModEntry> fallbackByMatchLoadedPatternTree(LoadModsResult
loadedTree)
+ throws IllegalPathException {
+ List<ModEntry> matchedMods = matchLoadedTreeAndRelease(loadedTree);
+ reserveMatchedModsMemory(matchedMods);
+ return matchedMods;
+ }
+
+ private List<ModEntry> matchLoadedTreeAndRelease(LoadModsResult loadedTree)
+ throws IllegalPathException {
+ try {
+ return new ArrayList<>(modsTreeMatcher.match(loadedTree.mods));
+ } finally {
+ loadedTree.mods = null;
+ cachedModEntriesSize.addAndGet(-loadedTree.cacheQuotaBytes);
+ loadedTree.cacheQuotaBytes = 0;
+
memoryReservationManager.releaseMemoryCumulatively(loadedTree.reservedTreeMemoryBytes);
+ loadedTree.reservedTreeMemoryBytes = 0;
+ }
+ }
+
+ private long reserveMatchedModsMemory(List<ModEntry> matchedMods) {
+ long estimatedSize = RamUsageEstimator.sizeOfArrayList(matchedMods);
+ memoryReservationManager.reserveMemoryCumulatively(estimatedSize);
+ return estimatedSize;
+ }
+
+ private long reserveMatchedModsMemoryIncrementally(
+ List<ModEntry> matchedMods, long reservedMatchedModsMemoryBytes) {
+ long currentEstimatedSize = RamUsageEstimator.sizeOfArrayList(matchedMods);
+ long delta = currentEstimatedSize - reservedMatchedModsMemoryBytes;
+ memoryReservationManager.reserveMemoryCumulatively(delta);
+ return currentEstimatedSize;
+ }
+
+ private void adjustMatchedModsMemoryReservation(
+ List<ModEntry> matchedMods, long reservedMatchedModsMemoryBytes) {
+ long currentEstimatedSize = RamUsageEstimator.sizeOfArrayList(matchedMods);
+ long delta = currentEstimatedSize - reservedMatchedModsMemoryBytes;
+ if (delta >= 0) {
+ memoryReservationManager.reserveMemoryCumulatively(delta);
+ } else {
+ memoryReservationManager.releaseMemoryCumulatively(-delta);
+ }
+ }
+
+ private long estimateModsTreeMemory(
+ PatternTreeMap<ModEntry, PatternTreeMapFactory.ModsSerializer>
modifications) {
+ return RamUsageEstimator.sizeOfObject(modifications);
+ }
+
+ @Override
+ public void close() {
+ closeCurrentIterator();
+ }
+
+ private void closeCurrentIterator() {
+ if (currentIterator != null) {
+ currentIterator.close();
+ currentIterator = null;
+ }
+ }
+
+ private static class LoadModsResult {
+
+ private PatternTreeMap<ModEntry, PatternTreeMapFactory.ModsSerializer>
mods;
+ private long cacheQuotaBytes;
+ private long reservedTreeMemoryBytes;
+ private boolean loadedAllModEntries;
+ private boolean cacheable;
+
+ private LoadModsResult(PatternTreeMap<ModEntry,
PatternTreeMapFactory.ModsSerializer> mods) {
+ this.mods = mods;
+ }
+ }
+
+ @FunctionalInterface
+ interface ModsTreeMatcher {
+
+ List<ModEntry> match(PatternTreeMap<ModEntry,
PatternTreeMapFactory.ModsSerializer> modsTree)
+ throws IllegalPathException;
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/FakedMemoryReservationManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/FakedMemoryReservationManager.java
index ee6e0b06cf4..7cee8034a05 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/FakedMemoryReservationManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/FakedMemoryReservationManager.java
@@ -31,6 +31,9 @@ public class FakedMemoryReservationManager implements
MemoryReservationManager {
@Override
public void reserveMemoryImmediately() {}
+ @Override
+ public void reserveMemoryImmediately(final long size) {}
+
@Override
public void releaseMemoryCumulatively(long size) {}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/NotThreadSafeMemoryReservationManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/NotThreadSafeMemoryReservationManager.java
index 7dbebaa2b50..d156628532c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/NotThreadSafeMemoryReservationManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/NotThreadSafeMemoryReservationManager.java
@@ -68,6 +68,15 @@ public class NotThreadSafeMemoryReservationManager
implements MemoryReservationM
}
}
+ @Override
+ public void reserveMemoryImmediately(final long size) {
+ if (size != 0) {
+ LOCAL_EXECUTION_PLANNER.reserveFromFreeMemoryForOperators(
+ size, reservedBytesInTotal, queryId.getId(), contextHolder);
+ reservedBytesInTotal += size;
+ }
+ }
+
@Override
public void releaseMemoryCumulatively(final long size) {
bytesToBeReleased += size;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/ThreadSafeMemoryReservationManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/ThreadSafeMemoryReservationManager.java
index d167eae354f..2a544421f3f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/ThreadSafeMemoryReservationManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/ThreadSafeMemoryReservationManager.java
@@ -41,6 +41,11 @@ public class ThreadSafeMemoryReservationManager extends
NotThreadSafeMemoryReser
super.reserveMemoryImmediately();
}
+ @Override
+ public synchronized void reserveMemoryImmediately(final long size) {
+ super.reserveMemoryImmediately(size);
+ }
+
@Override
public synchronized void releaseMemoryCumulatively(long size) {
super.releaseMemoryCumulatively(size);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
index e513d79ba61..439286288d6 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
@@ -1494,7 +1494,7 @@ public class TsFileResource implements
PersistentResource, Cloneable {
return entries;
}
- public class ModIterator implements Iterator<ModEntry> {
+ public class ModIterator implements Iterator<ModEntry>, AutoCloseable {
private final Iterator<ModEntry> sharedModIterator;
private final Iterator<ModEntry> exclusiveModIterator;
@@ -1537,6 +1537,22 @@ public class TsFileResource implements
PersistentResource, Cloneable {
}
throw new NoSuchElementException();
}
+
+ @Override
+ public void close() {
+ closeModIterator(exclusiveModIterator);
+ closeModIterator(sharedModIterator);
+ }
+
+ private void closeModIterator(Iterator<ModEntry> modIterator) {
+ if (modIterator instanceof AutoCloseable) {
+ try {
+ ((AutoCloseable) modIterator).close();
+ } catch (Exception e) {
+
LOGGER.info(StorageEngineMessages.CANNOT_CLOSE_MOD_FILE_INPUT_STREAM,
getTsFile(), e);
+ }
+ }
+ }
}
public void upgradeModFile(ExecutorService upgradeModFileThreadPool) throws
IOException {
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/fragment/QueryModificationLoaderTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/fragment/QueryModificationLoaderTest.java
new file mode 100644
index 00000000000..39b9c41447c
--- /dev/null
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/fragment/QueryModificationLoaderTest.java
@@ -0,0 +1,358 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.execution.fragment;
+
+import org.apache.iotdb.calc.exception.MemoryNotEnoughException;
+import org.apache.iotdb.calc.plan.planner.memory.MemoryReservationManager;
+import org.apache.iotdb.commons.path.MeasurementPath;
+import org.apache.iotdb.commons.path.PatternTreeMap;
+import org.apache.iotdb.db.storageengine.dataregion.modification.ModEntry;
+import
org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile;
+import
org.apache.iotdb.db.storageengine.dataregion.modification.TreeDeletionEntry;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileID;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+import
org.apache.iotdb.db.storageengine.dataregion.tsfile.generator.TsFileNameGenerator;
+import org.apache.iotdb.db.utils.constant.TestConstant;
+import org.apache.iotdb.db.utils.datastructure.PatternTreeMapFactory;
+
+import org.apache.tsfile.external.commons.io.FileUtils;
+import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.utils.Pair;
+import org.junit.After;
+import org.junit.Test;
+
+import java.io.File;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class QueryModificationLoaderTest {
+
+ private static final IDeviceID DEVICE_ID =
IDeviceID.Factory.DEFAULT_FACTORY.create("root.sg.d1");
+
+ private File testDir;
+
+ @After
+ public void tearDown() throws Exception {
+ if (testDir != null && testDir.exists()) {
+ FileUtils.deleteDirectory(testDir);
+ }
+ }
+
+ @Test
+ public void testCacheLoadedModsTreeWhenQuotaEnough() throws Exception {
+ TsFileResource resource = prepareResource("cache");
+ writeMods(
+ resource,
+ new TreeDeletionEntry(new MeasurementPath("root.sg.d1.s1"), 0, 10),
+ new TreeDeletionEntry(new MeasurementPath("root.sg.d2.s1"), 20, 30));
+
+ Map<TsFileID, PatternTreeMap<ModEntry,
PatternTreeMapFactory.ModsSerializer>> fileModCache =
+ new ConcurrentHashMap<>();
+ AtomicLong cachedModEntriesSize = new AtomicLong();
+ CountingMemoryReservationManager memoryReservationManager =
+ new CountingMemoryReservationManager();
+
+ try (QueryModificationLoader loader =
+ newLoader(
+ resource,
+ Long.MAX_VALUE,
+ fileModCache,
+ cachedModEntriesSize,
+ memoryReservationManager,
+ 1)) {
+ List<ModEntry> result = loader.getPathModifications();
+
+ assertEquals(1, result.size());
+ assertTrue(fileModCache.containsKey(resource.getTsFileID()));
+ assertTrue(cachedModEntriesSize.get() > 0);
+ assertTrue(memoryReservationManager.getReservedBytes() >=
cachedModEntriesSize.get());
+ assertTrue(memoryReservationManager.getImmediateReservationCount() > 0);
+ }
+ }
+
+ @Test
+ public void
testFallbackScansModsWhenFileSizeExceedsRemainingQuotaBeforeLoad() throws
Exception {
+ TsFileResource resource = prepareResource("file-size-precheck-fallback");
+ writeMods(
+ resource,
+ new TreeDeletionEntry(new MeasurementPath("root.sg.d1.s1"), 0, 10),
+ new TreeDeletionEntry(new MeasurementPath("root.sg.d2.s1"), 20, 30),
+ new TreeDeletionEntry(new MeasurementPath("root.sg.d1.s1"), 40, 50));
+
+ Map<TsFileID, PatternTreeMap<ModEntry,
PatternTreeMapFactory.ModsSerializer>> fileModCache =
+ new ConcurrentHashMap<>();
+ AtomicLong cachedModEntriesSize = new AtomicLong();
+ CountingMemoryReservationManager memoryReservationManager =
+ new CountingMemoryReservationManager();
+
+ try (QueryModificationLoader loader =
+ newLoader(resource, 1, fileModCache, cachedModEntriesSize,
memoryReservationManager, 1)) {
+ List<ModEntry> result = loader.getPathModifications();
+
+ assertEquals(2, result.size());
+ assertFalse(fileModCache.containsKey(resource.getTsFileID()));
+ assertEquals(0, cachedModEntriesSize.get());
+ assertTrue(memoryReservationManager.getReservedBytes() > 0);
+ assertEquals(0,
memoryReservationManager.getRemainingImmediateFailures());
+ assertEquals(0, memoryReservationManager.getImmediateReservationCount());
+ }
+ }
+
+ @Test
+ public void testFallbackScansRemainingModsWhenEstimatedTreeExceedsQuota()
throws Exception {
+ TsFileResource resource = prepareResource("estimated-tree-quota-fallback");
+ writeMods(
+ resource,
+ new TreeDeletionEntry(new MeasurementPath("root.sg.d1.s1"), 0, 10),
+ new TreeDeletionEntry(new MeasurementPath("root.sg.d2.s1"), 20, 30),
+ new TreeDeletionEntry(new MeasurementPath("root.sg.d1.s1"), 40, 50));
+
+ Map<TsFileID, PatternTreeMap<ModEntry,
PatternTreeMapFactory.ModsSerializer>> fileModCache =
+ new ConcurrentHashMap<>();
+ AtomicLong cachedModEntriesSize = new AtomicLong();
+ CountingMemoryReservationManager memoryReservationManager =
+ new CountingMemoryReservationManager();
+
+ try (QueryModificationLoader loader =
+ newLoader(
+ resource,
+ resource.getTotalModSizeInByte() + 1,
+ fileModCache,
+ cachedModEntriesSize,
+ memoryReservationManager,
+ 1)) {
+ List<ModEntry> result = loader.getPathModifications();
+
+ assertEquals(2, result.size());
+ assertFalse(fileModCache.containsKey(resource.getTsFileID()));
+ assertEquals(0, cachedModEntriesSize.get());
+ assertTrue(memoryReservationManager.getReservedBytes() > 0);
+ assertTrue(memoryReservationManager.getCumulativeReleaseCount() > 0);
+ }
+ }
+
+ @Test
+ public void testFallbackMatchesLoadedTreeWhenFinalReservationFailed() throws
Exception {
+ TsFileResource resource = prepareResource("final-reserve-failed");
+ writeMods(
+ resource,
+ new TreeDeletionEntry(new MeasurementPath("root.sg.d1.s1"), 0, 10),
+ new TreeDeletionEntry(new MeasurementPath("root.sg.d2.s1"), 20, 30),
+ new TreeDeletionEntry(new MeasurementPath("root.sg.d1.s1"), 40, 50));
+
+ Map<TsFileID, PatternTreeMap<ModEntry,
PatternTreeMapFactory.ModsSerializer>> fileModCache =
+ new ConcurrentHashMap<>();
+ AtomicLong cachedModEntriesSize = new AtomicLong();
+ CountingMemoryReservationManager memoryReservationManager =
+ new CountingMemoryReservationManager(1);
+
+ try (QueryModificationLoader loader =
+ newLoader(
+ resource,
+ Long.MAX_VALUE,
+ fileModCache,
+ cachedModEntriesSize,
+ memoryReservationManager,
+ 100)) {
+ List<ModEntry> result = loader.getPathModifications();
+
+ assertEquals(2, result.size());
+ assertFalse(fileModCache.containsKey(resource.getTsFileID()));
+ assertEquals(0, cachedModEntriesSize.get());
+ assertTrue(memoryReservationManager.getReservedBytes() > 0);
+ assertEquals(0,
memoryReservationManager.getRemainingImmediateFailures());
+ assertEquals(1, memoryReservationManager.getImmediateReservationCount());
+ }
+ }
+
+ @Test
+ public void testFallbackReservesMatchedModsCumulativelyWhenQuotaExceeded()
throws Exception {
+ TsFileResource resource = prepareResource("fallback-cumulative-reserve");
+ writeMods(
+ resource,
+ new TreeDeletionEntry(new MeasurementPath("root.sg.d1.s1"), 0, 10),
+ new TreeDeletionEntry(new MeasurementPath("root.sg.d2.s1"), 20, 30),
+ new TreeDeletionEntry(new MeasurementPath("root.sg.d1.s1"), 40, 50));
+
+ Map<TsFileID, PatternTreeMap<ModEntry,
PatternTreeMapFactory.ModsSerializer>> fileModCache =
+ new ConcurrentHashMap<>();
+ AtomicLong cachedModEntriesSize = new AtomicLong();
+ CountingMemoryReservationManager memoryReservationManager =
+ new CountingMemoryReservationManager(1);
+
+ try (QueryModificationLoader loader =
+ newLoader(resource, 1, fileModCache, cachedModEntriesSize,
memoryReservationManager, 1)) {
+ List<ModEntry> result = loader.getPathModifications();
+
+ assertEquals(2, result.size());
+ assertFalse(fileModCache.containsKey(resource.getTsFileID()));
+ assertEquals(0, cachedModEntriesSize.get());
+ assertTrue(memoryReservationManager.getReservedBytes() > 0);
+ assertEquals(1,
memoryReservationManager.getRemainingImmediateFailures());
+ assertEquals(0, memoryReservationManager.getImmediateReservationCount());
+ }
+ }
+
+ @Test
+ public void testFallbackAdjustsReservedMemoryAfterSortAndMerge() throws
Exception {
+ TsFileResource resource = prepareResource("fallback-sort-merge-adjust");
+ writeMods(
+ resource,
+ new TreeDeletionEntry(new MeasurementPath("root.sg.d1.s1"), 0, 10),
+ new TreeDeletionEntry(new MeasurementPath("root.sg.d1.s1"), 5, 15),
+ new TreeDeletionEntry(new MeasurementPath("root.sg.d2.s1"), 20, 30));
+
+ Map<TsFileID, PatternTreeMap<ModEntry,
PatternTreeMapFactory.ModsSerializer>> fileModCache =
+ new ConcurrentHashMap<>();
+ AtomicLong cachedModEntriesSize = new AtomicLong();
+ CountingMemoryReservationManager memoryReservationManager =
+ new CountingMemoryReservationManager();
+
+ try (QueryModificationLoader loader =
+ newLoader(resource, 1, fileModCache, cachedModEntriesSize,
memoryReservationManager, 1)) {
+ List<ModEntry> result = loader.getPathModifications();
+
+ assertEquals(1, result.size());
+ assertFalse(fileModCache.containsKey(resource.getTsFileID()));
+ assertEquals(0, cachedModEntriesSize.get());
+ assertTrue(memoryReservationManager.getReservedBytes() > 0);
+ assertTrue(memoryReservationManager.getCumulativeReleaseCount() > 0);
+ }
+ }
+
+ private QueryModificationLoader newLoader(
+ TsFileResource resource,
+ long modsCacheSizeLimitPerFI,
+ Map<TsFileID, PatternTreeMap<ModEntry,
PatternTreeMapFactory.ModsSerializer>> fileModCache,
+ AtomicLong cachedModEntriesSize,
+ MemoryReservationManager memoryReservationManager,
+ int modsMemoryEstimateReadInterval) {
+ QueryContext queryContext = new QueryContext(false, false);
+ return new QueryModificationLoader(
+ queryContext,
+ resource,
+ memoryReservationManager,
+ modsCacheSizeLimitPerFI,
+ modsMemoryEstimateReadInterval,
+ fileModCache,
+ cachedModEntriesSize,
+ modification -> modification.affects(DEVICE_ID) &&
modification.affects("s1"),
+ modsTree -> queryContext.getPathModifications(modsTree, DEVICE_ID,
"s1"));
+ }
+
+ private TsFileResource prepareResource(String name) {
+ testDir = new File(TestConstant.BASE_OUTPUT_PATH,
"QueryModificationLoaderTest-" + name);
+ testDir.mkdirs();
+ File tsFile =
+ new
File(TsFileNameGenerator.generateNewTsFilePath(testDir.getAbsolutePath(), 1, 1,
0, 0));
+ return new TsFileResource(tsFile);
+ }
+
+ private void writeMods(TsFileResource resource, TreeDeletionEntry...
modifications)
+ throws Exception {
+ try (ModificationFile modificationFile = resource.getModFileForWrite()) {
+ for (TreeDeletionEntry modification : modifications) {
+ modificationFile.write(modification);
+ }
+ }
+ }
+
+ private static class CountingMemoryReservationManager implements
MemoryReservationManager {
+
+ private long reservedBytes;
+ private int remainingImmediateFailures;
+ private int immediateReservationCount;
+ private int cumulativeReleaseCount;
+
+ private CountingMemoryReservationManager() {}
+
+ private CountingMemoryReservationManager(int remainingImmediateFailures) {
+ this.remainingImmediateFailures = remainingImmediateFailures;
+ }
+
+ @Override
+ public void reserveMemoryCumulatively(long size) {
+ reservedBytes += size;
+ }
+
+ @Override
+ public void reserveMemoryImmediately() {
+ immediateReservationCount++;
+ if (remainingImmediateFailures > 0) {
+ remainingImmediateFailures--;
+ throw new MemoryNotEnoughException("Mock memory reservation failure.");
+ }
+ }
+
+ @Override
+ public void reserveMemoryImmediately(long size) {
+ immediateReservationCount++;
+ if (remainingImmediateFailures > 0) {
+ remainingImmediateFailures--;
+ throw new MemoryNotEnoughException("Mock memory reservation failure.");
+ }
+ reservedBytes += size;
+ }
+
+ @Override
+ public void releaseMemoryCumulatively(long size) {
+ cumulativeReleaseCount++;
+ reservedBytes -= size;
+ }
+
+ @Override
+ public void releaseAllReservedMemory() {
+ reservedBytes = 0;
+ }
+
+ @Override
+ public Pair<Long, Long> releaseMemoryVirtually(long size) {
+ reservedBytes -= size;
+ return new Pair<>(size, 0L);
+ }
+
+ @Override
+ public void reserveMemoryVirtually(long bytesToBeReserved, long
bytesAlreadyReserved) {
+ reservedBytes += bytesToBeReserved + bytesAlreadyReserved;
+ }
+
+ private long getReservedBytes() {
+ return reservedBytes;
+ }
+
+ private int getRemainingImmediateFailures() {
+ return remainingImmediateFailures;
+ }
+
+ private int getImmediateReservationCount() {
+ return immediateReservationCount;
+ }
+
+ private int getCumulativeReleaseCount() {
+ return cumulativeReleaseCount;
+ }
+ }
+}