This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 59fe3fee515 Add unit byte for no enough query memory
59fe3fee515 is described below
commit 59fe3fee5157fac9b84de7c6a809cf76c2a62bfe
Author: Beyyes <[email protected]>
AuthorDate: Thu Jan 25 16:19:38 2024 +0800
Add unit byte for no enough query memory
---
.../IoTDBOrderByLimitOffsetAlignByDeviceIT.java | 1 +
.../execution/operator/process/TopKOperator.java | 42 +++++++++++-----------
.../execution/schedule/DriverScheduler.java | 2 +-
.../plan/planner/LocalExecutionPlanner.java | 4 +--
.../plan/planner/LogicalPlanBuilder.java | 6 ++--
.../AlignByTimeOrderByLimitOffsetTest.java | 5 ++-
6 files changed, 32 insertions(+), 28 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/db/it/alignbydevice/IoTDBOrderByLimitOffsetAlignByDeviceIT.java
b/integration-test/src/test/java/org/apache/iotdb/db/it/alignbydevice/IoTDBOrderByLimitOffsetAlignByDeviceIT.java
index 670d35c9ca0..e2de3d28ecb 100644
---
a/integration-test/src/test/java/org/apache/iotdb/db/it/alignbydevice/IoTDBOrderByLimitOffsetAlignByDeviceIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/db/it/alignbydevice/IoTDBOrderByLimitOffsetAlignByDeviceIT.java
@@ -98,6 +98,7 @@ public class IoTDBOrderByLimitOffsetAlignByDeviceIT {
@Test
public void fillTest() {
+ // linear fill can not use TopKNode
String[] expectedHeader = new String[] {"Time,Device,s1,s2"};
String[] retArray =
new String[] {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TopKOperator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TopKOperator.java
index 6ee32cea9c7..a389f38dab5 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TopKOperator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TopKOperator.java
@@ -53,11 +53,11 @@ import static
com.google.common.util.concurrent.Futures.successfulAsList;
public class TopKOperator implements ProcessOperator {
private final OperatorContext operatorContext;
- private final List<Operator> deviceOperators;
- private int deviceIndex;
+ private final List<Operator> childrenOperators;
+ private int childIndex;
// read step operators each invoking
- private int deviceBatchStep;
- private boolean[] canCallNext;
+ private final int childBatchStep;
+ private final boolean[] canCallNext;
private final List<TSDataType> dataTypes;
private final TsBlockBuilder tsBlockBuilder;
@@ -84,13 +84,13 @@ public class TopKOperator implements ProcessOperator {
public TopKOperator(
OperatorContext operatorContext,
- List<Operator> deviceOperators,
+ List<Operator> childrenOperators,
List<TSDataType> dataTypes,
Comparator<SortKey> comparator,
int topValue,
boolean childrenDataInOrder) {
this.operatorContext = operatorContext;
- this.deviceOperators = deviceOperators;
+ this.childrenOperators = childrenOperators;
this.dataTypes = dataTypes;
this.mergeSortHeap = new MergeSortHeap(topValue, comparator.reversed());
this.comparator = comparator;
@@ -100,11 +100,11 @@ public class TopKOperator implements ProcessOperator {
initResultTsBlock();
- deviceBatchStep =
+ childBatchStep =
OPERATOR_BATCH_UPPER_BOUND % topValue == 0
? OPERATOR_BATCH_UPPER_BOUND / topValue
: OPERATOR_BATCH_UPPER_BOUND / topValue + 1;
- canCallNext = new boolean[deviceOperators.size()];
+ canCallNext = new boolean[childrenOperators.size()];
}
@Override
@@ -116,8 +116,8 @@ public class TopKOperator implements ProcessOperator {
public ListenableFuture<?> isBlocked() {
boolean hasReadyChild = false;
List<ListenableFuture<?>> listenableFutures = new ArrayList<>();
- for (int i = deviceIndex;
- i < Math.min(deviceIndex + deviceBatchStep, deviceOperators.size());
+ for (int i = childIndex;
+ i < Math.min(childIndex + childBatchStep, childrenOperators.size());
i++) {
if (getOperator(i) == null) {
continue;
@@ -142,7 +142,7 @@ public class TopKOperator implements ProcessOperator {
@Override
public boolean hasNext() throws Exception {
- if (deviceIndex >= deviceOperators.size()) {
+ if (childIndex >= childrenOperators.size()) {
if (topKResult == null) {
return false;
}
@@ -154,7 +154,7 @@ public class TopKOperator implements ProcessOperator {
@Override
public TsBlock next() throws Exception {
- if (deviceIndex >= deviceOperators.size() && resultReturnSize <
topKResult.length) {
+ if (childIndex >= childrenOperators.size() && resultReturnSize <
topKResult.length) {
return getResultFromCachedTopKResult();
}
@@ -162,8 +162,8 @@ public class TopKOperator implements ProcessOperator {
long maxRuntime =
operatorContext.getMaxRunTime().roundTo(TimeUnit.NANOSECONDS);
boolean batchFinished = true;
- int operatorBatchEnd = Math.min(deviceIndex + deviceBatchStep,
deviceOperators.size());
- for (int i = deviceIndex; i < operatorBatchEnd; i++) {
+ int operatorBatchEnd = Math.min(childIndex + childBatchStep,
childrenOperators.size());
+ for (int i = childIndex; i < operatorBatchEnd; i++) {
if (getOperator(i) == null) {
continue;
}
@@ -212,8 +212,8 @@ public class TopKOperator implements ProcessOperator {
}
if (batchFinished) {
- deviceIndex = deviceIndex + deviceBatchStep;
- if (deviceIndex >= deviceOperators.size()) {
+ childIndex = childIndex + childBatchStep;
+ if (childIndex >= childrenOperators.size()) {
return getResultFromCachedTopKResult();
}
}
@@ -223,8 +223,8 @@ public class TopKOperator implements ProcessOperator {
@Override
public void close() throws Exception {
- for (int i = deviceIndex; i < deviceOperators.size(); i++) {
- final Operator operator = deviceOperators.get(i);
+ for (int i = childIndex; i < childrenOperators.size(); i++) {
+ final Operator operator = childrenOperators.get(i);
if (operator != null) {
operator.close();
}
@@ -236,7 +236,7 @@ public class TopKOperator implements ProcessOperator {
// traverse each child serial,
// so no need to accumulate the returnSize and retainedSize of each child
long maxPeekMemory = calculateMaxReturnSize();
- for (Operator operator : deviceOperators) {
+ for (Operator operator : childrenOperators) {
maxPeekMemory = Math.max(maxPeekMemory,
operator.calculateMaxPeekMemory());
}
return Math.max(maxPeekMemory, topValue *
getMemoryUsageOfOneMergeSortKey() * 2);
@@ -379,11 +379,11 @@ public class TopKOperator implements ProcessOperator {
}
private Operator getOperator(int i) {
- return deviceOperators.get(i);
+ return childrenOperators.get(i);
}
private void closeOperator(int i) throws Exception {
getOperator(i).close();
- deviceOperators.set(i, null);
+ childrenOperators.set(i, null);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/DriverScheduler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/DriverScheduler.java
index 572aeeb9a78..e8ddb8161d5 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/DriverScheduler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/DriverScheduler.java
@@ -257,7 +257,7 @@ public class DriverScheduler implements IDriverScheduler,
IService {
.getThrottleQuotaLimit()
.checkMemory(sessionInfo.getUserName(), estimatedMemory.get())) {
throw new MemoryNotEnoughException(
- "There is not enough memory to execute current fragment instance");
+ "There is no enough memory to execute current fragment instance");
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java
index 3aef9be159e..eff85a4b197 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java
@@ -132,8 +132,8 @@ public class LocalExecutionPlanner {
throw new MemoryNotEnoughException(
String.format(
"There is not enough memory to execute current fragment
instance, "
- + "current remaining free memory is %d, "
- + "estimated memory usage for current fragment instance is
%d",
+ + "current remaining free memory is %dB, "
+ + "estimated memory usage for current fragment instance is
%dB",
freeMemoryForOperators, estimatedMemorySize));
} else {
freeMemoryForOperators -= estimatedMemorySize;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java
index 15fb330bab2..865d73b81af 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java
@@ -771,7 +771,7 @@ public class LogicalPlanBuilder {
orderByParameter,
outputColumnNames);
- // if value filter exists, need add a LIMIT-NODE as the child node of
TopKNode
+ // if value filter exists, need add a LimitNode as the child node of
TopKNode
long valueFilterLimit = queryStatement.hasWhere() ? limitValue : -1;
// only order by based on time, use TopKNode + SingleDeviceViewNode
@@ -796,7 +796,7 @@ public class LogicalPlanBuilder {
analysis.setUseTopKNode();
this.root = topKNode;
} else if (canUseMergeSortNode(queryStatement,
deviceNameToSourceNodesMap.size())) {
- // otherwise use MergeSortNode + SingleDeviceViewNode
+ // use MergeSortNode + SingleDeviceViewNode
MergeSortNode mergeSortNode =
new MergeSortNode(
context.getQueryId().genPlanNodeId(), orderByParameter,
outputColumnNames);
@@ -855,7 +855,7 @@ public class LogicalPlanBuilder {
private boolean canUseMergeSortNode(QueryStatement queryStatement, int
deviceSize) {
// 1. `order by based on time` + `no order by expression`.
// 2. deviceSize is larger than 1.
- // when satisfy all above cases use MergeSortNode.
+ // when satisfy all above cases use MergeSortNode + SingleDeviceViewNode.
return queryStatement.isOrderByBasedOnTime()
&& !queryStatement.hasOrderByExpression()
&& deviceSize > 1;
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/AlignByTimeOrderByLimitOffsetTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/AlignByTimeOrderByLimitOffsetTest.java
index e1a77a2d8ce..1cc7e9bbf6b 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/AlignByTimeOrderByLimitOffsetTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/AlignByTimeOrderByLimitOffsetTest.java
@@ -54,7 +54,6 @@ public class AlignByTimeOrderByLimitOffsetTest {
DistributedQueryPlan plan;
PlanNode firstFiRoot;
PlanNode firstFiTopNode;
- PlanNode mergeSortNode;
/*
* IdentitySinkNode-63
@@ -154,6 +153,7 @@ public class AlignByTimeOrderByLimitOffsetTest {
@Test
public void orderByExpressionTest2() {
// select s1 order by s2 + limit N
+ // use TopKNode to replace SortNode + LimitNode
sql =
String.format(
"select s1 from root.sg.d1 ORDER BY root.sg.d22.s2 DESC LIMIT %s",
LIMIT_VALUE);
@@ -170,6 +170,7 @@ public class AlignByTimeOrderByLimitOffsetTest {
firstFiTopNode.getChildren().get(0).getChildren().get(0) instanceof
FullOuterTimeJoinNode);
// select s1 order by s2 + offset M + limit N
+ // use TopKNode to replace SortNode
sql =
String.format(
"select s1 from root.sg.d1 ORDER BY root.sg.d22.s2 DESC OFFSET 5
LIMIT %s",
@@ -257,6 +258,7 @@ public class AlignByTimeOrderByLimitOffsetTest {
*/
@Test
public void orderByFillTest() {
+ // previous and constant fill can use TopKNode
sql =
String.format(
"select * from root.sg.d1,root.sg.d22,root.sg.d333 ORDER BY
root.sg.d1.s1 DESC fill(previous) LIMIT %s",
@@ -290,6 +292,7 @@ public class AlignByTimeOrderByLimitOffsetTest {
* ├──ExchangeNode-58:
[SourceAddress:192.0.2.1/test.7.0/61]
* └──ExchangeNode-59:
[SourceAddress:192.0.4.1/test.8.0/62]
*/
+ // linear fill can not use TopKNode
sql =
String.format(
"select * from root.sg.d1,root.sg.d22,root.sg.d333 ORDER BY
root.sg.d1.s1 DESC fill(linear) LIMIT %s",