This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new bce9631e108 Add memory control for MergeReader
bce9631e108 is described below

commit bce9631e108b8a2a2674011f2c239d08d15f7449
Author: Liao Lanyu <[email protected]>
AuthorDate: Fri Jun 21 17:54:07 2024 +0800

    Add memory control for MergeReader
---
 .../db/queryengine/common/MPPQueryContext.java     | 51 +++---------
 .../fragment/FragmentInstanceContext.java          | 19 +++++
 .../fragment/FragmentInstanceExecution.java        |  2 +
 .../AbstractSeriesAggregationScanOperator.java     |  2 +-
 .../operator/source/AlignedSeriesScanOperator.java |  4 +-
 .../operator/source/SeriesScanOperator.java        |  3 +-
 .../execution/operator/source/SeriesScanUtil.java  |  3 +-
 .../iotdb/db/queryengine/plan/Coordinator.java     |  2 +-
 .../plan/optimization/AggregationPushDown.java     | 25 +-----
 .../plan/planner/LocalExecutionPlanner.java        | 13 +--
 .../planner/memory/MemoryReservationManager.java   | 46 +++++++++++
 .../NotThreadSafeMemoryReservationManager.java     | 94 ++++++++++++++++++++++
 .../memory/ThreadSafeMemoryReservationManager.java | 51 ++++++++++++
 .../read/reader/common/PriorityMergeReader.java    | 31 ++++++-
 .../execution/operator/OperatorMemoryTest.java     | 19 ++---
 15 files changed, 277 insertions(+), 88 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java
index 3a42ee805b0..ed4516223b5 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java
@@ -26,7 +26,8 @@ import 
org.apache.iotdb.db.queryengine.plan.analyze.PredicateUtils;
 import org.apache.iotdb.db.queryengine.plan.analyze.QueryType;
 import org.apache.iotdb.db.queryengine.plan.analyze.TypeProvider;
 import org.apache.iotdb.db.queryengine.plan.analyze.lock.SchemaLockType;
-import org.apache.iotdb.db.queryengine.plan.planner.LocalExecutionPlanner;
+import 
org.apache.iotdb.db.queryengine.plan.planner.memory.MemoryReservationManager;
+import 
org.apache.iotdb.db.queryengine.plan.planner.memory.NotThreadSafeMemoryReservationManager;
 import org.apache.iotdb.db.queryengine.statistics.QueryPlanStatistics;
 
 import org.apache.tsfile.read.filter.basic.Filter;
@@ -78,20 +79,13 @@ public class MPPQueryContext {
 
   // To avoid query front-end from consuming too much memory, it needs to 
reserve memory when
   // constructing some Expression and PlanNode.
-  private long reservedBytesInTotalForFrontEnd = 0;
-
-  private long bytesToBeReservedForFrontEnd = 0;
-
-  // To avoid reserving memory too frequently, we choose to do it in batches. 
This is the lower
-  // bound
-  // for each batch.
-  private static final long MEMORY_BATCH_THRESHOLD = 1024L * 1024L;
-
-  private final LocalExecutionPlanner LOCAL_EXECUTION_PLANNER = 
LocalExecutionPlanner.getInstance();
+  private final MemoryReservationManager memoryReservationManager;
 
   public MPPQueryContext(QueryId queryId) {
     this.queryId = queryId;
     this.endPointBlackList = new LinkedList<>();
+    this.memoryReservationManager =
+        new NotThreadSafeMemoryReservationManager(queryId, 
this.getClass().getName());
   }
 
   // TODO too many callers just pass a null SessionInfo which should be 
forbidden
@@ -127,7 +121,7 @@ public class MPPQueryContext {
 
   public void prepareForRetry() {
     this.initResultNodeContext();
-    this.releaseMemoryForFrontEnd();
+    this.releaseAllMemoryReservedForFrontEnd();
   }
 
   private void initResultNodeContext() {
@@ -313,40 +307,19 @@ public class MPPQueryContext {
    * single-threaded manner.
    */
   public void reserveMemoryForFrontEnd(final long bytes) {
-    this.bytesToBeReservedForFrontEnd += bytes;
-    if (this.bytesToBeReservedForFrontEnd >= MEMORY_BATCH_THRESHOLD) {
-      reserveMemoryForFrontEndImmediately();
-    }
+    this.memoryReservationManager.reserveMemoryCumulatively(bytes);
   }
 
   public void reserveMemoryForFrontEndImmediately() {
-    if (bytesToBeReservedForFrontEnd != 0) {
-      LOCAL_EXECUTION_PLANNER.reserveMemoryForQueryFrontEnd(
-          bytesToBeReservedForFrontEnd, reservedBytesInTotalForFrontEnd, 
queryId.getId());
-      this.reservedBytesInTotalForFrontEnd += bytesToBeReservedForFrontEnd;
-      this.bytesToBeReservedForFrontEnd = 0;
-    }
+    this.memoryReservationManager.reserveMemoryImmediately();
   }
 
-  public void releaseMemoryForFrontEnd() {
-    if (reservedBytesInTotalForFrontEnd != 0) {
-      
LOCAL_EXECUTION_PLANNER.releaseToFreeMemoryForOperators(reservedBytesInTotalForFrontEnd);
-      reservedBytesInTotalForFrontEnd = 0;
-    }
+  public void releaseAllMemoryReservedForFrontEnd() {
+    this.memoryReservationManager.releaseAllReservedMemory();
   }
 
-  public void releaseMemoryForFrontEnd(final long bytes) {
-    if (bytes != 0) {
-      long bytesToRelease;
-      if (bytes <= bytesToBeReservedForFrontEnd) {
-        bytesToBeReservedForFrontEnd -= bytes;
-      } else {
-        bytesToRelease = bytes - bytesToBeReservedForFrontEnd;
-        bytesToBeReservedForFrontEnd = 0;
-        
LOCAL_EXECUTION_PLANNER.releaseToFreeMemoryForOperators(bytesToRelease);
-        reservedBytesInTotalForFrontEnd -= bytesToRelease;
-      }
-    }
+  public void releaseMemoryReservedForFrontEnd(final long bytes) {
+    this.memoryReservationManager.releaseMemoryCumulatively(bytes);
   }
 
   // endregion
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
index a109fe84963..7c83518fcf0 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
@@ -28,6 +28,8 @@ import org.apache.iotdb.db.queryengine.common.QueryId;
 import org.apache.iotdb.db.queryengine.common.SessionInfo;
 import org.apache.iotdb.db.queryengine.metric.QueryRelatedResourceMetricSet;
 import org.apache.iotdb.db.queryengine.metric.SeriesScanCostMetricSet;
+import 
org.apache.iotdb.db.queryengine.plan.planner.memory.MemoryReservationManager;
+import 
org.apache.iotdb.db.queryengine.plan.planner.memory.ThreadSafeMemoryReservationManager;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.TimePredicate;
 import org.apache.iotdb.db.storageengine.dataregion.IDataRegionForQuery;
 import org.apache.iotdb.db.storageengine.dataregion.read.IQueryDataSource;
@@ -63,6 +65,8 @@ public class FragmentInstanceContext extends QueryContext {
 
   private final FragmentInstanceStateMachine stateMachine;
 
+  private final MemoryReservationManager memoryReservationManager;
+
   private IDataRegionForQuery dataRegion;
   private Filter globalTimeFilter;
 
@@ -193,6 +197,8 @@ public class FragmentInstanceContext extends QueryContext {
         globalTimePredicate == null ? null : 
globalTimePredicate.convertPredicateToTimeFilter();
     this.dataNodeQueryContextMap = dataNodeQueryContextMap;
     this.dataNodeQueryContext = dataNodeQueryContextMap.get(id.getQueryId());
+    this.memoryReservationManager =
+        new ThreadSafeMemoryReservationManager(id.getQueryId(), 
this.getClass().getName());
   }
 
   private FragmentInstanceContext(
@@ -203,6 +209,8 @@ public class FragmentInstanceContext extends QueryContext {
     this.sessionInfo = sessionInfo;
     this.dataNodeQueryContextMap = null;
     this.dataNodeQueryContext = null;
+    this.memoryReservationManager =
+        new ThreadSafeMemoryReservationManager(id.getQueryId(), 
this.getClass().getName());
   }
 
   private FragmentInstanceContext(
@@ -218,6 +226,8 @@ public class FragmentInstanceContext extends QueryContext {
     this.dataRegion = dataRegion;
     this.globalTimeFilter = globalTimeFilter;
     this.dataNodeQueryContextMap = null;
+    this.memoryReservationManager =
+        new ThreadSafeMemoryReservationManager(id.getQueryId(), 
this.getClass().getName());
   }
 
   @TestOnly
@@ -232,6 +242,7 @@ public class FragmentInstanceContext extends QueryContext {
     this.stateMachine = null;
     this.dataNodeQueryContextMap = null;
     this.dataNodeQueryContext = null;
+    this.memoryReservationManager = null;
   }
 
   public void start() {
@@ -369,6 +380,14 @@ public class FragmentInstanceContext extends QueryContext {
     this.devicePathsToContext = devicePathsToContext;
   }
 
+  public MemoryReservationManager getMemoryReservationContext() {
+    return memoryReservationManager;
+  }
+
+  public void releaseMemoryReservationManager() {
+    memoryReservationManager.releaseAllReservedMemory();
+  }
+
   public void initQueryDataSource(List<PartialPath> sourcePaths) throws 
QueryProcessException {
     long startTime = System.nanoTime();
     if (sourcePaths == null) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecution.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecution.java
index b31d4e13eeb..794b9c3d234 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecution.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecution.java
@@ -311,6 +311,8 @@ public class FragmentInstanceExecution {
             exchangeManager.deRegisterFragmentInstanceFromMemoryPool(
                 instanceId.getQueryId().getId(), 
instanceId.getFragmentInstanceId(), true);
 
+            context.releaseMemoryReservationManager();
+
             if (newState.isFailed()) {
               scheduler.abortFragmentInstance(instanceId);
             }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AbstractSeriesAggregationScanOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AbstractSeriesAggregationScanOperator.java
index 75fe64629e1..d8ca5829403 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AbstractSeriesAggregationScanOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AbstractSeriesAggregationScanOperator.java
@@ -94,7 +94,7 @@ public abstract class AbstractSeriesAggregationScanOperator 
extends AbstractData
     this.timeRangeIterator = timeRangeIterator;
 
     this.cachedRawDataSize =
-        (1L + subSensorSize) * 
TSFileDescriptor.getInstance().getConfig().getPageSizeInByte() * 3;
+        (1L + subSensorSize) * 
TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
     this.maxReturnSize = maxReturnSize;
     this.outputEndTime = outputEndTime;
     this.canUseStatistics = canUseStatistics;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AlignedSeriesScanOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AlignedSeriesScanOperator.java
index 95ea59abac4..a8a8310e018 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AlignedSeriesScanOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AlignedSeriesScanOperator.java
@@ -79,9 +79,7 @@ public class AlignedSeriesScanOperator extends 
AbstractSeriesScanOperator {
   public long calculateMaxPeekMemory() {
     return Math.max(
         maxReturnSize,
-        (1L + valueColumnCount)
-            * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte()
-            * 3L);
+        (1L + valueColumnCount) * 
TSFileDescriptor.getInstance().getConfig().getPageSizeInByte());
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanOperator.java
index d4ce1caa624..81466851a85 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanOperator.java
@@ -54,8 +54,7 @@ public class SeriesScanOperator extends 
AbstractSeriesScanOperator {
 
   @Override
   public long calculateMaxPeekMemory() {
-    return Math.max(
-        maxReturnSize, 
TSFileDescriptor.getInstance().getConfig().getPageSizeInByte() * 3L);
+    return Math.max(maxReturnSize, 
TSFileDescriptor.getInstance().getConfig().getPageSizeInByte());
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java
index e09bd364084..66c833401a8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java
@@ -70,7 +70,7 @@ import static 
org.apache.iotdb.db.queryengine.metric.SeriesScanCostMetricSet.BUI
 
 public class SeriesScanUtil implements Accountable {
 
-  protected final QueryContext context;
+  protected final FragmentInstanceContext context;
 
   // The path of the target series which will be scanned.
   protected final PartialPath seriesPath;
@@ -143,6 +143,7 @@ public class SeriesScanUtil implements Accountable {
       this.orderUtils = new DescTimeOrderUtils();
       this.mergeReader = getDescPriorityMergeReader();
     }
+    
this.mergeReader.setMemoryReservationManager(context.getMemoryReservationContext());
 
     // init TimeSeriesMetadata materializer
     this.seqTimeSeriesMetadata = new LinkedList<>();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
index 65dcd830195..5d8c7830e3f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
@@ -142,7 +142,7 @@ public class Coordinator {
       return result;
     } finally {
       if (queryContext != null) {
-        queryContext.releaseMemoryForFrontEnd();
+        queryContext.releaseAllMemoryReservedForFrontEnd();
       }
       if (queryContext != null && 
!queryContext.getAcquiredLockNumMap().isEmpty()) {
         Map<SchemaLockType, Integer> lockMap = 
queryContext.getAcquiredLockNumMap();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/optimization/AggregationPushDown.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/optimization/AggregationPushDown.java
index 994f1a30056..a8b01ef6a84 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/optimization/AggregationPushDown.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/optimization/AggregationPushDown.java
@@ -92,12 +92,9 @@ public class AggregationPushDown implements PlanOptimizer {
     RewriterContext rewriterContext =
         new RewriterContext(analysis, context, 
queryStatement.isAlignByDevice());
     PlanNode node;
-    try {
-      node = plan.accept(new Rewriter(), rewriterContext);
-    } finally {
-      // release the last batch of memory
-      rewriterContext.releaseMemoryForFrontEndImmediately();
-    }
+
+    node = plan.accept(new Rewriter(), rewriterContext);
+
     return node;
   }
 
@@ -633,8 +630,6 @@ public class AggregationPushDown implements PlanOptimizer {
 
   private static class RewriterContext {
 
-    private static final long RELEASE_BATCH_SIZE = 1024L * 1024L;
-
     private final Analysis analysis;
     private final MPPQueryContext context;
     private final boolean isAlignByDevice;
@@ -642,8 +637,6 @@ public class AggregationPushDown implements PlanOptimizer {
     private String curDevice;
     private PartialPath curDevicePath;
 
-    private long bytesToBeReleased = 0;
-
     public RewriterContext(Analysis analysis, MPPQueryContext context, boolean 
isAlignByDevice) {
       this.analysis = analysis;
       Validate.notNull(context, "Query context cannot be null.");
@@ -683,17 +676,7 @@ public class AggregationPushDown implements PlanOptimizer {
     }
 
     public void releaseMemoryForFrontEnd(final long bytes) {
-      bytesToBeReleased += bytes;
-      if (bytesToBeReleased >= RELEASE_BATCH_SIZE) {
-        releaseMemoryForFrontEndImmediately();
-      }
-    }
-
-    public void releaseMemoryForFrontEndImmediately() {
-      if (bytesToBeReleased > 0) {
-        context.releaseMemoryForFrontEnd(bytesToBeReleased);
-        bytesToBeReleased = 0;
-      }
+      this.context.releaseMemoryReservedForFrontEnd(bytes);
     }
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java
index 465c2e2e723..8d9410f92c0 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java
@@ -236,15 +236,18 @@ public class LocalExecutionPlanner {
     }
   }
 
-  public synchronized void reserveMemoryForQueryFrontEnd(
-      final long memoryInBytes, final long reservedBytes, final String 
queryId) {
+  public synchronized void reserveFromFreeMemoryForOperators(
+      final long memoryInBytes,
+      final long reservedBytes,
+      final String queryId,
+      final String contextHolder) {
     if (memoryInBytes > freeMemoryForOperators) {
       throw new MemoryNotEnoughException(
           String.format(
-              "There is not enough memory for planning-stage of Query %s, "
+              "There is not enough memory for Query %s, the contextHolder is 
%s,"
                   + "current remaining free memory is %dB, "
-                  + "estimated memory usage is %dB, reserved memory for FE of 
this query in total is %dB",
-              queryId, freeMemoryForOperators, memoryInBytes, reservedBytes));
+                  + "reserved memory for this context in total is %dB.",
+              queryId, contextHolder, freeMemoryForOperators, reservedBytes));
     } else {
       freeMemoryForOperators -= memoryInBytes;
       if (LOGGER.isDebugEnabled()) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/MemoryReservationManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/MemoryReservationManager.java
new file mode 100644
index 00000000000..ba55d60b096
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/MemoryReservationManager.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.planner.memory;
+
+public interface MemoryReservationManager {
+  /**
+   * Reserve memory for the given size. The memory reservation request will be 
accumulated and the
+   * actual memory will be reserved when the accumulated memory exceeds the 
threshold.
+   *
+   * @param size the size of memory to reserve
+   */
+  void reserveMemoryCumulatively(final long size);
+
+  /** Reserve memory for the accumulated memory size immediately. */
+  void reserveMemoryImmediately();
+
+  /**
+   * Release memory for the given size.
+   *
+   * @param size the size of memory to release
+   */
+  void releaseMemoryCumulatively(final long size);
+
+  /**
+   * Release all reserved memory immediately. Make sure this method is called 
when the lifecycle of
+   * this manager ends, Or the memory to be released in the batch may not be 
released correctly.
+   */
+  void releaseAllReservedMemory();
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/NotThreadSafeMemoryReservationManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/NotThreadSafeMemoryReservationManager.java
new file mode 100644
index 00000000000..7d8d4b076c2
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/NotThreadSafeMemoryReservationManager.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.planner.memory;
+
+import org.apache.iotdb.db.queryengine.common.QueryId;
+import org.apache.iotdb.db.queryengine.plan.planner.LocalExecutionPlanner;
+
+import javax.annotation.concurrent.NotThreadSafe;
+
+@NotThreadSafe
+public class NotThreadSafeMemoryReservationManager implements 
MemoryReservationManager {
+  // To avoid reserving memory too frequently, we choose to do it in batches. 
This is the lower
+  // bound for each batch.
+  private static final long MEMORY_BATCH_THRESHOLD = 1024L * 1024L;
+
+  private final LocalExecutionPlanner LOCAL_EXECUTION_PLANNER = 
LocalExecutionPlanner.getInstance();
+
+  private final QueryId queryId;
+
+  private final String contextHolder;
+
+  private long reservedBytesInTotal = 0;
+
+  private long bytesToBeReserved = 0;
+
+  private long bytesToBeReleased = 0;
+
+  public NotThreadSafeMemoryReservationManager(final QueryId queryId, final 
String contextHolder) {
+    this.queryId = queryId;
+    this.contextHolder = contextHolder;
+  }
+
+  @Override
+  public void reserveMemoryCumulatively(final long size) {
+    bytesToBeReserved += size;
+    if (bytesToBeReserved >= MEMORY_BATCH_THRESHOLD) {
+      reserveMemoryImmediately();
+    }
+  }
+
+  @Override
+  public void reserveMemoryImmediately() {
+    if (bytesToBeReserved != 0) {
+      LOCAL_EXECUTION_PLANNER.reserveFromFreeMemoryForOperators(
+          bytesToBeReserved, reservedBytesInTotal, queryId.getId(), 
contextHolder);
+      reservedBytesInTotal += bytesToBeReserved;
+      bytesToBeReserved = 0;
+    }
+  }
+
+  @Override
+  public void releaseMemoryCumulatively(final long size) {
+    bytesToBeReleased += size;
+    if (bytesToBeReleased >= MEMORY_BATCH_THRESHOLD) {
+      long bytesToRelease;
+      if (bytesToBeReleased <= bytesToBeReserved) {
+        bytesToBeReserved -= bytesToBeReleased;
+      } else {
+        bytesToRelease = bytesToBeReleased - bytesToBeReserved;
+        bytesToBeReserved = 0;
+        
LOCAL_EXECUTION_PLANNER.releaseToFreeMemoryForOperators(bytesToRelease);
+        reservedBytesInTotal -= bytesToRelease;
+      }
+      bytesToBeReleased = 0;
+    }
+  }
+
+  @Override
+  public void releaseAllReservedMemory() {
+    if (reservedBytesInTotal != 0) {
+      
LOCAL_EXECUTION_PLANNER.releaseToFreeMemoryForOperators(reservedBytesInTotal);
+      reservedBytesInTotal = 0;
+      bytesToBeReserved = 0;
+      bytesToBeReleased = 0;
+    }
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/ThreadSafeMemoryReservationManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/ThreadSafeMemoryReservationManager.java
new file mode 100644
index 00000000000..efe83d23c05
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/ThreadSafeMemoryReservationManager.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.planner.memory;
+
+import org.apache.iotdb.db.queryengine.common.QueryId;
+
+import javax.annotation.concurrent.ThreadSafe;
+
+@ThreadSafe
+public class ThreadSafeMemoryReservationManager extends 
NotThreadSafeMemoryReservationManager {
+  public ThreadSafeMemoryReservationManager(QueryId queryId, String 
contextHolder) {
+    super(queryId, contextHolder);
+  }
+
+  @Override
+  public synchronized void reserveMemoryCumulatively(long size) {
+    super.reserveMemoryCumulatively(size);
+  }
+
+  @Override
+  public synchronized void reserveMemoryImmediately() {
+    super.reserveMemoryImmediately();
+  }
+
+  @Override
+  public synchronized void releaseMemoryCumulatively(long size) {
+    super.releaseMemoryCumulatively(size);
+  }
+
+  @Override
+  public synchronized void releaseAllReservedMemory() {
+    super.releaseAllReservedMemory();
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/common/PriorityMergeReader.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/common/PriorityMergeReader.java
index 43ec7b9a8be..a7c96131acf 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/common/PriorityMergeReader.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/common/PriorityMergeReader.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.storageengine.dataregion.read.reader.common;
 
 import org.apache.iotdb.commons.utils.TestOnly;
+import 
org.apache.iotdb.db.queryengine.plan.planner.memory.MemoryReservationManager;
 
 import org.apache.tsfile.read.TimeValuePair;
 import org.apache.tsfile.read.reader.IPointReader;
@@ -40,6 +41,8 @@ public class PriorityMergeReader implements IPointReader {
 
   protected long usedMemorySize = 0;
 
+  protected MemoryReservationManager memoryReservationManager;
+
   public PriorityMergeReader() {
     heap =
         new PriorityQueue<>(
@@ -51,6 +54,10 @@ public class PriorityMergeReader implements IPointReader {
             });
   }
 
+  public void setMemoryReservationManager(MemoryReservationManager 
memoryReservationManager) {
+    this.memoryReservationManager = memoryReservationManager;
+  }
+
   @TestOnly
   public void addReader(IPointReader reader, long priority) throws IOException 
{
     if (reader.hasNextTimeValuePair()) {
@@ -67,7 +74,11 @@ public class PriorityMergeReader implements IPointReader {
       Element element = new Element(reader, reader.nextTimeValuePair(), 
priority);
       heap.add(element);
       currentReadStopTime = Math.max(currentReadStopTime, endTime);
-      usedMemorySize += element.getReader().getUsedMemorySize();
+      long size = element.getReader().getUsedMemorySize();
+      usedMemorySize += size;
+      if (memoryReservationManager != null) {
+        memoryReservationManager.reserveMemoryCumulatively(size);
+      }
     } else {
       reader.close();
     }
@@ -96,7 +107,11 @@ public class PriorityMergeReader implements IPointReader {
       top.setTimeValuePair(topNext);
       heap.add(top);
     } else {
-      usedMemorySize -= top.getReader().getUsedMemorySize();
+      long size = top.getReader().getUsedMemorySize();
+      usedMemorySize -= size;
+      if (memoryReservationManager != null) {
+        memoryReservationManager.releaseMemoryCumulatively(size);
+      }
     }
     return ret;
   }
@@ -121,7 +136,11 @@ public class PriorityMergeReader implements IPointReader {
       Element e = heap.poll();
       fillNullValue(ret, e.getTimeValuePair());
       if (!e.hasNext()) {
-        usedMemorySize -= e.getReader().getUsedMemorySize();
+        long size = e.getReader().getUsedMemorySize();
+        usedMemorySize -= size;
+        if (memoryReservationManager != null) {
+          memoryReservationManager.releaseMemoryCumulatively(size);
+        }
         e.getReader().close();
         continue;
       }
@@ -133,7 +152,11 @@ public class PriorityMergeReader implements IPointReader {
           e.next();
           heap.add(e);
         } else {
-          usedMemorySize -= e.getReader().getUsedMemorySize();
+          long size = e.getReader().getUsedMemorySize();
+          usedMemorySize -= size;
+          if (memoryReservationManager != null) {
+            memoryReservationManager.releaseMemoryCumulatively(size);
+          }
           // the chunk is end
           e.close();
         }
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/OperatorMemoryTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/OperatorMemoryTest.java
index 335dd31c74a..d155d850856 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/OperatorMemoryTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/OperatorMemoryTest.java
@@ -150,14 +150,12 @@ public class OperatorMemoryTest {
               scanOptionsBuilder.build());
 
       assertEquals(
-          TSFileDescriptor.getInstance().getConfig().getPageSizeInByte() * 3L,
+          TSFileDescriptor.getInstance().getConfig().getPageSizeInByte(),
           seriesScanOperator.calculateMaxPeekMemory());
       assertEquals(
           TSFileDescriptor.getInstance().getConfig().getPageSizeInByte(),
           seriesScanOperator.calculateMaxReturnSize());
-      assertEquals(
-          TSFileDescriptor.getInstance().getConfig().getPageSizeInByte() * 2L,
-          seriesScanOperator.calculateRetainedSizeAfterCallingNext());
+      assertEquals(0L, 
seriesScanOperator.calculateRetainedSizeAfterCallingNext());
 
     } catch (IllegalPathException e) {
       e.printStackTrace();
@@ -202,7 +200,7 @@ public class OperatorMemoryTest {
       long maxPeekMemory =
           Math.max(
               
TSFileDescriptor.getInstance().getConfig().getMaxTsBlockSizeInBytes(),
-              4 * 
TSFileDescriptor.getInstance().getConfig().getPageSizeInByte() * 3L);
+              4 * 
TSFileDescriptor.getInstance().getConfig().getPageSizeInByte());
       long maxReturnMemory =
           Math.min(
               
TSFileDescriptor.getInstance().getConfig().getMaxTsBlockSizeInBytes(),
@@ -1006,8 +1004,7 @@ public class OperatorMemoryTest {
           TimeColumn.SIZE_IN_BYTES_PER_POSITION
               + 512 * Byte.BYTES
               + LongColumn.SIZE_IN_BYTES_PER_POSITION;
-      long cachedRawDataSize =
-          2L * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte() 
* 3;
+      long cachedRawDataSize = 2L * 
TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
 
       assertEquals(
           expectedMaxReturnSize + cachedRawDataSize,
@@ -1039,7 +1036,7 @@ public class OperatorMemoryTest {
           TimeColumn.SIZE_IN_BYTES_PER_POSITION
               + 512 * Byte.BYTES
               + 2 * LongColumn.SIZE_IN_BYTES_PER_POSITION;
-      cachedRawDataSize = 2L * 
TSFileDescriptor.getInstance().getConfig().getPageSizeInByte() * 3;
+      cachedRawDataSize = 2L * 
TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
 
       assertEquals(
           expectedMaxReturnSize + cachedRawDataSize,
@@ -1082,7 +1079,7 @@ public class OperatorMemoryTest {
               * (TimeColumn.SIZE_IN_BYTES_PER_POSITION
                   + 512 * Byte.BYTES
                   + LongColumn.SIZE_IN_BYTES_PER_POSITION);
-      cachedRawDataSize = 2L * 
TSFileDescriptor.getInstance().getConfig().getPageSizeInByte() * 3;
+      cachedRawDataSize = 2L * 
TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
 
       assertEquals(
           expectedMaxReturnSize + cachedRawDataSize,
@@ -1122,7 +1119,7 @@ public class OperatorMemoryTest {
                   * (TimeColumn.SIZE_IN_BYTES_PER_POSITION
                       + 512 * Byte.BYTES
                       + LongColumn.SIZE_IN_BYTES_PER_POSITION));
-      cachedRawDataSize = 2L * 
TSFileDescriptor.getInstance().getConfig().getPageSizeInByte() * 3;
+      cachedRawDataSize = 2L * 
TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
 
       assertEquals(
           expectedMaxReturnSize + cachedRawDataSize,
@@ -1160,7 +1157,7 @@ public class OperatorMemoryTest {
               typeProvider);
 
       expectedMaxReturnSize = DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
-      cachedRawDataSize = 2L * 
TSFileDescriptor.getInstance().getConfig().getPageSizeInByte() * 3;
+      cachedRawDataSize = 2L * 
TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
 
       assertEquals(
           expectedMaxReturnSize + cachedRawDataSize,


Reply via email to