This is an automated email from the ASF dual-hosted git repository.

lancelly pushed a commit to branch memoryControl
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit eed9976c1adb8af27f3eeecd232ee6098cf28c50
Author: lancelly <[email protected]>
AuthorDate: Thu Jun 20 15:56:40 2024 +0800

    MemoryReservationContext
---
 .../db/queryengine/common/MPPQueryContext.java     | 51 +++---------
 .../fragment/FragmentInstanceContext.java          | 19 +++++
 .../fragment/FragmentInstanceExecution.java        |  2 +
 .../execution/operator/source/SeriesScanUtil.java  |  5 +-
 .../iotdb/db/queryengine/plan/Coordinator.java     |  2 +-
 .../plan/optimization/AggregationPushDown.java     | 25 +-----
 .../plan/planner/LocalExecutionPlanner.java        | 13 ++--
 .../planner/memory/MemoryReservationContext.java   | 46 +++++++++++
 .../SynchronizedMemoryReservationContext.java      | 48 ++++++++++++
 ...ynchronizedMemoryReservationContextWrapper.java | 72 +++++++++++++++++
 .../UnsynchronizedMemoryReservationContext.java    | 91 ++++++++++++++++++++++
 .../read/reader/common/PriorityMergeReader.java    | 32 +++++++-
 12 files changed, 335 insertions(+), 71 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java
index 3a42ee805b0..9d865e0a624 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java
@@ -26,7 +26,8 @@ import 
org.apache.iotdb.db.queryengine.plan.analyze.PredicateUtils;
 import org.apache.iotdb.db.queryengine.plan.analyze.QueryType;
 import org.apache.iotdb.db.queryengine.plan.analyze.TypeProvider;
 import org.apache.iotdb.db.queryengine.plan.analyze.lock.SchemaLockType;
-import org.apache.iotdb.db.queryengine.plan.planner.LocalExecutionPlanner;
+import 
org.apache.iotdb.db.queryengine.plan.planner.memory.MemoryReservationContext;
+import 
org.apache.iotdb.db.queryengine.plan.planner.memory.UnsynchronizedMemoryReservationContext;
 import org.apache.iotdb.db.queryengine.statistics.QueryPlanStatistics;
 
 import org.apache.tsfile.read.filter.basic.Filter;
@@ -78,20 +79,13 @@ public class MPPQueryContext {
 
   // To avoid query front-end from consuming too much memory, it needs to 
reserve memory when
   // constructing some Expression and PlanNode.
-  private long reservedBytesInTotalForFrontEnd = 0;
-
-  private long bytesToBeReservedForFrontEnd = 0;
-
-  // To avoid reserving memory too frequently, we choose to do it in batches. 
This is the lower
-  // bound
-  // for each batch.
-  private static final long MEMORY_BATCH_THRESHOLD = 1024L * 1024L;
-
-  private final LocalExecutionPlanner LOCAL_EXECUTION_PLANNER = 
LocalExecutionPlanner.getInstance();
+  private final MemoryReservationContext memoryReservationContext;
 
   public MPPQueryContext(QueryId queryId) {
     this.queryId = queryId;
     this.endPointBlackList = new LinkedList<>();
+    this.memoryReservationContext =
+        new UnsynchronizedMemoryReservationContext(queryId, "MPPQueryContext");
   }
 
   // TODO too many callers just pass a null SessionInfo which should be 
forbidden
@@ -127,7 +121,7 @@ public class MPPQueryContext {
 
   public void prepareForRetry() {
     this.initResultNodeContext();
-    this.releaseMemoryForFrontEnd();
+    this.releaseAllMemoryReservedForFrontEnd();
   }
 
   private void initResultNodeContext() {
@@ -313,40 +307,19 @@ public class MPPQueryContext {
    * single-threaded manner.
    */
   public void reserveMemoryForFrontEnd(final long bytes) {
-    this.bytesToBeReservedForFrontEnd += bytes;
-    if (this.bytesToBeReservedForFrontEnd >= MEMORY_BATCH_THRESHOLD) {
-      reserveMemoryForFrontEndImmediately();
-    }
+    this.memoryReservationContext.reserveMemoryAccumulatively(bytes);
   }
 
   public void reserveMemoryForFrontEndImmediately() {
-    if (bytesToBeReservedForFrontEnd != 0) {
-      LOCAL_EXECUTION_PLANNER.reserveMemoryForQueryFrontEnd(
-          bytesToBeReservedForFrontEnd, reservedBytesInTotalForFrontEnd, 
queryId.getId());
-      this.reservedBytesInTotalForFrontEnd += bytesToBeReservedForFrontEnd;
-      this.bytesToBeReservedForFrontEnd = 0;
-    }
+    this.memoryReservationContext.reserveMemoryImmediately();
   }
 
-  public void releaseMemoryForFrontEnd() {
-    if (reservedBytesInTotalForFrontEnd != 0) {
-      
LOCAL_EXECUTION_PLANNER.releaseToFreeMemoryForOperators(reservedBytesInTotalForFrontEnd);
-      reservedBytesInTotalForFrontEnd = 0;
-    }
+  public void releaseAllMemoryReservedForFrontEnd() {
+    this.memoryReservationContext.releaseAllReservedMemory();
   }
 
-  public void releaseMemoryForFrontEnd(final long bytes) {
-    if (bytes != 0) {
-      long bytesToRelease;
-      if (bytes <= bytesToBeReservedForFrontEnd) {
-        bytesToBeReservedForFrontEnd -= bytes;
-      } else {
-        bytesToRelease = bytes - bytesToBeReservedForFrontEnd;
-        bytesToBeReservedForFrontEnd = 0;
-        
LOCAL_EXECUTION_PLANNER.releaseToFreeMemoryForOperators(bytesToRelease);
-        reservedBytesInTotalForFrontEnd -= bytesToRelease;
-      }
-    }
+  public void releaseAllMemoryReservedForFrontEnd(final long bytes) {
+    this.memoryReservationContext.releaseMemoryAccumulatively(bytes);
   }
 
   // endregion
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
index a109fe84963..152daa8bd42 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
@@ -28,6 +28,8 @@ import org.apache.iotdb.db.queryengine.common.QueryId;
 import org.apache.iotdb.db.queryengine.common.SessionInfo;
 import org.apache.iotdb.db.queryengine.metric.QueryRelatedResourceMetricSet;
 import org.apache.iotdb.db.queryengine.metric.SeriesScanCostMetricSet;
+import 
org.apache.iotdb.db.queryengine.plan.planner.memory.MemoryReservationContext;
+import 
org.apache.iotdb.db.queryengine.plan.planner.memory.UnsynchronizedMemoryReservationContext;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.TimePredicate;
 import org.apache.iotdb.db.storageengine.dataregion.IDataRegionForQuery;
 import org.apache.iotdb.db.storageengine.dataregion.read.IQueryDataSource;
@@ -63,6 +65,8 @@ public class FragmentInstanceContext extends QueryContext {
 
   private final FragmentInstanceStateMachine stateMachine;
 
+  private final MemoryReservationContext memoryReservationContext;
+
   private IDataRegionForQuery dataRegion;
   private Filter globalTimeFilter;
 
@@ -193,6 +197,8 @@ public class FragmentInstanceContext extends QueryContext {
         globalTimePredicate == null ? null : 
globalTimePredicate.convertPredicateToTimeFilter();
     this.dataNodeQueryContextMap = dataNodeQueryContextMap;
     this.dataNodeQueryContext = dataNodeQueryContextMap.get(id.getQueryId());
+    this.memoryReservationContext =
+        new UnsynchronizedMemoryReservationContext(id.getQueryId(), 
"FragmentInstanceContext");
   }
 
   private FragmentInstanceContext(
@@ -203,6 +209,8 @@ public class FragmentInstanceContext extends QueryContext {
     this.sessionInfo = sessionInfo;
     this.dataNodeQueryContextMap = null;
     this.dataNodeQueryContext = null;
+    this.memoryReservationContext =
+        new UnsynchronizedMemoryReservationContext(id.getQueryId(), 
"FragmentInstanceContext");
   }
 
   private FragmentInstanceContext(
@@ -218,6 +226,8 @@ public class FragmentInstanceContext extends QueryContext {
     this.dataRegion = dataRegion;
     this.globalTimeFilter = globalTimeFilter;
     this.dataNodeQueryContextMap = null;
+    this.memoryReservationContext =
+        new UnsynchronizedMemoryReservationContext(id.getQueryId(), 
"FragmentInstanceContext");
   }
 
   @TestOnly
@@ -232,6 +242,7 @@ public class FragmentInstanceContext extends QueryContext {
     this.stateMachine = null;
     this.dataNodeQueryContextMap = null;
     this.dataNodeQueryContext = null;
+    this.memoryReservationContext = null;
   }
 
   public void start() {
@@ -369,6 +380,14 @@ public class FragmentInstanceContext extends QueryContext {
     this.devicePathsToContext = devicePathsToContext;
   }
 
+  public MemoryReservationContext getMemoryReservationContext() {
+    return memoryReservationContext;
+  }
+
+  public void releaseMemoryReservationContext() {
+    memoryReservationContext.releaseAllReservedMemory();
+  }
+
   public void initQueryDataSource(List<PartialPath> sourcePaths) throws 
QueryProcessException {
     long startTime = System.nanoTime();
     if (sourcePaths == null) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecution.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecution.java
index b31d4e13eeb..e760e7e8759 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecution.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecution.java
@@ -311,6 +311,8 @@ public class FragmentInstanceExecution {
             exchangeManager.deRegisterFragmentInstanceFromMemoryPool(
                 instanceId.getQueryId().getId(), 
instanceId.getFragmentInstanceId(), true);
 
+            context.releaseMemoryReservationContext();
+
             if (newState.isFailed()) {
               scheduler.abortFragmentInstance(instanceId);
             }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java
index e09bd364084..142cf4942df 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java
@@ -25,6 +25,7 @@ import 
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContex
 import org.apache.iotdb.db.queryengine.execution.fragment.QueryContext;
 import org.apache.iotdb.db.queryengine.metric.SeriesScanCostMetricSet;
 import 
org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeTTLCache;
+import 
org.apache.iotdb.db.queryengine.plan.planner.memory.SynchronizedMemoryReservationContextWrapper;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.SeriesScanOptions;
 import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering;
 import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource;
@@ -70,7 +71,7 @@ import static 
org.apache.iotdb.db.queryengine.metric.SeriesScanCostMetricSet.BUI
 
 public class SeriesScanUtil implements Accountable {
 
-  protected final QueryContext context;
+  protected final FragmentInstanceContext context;
 
   // The path of the target series which will be scanned.
   protected final PartialPath seriesPath;
@@ -143,6 +144,8 @@ public class SeriesScanUtil implements Accountable {
       this.orderUtils = new DescTimeOrderUtils();
       this.mergeReader = getDescPriorityMergeReader();
     }
+    this.mergeReader.setMemoryReservationContextWrapper(
+        new 
SynchronizedMemoryReservationContextWrapper(context.getMemoryReservationContext()));
 
     // init TimeSeriesMetadata materializer
     this.seqTimeSeriesMetadata = new LinkedList<>();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
index 65dcd830195..5d8c7830e3f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
@@ -142,7 +142,7 @@ public class Coordinator {
       return result;
     } finally {
       if (queryContext != null) {
-        queryContext.releaseMemoryForFrontEnd();
+        queryContext.releaseAllMemoryReservedForFrontEnd();
       }
       if (queryContext != null && 
!queryContext.getAcquiredLockNumMap().isEmpty()) {
         Map<SchemaLockType, Integer> lockMap = 
queryContext.getAcquiredLockNumMap();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/optimization/AggregationPushDown.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/optimization/AggregationPushDown.java
index 994f1a30056..3e71b582287 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/optimization/AggregationPushDown.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/optimization/AggregationPushDown.java
@@ -92,12 +92,9 @@ public class AggregationPushDown implements PlanOptimizer {
     RewriterContext rewriterContext =
         new RewriterContext(analysis, context, 
queryStatement.isAlignByDevice());
     PlanNode node;
-    try {
-      node = plan.accept(new Rewriter(), rewriterContext);
-    } finally {
-      // release the last batch of memory
-      rewriterContext.releaseMemoryForFrontEndImmediately();
-    }
+
+    node = plan.accept(new Rewriter(), rewriterContext);
+
     return node;
   }
 
@@ -633,8 +630,6 @@ public class AggregationPushDown implements PlanOptimizer {
 
   private static class RewriterContext {
 
-    private static final long RELEASE_BATCH_SIZE = 1024L * 1024L;
-
     private final Analysis analysis;
     private final MPPQueryContext context;
     private final boolean isAlignByDevice;
@@ -642,8 +637,6 @@ public class AggregationPushDown implements PlanOptimizer {
     private String curDevice;
     private PartialPath curDevicePath;
 
-    private long bytesToBeReleased = 0;
-
     public RewriterContext(Analysis analysis, MPPQueryContext context, boolean 
isAlignByDevice) {
       this.analysis = analysis;
       Validate.notNull(context, "Query context cannot be null.");
@@ -683,17 +676,7 @@ public class AggregationPushDown implements PlanOptimizer {
     }
 
     public void releaseMemoryForFrontEnd(final long bytes) {
-      bytesToBeReleased += bytes;
-      if (bytesToBeReleased >= RELEASE_BATCH_SIZE) {
-        releaseMemoryForFrontEndImmediately();
-      }
-    }
-
-    public void releaseMemoryForFrontEndImmediately() {
-      if (bytesToBeReleased > 0) {
-        context.releaseMemoryForFrontEnd(bytesToBeReleased);
-        bytesToBeReleased = 0;
-      }
+      this.context.releaseAllMemoryReservedForFrontEnd(bytes);
     }
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java
index 465c2e2e723..3fe45ee6d7b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java
@@ -236,15 +236,18 @@ public class LocalExecutionPlanner {
     }
   }
 
-  public synchronized void reserveMemoryForQueryFrontEnd(
-      final long memoryInBytes, final long reservedBytes, final String 
queryId) {
+  public synchronized void reserveFromFreeMemoryForOperators(
+      final long memoryInBytes,
+      final long reservedBytes,
+      final String queryId,
+      final String contextHolder) {
     if (memoryInBytes > freeMemoryForOperators) {
       throw new MemoryNotEnoughException(
           String.format(
-              "There is not enough memory for planning-stage of Query %s, "
+              "There is not enough memory for of Query %s, contextHolder is 
%s,"
                   + "current remaining free memory is %dB, "
-                  + "estimated memory usage is %dB, reserved memory for FE of 
this query in total is %dB",
-              queryId, freeMemoryForOperators, memoryInBytes, reservedBytes));
+                  + "reserved memory for this context in total is %dB.",
+              queryId, contextHolder, freeMemoryForOperators, reservedBytes));
     } else {
       freeMemoryForOperators -= memoryInBytes;
       if (LOGGER.isDebugEnabled()) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/MemoryReservationContext.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/MemoryReservationContext.java
new file mode 100644
index 00000000000..27ea8242cbe
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/MemoryReservationContext.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.planner.memory;
+
+public interface MemoryReservationContext {
+  /**
+   * Reserve memory for the given size. The memory reservation request will be 
accumulated and the
+   * actual memory will be reserved when the accumulated memory exceeds the 
threshold.
+   *
+   * @param size the size of memory to reserve
+   */
+  void reserveMemoryAccumulatively(final long size);
+
+  /** Reserve memory for the accumulated memory size immediately. */
+  void reserveMemoryImmediately();
+
+  /**
+   * Release memory for the given size.
+   *
+   * @param size the size of memory to release
+   */
+  void releaseMemoryAccumulatively(final long size);
+
+  /**
+   * Release all reserved memory immediately. Make sure this method is called 
when the lifecycle of
+   * this context ends, Or the memory to be released in the batch may not be 
released correctly.
+   */
+  void releaseAllReservedMemory();
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/SynchronizedMemoryReservationContext.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/SynchronizedMemoryReservationContext.java
new file mode 100644
index 00000000000..48372477c85
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/SynchronizedMemoryReservationContext.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.planner.memory;
+
+import org.apache.iotdb.db.queryengine.common.QueryId;
+
+public class SynchronizedMemoryReservationContext extends 
UnsynchronizedMemoryReservationContext {
+  public SynchronizedMemoryReservationContext(QueryId queryId, String 
contextHolder) {
+    super(queryId, contextHolder);
+  }
+
+  @Override
+  public synchronized void reserveMemoryAccumulatively(long size) {
+    super.reserveMemoryAccumulatively(size);
+  }
+
+  @Override
+  public synchronized void reserveMemoryImmediately() {
+    super.reserveMemoryImmediately();
+  }
+
+  @Override
+  public synchronized void releaseMemoryAccumulatively(long size) {
+    super.releaseMemoryAccumulatively(size);
+  }
+
+  @Override
+  public synchronized void releaseAllReservedMemory() {
+    super.releaseAllReservedMemory();
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/SynchronizedMemoryReservationContextWrapper.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/SynchronizedMemoryReservationContextWrapper.java
new file mode 100644
index 00000000000..75256da1769
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/SynchronizedMemoryReservationContextWrapper.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.planner.memory;
+
+/**
+ * SynchronizedMemoryReservationContext synchronizes the memory reservation 
and release operations.
+ * However, synchronization is not necessary every time for the memory 
reservation and release
+ * operations in single thread context. For example, each SeriesScanUtil 
itself reserves and
+ * releases memory in a single thread context, but SeriesScanUtils in the same 
FI may share the same
+ * SynchronizedMemoryReservationContext. So we can use this wrapper to batch 
the reserve and release
+ * operations to reduce the overhead of synchronization. The actual reserve 
and release operations
+ * are delegated to the wrapped MemoryReservationContext.
+ */
+public class SynchronizedMemoryReservationContextWrapper implements 
MemoryReservationContext {
+
+  private static final long MEMORY_BATCH_THRESHOLD = 1024L * 1024L;
+  private final MemoryReservationContext memoryReservationContext;
+
+  private long bytesToBeReserved = 0;
+
+  private long bytesToBeReleased = 0;
+
+  public SynchronizedMemoryReservationContextWrapper(
+      MemoryReservationContext memoryReservationContext) {
+    this.memoryReservationContext = memoryReservationContext;
+  }
+
+  @Override
+  public void reserveMemoryAccumulatively(long size) {
+    this.bytesToBeReserved += size;
+    if (this.bytesToBeReserved >= MEMORY_BATCH_THRESHOLD) {
+      memoryReservationContext.reserveMemoryAccumulatively(bytesToBeReserved);
+      this.bytesToBeReserved = 0;
+    }
+  }
+
+  @Override
+  public void reserveMemoryImmediately() {
+    memoryReservationContext.reserveMemoryImmediately();
+  }
+
+  @Override
+  public void releaseMemoryAccumulatively(long size) {
+    this.bytesToBeReleased += size;
+    if (this.bytesToBeReleased >= MEMORY_BATCH_THRESHOLD) {
+      
memoryReservationContext.releaseMemoryAccumulatively(this.bytesToBeReleased);
+      this.bytesToBeReleased = 0;
+    }
+  }
+
+  @Override
+  public void releaseAllReservedMemory() {
+    memoryReservationContext.releaseAllReservedMemory();
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/UnsynchronizedMemoryReservationContext.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/UnsynchronizedMemoryReservationContext.java
new file mode 100644
index 00000000000..4a67bb13f54
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/UnsynchronizedMemoryReservationContext.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.planner.memory;
+
+import org.apache.iotdb.db.queryengine.common.QueryId;
+import org.apache.iotdb.db.queryengine.plan.planner.LocalExecutionPlanner;
+
+public class UnsynchronizedMemoryReservationContext implements 
MemoryReservationContext {
+  // To avoid reserving memory too frequently, we choose to do it in batches. 
This is the lower
+  // bound for each batch.
+  private static final long MEMORY_BATCH_THRESHOLD = 1024L * 1024L;
+
+  private final LocalExecutionPlanner LOCAL_EXECUTION_PLANNER = 
LocalExecutionPlanner.getInstance();
+
+  private final QueryId queryId;
+
+  private final String contextHolder;
+
+  private long reservedBytesInTotal = 0;
+
+  private long bytesToBeReserved = 0;
+
+  private long bytesToBeReleased = 0;
+
+  public UnsynchronizedMemoryReservationContext(final QueryId queryId, final 
String contextHolder) {
+    this.queryId = queryId;
+    this.contextHolder = contextHolder;
+  }
+
+  @Override
+  public void reserveMemoryAccumulatively(final long size) {
+    this.bytesToBeReserved += size;
+    if (this.bytesToBeReserved >= MEMORY_BATCH_THRESHOLD) {
+      reserveMemoryImmediately();
+    }
+  }
+
+  @Override
+  public void reserveMemoryImmediately() {
+    if (bytesToBeReserved != 0) {
+      LOCAL_EXECUTION_PLANNER.reserveFromFreeMemoryForOperators(
+          bytesToBeReserved, reservedBytesInTotal, queryId.getId(), 
contextHolder);
+      this.reservedBytesInTotal += bytesToBeReserved;
+      this.bytesToBeReserved = 0;
+    }
+  }
+
+  @Override
+  public void releaseMemoryAccumulatively(final long size) {
+    this.bytesToBeReleased += size;
+    if (bytesToBeReleased >= MEMORY_BATCH_THRESHOLD) {
+      long bytesToRelease;
+      if (this.bytesToBeReleased <= bytesToBeReserved) {
+        bytesToBeReserved -= this.bytesToBeReleased;
+      } else {
+        bytesToRelease = this.bytesToBeReleased - bytesToBeReserved;
+        bytesToBeReserved = 0;
+        
LOCAL_EXECUTION_PLANNER.releaseToFreeMemoryForOperators(bytesToRelease);
+        reservedBytesInTotal -= bytesToRelease;
+      }
+      this.bytesToBeReleased = 0;
+    }
+  }
+
+  @Override
+  public void releaseAllReservedMemory() {
+    if (reservedBytesInTotal != 0) {
+      
LOCAL_EXECUTION_PLANNER.releaseToFreeMemoryForOperators(reservedBytesInTotal);
+      reservedBytesInTotal = 0;
+      bytesToBeReserved = 0;
+      bytesToBeReleased = 0;
+    }
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/common/PriorityMergeReader.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/common/PriorityMergeReader.java
index 43ec7b9a8be..1b7f43b1d6c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/common/PriorityMergeReader.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/common/PriorityMergeReader.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.storageengine.dataregion.read.reader.common;
 
 import org.apache.iotdb.commons.utils.TestOnly;
+import 
org.apache.iotdb.db.queryengine.plan.planner.memory.SynchronizedMemoryReservationContextWrapper;
 
 import org.apache.tsfile.read.TimeValuePair;
 import org.apache.tsfile.read.reader.IPointReader;
@@ -40,6 +41,8 @@ public class PriorityMergeReader implements IPointReader {
 
   protected long usedMemorySize = 0;
 
+  protected SynchronizedMemoryReservationContextWrapper 
memoryReservationContextWrapper;
+
   public PriorityMergeReader() {
     heap =
         new PriorityQueue<>(
@@ -51,6 +54,11 @@ public class PriorityMergeReader implements IPointReader {
             });
   }
 
+  public void setMemoryReservationContextWrapper(
+      SynchronizedMemoryReservationContextWrapper 
memoryReservationContextWrapper) {
+    this.memoryReservationContextWrapper = memoryReservationContextWrapper;
+  }
+
   @TestOnly
   public void addReader(IPointReader reader, long priority) throws IOException 
{
     if (reader.hasNextTimeValuePair()) {
@@ -67,7 +75,11 @@ public class PriorityMergeReader implements IPointReader {
       Element element = new Element(reader, reader.nextTimeValuePair(), 
priority);
       heap.add(element);
       currentReadStopTime = Math.max(currentReadStopTime, endTime);
-      usedMemorySize += element.getReader().getUsedMemorySize();
+      long size = element.getReader().getUsedMemorySize();
+      usedMemorySize += size;
+      if (memoryReservationContextWrapper != null) {
+        memoryReservationContextWrapper.reserveMemoryAccumulatively(size);
+      }
     } else {
       reader.close();
     }
@@ -96,7 +108,11 @@ public class PriorityMergeReader implements IPointReader {
       top.setTimeValuePair(topNext);
       heap.add(top);
     } else {
-      usedMemorySize -= top.getReader().getUsedMemorySize();
+      long size = top.getReader().getUsedMemorySize();
+      usedMemorySize -= size;
+      if (memoryReservationContextWrapper != null) {
+        memoryReservationContextWrapper.releaseMemoryAccumulatively(size);
+      }
     }
     return ret;
   }
@@ -121,7 +137,11 @@ public class PriorityMergeReader implements IPointReader {
       Element e = heap.poll();
       fillNullValue(ret, e.getTimeValuePair());
       if (!e.hasNext()) {
-        usedMemorySize -= e.getReader().getUsedMemorySize();
+        long size = e.getReader().getUsedMemorySize();
+        usedMemorySize -= size;
+        if (memoryReservationContextWrapper != null) {
+          memoryReservationContextWrapper.releaseMemoryAccumulatively(size);
+        }
         e.getReader().close();
         continue;
       }
@@ -133,7 +153,11 @@ public class PriorityMergeReader implements IPointReader {
           e.next();
           heap.add(e);
         } else {
-          usedMemorySize -= e.getReader().getUsedMemorySize();
+          long size = e.getReader().getUsedMemorySize();
+          usedMemorySize -= size;
+          if (memoryReservationContextWrapper != null) {
+            memoryReservationContextWrapper.releaseMemoryAccumulatively(size);
+          }
           // the chunk is end
           e.close();
         }

Reply via email to