This is an automated email from the ASF dual-hosted git repository. lancelly pushed a commit to branch memoryControl in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit eed9976c1adb8af27f3eeecd232ee6098cf28c50 Author: lancelly <[email protected]> AuthorDate: Thu Jun 20 15:56:40 2024 +0800 MemoryReservationContext --- .../db/queryengine/common/MPPQueryContext.java | 51 +++--------- .../fragment/FragmentInstanceContext.java | 19 +++++ .../fragment/FragmentInstanceExecution.java | 2 + .../execution/operator/source/SeriesScanUtil.java | 5 +- .../iotdb/db/queryengine/plan/Coordinator.java | 2 +- .../plan/optimization/AggregationPushDown.java | 25 +----- .../plan/planner/LocalExecutionPlanner.java | 13 ++-- .../planner/memory/MemoryReservationContext.java | 46 +++++++++++ .../SynchronizedMemoryReservationContext.java | 48 ++++++++++++ ...ynchronizedMemoryReservationContextWrapper.java | 72 +++++++++++++++++ .../UnsynchronizedMemoryReservationContext.java | 91 ++++++++++++++++++++++ .../read/reader/common/PriorityMergeReader.java | 32 +++++++- 12 files changed, 335 insertions(+), 71 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java index 3a42ee805b0..9d865e0a624 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java @@ -26,7 +26,8 @@ import org.apache.iotdb.db.queryengine.plan.analyze.PredicateUtils; import org.apache.iotdb.db.queryengine.plan.analyze.QueryType; import org.apache.iotdb.db.queryengine.plan.analyze.TypeProvider; import org.apache.iotdb.db.queryengine.plan.analyze.lock.SchemaLockType; -import org.apache.iotdb.db.queryengine.plan.planner.LocalExecutionPlanner; +import org.apache.iotdb.db.queryengine.plan.planner.memory.MemoryReservationContext; +import org.apache.iotdb.db.queryengine.plan.planner.memory.UnsynchronizedMemoryReservationContext; import org.apache.iotdb.db.queryengine.statistics.QueryPlanStatistics; import org.apache.tsfile.read.filter.basic.Filter; @@ -78,20 +79,13 @@ public class MPPQueryContext { // To avoid query front-end from consuming too much memory, it needs to reserve memory when // constructing some Expression and PlanNode. - private long reservedBytesInTotalForFrontEnd = 0; - - private long bytesToBeReservedForFrontEnd = 0; - - // To avoid reserving memory too frequently, we choose to do it in batches. This is the lower - // bound - // for each batch. - private static final long MEMORY_BATCH_THRESHOLD = 1024L * 1024L; - - private final LocalExecutionPlanner LOCAL_EXECUTION_PLANNER = LocalExecutionPlanner.getInstance(); + private final MemoryReservationContext memoryReservationContext; public MPPQueryContext(QueryId queryId) { this.queryId = queryId; this.endPointBlackList = new LinkedList<>(); + this.memoryReservationContext = + new UnsynchronizedMemoryReservationContext(queryId, "MPPQueryContext"); } // TODO too many callers just pass a null SessionInfo which should be forbidden @@ -127,7 +121,7 @@ public class MPPQueryContext { public void prepareForRetry() { this.initResultNodeContext(); - this.releaseMemoryForFrontEnd(); + this.releaseAllMemoryReservedForFrontEnd(); } private void initResultNodeContext() { @@ -313,40 +307,19 @@ public class MPPQueryContext { * single-threaded manner. */ public void reserveMemoryForFrontEnd(final long bytes) { - this.bytesToBeReservedForFrontEnd += bytes; - if (this.bytesToBeReservedForFrontEnd >= MEMORY_BATCH_THRESHOLD) { - reserveMemoryForFrontEndImmediately(); - } + this.memoryReservationContext.reserveMemoryAccumulatively(bytes); } public void reserveMemoryForFrontEndImmediately() { - if (bytesToBeReservedForFrontEnd != 0) { - LOCAL_EXECUTION_PLANNER.reserveMemoryForQueryFrontEnd( - bytesToBeReservedForFrontEnd, reservedBytesInTotalForFrontEnd, queryId.getId()); - this.reservedBytesInTotalForFrontEnd += bytesToBeReservedForFrontEnd; - this.bytesToBeReservedForFrontEnd = 0; - } + this.memoryReservationContext.reserveMemoryImmediately(); } - public void releaseMemoryForFrontEnd() { - if (reservedBytesInTotalForFrontEnd != 0) { - LOCAL_EXECUTION_PLANNER.releaseToFreeMemoryForOperators(reservedBytesInTotalForFrontEnd); - reservedBytesInTotalForFrontEnd = 0; - } + public void releaseAllMemoryReservedForFrontEnd() { + this.memoryReservationContext.releaseAllReservedMemory(); } - public void releaseMemoryForFrontEnd(final long bytes) { - if (bytes != 0) { - long bytesToRelease; - if (bytes <= bytesToBeReservedForFrontEnd) { - bytesToBeReservedForFrontEnd -= bytes; - } else { - bytesToRelease = bytes - bytesToBeReservedForFrontEnd; - bytesToBeReservedForFrontEnd = 0; - LOCAL_EXECUTION_PLANNER.releaseToFreeMemoryForOperators(bytesToRelease); - reservedBytesInTotalForFrontEnd -= bytesToRelease; - } - } + public void releaseAllMemoryReservedForFrontEnd(final long bytes) { + this.memoryReservationContext.releaseMemoryAccumulatively(bytes); } // endregion diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java index a109fe84963..152daa8bd42 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java @@ -28,6 +28,8 @@ import org.apache.iotdb.db.queryengine.common.QueryId; import org.apache.iotdb.db.queryengine.common.SessionInfo; import org.apache.iotdb.db.queryengine.metric.QueryRelatedResourceMetricSet; import org.apache.iotdb.db.queryengine.metric.SeriesScanCostMetricSet; +import org.apache.iotdb.db.queryengine.plan.planner.memory.MemoryReservationContext; +import org.apache.iotdb.db.queryengine.plan.planner.memory.UnsynchronizedMemoryReservationContext; import org.apache.iotdb.db.queryengine.plan.planner.plan.TimePredicate; import org.apache.iotdb.db.storageengine.dataregion.IDataRegionForQuery; import org.apache.iotdb.db.storageengine.dataregion.read.IQueryDataSource; @@ -63,6 +65,8 @@ public class FragmentInstanceContext extends QueryContext { private final FragmentInstanceStateMachine stateMachine; + private final MemoryReservationContext memoryReservationContext; + private IDataRegionForQuery dataRegion; private Filter globalTimeFilter; @@ -193,6 +197,8 @@ public class FragmentInstanceContext extends QueryContext { globalTimePredicate == null ? null : globalTimePredicate.convertPredicateToTimeFilter(); this.dataNodeQueryContextMap = dataNodeQueryContextMap; this.dataNodeQueryContext = dataNodeQueryContextMap.get(id.getQueryId()); + this.memoryReservationContext = + new UnsynchronizedMemoryReservationContext(id.getQueryId(), "FragmentInstanceContext"); } private FragmentInstanceContext( @@ -203,6 +209,8 @@ public class FragmentInstanceContext extends QueryContext { this.sessionInfo = sessionInfo; this.dataNodeQueryContextMap = null; this.dataNodeQueryContext = null; + this.memoryReservationContext = + new UnsynchronizedMemoryReservationContext(id.getQueryId(), "FragmentInstanceContext"); } private FragmentInstanceContext( @@ -218,6 +226,8 @@ public class FragmentInstanceContext extends QueryContext { this.dataRegion = dataRegion; this.globalTimeFilter = globalTimeFilter; this.dataNodeQueryContextMap = null; + this.memoryReservationContext = + new UnsynchronizedMemoryReservationContext(id.getQueryId(), "FragmentInstanceContext"); } @TestOnly @@ -232,6 +242,7 @@ public class FragmentInstanceContext extends QueryContext { this.stateMachine = null; this.dataNodeQueryContextMap = null; this.dataNodeQueryContext = null; + this.memoryReservationContext = null; } public void start() { @@ -369,6 +380,14 @@ public class FragmentInstanceContext extends QueryContext { this.devicePathsToContext = devicePathsToContext; } + public MemoryReservationContext getMemoryReservationContext() { + return memoryReservationContext; + } + + public void releaseMemoryReservationContext() { + memoryReservationContext.releaseAllReservedMemory(); + } + public void initQueryDataSource(List<PartialPath> sourcePaths) throws QueryProcessException { long startTime = System.nanoTime(); if (sourcePaths == null) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecution.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecution.java index b31d4e13eeb..e760e7e8759 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecution.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecution.java @@ -311,6 +311,8 @@ public class FragmentInstanceExecution { exchangeManager.deRegisterFragmentInstanceFromMemoryPool( instanceId.getQueryId().getId(), instanceId.getFragmentInstanceId(), true); + context.releaseMemoryReservationContext(); + if (newState.isFailed()) { scheduler.abortFragmentInstance(instanceId); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java index e09bd364084..142cf4942df 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java @@ -25,6 +25,7 @@ import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContex import org.apache.iotdb.db.queryengine.execution.fragment.QueryContext; import org.apache.iotdb.db.queryengine.metric.SeriesScanCostMetricSet; import org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeTTLCache; +import org.apache.iotdb.db.queryengine.plan.planner.memory.SynchronizedMemoryReservationContextWrapper; import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.SeriesScanOptions; import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering; import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource; @@ -70,7 +71,7 @@ import static org.apache.iotdb.db.queryengine.metric.SeriesScanCostMetricSet.BUI public class SeriesScanUtil implements Accountable { - protected final QueryContext context; + protected final FragmentInstanceContext context; // The path of the target series which will be scanned. protected final PartialPath seriesPath; @@ -143,6 +144,8 @@ public class SeriesScanUtil implements Accountable { this.orderUtils = new DescTimeOrderUtils(); this.mergeReader = getDescPriorityMergeReader(); } + this.mergeReader.setMemoryReservationContextWrapper( + new SynchronizedMemoryReservationContextWrapper(context.getMemoryReservationContext())); // init TimeSeriesMetadata materializer this.seqTimeSeriesMetadata = new LinkedList<>(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java index 65dcd830195..5d8c7830e3f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java @@ -142,7 +142,7 @@ public class Coordinator { return result; } finally { if (queryContext != null) { - queryContext.releaseMemoryForFrontEnd(); + queryContext.releaseAllMemoryReservedForFrontEnd(); } if (queryContext != null && !queryContext.getAcquiredLockNumMap().isEmpty()) { Map<SchemaLockType, Integer> lockMap = queryContext.getAcquiredLockNumMap(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/optimization/AggregationPushDown.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/optimization/AggregationPushDown.java index 994f1a30056..3e71b582287 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/optimization/AggregationPushDown.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/optimization/AggregationPushDown.java @@ -92,12 +92,9 @@ public class AggregationPushDown implements PlanOptimizer { RewriterContext rewriterContext = new RewriterContext(analysis, context, queryStatement.isAlignByDevice()); PlanNode node; - try { - node = plan.accept(new Rewriter(), rewriterContext); - } finally { - // release the last batch of memory - rewriterContext.releaseMemoryForFrontEndImmediately(); - } + + node = plan.accept(new Rewriter(), rewriterContext); + return node; } @@ -633,8 +630,6 @@ public class AggregationPushDown implements PlanOptimizer { private static class RewriterContext { - private static final long RELEASE_BATCH_SIZE = 1024L * 1024L; - private final Analysis analysis; private final MPPQueryContext context; private final boolean isAlignByDevice; @@ -642,8 +637,6 @@ public class AggregationPushDown implements PlanOptimizer { private String curDevice; private PartialPath curDevicePath; - private long bytesToBeReleased = 0; - public RewriterContext(Analysis analysis, MPPQueryContext context, boolean isAlignByDevice) { this.analysis = analysis; Validate.notNull(context, "Query context cannot be null."); @@ -683,17 +676,7 @@ public class AggregationPushDown implements PlanOptimizer { } public void releaseMemoryForFrontEnd(final long bytes) { - bytesToBeReleased += bytes; - if (bytesToBeReleased >= RELEASE_BATCH_SIZE) { - releaseMemoryForFrontEndImmediately(); - } - } - - public void releaseMemoryForFrontEndImmediately() { - if (bytesToBeReleased > 0) { - context.releaseMemoryForFrontEnd(bytesToBeReleased); - bytesToBeReleased = 0; - } + this.context.releaseAllMemoryReservedForFrontEnd(bytes); } } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java index 465c2e2e723..3fe45ee6d7b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java @@ -236,15 +236,18 @@ public class LocalExecutionPlanner { } } - public synchronized void reserveMemoryForQueryFrontEnd( - final long memoryInBytes, final long reservedBytes, final String queryId) { + public synchronized void reserveFromFreeMemoryForOperators( + final long memoryInBytes, + final long reservedBytes, + final String queryId, + final String contextHolder) { if (memoryInBytes > freeMemoryForOperators) { throw new MemoryNotEnoughException( String.format( - "There is not enough memory for planning-stage of Query %s, " + "There is not enough memory for of Query %s, contextHolder is %s," + "current remaining free memory is %dB, " - + "estimated memory usage is %dB, reserved memory for FE of this query in total is %dB", - queryId, freeMemoryForOperators, memoryInBytes, reservedBytes)); + + "reserved memory for this context in total is %dB.", + queryId, contextHolder, freeMemoryForOperators, reservedBytes)); } else { freeMemoryForOperators -= memoryInBytes; if (LOGGER.isDebugEnabled()) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/MemoryReservationContext.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/MemoryReservationContext.java new file mode 100644 index 00000000000..27ea8242cbe --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/MemoryReservationContext.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.planner.memory; + +public interface MemoryReservationContext { + /** + * Reserve memory for the given size. The memory reservation request will be accumulated and the + * actual memory will be reserved when the accumulated memory exceeds the threshold. + * + * @param size the size of memory to reserve + */ + void reserveMemoryAccumulatively(final long size); + + /** Reserve memory for the accumulated memory size immediately. */ + void reserveMemoryImmediately(); + + /** + * Release memory for the given size. + * + * @param size the size of memory to release + */ + void releaseMemoryAccumulatively(final long size); + + /** + * Release all reserved memory immediately. Make sure this method is called when the lifecycle of + * this context ends, Or the memory to be released in the batch may not be released correctly. + */ + void releaseAllReservedMemory(); +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/SynchronizedMemoryReservationContext.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/SynchronizedMemoryReservationContext.java new file mode 100644 index 00000000000..48372477c85 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/SynchronizedMemoryReservationContext.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.planner.memory; + +import org.apache.iotdb.db.queryengine.common.QueryId; + +public class SynchronizedMemoryReservationContext extends UnsynchronizedMemoryReservationContext { + public SynchronizedMemoryReservationContext(QueryId queryId, String contextHolder) { + super(queryId, contextHolder); + } + + @Override + public synchronized void reserveMemoryAccumulatively(long size) { + super.reserveMemoryAccumulatively(size); + } + + @Override + public synchronized void reserveMemoryImmediately() { + super.reserveMemoryImmediately(); + } + + @Override + public synchronized void releaseMemoryAccumulatively(long size) { + super.releaseMemoryAccumulatively(size); + } + + @Override + public synchronized void releaseAllReservedMemory() { + super.releaseAllReservedMemory(); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/SynchronizedMemoryReservationContextWrapper.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/SynchronizedMemoryReservationContextWrapper.java new file mode 100644 index 00000000000..75256da1769 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/SynchronizedMemoryReservationContextWrapper.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.planner.memory; + +/** + * SynchronizedMemoryReservationContext synchronizes the memory reservation and release operations. + * However, synchronization is not necessary every time for the memory reservation and release + * operations in single thread context. For example, each SeriesScanUtil itself reserves and + * releases memory in a single thread context, but SeriesScanUtils in the same FI may share the same + * SynchronizedMemoryReservationContext. So we can use this wrapper to batch the reserve and release + * operations to reduce the overhead of synchronization. The actual reserve and release operations + * are delegated to the wrapped MemoryReservationContext. + */ +public class SynchronizedMemoryReservationContextWrapper implements MemoryReservationContext { + + private static final long MEMORY_BATCH_THRESHOLD = 1024L * 1024L; + private final MemoryReservationContext memoryReservationContext; + + private long bytesToBeReserved = 0; + + private long bytesToBeReleased = 0; + + public SynchronizedMemoryReservationContextWrapper( + MemoryReservationContext memoryReservationContext) { + this.memoryReservationContext = memoryReservationContext; + } + + @Override + public void reserveMemoryAccumulatively(long size) { + this.bytesToBeReserved += size; + if (this.bytesToBeReserved >= MEMORY_BATCH_THRESHOLD) { + memoryReservationContext.reserveMemoryAccumulatively(bytesToBeReserved); + this.bytesToBeReserved = 0; + } + } + + @Override + public void reserveMemoryImmediately() { + memoryReservationContext.reserveMemoryImmediately(); + } + + @Override + public void releaseMemoryAccumulatively(long size) { + this.bytesToBeReleased += size; + if (this.bytesToBeReleased >= MEMORY_BATCH_THRESHOLD) { + memoryReservationContext.releaseMemoryAccumulatively(this.bytesToBeReleased); + this.bytesToBeReleased = 0; + } + } + + @Override + public void releaseAllReservedMemory() { + memoryReservationContext.releaseAllReservedMemory(); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/UnsynchronizedMemoryReservationContext.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/UnsynchronizedMemoryReservationContext.java new file mode 100644 index 00000000000..4a67bb13f54 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/UnsynchronizedMemoryReservationContext.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.planner.memory; + +import org.apache.iotdb.db.queryengine.common.QueryId; +import org.apache.iotdb.db.queryengine.plan.planner.LocalExecutionPlanner; + +public class UnsynchronizedMemoryReservationContext implements MemoryReservationContext { + // To avoid reserving memory too frequently, we choose to do it in batches. This is the lower + // bound for each batch. + private static final long MEMORY_BATCH_THRESHOLD = 1024L * 1024L; + + private final LocalExecutionPlanner LOCAL_EXECUTION_PLANNER = LocalExecutionPlanner.getInstance(); + + private final QueryId queryId; + + private final String contextHolder; + + private long reservedBytesInTotal = 0; + + private long bytesToBeReserved = 0; + + private long bytesToBeReleased = 0; + + public UnsynchronizedMemoryReservationContext(final QueryId queryId, final String contextHolder) { + this.queryId = queryId; + this.contextHolder = contextHolder; + } + + @Override + public void reserveMemoryAccumulatively(final long size) { + this.bytesToBeReserved += size; + if (this.bytesToBeReserved >= MEMORY_BATCH_THRESHOLD) { + reserveMemoryImmediately(); + } + } + + @Override + public void reserveMemoryImmediately() { + if (bytesToBeReserved != 0) { + LOCAL_EXECUTION_PLANNER.reserveFromFreeMemoryForOperators( + bytesToBeReserved, reservedBytesInTotal, queryId.getId(), contextHolder); + this.reservedBytesInTotal += bytesToBeReserved; + this.bytesToBeReserved = 0; + } + } + + @Override + public void releaseMemoryAccumulatively(final long size) { + this.bytesToBeReleased += size; + if (bytesToBeReleased >= MEMORY_BATCH_THRESHOLD) { + long bytesToRelease; + if (this.bytesToBeReleased <= bytesToBeReserved) { + bytesToBeReserved -= this.bytesToBeReleased; + } else { + bytesToRelease = this.bytesToBeReleased - bytesToBeReserved; + bytesToBeReserved = 0; + LOCAL_EXECUTION_PLANNER.releaseToFreeMemoryForOperators(bytesToRelease); + reservedBytesInTotal -= bytesToRelease; + } + this.bytesToBeReleased = 0; + } + } + + @Override + public void releaseAllReservedMemory() { + if (reservedBytesInTotal != 0) { + LOCAL_EXECUTION_PLANNER.releaseToFreeMemoryForOperators(reservedBytesInTotal); + reservedBytesInTotal = 0; + bytesToBeReserved = 0; + bytesToBeReleased = 0; + } + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/common/PriorityMergeReader.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/common/PriorityMergeReader.java index 43ec7b9a8be..1b7f43b1d6c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/common/PriorityMergeReader.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/common/PriorityMergeReader.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.storageengine.dataregion.read.reader.common; import org.apache.iotdb.commons.utils.TestOnly; +import org.apache.iotdb.db.queryengine.plan.planner.memory.SynchronizedMemoryReservationContextWrapper; import org.apache.tsfile.read.TimeValuePair; import org.apache.tsfile.read.reader.IPointReader; @@ -40,6 +41,8 @@ public class PriorityMergeReader implements IPointReader { protected long usedMemorySize = 0; + protected SynchronizedMemoryReservationContextWrapper memoryReservationContextWrapper; + public PriorityMergeReader() { heap = new PriorityQueue<>( @@ -51,6 +54,11 @@ public class PriorityMergeReader implements IPointReader { }); } + public void setMemoryReservationContextWrapper( + SynchronizedMemoryReservationContextWrapper memoryReservationContextWrapper) { + this.memoryReservationContextWrapper = memoryReservationContextWrapper; + } + @TestOnly public void addReader(IPointReader reader, long priority) throws IOException { if (reader.hasNextTimeValuePair()) { @@ -67,7 +75,11 @@ public class PriorityMergeReader implements IPointReader { Element element = new Element(reader, reader.nextTimeValuePair(), priority); heap.add(element); currentReadStopTime = Math.max(currentReadStopTime, endTime); - usedMemorySize += element.getReader().getUsedMemorySize(); + long size = element.getReader().getUsedMemorySize(); + usedMemorySize += size; + if (memoryReservationContextWrapper != null) { + memoryReservationContextWrapper.reserveMemoryAccumulatively(size); + } } else { reader.close(); } @@ -96,7 +108,11 @@ public class PriorityMergeReader implements IPointReader { top.setTimeValuePair(topNext); heap.add(top); } else { - usedMemorySize -= top.getReader().getUsedMemorySize(); + long size = top.getReader().getUsedMemorySize(); + usedMemorySize -= size; + if (memoryReservationContextWrapper != null) { + memoryReservationContextWrapper.releaseMemoryAccumulatively(size); + } } return ret; } @@ -121,7 +137,11 @@ public class PriorityMergeReader implements IPointReader { Element e = heap.poll(); fillNullValue(ret, e.getTimeValuePair()); if (!e.hasNext()) { - usedMemorySize -= e.getReader().getUsedMemorySize(); + long size = e.getReader().getUsedMemorySize(); + usedMemorySize -= size; + if (memoryReservationContextWrapper != null) { + memoryReservationContextWrapper.releaseMemoryAccumulatively(size); + } e.getReader().close(); continue; } @@ -133,7 +153,11 @@ public class PriorityMergeReader implements IPointReader { e.next(); heap.add(e); } else { - usedMemorySize -= e.getReader().getUsedMemorySize(); + long size = e.getReader().getUsedMemorySize(); + usedMemorySize -= size; + if (memoryReservationContextWrapper != null) { + memoryReservationContextWrapper.releaseMemoryAccumulatively(size); + } // the chunk is end e.close(); }
