This is an automated email from the ASF dual-hosted git repository.

caogaofei pushed a commit to branch beyyes/add_unit_for_query_memory
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit ac4dfdd92a24b15b90360f4f5efb73db51055586
Author: Beyyes <[email protected]>
AuthorDate: Thu Jan 25 11:02:00 2024 +0800

    add unit for no enough memory, perfect comments
---
 .../IoTDBOrderByLimitOffsetAlignByDeviceIT.java    |  1 +
 .../execution/operator/process/TopKOperator.java   | 42 +++++++++++-----------
 .../execution/schedule/DriverScheduler.java        |  2 +-
 .../plan/planner/LocalExecutionPlanner.java        |  4 +--
 .../plan/planner/LogicalPlanBuilder.java           |  6 ++--
 .../AlignByTimeOrderByLimitOffsetTest.java         |  5 ++-
 6 files changed, 32 insertions(+), 28 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/db/it/alignbydevice/IoTDBOrderByLimitOffsetAlignByDeviceIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/db/it/alignbydevice/IoTDBOrderByLimitOffsetAlignByDeviceIT.java
index 670d35c9ca0..e2de3d28ecb 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/db/it/alignbydevice/IoTDBOrderByLimitOffsetAlignByDeviceIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/db/it/alignbydevice/IoTDBOrderByLimitOffsetAlignByDeviceIT.java
@@ -98,6 +98,7 @@ public class IoTDBOrderByLimitOffsetAlignByDeviceIT {
 
   @Test
   public void fillTest() {
+    // linear fill can not use TopKNode
     String[] expectedHeader = new String[] {"Time,Device,s1,s2"};
     String[] retArray =
         new String[] {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TopKOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TopKOperator.java
index 6ee32cea9c7..a389f38dab5 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TopKOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TopKOperator.java
@@ -53,11 +53,11 @@ import static 
com.google.common.util.concurrent.Futures.successfulAsList;
 public class TopKOperator implements ProcessOperator {
   private final OperatorContext operatorContext;
 
-  private final List<Operator> deviceOperators;
-  private int deviceIndex;
+  private final List<Operator> childrenOperators;
+  private int childIndex;
   // read step operators each invoking
-  private int deviceBatchStep;
-  private boolean[] canCallNext;
+  private final int childBatchStep;
+  private final boolean[] canCallNext;
 
   private final List<TSDataType> dataTypes;
   private final TsBlockBuilder tsBlockBuilder;
@@ -84,13 +84,13 @@ public class TopKOperator implements ProcessOperator {
 
   public TopKOperator(
       OperatorContext operatorContext,
-      List<Operator> deviceOperators,
+      List<Operator> childrenOperators,
       List<TSDataType> dataTypes,
       Comparator<SortKey> comparator,
       int topValue,
       boolean childrenDataInOrder) {
     this.operatorContext = operatorContext;
-    this.deviceOperators = deviceOperators;
+    this.childrenOperators = childrenOperators;
     this.dataTypes = dataTypes;
     this.mergeSortHeap = new MergeSortHeap(topValue, comparator.reversed());
     this.comparator = comparator;
@@ -100,11 +100,11 @@ public class TopKOperator implements ProcessOperator {
 
     initResultTsBlock();
 
-    deviceBatchStep =
+    childBatchStep =
         OPERATOR_BATCH_UPPER_BOUND % topValue == 0
             ? OPERATOR_BATCH_UPPER_BOUND / topValue
             : OPERATOR_BATCH_UPPER_BOUND / topValue + 1;
-    canCallNext = new boolean[deviceOperators.size()];
+    canCallNext = new boolean[childrenOperators.size()];
   }
 
   @Override
@@ -116,8 +116,8 @@ public class TopKOperator implements ProcessOperator {
   public ListenableFuture<?> isBlocked() {
     boolean hasReadyChild = false;
     List<ListenableFuture<?>> listenableFutures = new ArrayList<>();
-    for (int i = deviceIndex;
-        i < Math.min(deviceIndex + deviceBatchStep, deviceOperators.size());
+    for (int i = childIndex;
+        i < Math.min(childIndex + childBatchStep, childrenOperators.size());
         i++) {
       if (getOperator(i) == null) {
         continue;
@@ -142,7 +142,7 @@ public class TopKOperator implements ProcessOperator {
 
   @Override
   public boolean hasNext() throws Exception {
-    if (deviceIndex >= deviceOperators.size()) {
+    if (childIndex >= childrenOperators.size()) {
       if (topKResult == null) {
         return false;
       }
@@ -154,7 +154,7 @@ public class TopKOperator implements ProcessOperator {
 
   @Override
   public TsBlock next() throws Exception {
-    if (deviceIndex >= deviceOperators.size() && resultReturnSize < 
topKResult.length) {
+    if (childIndex >= childrenOperators.size() && resultReturnSize < 
topKResult.length) {
       return getResultFromCachedTopKResult();
     }
 
@@ -162,8 +162,8 @@ public class TopKOperator implements ProcessOperator {
     long maxRuntime = 
operatorContext.getMaxRunTime().roundTo(TimeUnit.NANOSECONDS);
 
     boolean batchFinished = true;
-    int operatorBatchEnd = Math.min(deviceIndex + deviceBatchStep, 
deviceOperators.size());
-    for (int i = deviceIndex; i < operatorBatchEnd; i++) {
+    int operatorBatchEnd = Math.min(childIndex + childBatchStep, 
childrenOperators.size());
+    for (int i = childIndex; i < operatorBatchEnd; i++) {
       if (getOperator(i) == null) {
         continue;
       }
@@ -212,8 +212,8 @@ public class TopKOperator implements ProcessOperator {
     }
 
     if (batchFinished) {
-      deviceIndex = deviceIndex + deviceBatchStep;
-      if (deviceIndex >= deviceOperators.size()) {
+      childIndex = childIndex + childBatchStep;
+      if (childIndex >= childrenOperators.size()) {
         return getResultFromCachedTopKResult();
       }
     }
@@ -223,8 +223,8 @@ public class TopKOperator implements ProcessOperator {
 
   @Override
   public void close() throws Exception {
-    for (int i = deviceIndex; i < deviceOperators.size(); i++) {
-      final Operator operator = deviceOperators.get(i);
+    for (int i = childIndex; i < childrenOperators.size(); i++) {
+      final Operator operator = childrenOperators.get(i);
       if (operator != null) {
         operator.close();
       }
@@ -236,7 +236,7 @@ public class TopKOperator implements ProcessOperator {
     // traverse each child serial,
     // so no need to accumulate the returnSize and retainedSize of each child
     long maxPeekMemory = calculateMaxReturnSize();
-    for (Operator operator : deviceOperators) {
+    for (Operator operator : childrenOperators) {
       maxPeekMemory = Math.max(maxPeekMemory, 
operator.calculateMaxPeekMemory());
     }
     return Math.max(maxPeekMemory, topValue * 
getMemoryUsageOfOneMergeSortKey() * 2);
@@ -379,11 +379,11 @@ public class TopKOperator implements ProcessOperator {
   }
 
   private Operator getOperator(int i) {
-    return deviceOperators.get(i);
+    return childrenOperators.get(i);
   }
 
   private void closeOperator(int i) throws Exception {
     getOperator(i).close();
-    deviceOperators.set(i, null);
+    childrenOperators.set(i, null);
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/DriverScheduler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/DriverScheduler.java
index 572aeeb9a78..e8ddb8161d5 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/DriverScheduler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/DriverScheduler.java
@@ -257,7 +257,7 @@ public class DriverScheduler implements IDriverScheduler, 
IService {
           .getThrottleQuotaLimit()
           .checkMemory(sessionInfo.getUserName(), estimatedMemory.get())) {
         throw new MemoryNotEnoughException(
-            "There is not enough memory to execute current fragment instance");
+            "There is no enough memory to execute current fragment instance");
       }
     }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java
index 3aef9be159e..eff85a4b197 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java
@@ -132,8 +132,8 @@ public class LocalExecutionPlanner {
         throw new MemoryNotEnoughException(
             String.format(
                 "There is not enough memory to execute current fragment 
instance, "
-                    + "current remaining free memory is %d, "
-                    + "estimated memory usage for current fragment instance is 
%d",
+                    + "current remaining free memory is %dB, "
+                    + "estimated memory usage for current fragment instance is 
%dB",
                 freeMemoryForOperators, estimatedMemorySize));
       } else {
         freeMemoryForOperators -= estimatedMemorySize;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java
index 15fb330bab2..865d73b81af 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java
@@ -771,7 +771,7 @@ public class LogicalPlanBuilder {
               orderByParameter,
               outputColumnNames);
 
-      // if value filter exists, need add a LIMIT-NODE as the child node of 
TopKNode
+      // if value filter exists, need add a LimitNode as the child node of 
TopKNode
       long valueFilterLimit = queryStatement.hasWhere() ? limitValue : -1;
 
       // only order by based on time, use TopKNode + SingleDeviceViewNode
@@ -796,7 +796,7 @@ public class LogicalPlanBuilder {
       analysis.setUseTopKNode();
       this.root = topKNode;
     } else if (canUseMergeSortNode(queryStatement, 
deviceNameToSourceNodesMap.size())) {
-      // otherwise use MergeSortNode + SingleDeviceViewNode
+      // use MergeSortNode + SingleDeviceViewNode
       MergeSortNode mergeSortNode =
           new MergeSortNode(
               context.getQueryId().genPlanNodeId(), orderByParameter, 
outputColumnNames);
@@ -855,7 +855,7 @@ public class LogicalPlanBuilder {
   private boolean canUseMergeSortNode(QueryStatement queryStatement, int 
deviceSize) {
     // 1. `order by based on time` + `no order by expression`.
     // 2. deviceSize is larger than 1.
-    // when satisfy all above cases use MergeSortNode.
+    // when satisfy all above cases use MergeSortNode + SingleDeviceViewNode.
     return queryStatement.isOrderByBasedOnTime()
         && !queryStatement.hasOrderByExpression()
         && deviceSize > 1;
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/AlignByTimeOrderByLimitOffsetTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/AlignByTimeOrderByLimitOffsetTest.java
index e1a77a2d8ce..1cc7e9bbf6b 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/AlignByTimeOrderByLimitOffsetTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/AlignByTimeOrderByLimitOffsetTest.java
@@ -54,7 +54,6 @@ public class AlignByTimeOrderByLimitOffsetTest {
   DistributedQueryPlan plan;
   PlanNode firstFiRoot;
   PlanNode firstFiTopNode;
-  PlanNode mergeSortNode;
 
   /*
    * IdentitySinkNode-63
@@ -154,6 +153,7 @@ public class AlignByTimeOrderByLimitOffsetTest {
   @Test
   public void orderByExpressionTest2() {
     // select s1 order by s2 + limit N
+    // use TopKNode to replace SortNode + LimitNode
     sql =
         String.format(
             "select s1 from root.sg.d1 ORDER BY root.sg.d22.s2 DESC LIMIT %s", 
LIMIT_VALUE);
@@ -170,6 +170,7 @@ public class AlignByTimeOrderByLimitOffsetTest {
         firstFiTopNode.getChildren().get(0).getChildren().get(0) instanceof 
FullOuterTimeJoinNode);
 
     // select s1 order by s2 + offset M + limit N
+    // use TopKNode to replace SortNode
     sql =
         String.format(
             "select s1 from root.sg.d1 ORDER BY root.sg.d22.s2 DESC OFFSET 5 
LIMIT %s",
@@ -257,6 +258,7 @@ public class AlignByTimeOrderByLimitOffsetTest {
    */
   @Test
   public void orderByFillTest() {
+    // previous and constant fill can use TopKNode
     sql =
         String.format(
             "select * from root.sg.d1,root.sg.d22,root.sg.d333 ORDER BY 
root.sg.d1.s1 DESC fill(previous) LIMIT %s",
@@ -290,6 +292,7 @@ public class AlignByTimeOrderByLimitOffsetTest {
      *                   ├──ExchangeNode-58: 
[SourceAddress:192.0.2.1/test.7.0/61]
      *                   └──ExchangeNode-59: 
[SourceAddress:192.0.4.1/test.8.0/62]
      */
+    // linear fill can not use TopKNode
     sql =
         String.format(
             "select * from root.sg.d1,root.sg.d22,root.sg.d333 ORDER BY 
root.sg.d1.s1 DESC fill(linear) LIMIT %s",

Reply via email to