This is an automated email from the ASF dual-hosted git repository. Wei-hao-Li pushed a commit to branch showQ in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit f6f6a8c4a6373d30f7ee8a679e87c9105160748a Author: Weihao Li <[email protected]> AuthorDate: Mon Jun 29 12:34:35 2026 +0800 fix some Signed-off-by: Weihao Li <[email protected]> --- .../fragment/FragmentInstanceContext.java | 5 ++ .../plan/planner/LocalExecutionPlanner.java | 91 ++++++++++++++++------ .../NotThreadSafeMemoryReservationManager.java | 43 +++++++++- .../memory/ThreadSafeMemoryReservationManager.java | 5 ++ .../LocalExecutionPlannerOperatorsMemoryTest.java | 85 ++++++++++++++++++++ 5 files changed, 203 insertions(+), 26 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java index 81dd89387bf..4cf29a80dda 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java @@ -45,6 +45,7 @@ import org.apache.iotdb.db.queryengine.metric.DriverSchedulerMetricSet; import org.apache.iotdb.db.queryengine.metric.QueryRelatedResourceMetricSet; import org.apache.iotdb.db.queryengine.metric.QueryResourceMetricSet; import org.apache.iotdb.db.queryengine.metric.SeriesScanCostMetricSet; +import org.apache.iotdb.db.queryengine.plan.planner.memory.NotThreadSafeMemoryReservationManager; import org.apache.iotdb.db.queryengine.plan.planner.memory.ThreadSafeMemoryReservationManager; import org.apache.iotdb.db.queryengine.plan.planner.plan.TimePredicate; import org.apache.iotdb.db.queryengine.plan.relational.function.tvf.read_tsfile.ExternalTsFileQueryDataSource; @@ -1365,6 +1366,10 @@ public class FragmentInstanceContext extends QueryContext { public void setHighestPriority(boolean highestPriority) { this.highestPriority = highestPriority; + if (memoryReservationManager instanceof NotThreadSafeMemoryReservationManager) { + ((NotThreadSafeMemoryReservationManager) memoryReservationManager) + .setHighestPriority(highestPriority); + } } public boolean isSingleSourcePath() { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java index 95f28052c1f..470aa4257a7 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java @@ -27,6 +27,7 @@ import org.apache.iotdb.commons.memory.MemoryBlockType; import org.apache.iotdb.commons.path.IFullPath; import org.apache.iotdb.commons.queryengine.common.SqlDialect; import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNode; +import org.apache.iotdb.commons.utils.TestOnly; import org.apache.iotdb.db.conf.DataNodeMemoryConfig; import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; @@ -118,7 +119,7 @@ public class LocalExecutionPlanner { context.invalidateParentPlanNodeIdToMemoryEstimator(); // check whether current free memory is enough to execute current query - long estimatedMemorySize = checkMemory(memoryEstimator, instanceContext.getStateMachine()); + long estimatedMemorySize = checkMemory(memoryEstimator, instanceContext); context.addPipelineDriverFactory(root, context.getDriverContext(), estimatedMemorySize); @@ -157,7 +158,7 @@ public class LocalExecutionPlanner { context.invalidateParentPlanNodeIdToMemoryEstimator(); // check whether current free memory is enough to execute current query - checkMemory(memoryEstimator, instanceContext.getStateMachine()); + checkMemory(memoryEstimator, instanceContext); context.addPipelineDriverFactory(root, context.getDriverContext(), 0); @@ -193,7 +194,7 @@ public class LocalExecutionPlanner { } private long checkMemory( - final PipelineMemoryEstimator memoryEstimator, FragmentInstanceStateMachine stateMachine) + final PipelineMemoryEstimator memoryEstimator, FragmentInstanceContext instanceContext) throws MemoryNotEnoughException { // if it is disabled, just return @@ -206,14 +207,75 @@ public class LocalExecutionPlanner { QueryRelatedResourceMetricSet.getInstance().updateEstimatedMemory(estimatedMemorySize); - if (OPERATORS_MEMORY_BLOCK.allocate(estimatedMemorySize)) { + if (instanceContext.isHighestPriority()) { + return 0L; + } + + long reservedBytes = allocateOperatorsMemory(estimatedMemorySize); + if (reservedBytes < 0) { + throw new MemoryNotEnoughException( + String.format( + "There is not enough memory to execute current fragment instance, " + + "current remaining free memory is %dB, " + + "estimated memory usage for current fragment instance is %dB", + OPERATORS_MEMORY_BLOCK.getFreeMemoryInBytes(), estimatedMemorySize)); + } + FragmentInstanceStateMachine stateMachine = instanceContext.getStateMachine(); + if (reservedBytes > 0) { + stateMachine.addStateChangeListener( + newState -> { + if (newState.isDone()) { + try (SetThreadName fragmentInstanceName = + new SetThreadName(stateMachine.getFragmentInstanceId().getFullId())) { + OPERATORS_MEMORY_BLOCK.release(reservedBytes); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug( + "[ReleaseMemory] release: {}, current remaining memory: {}", + reservedBytes, + OPERATORS_MEMORY_BLOCK.getFreeMemoryInBytes()); + } + } + } + }); + } + return reservedBytes; + } + + /** + * Try to reserve bytes from the operators memory block. + * + * @return allocated bytes on success ({@code >= 0}), {@code -1} if allocation failed + */ + private long allocateOperatorsMemory(final long memoryInBytes) { + if (memoryInBytes <= 0) { + return 0L; + } + if (OPERATORS_MEMORY_BLOCK.allocate(memoryInBytes)) { if (LOGGER.isDebugEnabled()) { LOGGER.debug( "[ConsumeMemory] consume: {}, current remaining memory: {}", - estimatedMemorySize, + memoryInBytes, OPERATORS_MEMORY_BLOCK.getFreeMemoryInBytes()); } - } else { + return memoryInBytes; + } + return -1L; + } + + @TestOnly + long allocateOperatorsMemoryForTest(final long memoryInBytes) { + return allocateOperatorsMemory(memoryInBytes); + } + + @TestOnly + long reserveOperatorsMemoryForFragmentForTest( + final long estimatedMemorySize, final boolean isHighestPriority) + throws MemoryNotEnoughException { + if (isHighestPriority) { + return 0L; + } + long reservedBytes = allocateOperatorsMemory(estimatedMemorySize); + if (reservedBytes < 0) { throw new MemoryNotEnoughException( String.format( "There is not enough memory to execute current fragment instance, " @@ -221,22 +283,7 @@ public class LocalExecutionPlanner { + "estimated memory usage for current fragment instance is %dB", OPERATORS_MEMORY_BLOCK.getFreeMemoryInBytes(), estimatedMemorySize)); } - stateMachine.addStateChangeListener( - newState -> { - if (newState.isDone()) { - try (SetThreadName fragmentInstanceName = - new SetThreadName(stateMachine.getFragmentInstanceId().getFullId())) { - OPERATORS_MEMORY_BLOCK.release(estimatedMemorySize); - if (LOGGER.isDebugEnabled()) { - LOGGER.debug( - "[ReleaseMemory] release: {}, current remaining memory: {}", - estimatedMemorySize, - OPERATORS_MEMORY_BLOCK.getFreeMemoryInBytes()); - } - } - } - }); - return estimatedMemorySize; + return reservedBytes; } private QueryDataSourceType getQueryDataSourceType(DataDriverContext dataDriverContext) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/NotThreadSafeMemoryReservationManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/NotThreadSafeMemoryReservationManager.java index d156628532c..9f49c5c7786 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/NotThreadSafeMemoryReservationManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/NotThreadSafeMemoryReservationManager.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.queryengine.plan.planner.memory; import org.apache.iotdb.calc.plan.planner.memory.MemoryReservationManager; +import org.apache.iotdb.commons.utils.TestOnly; import org.apache.iotdb.db.queryengine.common.QueryId; import org.apache.iotdb.db.queryengine.plan.planner.LocalExecutionPlanner; @@ -39,6 +40,8 @@ public class NotThreadSafeMemoryReservationManager implements MemoryReservationM private final String contextHolder; + private boolean isHighestPriority; + private long reservedBytesInTotal = 0; private long bytesToBeReserved = 0; @@ -50,8 +53,24 @@ public class NotThreadSafeMemoryReservationManager implements MemoryReservationM this.contextHolder = contextHolder; } + public void setHighestPriority(boolean isHighestPriority) { + this.isHighestPriority = isHighestPriority; + } + + public boolean isHighestPriority() { + return isHighestPriority; + } + + @TestOnly + public long getReservedBytesInTotalForTest() { + return reservedBytesInTotal; + } + @Override public void reserveMemoryCumulatively(final long size) { + if (isHighestPriority) { + return; + } bytesToBeReserved += size; if (bytesToBeReserved >= MEMORY_BATCH_THRESHOLD) { reserveMemoryImmediately(); @@ -60,6 +79,9 @@ public class NotThreadSafeMemoryReservationManager implements MemoryReservationM @Override public void reserveMemoryImmediately() { + if (isHighestPriority) { + return; + } if (bytesToBeReserved != 0) { LOCAL_EXECUTION_PLANNER.reserveFromFreeMemoryForOperators( bytesToBeReserved, reservedBytesInTotal, queryId.getId(), contextHolder); @@ -70,15 +92,19 @@ public class NotThreadSafeMemoryReservationManager implements MemoryReservationM @Override public void reserveMemoryImmediately(final long size) { - if (size != 0) { - LOCAL_EXECUTION_PLANNER.reserveFromFreeMemoryForOperators( - size, reservedBytesInTotal, queryId.getId(), contextHolder); - reservedBytesInTotal += size; + if (isHighestPriority || size == 0) { + return; } + LOCAL_EXECUTION_PLANNER.reserveFromFreeMemoryForOperators( + size, reservedBytesInTotal, queryId.getId(), contextHolder); + reservedBytesInTotal += size; } @Override public void releaseMemoryCumulatively(final long size) { + if (isHighestPriority) { + return; + } bytesToBeReleased += size; if (bytesToBeReleased >= MEMORY_BATCH_THRESHOLD) { long bytesToRelease; @@ -96,6 +122,9 @@ public class NotThreadSafeMemoryReservationManager implements MemoryReservationM @Override public void releaseAllReservedMemory() { + if (isHighestPriority) { + return; + } if (reservedBytesInTotal != 0) { LOCAL_EXECUTION_PLANNER.releaseToFreeMemoryForOperators(reservedBytesInTotal); reservedBytesInTotal = 0; @@ -106,6 +135,9 @@ public class NotThreadSafeMemoryReservationManager implements MemoryReservationM @Override public Pair<Long, Long> releaseMemoryVirtually(final long size) { + if (isHighestPriority) { + return new Pair<>(size, 0L); + } if (bytesToBeReserved >= size) { bytesToBeReserved -= size; return new Pair<>(size, 0L); @@ -121,6 +153,9 @@ public class NotThreadSafeMemoryReservationManager implements MemoryReservationM @Override public void reserveMemoryVirtually( final long bytesToBeReserved, final long bytesAlreadyReserved) { + if (isHighestPriority) { + return; + } reservedBytesInTotal += bytesAlreadyReserved; reserveMemoryCumulatively(bytesToBeReserved); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/ThreadSafeMemoryReservationManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/ThreadSafeMemoryReservationManager.java index 2a544421f3f..0a1c6eee418 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/ThreadSafeMemoryReservationManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/ThreadSafeMemoryReservationManager.java @@ -66,4 +66,9 @@ public class ThreadSafeMemoryReservationManager extends NotThreadSafeMemoryReser final long bytesToBeReserved, final long bytesAlreadyReserved) { super.reserveMemoryVirtually(bytesToBeReserved, bytesAlreadyReserved); } + + @Override + public synchronized void setHighestPriority(boolean isHighestPriority) { + super.setHighestPriority(isHighestPriority); + } } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlannerOperatorsMemoryTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlannerOperatorsMemoryTest.java new file mode 100644 index 00000000000..4f255d642be --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlannerOperatorsMemoryTest.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.planner; + +import org.apache.iotdb.calc.exception.MemoryNotEnoughException; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Test; + +public class LocalExecutionPlannerOperatorsMemoryTest { + + private static final LocalExecutionPlanner PLANNER = LocalExecutionPlanner.getInstance(); + + private long bytesHeldByTest = 0L; + + @After + public void tearDown() { + if (bytesHeldByTest > 0) { + PLANNER.releaseToFreeMemoryForOperators(bytesHeldByTest); + bytesHeldByTest = 0L; + } + } + + @Test + public void testAllocateOperatorsMemoryFailsWhenInsufficient() { + long free = PLANNER.getFreeMemoryForOperators(); + Assert.assertEquals(-1L, PLANNER.allocateOperatorsMemoryForTest(free + 1024L)); + } + + @Test + public void testAllocateOperatorsMemorySucceedsWhenAvailable() { + long request = Math.min(1024L, PLANNER.getFreeMemoryForOperators()); + long reserved = PLANNER.allocateOperatorsMemoryForTest(request); + Assert.assertEquals(request, reserved); + bytesHeldByTest = reserved; + } + + @Test + public void testHighestPriorityBypassWithoutConsumingOperatorsPool() + throws MemoryNotEnoughException { + long freeBefore = PLANNER.getFreeMemoryForOperators(); + long request = freeBefore + 1024L; + + Assert.assertEquals(0L, PLANNER.reserveOperatorsMemoryForFragmentForTest(request, true)); + Assert.assertEquals(freeBefore, PLANNER.getFreeMemoryForOperators()); + } + + @Test + public void testHighestPriorityBypassWhenPoolInsufficient() throws MemoryNotEnoughException { + long free = PLANNER.getFreeMemoryForOperators(); + long request = free + 1024L; + + Assert.assertEquals(0L, PLANNER.reserveOperatorsMemoryForFragmentForTest(request, true)); + } + + @Test + public void testNormalPriorityThrowsWhenPoolInsufficient() { + long free = PLANNER.getFreeMemoryForOperators(); + long request = free + 1024L; + + try { + PLANNER.reserveOperatorsMemoryForFragmentForTest(request, false); + Assert.fail("Expect MemoryNotEnoughException"); + } catch (MemoryNotEnoughException ignore) { + } + } +}
