This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new bce9631e108 Add memory control for MergeReader
bce9631e108 is described below
commit bce9631e108b8a2a2674011f2c239d08d15f7449
Author: Liao Lanyu <[email protected]>
AuthorDate: Fri Jun 21 17:54:07 2024 +0800
Add memory control for MergeReader
---
.../db/queryengine/common/MPPQueryContext.java | 51 +++---------
.../fragment/FragmentInstanceContext.java | 19 +++++
.../fragment/FragmentInstanceExecution.java | 2 +
.../AbstractSeriesAggregationScanOperator.java | 2 +-
.../operator/source/AlignedSeriesScanOperator.java | 4 +-
.../operator/source/SeriesScanOperator.java | 3 +-
.../execution/operator/source/SeriesScanUtil.java | 3 +-
.../iotdb/db/queryengine/plan/Coordinator.java | 2 +-
.../plan/optimization/AggregationPushDown.java | 25 +-----
.../plan/planner/LocalExecutionPlanner.java | 13 +--
.../planner/memory/MemoryReservationManager.java | 46 +++++++++++
.../NotThreadSafeMemoryReservationManager.java | 94 ++++++++++++++++++++++
.../memory/ThreadSafeMemoryReservationManager.java | 51 ++++++++++++
.../read/reader/common/PriorityMergeReader.java | 31 ++++++-
.../execution/operator/OperatorMemoryTest.java | 19 ++---
15 files changed, 277 insertions(+), 88 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java
index 3a42ee805b0..ed4516223b5 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java
@@ -26,7 +26,8 @@ import
org.apache.iotdb.db.queryengine.plan.analyze.PredicateUtils;
import org.apache.iotdb.db.queryengine.plan.analyze.QueryType;
import org.apache.iotdb.db.queryengine.plan.analyze.TypeProvider;
import org.apache.iotdb.db.queryengine.plan.analyze.lock.SchemaLockType;
-import org.apache.iotdb.db.queryengine.plan.planner.LocalExecutionPlanner;
+import
org.apache.iotdb.db.queryengine.plan.planner.memory.MemoryReservationManager;
+import
org.apache.iotdb.db.queryengine.plan.planner.memory.NotThreadSafeMemoryReservationManager;
import org.apache.iotdb.db.queryengine.statistics.QueryPlanStatistics;
import org.apache.tsfile.read.filter.basic.Filter;
@@ -78,20 +79,13 @@ public class MPPQueryContext {
// To avoid query front-end from consuming too much memory, it needs to
reserve memory when
// constructing some Expression and PlanNode.
- private long reservedBytesInTotalForFrontEnd = 0;
-
- private long bytesToBeReservedForFrontEnd = 0;
-
- // To avoid reserving memory too frequently, we choose to do it in batches.
This is the lower
- // bound
- // for each batch.
- private static final long MEMORY_BATCH_THRESHOLD = 1024L * 1024L;
-
- private final LocalExecutionPlanner LOCAL_EXECUTION_PLANNER =
LocalExecutionPlanner.getInstance();
+ private final MemoryReservationManager memoryReservationManager;
public MPPQueryContext(QueryId queryId) {
this.queryId = queryId;
this.endPointBlackList = new LinkedList<>();
+ this.memoryReservationManager =
+ new NotThreadSafeMemoryReservationManager(queryId,
this.getClass().getName());
}
// TODO too many callers just pass a null SessionInfo which should be
forbidden
@@ -127,7 +121,7 @@ public class MPPQueryContext {
public void prepareForRetry() {
this.initResultNodeContext();
- this.releaseMemoryForFrontEnd();
+ this.releaseAllMemoryReservedForFrontEnd();
}
private void initResultNodeContext() {
@@ -313,40 +307,19 @@ public class MPPQueryContext {
* single-threaded manner.
*/
public void reserveMemoryForFrontEnd(final long bytes) {
- this.bytesToBeReservedForFrontEnd += bytes;
- if (this.bytesToBeReservedForFrontEnd >= MEMORY_BATCH_THRESHOLD) {
- reserveMemoryForFrontEndImmediately();
- }
+ this.memoryReservationManager.reserveMemoryCumulatively(bytes);
}
public void reserveMemoryForFrontEndImmediately() {
- if (bytesToBeReservedForFrontEnd != 0) {
- LOCAL_EXECUTION_PLANNER.reserveMemoryForQueryFrontEnd(
- bytesToBeReservedForFrontEnd, reservedBytesInTotalForFrontEnd,
queryId.getId());
- this.reservedBytesInTotalForFrontEnd += bytesToBeReservedForFrontEnd;
- this.bytesToBeReservedForFrontEnd = 0;
- }
+ this.memoryReservationManager.reserveMemoryImmediately();
}
- public void releaseMemoryForFrontEnd() {
- if (reservedBytesInTotalForFrontEnd != 0) {
-
LOCAL_EXECUTION_PLANNER.releaseToFreeMemoryForOperators(reservedBytesInTotalForFrontEnd);
- reservedBytesInTotalForFrontEnd = 0;
- }
+ public void releaseAllMemoryReservedForFrontEnd() {
+ this.memoryReservationManager.releaseAllReservedMemory();
}
- public void releaseMemoryForFrontEnd(final long bytes) {
- if (bytes != 0) {
- long bytesToRelease;
- if (bytes <= bytesToBeReservedForFrontEnd) {
- bytesToBeReservedForFrontEnd -= bytes;
- } else {
- bytesToRelease = bytes - bytesToBeReservedForFrontEnd;
- bytesToBeReservedForFrontEnd = 0;
-
LOCAL_EXECUTION_PLANNER.releaseToFreeMemoryForOperators(bytesToRelease);
- reservedBytesInTotalForFrontEnd -= bytesToRelease;
- }
- }
+ public void releaseMemoryReservedForFrontEnd(final long bytes) {
+ this.memoryReservationManager.releaseMemoryCumulatively(bytes);
}
// endregion
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
index a109fe84963..7c83518fcf0 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
@@ -28,6 +28,8 @@ import org.apache.iotdb.db.queryengine.common.QueryId;
import org.apache.iotdb.db.queryengine.common.SessionInfo;
import org.apache.iotdb.db.queryengine.metric.QueryRelatedResourceMetricSet;
import org.apache.iotdb.db.queryengine.metric.SeriesScanCostMetricSet;
+import
org.apache.iotdb.db.queryengine.plan.planner.memory.MemoryReservationManager;
+import
org.apache.iotdb.db.queryengine.plan.planner.memory.ThreadSafeMemoryReservationManager;
import org.apache.iotdb.db.queryengine.plan.planner.plan.TimePredicate;
import org.apache.iotdb.db.storageengine.dataregion.IDataRegionForQuery;
import org.apache.iotdb.db.storageengine.dataregion.read.IQueryDataSource;
@@ -63,6 +65,8 @@ public class FragmentInstanceContext extends QueryContext {
private final FragmentInstanceStateMachine stateMachine;
+ private final MemoryReservationManager memoryReservationManager;
+
private IDataRegionForQuery dataRegion;
private Filter globalTimeFilter;
@@ -193,6 +197,8 @@ public class FragmentInstanceContext extends QueryContext {
globalTimePredicate == null ? null :
globalTimePredicate.convertPredicateToTimeFilter();
this.dataNodeQueryContextMap = dataNodeQueryContextMap;
this.dataNodeQueryContext = dataNodeQueryContextMap.get(id.getQueryId());
+ this.memoryReservationManager =
+ new ThreadSafeMemoryReservationManager(id.getQueryId(),
this.getClass().getName());
}
private FragmentInstanceContext(
@@ -203,6 +209,8 @@ public class FragmentInstanceContext extends QueryContext {
this.sessionInfo = sessionInfo;
this.dataNodeQueryContextMap = null;
this.dataNodeQueryContext = null;
+ this.memoryReservationManager =
+ new ThreadSafeMemoryReservationManager(id.getQueryId(),
this.getClass().getName());
}
private FragmentInstanceContext(
@@ -218,6 +226,8 @@ public class FragmentInstanceContext extends QueryContext {
this.dataRegion = dataRegion;
this.globalTimeFilter = globalTimeFilter;
this.dataNodeQueryContextMap = null;
+ this.memoryReservationManager =
+ new ThreadSafeMemoryReservationManager(id.getQueryId(),
this.getClass().getName());
}
@TestOnly
@@ -232,6 +242,7 @@ public class FragmentInstanceContext extends QueryContext {
this.stateMachine = null;
this.dataNodeQueryContextMap = null;
this.dataNodeQueryContext = null;
+ this.memoryReservationManager = null;
}
public void start() {
@@ -369,6 +380,14 @@ public class FragmentInstanceContext extends QueryContext {
this.devicePathsToContext = devicePathsToContext;
}
+ public MemoryReservationManager getMemoryReservationContext() {
+ return memoryReservationManager;
+ }
+
+ public void releaseMemoryReservationManager() {
+ memoryReservationManager.releaseAllReservedMemory();
+ }
+
public void initQueryDataSource(List<PartialPath> sourcePaths) throws
QueryProcessException {
long startTime = System.nanoTime();
if (sourcePaths == null) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecution.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecution.java
index b31d4e13eeb..794b9c3d234 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecution.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecution.java
@@ -311,6 +311,8 @@ public class FragmentInstanceExecution {
exchangeManager.deRegisterFragmentInstanceFromMemoryPool(
instanceId.getQueryId().getId(),
instanceId.getFragmentInstanceId(), true);
+ context.releaseMemoryReservationManager();
+
if (newState.isFailed()) {
scheduler.abortFragmentInstance(instanceId);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AbstractSeriesAggregationScanOperator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AbstractSeriesAggregationScanOperator.java
index 75fe64629e1..d8ca5829403 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AbstractSeriesAggregationScanOperator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AbstractSeriesAggregationScanOperator.java
@@ -94,7 +94,7 @@ public abstract class AbstractSeriesAggregationScanOperator
extends AbstractData
this.timeRangeIterator = timeRangeIterator;
this.cachedRawDataSize =
- (1L + subSensorSize) *
TSFileDescriptor.getInstance().getConfig().getPageSizeInByte() * 3;
+ (1L + subSensorSize) *
TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
this.maxReturnSize = maxReturnSize;
this.outputEndTime = outputEndTime;
this.canUseStatistics = canUseStatistics;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AlignedSeriesScanOperator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AlignedSeriesScanOperator.java
index 95ea59abac4..a8a8310e018 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AlignedSeriesScanOperator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AlignedSeriesScanOperator.java
@@ -79,9 +79,7 @@ public class AlignedSeriesScanOperator extends
AbstractSeriesScanOperator {
public long calculateMaxPeekMemory() {
return Math.max(
maxReturnSize,
- (1L + valueColumnCount)
- * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte()
- * 3L);
+ (1L + valueColumnCount) *
TSFileDescriptor.getInstance().getConfig().getPageSizeInByte());
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanOperator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanOperator.java
index d4ce1caa624..81466851a85 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanOperator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanOperator.java
@@ -54,8 +54,7 @@ public class SeriesScanOperator extends
AbstractSeriesScanOperator {
@Override
public long calculateMaxPeekMemory() {
- return Math.max(
- maxReturnSize,
TSFileDescriptor.getInstance().getConfig().getPageSizeInByte() * 3L);
+ return Math.max(maxReturnSize,
TSFileDescriptor.getInstance().getConfig().getPageSizeInByte());
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java
index e09bd364084..66c833401a8 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java
@@ -70,7 +70,7 @@ import static
org.apache.iotdb.db.queryengine.metric.SeriesScanCostMetricSet.BUI
public class SeriesScanUtil implements Accountable {
- protected final QueryContext context;
+ protected final FragmentInstanceContext context;
// The path of the target series which will be scanned.
protected final PartialPath seriesPath;
@@ -143,6 +143,7 @@ public class SeriesScanUtil implements Accountable {
this.orderUtils = new DescTimeOrderUtils();
this.mergeReader = getDescPriorityMergeReader();
}
+
this.mergeReader.setMemoryReservationManager(context.getMemoryReservationContext());
// init TimeSeriesMetadata materializer
this.seqTimeSeriesMetadata = new LinkedList<>();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
index 65dcd830195..5d8c7830e3f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
@@ -142,7 +142,7 @@ public class Coordinator {
return result;
} finally {
if (queryContext != null) {
- queryContext.releaseMemoryForFrontEnd();
+ queryContext.releaseAllMemoryReservedForFrontEnd();
}
if (queryContext != null &&
!queryContext.getAcquiredLockNumMap().isEmpty()) {
Map<SchemaLockType, Integer> lockMap =
queryContext.getAcquiredLockNumMap();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/optimization/AggregationPushDown.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/optimization/AggregationPushDown.java
index 994f1a30056..a8b01ef6a84 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/optimization/AggregationPushDown.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/optimization/AggregationPushDown.java
@@ -92,12 +92,9 @@ public class AggregationPushDown implements PlanOptimizer {
RewriterContext rewriterContext =
new RewriterContext(analysis, context,
queryStatement.isAlignByDevice());
PlanNode node;
- try {
- node = plan.accept(new Rewriter(), rewriterContext);
- } finally {
- // release the last batch of memory
- rewriterContext.releaseMemoryForFrontEndImmediately();
- }
+
+ node = plan.accept(new Rewriter(), rewriterContext);
+
return node;
}
@@ -633,8 +630,6 @@ public class AggregationPushDown implements PlanOptimizer {
private static class RewriterContext {
- private static final long RELEASE_BATCH_SIZE = 1024L * 1024L;
-
private final Analysis analysis;
private final MPPQueryContext context;
private final boolean isAlignByDevice;
@@ -642,8 +637,6 @@ public class AggregationPushDown implements PlanOptimizer {
private String curDevice;
private PartialPath curDevicePath;
- private long bytesToBeReleased = 0;
-
public RewriterContext(Analysis analysis, MPPQueryContext context, boolean
isAlignByDevice) {
this.analysis = analysis;
Validate.notNull(context, "Query context cannot be null.");
@@ -683,17 +676,7 @@ public class AggregationPushDown implements PlanOptimizer {
}
public void releaseMemoryForFrontEnd(final long bytes) {
- bytesToBeReleased += bytes;
- if (bytesToBeReleased >= RELEASE_BATCH_SIZE) {
- releaseMemoryForFrontEndImmediately();
- }
- }
-
- public void releaseMemoryForFrontEndImmediately() {
- if (bytesToBeReleased > 0) {
- context.releaseMemoryForFrontEnd(bytesToBeReleased);
- bytesToBeReleased = 0;
- }
+ this.context.releaseMemoryReservedForFrontEnd(bytes);
}
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java
index 465c2e2e723..8d9410f92c0 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java
@@ -236,15 +236,18 @@ public class LocalExecutionPlanner {
}
}
- public synchronized void reserveMemoryForQueryFrontEnd(
- final long memoryInBytes, final long reservedBytes, final String
queryId) {
+ public synchronized void reserveFromFreeMemoryForOperators(
+ final long memoryInBytes,
+ final long reservedBytes,
+ final String queryId,
+ final String contextHolder) {
if (memoryInBytes > freeMemoryForOperators) {
throw new MemoryNotEnoughException(
String.format(
- "There is not enough memory for planning-stage of Query %s, "
+ "There is not enough memory for Query %s, the contextHolder is
%s,"
+ "current remaining free memory is %dB, "
- + "estimated memory usage is %dB, reserved memory for FE of
this query in total is %dB",
- queryId, freeMemoryForOperators, memoryInBytes, reservedBytes));
+ + "reserved memory for this context in total is %dB.",
+ queryId, contextHolder, freeMemoryForOperators, reservedBytes));
} else {
freeMemoryForOperators -= memoryInBytes;
if (LOGGER.isDebugEnabled()) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/MemoryReservationManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/MemoryReservationManager.java
new file mode 100644
index 00000000000..ba55d60b096
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/MemoryReservationManager.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.planner.memory;
+
+public interface MemoryReservationManager {
+ /**
+ * Reserve memory for the given size. The memory reservation request will be
accumulated and the
+ * actual memory will be reserved when the accumulated memory exceeds the
threshold.
+ *
+ * @param size the size of memory to reserve
+ */
+ void reserveMemoryCumulatively(final long size);
+
+ /** Reserve memory for the accumulated memory size immediately. */
+ void reserveMemoryImmediately();
+
+ /**
+ * Release memory for the given size.
+ *
+ * @param size the size of memory to release
+ */
+ void releaseMemoryCumulatively(final long size);
+
+ /**
+ * Release all reserved memory immediately. Make sure this method is called
when the lifecycle of
+ * this manager ends, Or the memory to be released in the batch may not be
released correctly.
+ */
+ void releaseAllReservedMemory();
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/NotThreadSafeMemoryReservationManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/NotThreadSafeMemoryReservationManager.java
new file mode 100644
index 00000000000..7d8d4b076c2
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/NotThreadSafeMemoryReservationManager.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.planner.memory;
+
+import org.apache.iotdb.db.queryengine.common.QueryId;
+import org.apache.iotdb.db.queryengine.plan.planner.LocalExecutionPlanner;
+
+import javax.annotation.concurrent.NotThreadSafe;
+
+@NotThreadSafe
+public class NotThreadSafeMemoryReservationManager implements
MemoryReservationManager {
+ // To avoid reserving memory too frequently, we choose to do it in batches.
This is the lower
+ // bound for each batch.
+ private static final long MEMORY_BATCH_THRESHOLD = 1024L * 1024L;
+
+ private final LocalExecutionPlanner LOCAL_EXECUTION_PLANNER =
LocalExecutionPlanner.getInstance();
+
+ private final QueryId queryId;
+
+ private final String contextHolder;
+
+ private long reservedBytesInTotal = 0;
+
+ private long bytesToBeReserved = 0;
+
+ private long bytesToBeReleased = 0;
+
+ public NotThreadSafeMemoryReservationManager(final QueryId queryId, final
String contextHolder) {
+ this.queryId = queryId;
+ this.contextHolder = contextHolder;
+ }
+
+ @Override
+ public void reserveMemoryCumulatively(final long size) {
+ bytesToBeReserved += size;
+ if (bytesToBeReserved >= MEMORY_BATCH_THRESHOLD) {
+ reserveMemoryImmediately();
+ }
+ }
+
+ @Override
+ public void reserveMemoryImmediately() {
+ if (bytesToBeReserved != 0) {
+ LOCAL_EXECUTION_PLANNER.reserveFromFreeMemoryForOperators(
+ bytesToBeReserved, reservedBytesInTotal, queryId.getId(),
contextHolder);
+ reservedBytesInTotal += bytesToBeReserved;
+ bytesToBeReserved = 0;
+ }
+ }
+
+ @Override
+ public void releaseMemoryCumulatively(final long size) {
+ bytesToBeReleased += size;
+ if (bytesToBeReleased >= MEMORY_BATCH_THRESHOLD) {
+ long bytesToRelease;
+ if (bytesToBeReleased <= bytesToBeReserved) {
+ bytesToBeReserved -= bytesToBeReleased;
+ } else {
+ bytesToRelease = bytesToBeReleased - bytesToBeReserved;
+ bytesToBeReserved = 0;
+
LOCAL_EXECUTION_PLANNER.releaseToFreeMemoryForOperators(bytesToRelease);
+ reservedBytesInTotal -= bytesToRelease;
+ }
+ bytesToBeReleased = 0;
+ }
+ }
+
+ @Override
+ public void releaseAllReservedMemory() {
+ if (reservedBytesInTotal != 0) {
+
LOCAL_EXECUTION_PLANNER.releaseToFreeMemoryForOperators(reservedBytesInTotal);
+ reservedBytesInTotal = 0;
+ bytesToBeReserved = 0;
+ bytesToBeReleased = 0;
+ }
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/ThreadSafeMemoryReservationManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/ThreadSafeMemoryReservationManager.java
new file mode 100644
index 00000000000..efe83d23c05
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/ThreadSafeMemoryReservationManager.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.planner.memory;
+
+import org.apache.iotdb.db.queryengine.common.QueryId;
+
+import javax.annotation.concurrent.ThreadSafe;
+
+@ThreadSafe
+public class ThreadSafeMemoryReservationManager extends
NotThreadSafeMemoryReservationManager {
+ public ThreadSafeMemoryReservationManager(QueryId queryId, String
contextHolder) {
+ super(queryId, contextHolder);
+ }
+
+ @Override
+ public synchronized void reserveMemoryCumulatively(long size) {
+ super.reserveMemoryCumulatively(size);
+ }
+
+ @Override
+ public synchronized void reserveMemoryImmediately() {
+ super.reserveMemoryImmediately();
+ }
+
+ @Override
+ public synchronized void releaseMemoryCumulatively(long size) {
+ super.releaseMemoryCumulatively(size);
+ }
+
+ @Override
+ public synchronized void releaseAllReservedMemory() {
+ super.releaseAllReservedMemory();
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/common/PriorityMergeReader.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/common/PriorityMergeReader.java
index 43ec7b9a8be..a7c96131acf 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/common/PriorityMergeReader.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/common/PriorityMergeReader.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.storageengine.dataregion.read.reader.common;
import org.apache.iotdb.commons.utils.TestOnly;
+import
org.apache.iotdb.db.queryengine.plan.planner.memory.MemoryReservationManager;
import org.apache.tsfile.read.TimeValuePair;
import org.apache.tsfile.read.reader.IPointReader;
@@ -40,6 +41,8 @@ public class PriorityMergeReader implements IPointReader {
protected long usedMemorySize = 0;
+ protected MemoryReservationManager memoryReservationManager;
+
public PriorityMergeReader() {
heap =
new PriorityQueue<>(
@@ -51,6 +54,10 @@ public class PriorityMergeReader implements IPointReader {
});
}
+ public void setMemoryReservationManager(MemoryReservationManager
memoryReservationManager) {
+ this.memoryReservationManager = memoryReservationManager;
+ }
+
@TestOnly
public void addReader(IPointReader reader, long priority) throws IOException
{
if (reader.hasNextTimeValuePair()) {
@@ -67,7 +74,11 @@ public class PriorityMergeReader implements IPointReader {
Element element = new Element(reader, reader.nextTimeValuePair(),
priority);
heap.add(element);
currentReadStopTime = Math.max(currentReadStopTime, endTime);
- usedMemorySize += element.getReader().getUsedMemorySize();
+ long size = element.getReader().getUsedMemorySize();
+ usedMemorySize += size;
+ if (memoryReservationManager != null) {
+ memoryReservationManager.reserveMemoryCumulatively(size);
+ }
} else {
reader.close();
}
@@ -96,7 +107,11 @@ public class PriorityMergeReader implements IPointReader {
top.setTimeValuePair(topNext);
heap.add(top);
} else {
- usedMemorySize -= top.getReader().getUsedMemorySize();
+ long size = top.getReader().getUsedMemorySize();
+ usedMemorySize -= size;
+ if (memoryReservationManager != null) {
+ memoryReservationManager.releaseMemoryCumulatively(size);
+ }
}
return ret;
}
@@ -121,7 +136,11 @@ public class PriorityMergeReader implements IPointReader {
Element e = heap.poll();
fillNullValue(ret, e.getTimeValuePair());
if (!e.hasNext()) {
- usedMemorySize -= e.getReader().getUsedMemorySize();
+ long size = e.getReader().getUsedMemorySize();
+ usedMemorySize -= size;
+ if (memoryReservationManager != null) {
+ memoryReservationManager.releaseMemoryCumulatively(size);
+ }
e.getReader().close();
continue;
}
@@ -133,7 +152,11 @@ public class PriorityMergeReader implements IPointReader {
e.next();
heap.add(e);
} else {
- usedMemorySize -= e.getReader().getUsedMemorySize();
+ long size = e.getReader().getUsedMemorySize();
+ usedMemorySize -= size;
+ if (memoryReservationManager != null) {
+ memoryReservationManager.releaseMemoryCumulatively(size);
+ }
// the chunk is end
e.close();
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/OperatorMemoryTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/OperatorMemoryTest.java
index 335dd31c74a..d155d850856 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/OperatorMemoryTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/OperatorMemoryTest.java
@@ -150,14 +150,12 @@ public class OperatorMemoryTest {
scanOptionsBuilder.build());
assertEquals(
- TSFileDescriptor.getInstance().getConfig().getPageSizeInByte() * 3L,
+ TSFileDescriptor.getInstance().getConfig().getPageSizeInByte(),
seriesScanOperator.calculateMaxPeekMemory());
assertEquals(
TSFileDescriptor.getInstance().getConfig().getPageSizeInByte(),
seriesScanOperator.calculateMaxReturnSize());
- assertEquals(
- TSFileDescriptor.getInstance().getConfig().getPageSizeInByte() * 2L,
- seriesScanOperator.calculateRetainedSizeAfterCallingNext());
+ assertEquals(0L,
seriesScanOperator.calculateRetainedSizeAfterCallingNext());
} catch (IllegalPathException e) {
e.printStackTrace();
@@ -202,7 +200,7 @@ public class OperatorMemoryTest {
long maxPeekMemory =
Math.max(
TSFileDescriptor.getInstance().getConfig().getMaxTsBlockSizeInBytes(),
- 4 *
TSFileDescriptor.getInstance().getConfig().getPageSizeInByte() * 3L);
+ 4 *
TSFileDescriptor.getInstance().getConfig().getPageSizeInByte());
long maxReturnMemory =
Math.min(
TSFileDescriptor.getInstance().getConfig().getMaxTsBlockSizeInBytes(),
@@ -1006,8 +1004,7 @@ public class OperatorMemoryTest {
TimeColumn.SIZE_IN_BYTES_PER_POSITION
+ 512 * Byte.BYTES
+ LongColumn.SIZE_IN_BYTES_PER_POSITION;
- long cachedRawDataSize =
- 2L * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte()
* 3;
+ long cachedRawDataSize = 2L *
TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
assertEquals(
expectedMaxReturnSize + cachedRawDataSize,
@@ -1039,7 +1036,7 @@ public class OperatorMemoryTest {
TimeColumn.SIZE_IN_BYTES_PER_POSITION
+ 512 * Byte.BYTES
+ 2 * LongColumn.SIZE_IN_BYTES_PER_POSITION;
- cachedRawDataSize = 2L *
TSFileDescriptor.getInstance().getConfig().getPageSizeInByte() * 3;
+ cachedRawDataSize = 2L *
TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
assertEquals(
expectedMaxReturnSize + cachedRawDataSize,
@@ -1082,7 +1079,7 @@ public class OperatorMemoryTest {
* (TimeColumn.SIZE_IN_BYTES_PER_POSITION
+ 512 * Byte.BYTES
+ LongColumn.SIZE_IN_BYTES_PER_POSITION);
- cachedRawDataSize = 2L *
TSFileDescriptor.getInstance().getConfig().getPageSizeInByte() * 3;
+ cachedRawDataSize = 2L *
TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
assertEquals(
expectedMaxReturnSize + cachedRawDataSize,
@@ -1122,7 +1119,7 @@ public class OperatorMemoryTest {
* (TimeColumn.SIZE_IN_BYTES_PER_POSITION
+ 512 * Byte.BYTES
+ LongColumn.SIZE_IN_BYTES_PER_POSITION));
- cachedRawDataSize = 2L *
TSFileDescriptor.getInstance().getConfig().getPageSizeInByte() * 3;
+ cachedRawDataSize = 2L *
TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
assertEquals(
expectedMaxReturnSize + cachedRawDataSize,
@@ -1160,7 +1157,7 @@ public class OperatorMemoryTest {
typeProvider);
expectedMaxReturnSize = DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;
- cachedRawDataSize = 2L *
TSFileDescriptor.getInstance().getConfig().getPageSizeInByte() * 3;
+ cachedRawDataSize = 2L *
TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
assertEquals(
expectedMaxReturnSize + cachedRawDataSize,