This is an automated email from the ASF dual-hosted git repository.

JackieTien97 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new d563cd046e2 Improve query modification loading memory control (#17788)
d563cd046e2 is described below

commit d563cd046e28fbb1aeb99f94abcca9cba8100a99
Author: shuwenwei <[email protected]>
AuthorDate: Fri Jun 5 13:56:15 2026 +0800

    Improve query modification loading memory control (#17788)
---
 .../planner/memory/MemoryReservationManager.java   |   8 +
 .../iotdb/db/i18n/DataNodeQueryMessages.java       |   2 +
 .../iotdb/db/i18n/DataNodeQueryMessages.java       |   2 +
 .../fragment/FragmentInstanceContext.java          |  90 ++++--
 .../execution/fragment/QueryContext.java           |  20 +-
 .../fragment/QueryModificationLoader.java          | 314 ++++++++++++++++++
 .../memory/FakedMemoryReservationManager.java      |   3 +
 .../NotThreadSafeMemoryReservationManager.java     |   9 +
 .../memory/ThreadSafeMemoryReservationManager.java |   5 +
 .../dataregion/tsfile/TsFileResource.java          |  18 +-
 .../fragment/QueryModificationLoaderTest.java      | 358 +++++++++++++++++++++
 11 files changed, 786 insertions(+), 43 deletions(-)

diff --git 
a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/plan/planner/memory/MemoryReservationManager.java
 
b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/plan/planner/memory/MemoryReservationManager.java
index 9df8d52131b..b52534b4ec2 100644
--- 
a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/plan/planner/memory/MemoryReservationManager.java
+++ 
b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/plan/planner/memory/MemoryReservationManager.java
@@ -33,6 +33,14 @@ public interface MemoryReservationManager {
   /** Reserve memory for the accumulated memory size immediately. */
   void reserveMemoryImmediately();
 
+  /**
+   * Reserve memory for the given size immediately without changing the 
accumulated pending
+   * reservation size maintained by this manager.
+   *
+   * @param size the size of memory to reserve immediately
+   */
+  void reserveMemoryImmediately(final long size);
+
   /**
    * Release memory for the given size.
    *
diff --git 
a/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/DataNodeQueryMessages.java
 
b/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/DataNodeQueryMessages.java
index 2c4978618a6..964163ce14f 100644
--- 
a/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/DataNodeQueryMessages.java
+++ 
b/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/DataNodeQueryMessages.java
@@ -136,6 +136,8 @@ public final class DataNodeQueryMessages {
 
   public static final String FREE_MORE_MEMORY_THAN_HAS_BEEN_RESERVED =
       "Free more memory than has been reserved.";
+  public static final String ESTIMATED_MODS_TREE_SIZE_DECREASED =
+      "Estimated mods tree size decreased from %d to %d for TsFile %s.";
 
   // --- Execution / Operator ---
 
diff --git 
a/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/DataNodeQueryMessages.java
 
b/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/DataNodeQueryMessages.java
index 7a7de06471b..ae020f8c5a3 100644
--- 
a/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/DataNodeQueryMessages.java
+++ 
b/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/DataNodeQueryMessages.java
@@ -136,6 +136,8 @@ public final class DataNodeQueryMessages {
 
   public static final String FREE_MORE_MEMORY_THAN_HAS_BEEN_RESERVED =
       "释放的内存超过已预留的量。";
+  public static final String ESTIMATED_MODS_TREE_SIZE_DECREASED =
+      "估算的 mods tree 大小从 %d 减少到 %d,TsFile:%s。";
 
   // --- Execution / Operator ---
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
index 2a0373cf6fd..6bda6e9c14d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
@@ -26,11 +26,11 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.audit.UserEntity;
 import org.apache.iotdb.commons.conf.CommonConfig;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
+import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.exception.IoTDBException;
 import org.apache.iotdb.commons.exception.IoTDBRuntimeException;
 import org.apache.iotdb.commons.path.AlignedFullPath;
 import org.apache.iotdb.commons.path.IFullPath;
-import org.apache.iotdb.commons.path.PatternTreeMap;
 import org.apache.iotdb.commons.queryengine.common.SessionInfo;
 import org.apache.iotdb.commons.queryengine.utils.TimestampPrecisionUtils;
 import org.apache.iotdb.commons.utils.TestOnly;
@@ -56,7 +56,6 @@ import 
org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSourceForRegio
 import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSourceType;
 import 
org.apache.iotdb.db.storageengine.dataregion.read.control.FileReaderManager;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
-import org.apache.iotdb.db.utils.datastructure.PatternTreeMapFactory;
 import org.apache.iotdb.db.utils.datastructure.TVList;
 import org.apache.iotdb.mpp.rpc.thrift.TFetchFragmentInstanceStatisticsResp;
 
@@ -82,6 +81,7 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Predicate;
 import java.util.stream.Collectors;
 
 import static 
org.apache.iotdb.commons.utils.ErrorHandlingCommonUtils.getRootCause;
@@ -99,6 +99,7 @@ public class FragmentInstanceContext extends QueryContext {
   private static final long END_TIME_INITIAL_VALUE = -1L;
   // wait over 5s for driver to close is abnormal
   private static final long LONG_WAIT_DURATION = 5_000_000_000L;
+  private static final int MODS_MEMORY_ESTIMATE_READ_INTERVAL = 10_000;
   private final FragmentInstanceId id;
 
   private final FragmentInstanceStateMachine stateMachine;
@@ -378,41 +379,60 @@ public class FragmentInstanceContext extends QueryContext 
{
   }
 
   @Override
-  protected PatternTreeMap<ModEntry, PatternTreeMapFactory.ModsSerializer> 
getAllModifications(
-      TsFileResource resource) {
-    if (isSingleSourcePath() || memoryReservationManager == null) {
-      return loadAllModificationsFromDisk(resource);
+  public List<ModEntry> getPathModifications(
+      TsFileResource tsFileResource, IDeviceID deviceID, String measurement) {
+    if (!checkIfModificationExists(tsFileResource)) {
+      return Collections.emptyList();
     }
+    if (memoryReservationManager == null) {
+      return super.getPathModifications(tsFileResource, deviceID, measurement);
+    }
+    try (QueryModificationLoader modificationLoader =
+        getQueryModificationLoader(
+            tsFileResource,
+            modification -> modification.affects(deviceID) && 
modification.affects(measurement),
+            mods -> getPathModifications(mods, deviceID, measurement))) {
+      return modificationLoader.getPathModifications();
+    } catch (IllegalPathException e) {
+      throw new IllegalStateException(e);
+    }
+  }
+
+  @Override
+  public List<ModEntry> getPathModifications(TsFileResource tsFileResource, 
IDeviceID deviceID)
+      throws IllegalPathException {
+    if (!checkIfModificationExists(tsFileResource)) {
+      return Collections.emptyList();
+    }
+    if (memoryReservationManager == null) {
+      return super.getPathModifications(tsFileResource, deviceID);
+    }
+    try (QueryModificationLoader modificationLoader =
+        getQueryModificationLoader(
+            tsFileResource,
+            modification ->
+                deviceID.isTableModel()
+                    ? modification.affects(deviceID)
+                    : modification.affectsAll(deviceID),
+            mods -> getPathModifications(mods, deviceID))) {
+      return modificationLoader.getPathModifications();
+    }
+  }
 
-    AtomicReference<PatternTreeMap<ModEntry, 
PatternTreeMapFactory.ModsSerializer>>
-        atomicReference = new AtomicReference<>();
-    PatternTreeMap<ModEntry, PatternTreeMapFactory.ModsSerializer> 
cachedResult =
-        fileModCache.computeIfAbsent(
-            resource.getTsFileID(),
-            k -> {
-              PatternTreeMap<ModEntry, PatternTreeMapFactory.ModsSerializer> 
allMods =
-                  loadAllModificationsFromDisk(resource);
-              atomicReference.set(allMods);
-              if (cachedModEntriesSize.get() >= 
CONFIG.getModsCacheSizeLimitPerFI()) {
-                return null;
-              }
-              long memCost =
-                  RamUsageEstimator.sizeOfObject(allMods)
-                      + 
RamUsageEstimator.SHALLOW_SIZE_OF_CONCURRENT_HASHMAP_ENTRY;
-              long alreadyUsedMemoryForCachedModEntries = 
cachedModEntriesSize.get();
-              while (alreadyUsedMemoryForCachedModEntries + memCost
-                  < CONFIG.getModsCacheSizeLimitPerFI()) {
-                if (cachedModEntriesSize.compareAndSet(
-                    alreadyUsedMemoryForCachedModEntries,
-                    alreadyUsedMemoryForCachedModEntries + memCost)) {
-                  memoryReservationManager.reserveMemoryCumulatively(memCost);
-                  return allMods;
-                }
-                alreadyUsedMemoryForCachedModEntries = 
cachedModEntriesSize.get();
-              }
-              return null;
-            });
-    return cachedResult == null ? atomicReference.get() : cachedResult;
+  private QueryModificationLoader getQueryModificationLoader(
+      TsFileResource tsFileResource,
+      Predicate<ModEntry> fallbackModificationMatcher,
+      QueryModificationLoader.ModsTreeMatcher modsTreeMatcher) {
+    return new QueryModificationLoader(
+        this,
+        tsFileResource,
+        memoryReservationManager,
+        CONFIG.getModsCacheSizeLimitPerFI(),
+        MODS_MEMORY_ESTIMATE_READ_INTERVAL,
+        fileModCache,
+        cachedModEntriesSize,
+        fallbackModificationMatcher,
+        modsTreeMatcher);
   }
 
   // the state change listener is added here in a separate initialize() method
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/QueryContext.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/QueryContext.java
index d3edbe2e0d0..44a88826696 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/QueryContext.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/QueryContext.java
@@ -127,20 +127,26 @@ public class QueryContext {
       TsFileResource resource) {
     PatternTreeMap<ModEntry, ModsSerializer> modifications =
         PatternTreeMapFactory.getModsPatternTreeMap();
-    TsFileResource.ModIterator modEntryIterator = 
resource.getModEntryIterator();
-    while (modEntryIterator.hasNext()) {
-      ModEntry modification = modEntryIterator.next();
-      if (tables != null && modification instanceof TableDeletionEntry) {
-        String tableName = ((TableDeletionEntry) modification).getTableName();
-        if (!tables.contains(tableName)) {
+    try (TsFileResource.ModIterator modEntryIterator = 
resource.getModEntryIterator()) {
+      while (modEntryIterator.hasNext()) {
+        ModEntry modification = modEntryIterator.next();
+        if (shouldSkipModification(modification)) {
           continue;
         }
+        modifications.append(modification.keyOfPatternTree(), modification);
       }
-      modifications.append(modification.keyOfPatternTree(), modification);
     }
     return modifications;
   }
 
+  protected boolean shouldSkipModification(ModEntry modification) {
+    if (tables != null && modification instanceof TableDeletionEntry) {
+      String tableName = ((TableDeletionEntry) modification).getTableName();
+      return !tables.contains(tableName);
+    }
+    return false;
+  }
+
   public List<ModEntry> getPathModifications(
       TsFileResource tsFileResource, IDeviceID deviceID, String measurement) {
     // if the mods file does not exist, do not add it to the cache
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/QueryModificationLoader.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/QueryModificationLoader.java
new file mode 100644
index 00000000000..bdb1914a354
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/QueryModificationLoader.java
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.execution.fragment;
+
+import org.apache.iotdb.calc.exception.MemoryNotEnoughException;
+import org.apache.iotdb.calc.plan.planner.memory.MemoryReservationManager;
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.path.PatternTreeMap;
+import org.apache.iotdb.db.i18n.DataNodeQueryMessages;
+import org.apache.iotdb.db.storageengine.dataregion.modification.ModEntry;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileID;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+import org.apache.iotdb.db.utils.ModificationUtils;
+import org.apache.iotdb.db.utils.datastructure.PatternTreeMapFactory;
+
+import org.apache.tsfile.utils.RamUsageEstimator;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Predicate;
+
+class QueryModificationLoader implements AutoCloseable {
+
+  private final QueryContext queryContext;
+  private final TsFileResource resource;
+  private final MemoryReservationManager memoryReservationManager;
+  private final long modsCacheSizeLimitPerFI;
+  private final int modsMemoryEstimateReadInterval;
+  private final Map<TsFileID, PatternTreeMap<ModEntry, 
PatternTreeMapFactory.ModsSerializer>>
+      fileModCache;
+  private final AtomicLong cachedModEntriesSize;
+  private final Predicate<ModEntry> modificationMatcher;
+  private final ModsTreeMatcher modsTreeMatcher;
+
+  private TsFileResource.ModIterator currentIterator;
+
+  QueryModificationLoader(
+      QueryContext queryContext,
+      TsFileResource resource,
+      MemoryReservationManager memoryReservationManager,
+      long modsCacheSizeLimitPerFI,
+      int modsMemoryEstimateReadInterval,
+      Map<TsFileID, PatternTreeMap<ModEntry, 
PatternTreeMapFactory.ModsSerializer>> fileModCache,
+      AtomicLong cachedModEntriesSize,
+      Predicate<ModEntry> modificationMatcher,
+      ModsTreeMatcher modsTreeMatcher) {
+    this.queryContext = queryContext;
+    this.resource = resource;
+    this.memoryReservationManager = memoryReservationManager;
+    this.modsCacheSizeLimitPerFI = modsCacheSizeLimitPerFI;
+    this.modsMemoryEstimateReadInterval = modsMemoryEstimateReadInterval;
+    this.fileModCache = fileModCache;
+    this.cachedModEntriesSize = cachedModEntriesSize;
+    this.modificationMatcher = modificationMatcher;
+    this.modsTreeMatcher = modsTreeMatcher;
+  }
+
+  List<ModEntry> getPathModifications() throws IllegalPathException {
+    AtomicReference<LoadModsResult> loadedResult = new AtomicReference<>();
+    PatternTreeMap<ModEntry, PatternTreeMapFactory.ModsSerializer> cachedMods =
+        fileModCache.computeIfAbsent(
+            resource.getTsFileID(), ignored -> 
loadAllModificationsForCache(loadedResult));
+    if (cachedMods != null) {
+      return modsTreeMatcher.match(cachedMods);
+    }
+
+    LoadModsResult result = loadedResult.get();
+    try {
+      if (result.loadedAllModEntries) {
+        return fallbackByMatchLoadedPatternTree(result);
+      } else {
+        return fallbackByMatchedScan(result);
+      }
+    } finally {
+      close();
+    }
+  }
+
+  private PatternTreeMap<ModEntry, PatternTreeMapFactory.ModsSerializer>
+      loadAllModificationsForCache(AtomicReference<LoadModsResult> 
loadedResult) {
+    LoadModsResult result = loadAllModificationsWithQuotaControl();
+    loadedResult.set(result);
+    if (!result.cacheable) {
+      return null;
+    }
+
+    closeCurrentIterator();
+    return result.mods;
+  }
+
+  private LoadModsResult loadAllModificationsWithQuotaControl() {
+    PatternTreeMap<ModEntry, PatternTreeMapFactory.ModsSerializer> 
modifications =
+        PatternTreeMapFactory.getModsPatternTreeMap();
+    LoadModsResult result = new LoadModsResult(modifications);
+    if (resource.getTotalModSizeInByte() > getRemainingCacheQuota()) {
+      currentIterator = resource.getModEntryIterator();
+      result.loadedAllModEntries = false;
+      result.cacheable = false;
+      return result;
+    }
+
+    currentIterator = resource.getModEntryIterator();
+
+    int appendedModCount = 0;
+    boolean estimatedAfterLastAppend = false;
+
+    while (currentIterator.hasNext()) {
+      ModEntry modification = currentIterator.next();
+      if (queryContext.shouldSkipModification(modification)) {
+        continue;
+      }
+
+      modifications.append(modification.keyOfPatternTree(), modification);
+      appendedModCount++;
+      estimatedAfterLastAppend = false;
+
+      if (appendedModCount % modsMemoryEstimateReadInterval == 0) {
+        if (!tryEstimateAndReserveTreeMemory(result)) {
+          result.loadedAllModEntries = false;
+          result.cacheable = false;
+          return result;
+        }
+        estimatedAfterLastAppend = true;
+      }
+    }
+
+    if (!estimatedAfterLastAppend) {
+      result.cacheable = tryEstimateAndReserveTreeMemory(result);
+    } else {
+      result.cacheable = true;
+    }
+
+    result.loadedAllModEntries = true;
+    return result;
+  }
+
+  private boolean tryEstimateAndReserveTreeMemory(LoadModsResult result) {
+    long currentEstimatedSize = estimateModsTreeMemory(result.mods);
+    long delta = currentEstimatedSize - result.reservedTreeMemoryBytes;
+    if (delta < 0) {
+      throw new IllegalStateException(
+          String.format(
+              DataNodeQueryMessages.ESTIMATED_MODS_TREE_SIZE_DECREASED,
+              result.reservedTreeMemoryBytes,
+              currentEstimatedSize,
+              resource));
+    }
+    if (delta == 0) {
+      return true;
+    }
+
+    if (!tryClaimCacheQuota(delta)) {
+      return false;
+    }
+    result.cacheQuotaBytes += delta;
+
+    try {
+      memoryReservationManager.reserveMemoryImmediately(delta);
+    } catch (MemoryNotEnoughException e) {
+      return false;
+    }
+
+    result.reservedTreeMemoryBytes = currentEstimatedSize;
+    return true;
+  }
+
+  private boolean tryClaimCacheQuota(long delta) {
+    if (delta <= 0) {
+      return true;
+    }
+
+    long alreadyUsedMemoryForCachedModEntries = cachedModEntriesSize.get();
+    while (alreadyUsedMemoryForCachedModEntries + delta <= 
modsCacheSizeLimitPerFI) {
+      if (cachedModEntriesSize.compareAndSet(
+          alreadyUsedMemoryForCachedModEntries, 
alreadyUsedMemoryForCachedModEntries + delta)) {
+        return true;
+      }
+      alreadyUsedMemoryForCachedModEntries = cachedModEntriesSize.get();
+    }
+    return false;
+  }
+
+  private long getRemainingCacheQuota() {
+    return modsCacheSizeLimitPerFI - cachedModEntriesSize.get();
+  }
+
+  private List<ModEntry> fallbackByMatchedScan(LoadModsResult partialTree)
+      throws IllegalPathException {
+    List<ModEntry> matchedMods = matchLoadedTreeAndRelease(partialTree);
+    long reservedMatchedModsMemoryBytes = 
reserveMatchedModsMemory(matchedMods);
+    int matchedModCount = matchedMods.size();
+
+    while (currentIterator.hasNext()) {
+      ModEntry modification = currentIterator.next();
+      if (queryContext.shouldSkipModification(modification)) {
+        continue;
+      }
+      if (modificationMatcher.test(modification)) {
+        matchedMods.add(modification);
+        matchedModCount++;
+        if (matchedModCount % modsMemoryEstimateReadInterval == 0) {
+          reservedMatchedModsMemoryBytes =
+              reserveMatchedModsMemoryIncrementally(matchedMods, 
reservedMatchedModsMemoryBytes);
+        }
+      }
+    }
+
+    List<ModEntry> sortedAndMergedMods = 
ModificationUtils.sortAndMerge(matchedMods);
+    adjustMatchedModsMemoryReservation(sortedAndMergedMods, 
reservedMatchedModsMemoryBytes);
+    return sortedAndMergedMods;
+  }
+
+  private List<ModEntry> fallbackByMatchLoadedPatternTree(LoadModsResult 
loadedTree)
+      throws IllegalPathException {
+    List<ModEntry> matchedMods = matchLoadedTreeAndRelease(loadedTree);
+    reserveMatchedModsMemory(matchedMods);
+    return matchedMods;
+  }
+
+  private List<ModEntry> matchLoadedTreeAndRelease(LoadModsResult loadedTree)
+      throws IllegalPathException {
+    try {
+      return new ArrayList<>(modsTreeMatcher.match(loadedTree.mods));
+    } finally {
+      loadedTree.mods = null;
+      cachedModEntriesSize.addAndGet(-loadedTree.cacheQuotaBytes);
+      loadedTree.cacheQuotaBytes = 0;
+      
memoryReservationManager.releaseMemoryCumulatively(loadedTree.reservedTreeMemoryBytes);
+      loadedTree.reservedTreeMemoryBytes = 0;
+    }
+  }
+
+  private long reserveMatchedModsMemory(List<ModEntry> matchedMods) {
+    long estimatedSize = RamUsageEstimator.sizeOfArrayList(matchedMods);
+    memoryReservationManager.reserveMemoryCumulatively(estimatedSize);
+    return estimatedSize;
+  }
+
+  private long reserveMatchedModsMemoryIncrementally(
+      List<ModEntry> matchedMods, long reservedMatchedModsMemoryBytes) {
+    long currentEstimatedSize = RamUsageEstimator.sizeOfArrayList(matchedMods);
+    long delta = currentEstimatedSize - reservedMatchedModsMemoryBytes;
+    memoryReservationManager.reserveMemoryCumulatively(delta);
+    return currentEstimatedSize;
+  }
+
+  private void adjustMatchedModsMemoryReservation(
+      List<ModEntry> matchedMods, long reservedMatchedModsMemoryBytes) {
+    long currentEstimatedSize = RamUsageEstimator.sizeOfArrayList(matchedMods);
+    long delta = currentEstimatedSize - reservedMatchedModsMemoryBytes;
+    if (delta >= 0) {
+      memoryReservationManager.reserveMemoryCumulatively(delta);
+    } else {
+      memoryReservationManager.releaseMemoryCumulatively(-delta);
+    }
+  }
+
+  private long estimateModsTreeMemory(
+      PatternTreeMap<ModEntry, PatternTreeMapFactory.ModsSerializer> 
modifications) {
+    return RamUsageEstimator.sizeOfObject(modifications);
+  }
+
+  @Override
+  public void close() {
+    closeCurrentIterator();
+  }
+
+  private void closeCurrentIterator() {
+    if (currentIterator != null) {
+      currentIterator.close();
+      currentIterator = null;
+    }
+  }
+
+  private static class LoadModsResult {
+
+    private PatternTreeMap<ModEntry, PatternTreeMapFactory.ModsSerializer> 
mods;
+    private long cacheQuotaBytes;
+    private long reservedTreeMemoryBytes;
+    private boolean loadedAllModEntries;
+    private boolean cacheable;
+
+    private LoadModsResult(PatternTreeMap<ModEntry, 
PatternTreeMapFactory.ModsSerializer> mods) {
+      this.mods = mods;
+    }
+  }
+
+  @FunctionalInterface
+  interface ModsTreeMatcher {
+
+    List<ModEntry> match(PatternTreeMap<ModEntry, 
PatternTreeMapFactory.ModsSerializer> modsTree)
+        throws IllegalPathException;
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/FakedMemoryReservationManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/FakedMemoryReservationManager.java
index ee6e0b06cf4..7cee8034a05 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/FakedMemoryReservationManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/FakedMemoryReservationManager.java
@@ -31,6 +31,9 @@ public class FakedMemoryReservationManager implements 
MemoryReservationManager {
   @Override
   public void reserveMemoryImmediately() {}
 
+  @Override
+  public void reserveMemoryImmediately(final long size) {}
+
   @Override
   public void releaseMemoryCumulatively(long size) {}
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/NotThreadSafeMemoryReservationManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/NotThreadSafeMemoryReservationManager.java
index 7dbebaa2b50..d156628532c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/NotThreadSafeMemoryReservationManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/NotThreadSafeMemoryReservationManager.java
@@ -68,6 +68,15 @@ public class NotThreadSafeMemoryReservationManager 
implements MemoryReservationM
     }
   }
 
+  @Override
+  public void reserveMemoryImmediately(final long size) {
+    if (size != 0) {
+      LOCAL_EXECUTION_PLANNER.reserveFromFreeMemoryForOperators(
+          size, reservedBytesInTotal, queryId.getId(), contextHolder);
+      reservedBytesInTotal += size;
+    }
+  }
+
   @Override
   public void releaseMemoryCumulatively(final long size) {
     bytesToBeReleased += size;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/ThreadSafeMemoryReservationManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/ThreadSafeMemoryReservationManager.java
index d167eae354f..2a544421f3f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/ThreadSafeMemoryReservationManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/ThreadSafeMemoryReservationManager.java
@@ -41,6 +41,11 @@ public class ThreadSafeMemoryReservationManager extends 
NotThreadSafeMemoryReser
     super.reserveMemoryImmediately();
   }
 
+  @Override
+  public synchronized void reserveMemoryImmediately(final long size) {
+    super.reserveMemoryImmediately(size);
+  }
+
   @Override
   public synchronized void releaseMemoryCumulatively(long size) {
     super.releaseMemoryCumulatively(size);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
index e513d79ba61..439286288d6 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
@@ -1494,7 +1494,7 @@ public class TsFileResource implements 
PersistentResource, Cloneable {
     return entries;
   }
 
-  public class ModIterator implements Iterator<ModEntry> {
+  public class ModIterator implements Iterator<ModEntry>, AutoCloseable {
 
     private final Iterator<ModEntry> sharedModIterator;
     private final Iterator<ModEntry> exclusiveModIterator;
@@ -1537,6 +1537,22 @@ public class TsFileResource implements 
PersistentResource, Cloneable {
       }
       throw new NoSuchElementException();
     }
+
+    @Override
+    public void close() {
+      closeModIterator(exclusiveModIterator);
+      closeModIterator(sharedModIterator);
+    }
+
+    private void closeModIterator(Iterator<ModEntry> modIterator) {
+      if (modIterator instanceof AutoCloseable) {
+        try {
+          ((AutoCloseable) modIterator).close();
+        } catch (Exception e) {
+          
LOGGER.info(StorageEngineMessages.CANNOT_CLOSE_MOD_FILE_INPUT_STREAM, 
getTsFile(), e);
+        }
+      }
+    }
   }
 
   public void upgradeModFile(ExecutorService upgradeModFileThreadPool) throws 
IOException {
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/fragment/QueryModificationLoaderTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/fragment/QueryModificationLoaderTest.java
new file mode 100644
index 00000000000..39b9c41447c
--- /dev/null
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/fragment/QueryModificationLoaderTest.java
@@ -0,0 +1,358 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.execution.fragment;
+
+import org.apache.iotdb.calc.exception.MemoryNotEnoughException;
+import org.apache.iotdb.calc.plan.planner.memory.MemoryReservationManager;
+import org.apache.iotdb.commons.path.MeasurementPath;
+import org.apache.iotdb.commons.path.PatternTreeMap;
+import org.apache.iotdb.db.storageengine.dataregion.modification.ModEntry;
+import 
org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile;
+import 
org.apache.iotdb.db.storageengine.dataregion.modification.TreeDeletionEntry;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileID;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.generator.TsFileNameGenerator;
+import org.apache.iotdb.db.utils.constant.TestConstant;
+import org.apache.iotdb.db.utils.datastructure.PatternTreeMapFactory;
+
+import org.apache.tsfile.external.commons.io.FileUtils;
+import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.utils.Pair;
+import org.junit.After;
+import org.junit.Test;
+
+import java.io.File;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class QueryModificationLoaderTest {
+
+  private static final IDeviceID DEVICE_ID = 
IDeviceID.Factory.DEFAULT_FACTORY.create("root.sg.d1");
+
+  private File testDir;
+
+  @After
+  public void tearDown() throws Exception {
+    if (testDir != null && testDir.exists()) {
+      FileUtils.deleteDirectory(testDir);
+    }
+  }
+
+  @Test
+  public void testCacheLoadedModsTreeWhenQuotaEnough() throws Exception {
+    TsFileResource resource = prepareResource("cache");
+    writeMods(
+        resource,
+        new TreeDeletionEntry(new MeasurementPath("root.sg.d1.s1"), 0, 10),
+        new TreeDeletionEntry(new MeasurementPath("root.sg.d2.s1"), 20, 30));
+
+    Map<TsFileID, PatternTreeMap<ModEntry, 
PatternTreeMapFactory.ModsSerializer>> fileModCache =
+        new ConcurrentHashMap<>();
+    AtomicLong cachedModEntriesSize = new AtomicLong();
+    CountingMemoryReservationManager memoryReservationManager =
+        new CountingMemoryReservationManager();
+
+    try (QueryModificationLoader loader =
+        newLoader(
+            resource,
+            Long.MAX_VALUE,
+            fileModCache,
+            cachedModEntriesSize,
+            memoryReservationManager,
+            1)) {
+      List<ModEntry> result = loader.getPathModifications();
+
+      assertEquals(1, result.size());
+      assertTrue(fileModCache.containsKey(resource.getTsFileID()));
+      assertTrue(cachedModEntriesSize.get() > 0);
+      assertTrue(memoryReservationManager.getReservedBytes() >= 
cachedModEntriesSize.get());
+      assertTrue(memoryReservationManager.getImmediateReservationCount() > 0);
+    }
+  }
+
+  @Test
+  public void 
testFallbackScansModsWhenFileSizeExceedsRemainingQuotaBeforeLoad() throws 
Exception {
+    TsFileResource resource = prepareResource("file-size-precheck-fallback");
+    writeMods(
+        resource,
+        new TreeDeletionEntry(new MeasurementPath("root.sg.d1.s1"), 0, 10),
+        new TreeDeletionEntry(new MeasurementPath("root.sg.d2.s1"), 20, 30),
+        new TreeDeletionEntry(new MeasurementPath("root.sg.d1.s1"), 40, 50));
+
+    Map<TsFileID, PatternTreeMap<ModEntry, 
PatternTreeMapFactory.ModsSerializer>> fileModCache =
+        new ConcurrentHashMap<>();
+    AtomicLong cachedModEntriesSize = new AtomicLong();
+    CountingMemoryReservationManager memoryReservationManager =
+        new CountingMemoryReservationManager();
+
+    try (QueryModificationLoader loader =
+        newLoader(resource, 1, fileModCache, cachedModEntriesSize, 
memoryReservationManager, 1)) {
+      List<ModEntry> result = loader.getPathModifications();
+
+      assertEquals(2, result.size());
+      assertFalse(fileModCache.containsKey(resource.getTsFileID()));
+      assertEquals(0, cachedModEntriesSize.get());
+      assertTrue(memoryReservationManager.getReservedBytes() > 0);
+      assertEquals(0, 
memoryReservationManager.getRemainingImmediateFailures());
+      assertEquals(0, memoryReservationManager.getImmediateReservationCount());
+    }
+  }
+
+  @Test
+  public void testFallbackScansRemainingModsWhenEstimatedTreeExceedsQuota() 
throws Exception {
+    TsFileResource resource = prepareResource("estimated-tree-quota-fallback");
+    writeMods(
+        resource,
+        new TreeDeletionEntry(new MeasurementPath("root.sg.d1.s1"), 0, 10),
+        new TreeDeletionEntry(new MeasurementPath("root.sg.d2.s1"), 20, 30),
+        new TreeDeletionEntry(new MeasurementPath("root.sg.d1.s1"), 40, 50));
+
+    Map<TsFileID, PatternTreeMap<ModEntry, 
PatternTreeMapFactory.ModsSerializer>> fileModCache =
+        new ConcurrentHashMap<>();
+    AtomicLong cachedModEntriesSize = new AtomicLong();
+    CountingMemoryReservationManager memoryReservationManager =
+        new CountingMemoryReservationManager();
+
+    try (QueryModificationLoader loader =
+        newLoader(
+            resource,
+            resource.getTotalModSizeInByte() + 1,
+            fileModCache,
+            cachedModEntriesSize,
+            memoryReservationManager,
+            1)) {
+      List<ModEntry> result = loader.getPathModifications();
+
+      assertEquals(2, result.size());
+      assertFalse(fileModCache.containsKey(resource.getTsFileID()));
+      assertEquals(0, cachedModEntriesSize.get());
+      assertTrue(memoryReservationManager.getReservedBytes() > 0);
+      assertTrue(memoryReservationManager.getCumulativeReleaseCount() > 0);
+    }
+  }
+
+  @Test
+  public void testFallbackMatchesLoadedTreeWhenFinalReservationFailed() throws 
Exception {
+    TsFileResource resource = prepareResource("final-reserve-failed");
+    writeMods(
+        resource,
+        new TreeDeletionEntry(new MeasurementPath("root.sg.d1.s1"), 0, 10),
+        new TreeDeletionEntry(new MeasurementPath("root.sg.d2.s1"), 20, 30),
+        new TreeDeletionEntry(new MeasurementPath("root.sg.d1.s1"), 40, 50));
+
+    Map<TsFileID, PatternTreeMap<ModEntry, 
PatternTreeMapFactory.ModsSerializer>> fileModCache =
+        new ConcurrentHashMap<>();
+    AtomicLong cachedModEntriesSize = new AtomicLong();
+    CountingMemoryReservationManager memoryReservationManager =
+        new CountingMemoryReservationManager(1);
+
+    try (QueryModificationLoader loader =
+        newLoader(
+            resource,
+            Long.MAX_VALUE,
+            fileModCache,
+            cachedModEntriesSize,
+            memoryReservationManager,
+            100)) {
+      List<ModEntry> result = loader.getPathModifications();
+
+      assertEquals(2, result.size());
+      assertFalse(fileModCache.containsKey(resource.getTsFileID()));
+      assertEquals(0, cachedModEntriesSize.get());
+      assertTrue(memoryReservationManager.getReservedBytes() > 0);
+      assertEquals(0, 
memoryReservationManager.getRemainingImmediateFailures());
+      assertEquals(1, memoryReservationManager.getImmediateReservationCount());
+    }
+  }
+
+  @Test
+  public void testFallbackReservesMatchedModsCumulativelyWhenQuotaExceeded() 
throws Exception {
+    TsFileResource resource = prepareResource("fallback-cumulative-reserve");
+    writeMods(
+        resource,
+        new TreeDeletionEntry(new MeasurementPath("root.sg.d1.s1"), 0, 10),
+        new TreeDeletionEntry(new MeasurementPath("root.sg.d2.s1"), 20, 30),
+        new TreeDeletionEntry(new MeasurementPath("root.sg.d1.s1"), 40, 50));
+
+    Map<TsFileID, PatternTreeMap<ModEntry, 
PatternTreeMapFactory.ModsSerializer>> fileModCache =
+        new ConcurrentHashMap<>();
+    AtomicLong cachedModEntriesSize = new AtomicLong();
+    CountingMemoryReservationManager memoryReservationManager =
+        new CountingMemoryReservationManager(1);
+
+    try (QueryModificationLoader loader =
+        newLoader(resource, 1, fileModCache, cachedModEntriesSize, 
memoryReservationManager, 1)) {
+      List<ModEntry> result = loader.getPathModifications();
+
+      assertEquals(2, result.size());
+      assertFalse(fileModCache.containsKey(resource.getTsFileID()));
+      assertEquals(0, cachedModEntriesSize.get());
+      assertTrue(memoryReservationManager.getReservedBytes() > 0);
+      assertEquals(1, 
memoryReservationManager.getRemainingImmediateFailures());
+      assertEquals(0, memoryReservationManager.getImmediateReservationCount());
+    }
+  }
+
+  @Test
+  public void testFallbackAdjustsReservedMemoryAfterSortAndMerge() throws 
Exception {
+    TsFileResource resource = prepareResource("fallback-sort-merge-adjust");
+    writeMods(
+        resource,
+        new TreeDeletionEntry(new MeasurementPath("root.sg.d1.s1"), 0, 10),
+        new TreeDeletionEntry(new MeasurementPath("root.sg.d1.s1"), 5, 15),
+        new TreeDeletionEntry(new MeasurementPath("root.sg.d2.s1"), 20, 30));
+
+    Map<TsFileID, PatternTreeMap<ModEntry, 
PatternTreeMapFactory.ModsSerializer>> fileModCache =
+        new ConcurrentHashMap<>();
+    AtomicLong cachedModEntriesSize = new AtomicLong();
+    CountingMemoryReservationManager memoryReservationManager =
+        new CountingMemoryReservationManager();
+
+    try (QueryModificationLoader loader =
+        newLoader(resource, 1, fileModCache, cachedModEntriesSize, 
memoryReservationManager, 1)) {
+      List<ModEntry> result = loader.getPathModifications();
+
+      assertEquals(1, result.size());
+      assertFalse(fileModCache.containsKey(resource.getTsFileID()));
+      assertEquals(0, cachedModEntriesSize.get());
+      assertTrue(memoryReservationManager.getReservedBytes() > 0);
+      assertTrue(memoryReservationManager.getCumulativeReleaseCount() > 0);
+    }
+  }
+
+  private QueryModificationLoader newLoader(
+      TsFileResource resource,
+      long modsCacheSizeLimitPerFI,
+      Map<TsFileID, PatternTreeMap<ModEntry, 
PatternTreeMapFactory.ModsSerializer>> fileModCache,
+      AtomicLong cachedModEntriesSize,
+      MemoryReservationManager memoryReservationManager,
+      int modsMemoryEstimateReadInterval) {
+    QueryContext queryContext = new QueryContext(false, false);
+    return new QueryModificationLoader(
+        queryContext,
+        resource,
+        memoryReservationManager,
+        modsCacheSizeLimitPerFI,
+        modsMemoryEstimateReadInterval,
+        fileModCache,
+        cachedModEntriesSize,
+        modification -> modification.affects(DEVICE_ID) && 
modification.affects("s1"),
+        modsTree -> queryContext.getPathModifications(modsTree, DEVICE_ID, 
"s1"));
+  }
+
+  private TsFileResource prepareResource(String name) {
+    testDir = new File(TestConstant.BASE_OUTPUT_PATH, 
"QueryModificationLoaderTest-" + name);
+    testDir.mkdirs();
+    File tsFile =
+        new 
File(TsFileNameGenerator.generateNewTsFilePath(testDir.getAbsolutePath(), 1, 1, 
0, 0));
+    return new TsFileResource(tsFile);
+  }
+
+  private void writeMods(TsFileResource resource, TreeDeletionEntry... 
modifications)
+      throws Exception {
+    try (ModificationFile modificationFile = resource.getModFileForWrite()) {
+      for (TreeDeletionEntry modification : modifications) {
+        modificationFile.write(modification);
+      }
+    }
+  }
+
+  private static class CountingMemoryReservationManager implements 
MemoryReservationManager {
+
+    private long reservedBytes;
+    private int remainingImmediateFailures;
+    private int immediateReservationCount;
+    private int cumulativeReleaseCount;
+
+    private CountingMemoryReservationManager() {}
+
+    private CountingMemoryReservationManager(int remainingImmediateFailures) {
+      this.remainingImmediateFailures = remainingImmediateFailures;
+    }
+
+    @Override
+    public void reserveMemoryCumulatively(long size) {
+      reservedBytes += size;
+    }
+
+    @Override
+    public void reserveMemoryImmediately() {
+      immediateReservationCount++;
+      if (remainingImmediateFailures > 0) {
+        remainingImmediateFailures--;
+        throw new MemoryNotEnoughException("Mock memory reservation failure.");
+      }
+    }
+
+    @Override
+    public void reserveMemoryImmediately(long size) {
+      immediateReservationCount++;
+      if (remainingImmediateFailures > 0) {
+        remainingImmediateFailures--;
+        throw new MemoryNotEnoughException("Mock memory reservation failure.");
+      }
+      reservedBytes += size;
+    }
+
+    @Override
+    public void releaseMemoryCumulatively(long size) {
+      cumulativeReleaseCount++;
+      reservedBytes -= size;
+    }
+
+    @Override
+    public void releaseAllReservedMemory() {
+      reservedBytes = 0;
+    }
+
+    @Override
+    public Pair<Long, Long> releaseMemoryVirtually(long size) {
+      reservedBytes -= size;
+      return new Pair<>(size, 0L);
+    }
+
+    @Override
+    public void reserveMemoryVirtually(long bytesToBeReserved, long 
bytesAlreadyReserved) {
+      reservedBytes += bytesToBeReserved + bytesAlreadyReserved;
+    }
+
+    private long getReservedBytes() {
+      return reservedBytes;
+    }
+
+    private int getRemainingImmediateFailures() {
+      return remainingImmediateFailures;
+    }
+
+    private int getImmediateReservationCount() {
+      return immediateReservationCount;
+    }
+
+    private int getCumulativeReleaseCount() {
+      return cumulativeReleaseCount;
+    }
+  }
+}


Reply via email to