This is an automated email from the ASF dual-hosted git repository. caogaofei pushed a commit to branch beyyes/add_unit_for_query_memory in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit ac4dfdd92a24b15b90360f4f5efb73db51055586 Author: Beyyes <[email protected]> AuthorDate: Thu Jan 25 11:02:00 2024 +0800 add unit for no enough memory, perfect comments --- .../IoTDBOrderByLimitOffsetAlignByDeviceIT.java | 1 + .../execution/operator/process/TopKOperator.java | 42 +++++++++++----------- .../execution/schedule/DriverScheduler.java | 2 +- .../plan/planner/LocalExecutionPlanner.java | 4 +-- .../plan/planner/LogicalPlanBuilder.java | 6 ++-- .../AlignByTimeOrderByLimitOffsetTest.java | 5 ++- 6 files changed, 32 insertions(+), 28 deletions(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/alignbydevice/IoTDBOrderByLimitOffsetAlignByDeviceIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/alignbydevice/IoTDBOrderByLimitOffsetAlignByDeviceIT.java index 670d35c9ca0..e2de3d28ecb 100644 --- a/integration-test/src/test/java/org/apache/iotdb/db/it/alignbydevice/IoTDBOrderByLimitOffsetAlignByDeviceIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/db/it/alignbydevice/IoTDBOrderByLimitOffsetAlignByDeviceIT.java @@ -98,6 +98,7 @@ public class IoTDBOrderByLimitOffsetAlignByDeviceIT { @Test public void fillTest() { + // linear fill can not use TopKNode String[] expectedHeader = new String[] {"Time,Device,s1,s2"}; String[] retArray = new String[] { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TopKOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TopKOperator.java index 6ee32cea9c7..a389f38dab5 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TopKOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TopKOperator.java @@ -53,11 +53,11 @@ import static com.google.common.util.concurrent.Futures.successfulAsList; public class TopKOperator implements ProcessOperator { private final OperatorContext operatorContext; - private final List<Operator> deviceOperators; - private int deviceIndex; + private final List<Operator> childrenOperators; + private int childIndex; // read step operators each invoking - private int deviceBatchStep; - private boolean[] canCallNext; + private final int childBatchStep; + private final boolean[] canCallNext; private final List<TSDataType> dataTypes; private final TsBlockBuilder tsBlockBuilder; @@ -84,13 +84,13 @@ public class TopKOperator implements ProcessOperator { public TopKOperator( OperatorContext operatorContext, - List<Operator> deviceOperators, + List<Operator> childrenOperators, List<TSDataType> dataTypes, Comparator<SortKey> comparator, int topValue, boolean childrenDataInOrder) { this.operatorContext = operatorContext; - this.deviceOperators = deviceOperators; + this.childrenOperators = childrenOperators; this.dataTypes = dataTypes; this.mergeSortHeap = new MergeSortHeap(topValue, comparator.reversed()); this.comparator = comparator; @@ -100,11 +100,11 @@ public class TopKOperator implements ProcessOperator { initResultTsBlock(); - deviceBatchStep = + childBatchStep = OPERATOR_BATCH_UPPER_BOUND % topValue == 0 ? OPERATOR_BATCH_UPPER_BOUND / topValue : OPERATOR_BATCH_UPPER_BOUND / topValue + 1; - canCallNext = new boolean[deviceOperators.size()]; + canCallNext = new boolean[childrenOperators.size()]; } @Override @@ -116,8 +116,8 @@ public class TopKOperator implements ProcessOperator { public ListenableFuture<?> isBlocked() { boolean hasReadyChild = false; List<ListenableFuture<?>> listenableFutures = new ArrayList<>(); - for (int i = deviceIndex; - i < Math.min(deviceIndex + deviceBatchStep, deviceOperators.size()); + for (int i = childIndex; + i < Math.min(childIndex + childBatchStep, childrenOperators.size()); i++) { if (getOperator(i) == null) { continue; @@ -142,7 +142,7 @@ public class TopKOperator implements ProcessOperator { @Override public boolean hasNext() throws Exception { - if (deviceIndex >= deviceOperators.size()) { + if (childIndex >= childrenOperators.size()) { if (topKResult == null) { return false; } @@ -154,7 +154,7 @@ public class TopKOperator implements ProcessOperator { @Override public TsBlock next() throws Exception { - if (deviceIndex >= deviceOperators.size() && resultReturnSize < topKResult.length) { + if (childIndex >= childrenOperators.size() && resultReturnSize < topKResult.length) { return getResultFromCachedTopKResult(); } @@ -162,8 +162,8 @@ public class TopKOperator implements ProcessOperator { long maxRuntime = operatorContext.getMaxRunTime().roundTo(TimeUnit.NANOSECONDS); boolean batchFinished = true; - int operatorBatchEnd = Math.min(deviceIndex + deviceBatchStep, deviceOperators.size()); - for (int i = deviceIndex; i < operatorBatchEnd; i++) { + int operatorBatchEnd = Math.min(childIndex + childBatchStep, childrenOperators.size()); + for (int i = childIndex; i < operatorBatchEnd; i++) { if (getOperator(i) == null) { continue; } @@ -212,8 +212,8 @@ public class TopKOperator implements ProcessOperator { } if (batchFinished) { - deviceIndex = deviceIndex + deviceBatchStep; - if (deviceIndex >= deviceOperators.size()) { + childIndex = childIndex + childBatchStep; + if (childIndex >= childrenOperators.size()) { return getResultFromCachedTopKResult(); } } @@ -223,8 +223,8 @@ public class TopKOperator implements ProcessOperator { @Override public void close() throws Exception { - for (int i = deviceIndex; i < deviceOperators.size(); i++) { - final Operator operator = deviceOperators.get(i); + for (int i = childIndex; i < childrenOperators.size(); i++) { + final Operator operator = childrenOperators.get(i); if (operator != null) { operator.close(); } @@ -236,7 +236,7 @@ public class TopKOperator implements ProcessOperator { // traverse each child serial, // so no need to accumulate the returnSize and retainedSize of each child long maxPeekMemory = calculateMaxReturnSize(); - for (Operator operator : deviceOperators) { + for (Operator operator : childrenOperators) { maxPeekMemory = Math.max(maxPeekMemory, operator.calculateMaxPeekMemory()); } return Math.max(maxPeekMemory, topValue * getMemoryUsageOfOneMergeSortKey() * 2); @@ -379,11 +379,11 @@ public class TopKOperator implements ProcessOperator { } private Operator getOperator(int i) { - return deviceOperators.get(i); + return childrenOperators.get(i); } private void closeOperator(int i) throws Exception { getOperator(i).close(); - deviceOperators.set(i, null); + childrenOperators.set(i, null); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/DriverScheduler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/DriverScheduler.java index 572aeeb9a78..e8ddb8161d5 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/DriverScheduler.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/DriverScheduler.java @@ -257,7 +257,7 @@ public class DriverScheduler implements IDriverScheduler, IService { .getThrottleQuotaLimit() .checkMemory(sessionInfo.getUserName(), estimatedMemory.get())) { throw new MemoryNotEnoughException( - "There is not enough memory to execute current fragment instance"); + "There is no enough memory to execute current fragment instance"); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java index 3aef9be159e..eff85a4b197 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java @@ -132,8 +132,8 @@ public class LocalExecutionPlanner { throw new MemoryNotEnoughException( String.format( "There is not enough memory to execute current fragment instance, " - + "current remaining free memory is %d, " - + "estimated memory usage for current fragment instance is %d", + + "current remaining free memory is %dB, " + + "estimated memory usage for current fragment instance is %dB", freeMemoryForOperators, estimatedMemorySize)); } else { freeMemoryForOperators -= estimatedMemorySize; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java index 15fb330bab2..865d73b81af 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java @@ -771,7 +771,7 @@ public class LogicalPlanBuilder { orderByParameter, outputColumnNames); - // if value filter exists, need add a LIMIT-NODE as the child node of TopKNode + // if value filter exists, need add a LimitNode as the child node of TopKNode long valueFilterLimit = queryStatement.hasWhere() ? limitValue : -1; // only order by based on time, use TopKNode + SingleDeviceViewNode @@ -796,7 +796,7 @@ public class LogicalPlanBuilder { analysis.setUseTopKNode(); this.root = topKNode; } else if (canUseMergeSortNode(queryStatement, deviceNameToSourceNodesMap.size())) { - // otherwise use MergeSortNode + SingleDeviceViewNode + // use MergeSortNode + SingleDeviceViewNode MergeSortNode mergeSortNode = new MergeSortNode( context.getQueryId().genPlanNodeId(), orderByParameter, outputColumnNames); @@ -855,7 +855,7 @@ public class LogicalPlanBuilder { private boolean canUseMergeSortNode(QueryStatement queryStatement, int deviceSize) { // 1. `order by based on time` + `no order by expression`. // 2. deviceSize is larger than 1. - // when satisfy all above cases use MergeSortNode. + // when satisfy all above cases use MergeSortNode + SingleDeviceViewNode. return queryStatement.isOrderByBasedOnTime() && !queryStatement.hasOrderByExpression() && deviceSize > 1; diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/AlignByTimeOrderByLimitOffsetTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/AlignByTimeOrderByLimitOffsetTest.java index e1a77a2d8ce..1cc7e9bbf6b 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/AlignByTimeOrderByLimitOffsetTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/AlignByTimeOrderByLimitOffsetTest.java @@ -54,7 +54,6 @@ public class AlignByTimeOrderByLimitOffsetTest { DistributedQueryPlan plan; PlanNode firstFiRoot; PlanNode firstFiTopNode; - PlanNode mergeSortNode; /* * IdentitySinkNode-63 @@ -154,6 +153,7 @@ public class AlignByTimeOrderByLimitOffsetTest { @Test public void orderByExpressionTest2() { // select s1 order by s2 + limit N + // use TopKNode to replace SortNode + LimitNode sql = String.format( "select s1 from root.sg.d1 ORDER BY root.sg.d22.s2 DESC LIMIT %s", LIMIT_VALUE); @@ -170,6 +170,7 @@ public class AlignByTimeOrderByLimitOffsetTest { firstFiTopNode.getChildren().get(0).getChildren().get(0) instanceof FullOuterTimeJoinNode); // select s1 order by s2 + offset M + limit N + // use TopKNode to replace SortNode sql = String.format( "select s1 from root.sg.d1 ORDER BY root.sg.d22.s2 DESC OFFSET 5 LIMIT %s", @@ -257,6 +258,7 @@ public class AlignByTimeOrderByLimitOffsetTest { */ @Test public void orderByFillTest() { + // previous and constant fill can use TopKNode sql = String.format( "select * from root.sg.d1,root.sg.d22,root.sg.d333 ORDER BY root.sg.d1.s1 DESC fill(previous) LIMIT %s", @@ -290,6 +292,7 @@ public class AlignByTimeOrderByLimitOffsetTest { * ├──ExchangeNode-58: [SourceAddress:192.0.2.1/test.7.0/61] * └──ExchangeNode-59: [SourceAddress:192.0.4.1/test.8.0/62] */ + // linear fill can not use TopKNode sql = String.format( "select * from root.sg.d1,root.sg.d22,root.sg.d333 ORDER BY root.sg.d1.s1 DESC fill(linear) LIMIT %s",
