This is an automated email from the ASF dual-hosted git repository.

caogaofei pushed a commit to branch beyyes/align_by_time_topk
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit c561d8d5dcdaad6b7600fd0797ba5254d6d6f8b2
Author: Beyyes <[email protected]>
AuthorDate: Fri Jan 12 14:37:46 2024 +0800

    perfect docs
---
 .../IoTDBOrderByLimitOffsetAlignByDeviceIT.java    |  3 +-
 .../execution/operator/process/TopKOperator.java   | 42 +++++++++++-----------
 .../plan/planner/LogicalPlanBuilder.java           |  9 +++--
 .../AlignByTimeOrderByLimitOffsetTest.java         |  5 ++-
 4 files changed, 31 insertions(+), 28 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/db/it/alignbydevice/IoTDBOrderByLimitOffsetAlignByDeviceIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/db/it/alignbydevice/IoTDBOrderByLimitOffsetAlignByDeviceIT.java
index 670d35c9ca0..4ff1e734506 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/db/it/alignbydevice/IoTDBOrderByLimitOffsetAlignByDeviceIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/db/it/alignbydevice/IoTDBOrderByLimitOffsetAlignByDeviceIT.java
@@ -87,7 +87,7 @@ public class IoTDBOrderByLimitOffsetAlignByDeviceIT {
 
   @Test
   public void aggregationWithHavingTest() {
-    // when aggregation with having, can only use MergeSortNode but not use 
TopKNode
+    // aggregation with having can not use TopKNode
     String[] expectedHeader = new String[] {"Time,Device,sum(s1)"};
     String[] retArray = new String[] {"3,root.db.d2,222.0,", 
"3,root.db.d3,333.0,"};
     resultSetEqualTest(
@@ -98,6 +98,7 @@ public class IoTDBOrderByLimitOffsetAlignByDeviceIT {
 
   @Test
   public void fillTest() {
+    // linear fill can not use TopKNode
     String[] expectedHeader = new String[] {"Time,Device,s1,s2"};
     String[] retArray =
         new String[] {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TopKOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TopKOperator.java
index 6ee32cea9c7..a389f38dab5 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TopKOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TopKOperator.java
@@ -53,11 +53,11 @@ import static 
com.google.common.util.concurrent.Futures.successfulAsList;
 public class TopKOperator implements ProcessOperator {
   private final OperatorContext operatorContext;
 
-  private final List<Operator> deviceOperators;
-  private int deviceIndex;
+  private final List<Operator> childrenOperators;
+  private int childIndex;
   // read step operators each invoking
-  private int deviceBatchStep;
-  private boolean[] canCallNext;
+  private final int childBatchStep;
+  private final boolean[] canCallNext;
 
   private final List<TSDataType> dataTypes;
   private final TsBlockBuilder tsBlockBuilder;
@@ -84,13 +84,13 @@ public class TopKOperator implements ProcessOperator {
 
   public TopKOperator(
       OperatorContext operatorContext,
-      List<Operator> deviceOperators,
+      List<Operator> childrenOperators,
       List<TSDataType> dataTypes,
       Comparator<SortKey> comparator,
       int topValue,
       boolean childrenDataInOrder) {
     this.operatorContext = operatorContext;
-    this.deviceOperators = deviceOperators;
+    this.childrenOperators = childrenOperators;
     this.dataTypes = dataTypes;
     this.mergeSortHeap = new MergeSortHeap(topValue, comparator.reversed());
     this.comparator = comparator;
@@ -100,11 +100,11 @@ public class TopKOperator implements ProcessOperator {
 
     initResultTsBlock();
 
-    deviceBatchStep =
+    childBatchStep =
         OPERATOR_BATCH_UPPER_BOUND % topValue == 0
             ? OPERATOR_BATCH_UPPER_BOUND / topValue
             : OPERATOR_BATCH_UPPER_BOUND / topValue + 1;
-    canCallNext = new boolean[deviceOperators.size()];
+    canCallNext = new boolean[childrenOperators.size()];
   }
 
   @Override
@@ -116,8 +116,8 @@ public class TopKOperator implements ProcessOperator {
   public ListenableFuture<?> isBlocked() {
     boolean hasReadyChild = false;
     List<ListenableFuture<?>> listenableFutures = new ArrayList<>();
-    for (int i = deviceIndex;
-        i < Math.min(deviceIndex + deviceBatchStep, deviceOperators.size());
+    for (int i = childIndex;
+        i < Math.min(childIndex + childBatchStep, childrenOperators.size());
         i++) {
       if (getOperator(i) == null) {
         continue;
@@ -142,7 +142,7 @@ public class TopKOperator implements ProcessOperator {
 
   @Override
   public boolean hasNext() throws Exception {
-    if (deviceIndex >= deviceOperators.size()) {
+    if (childIndex >= childrenOperators.size()) {
       if (topKResult == null) {
         return false;
       }
@@ -154,7 +154,7 @@ public class TopKOperator implements ProcessOperator {
 
   @Override
   public TsBlock next() throws Exception {
-    if (deviceIndex >= deviceOperators.size() && resultReturnSize < 
topKResult.length) {
+    if (childIndex >= childrenOperators.size() && resultReturnSize < 
topKResult.length) {
       return getResultFromCachedTopKResult();
     }
 
@@ -162,8 +162,8 @@ public class TopKOperator implements ProcessOperator {
     long maxRuntime = 
operatorContext.getMaxRunTime().roundTo(TimeUnit.NANOSECONDS);
 
     boolean batchFinished = true;
-    int operatorBatchEnd = Math.min(deviceIndex + deviceBatchStep, 
deviceOperators.size());
-    for (int i = deviceIndex; i < operatorBatchEnd; i++) {
+    int operatorBatchEnd = Math.min(childIndex + childBatchStep, 
childrenOperators.size());
+    for (int i = childIndex; i < operatorBatchEnd; i++) {
       if (getOperator(i) == null) {
         continue;
       }
@@ -212,8 +212,8 @@ public class TopKOperator implements ProcessOperator {
     }
 
     if (batchFinished) {
-      deviceIndex = deviceIndex + deviceBatchStep;
-      if (deviceIndex >= deviceOperators.size()) {
+      childIndex = childIndex + childBatchStep;
+      if (childIndex >= childrenOperators.size()) {
         return getResultFromCachedTopKResult();
       }
     }
@@ -223,8 +223,8 @@ public class TopKOperator implements ProcessOperator {
 
   @Override
   public void close() throws Exception {
-    for (int i = deviceIndex; i < deviceOperators.size(); i++) {
-      final Operator operator = deviceOperators.get(i);
+    for (int i = childIndex; i < childrenOperators.size(); i++) {
+      final Operator operator = childrenOperators.get(i);
       if (operator != null) {
         operator.close();
       }
@@ -236,7 +236,7 @@ public class TopKOperator implements ProcessOperator {
     // traverse each child serial,
     // so no need to accumulate the returnSize and retainedSize of each child
     long maxPeekMemory = calculateMaxReturnSize();
-    for (Operator operator : deviceOperators) {
+    for (Operator operator : childrenOperators) {
       maxPeekMemory = Math.max(maxPeekMemory, 
operator.calculateMaxPeekMemory());
     }
     return Math.max(maxPeekMemory, topValue * 
getMemoryUsageOfOneMergeSortKey() * 2);
@@ -379,11 +379,11 @@ public class TopKOperator implements ProcessOperator {
   }
 
   private Operator getOperator(int i) {
-    return deviceOperators.get(i);
+    return childrenOperators.get(i);
   }
 
   private void closeOperator(int i) throws Exception {
     getOperator(i).close();
-    deviceOperators.set(i, null);
+    childrenOperators.set(i, null);
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java
index 038149b2b9a..1a33267c78e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java
@@ -774,7 +774,7 @@ public class LogicalPlanBuilder {
               orderByParameter,
               outputColumnNames);
 
-      // if value filter exists, need add a LIMIT-NODE as the child node of 
TopKNode
+      // if value filter exists, need add a LimitNode as the child node of 
TopKNode
       long valueFilterLimit = queryStatement.hasWhere() ? limitValue : -1;
 
       // only order by based on time, use TopKNode + SingleDeviceViewNode
@@ -799,7 +799,7 @@ public class LogicalPlanBuilder {
       analysis.setUseTopKNode();
       this.root = topKNode;
     } else if (canUseMergeSortNode(queryStatement, 
deviceNameToSourceNodesMap.size())) {
-      // otherwise use MergeSortNode + SingleDeviceViewNode
+      // use MergeSortNode + SingleDeviceViewNode
       MergeSortNode mergeSortNode =
           new MergeSortNode(
               context.getQueryId().genPlanNodeId(), orderByParameter, 
outputColumnNames);
@@ -811,7 +811,6 @@ public class LogicalPlanBuilder {
           -1);
       this.root = mergeSortNode;
     } else {
-      // order by based on device, use DeviceViewNode
       this.root =
           addDeviceViewNode(
               orderByParameter,
@@ -852,13 +851,13 @@ public class LogicalPlanBuilder {
         && (!queryStatement.isAggregationQuery()
             || (queryStatement.isAggregationQuery() && 
!queryStatement.hasHaving()))
         && (!queryStatement.hasFill()
-            || 
!LINEAR.equals(queryStatement.getFillComponent().getFillPolicy()));
+            || 
LINEAR.equals(queryStatement.getFillComponent().getFillPolicy()));
   }
 
   private boolean canUseMergeSortNode(QueryStatement queryStatement, int 
deviceSize) {
     // 1. `order by based on time` + `no order by expression`.
     // 2. deviceSize is larger than 1.
-    // when satisfy all above cases use MergeSortNode.
+    // when satisfy all above cases use MergeSortNode + SingleDeviceViewNode.
     return queryStatement.isOrderByBasedOnTime()
         && !queryStatement.hasOrderByExpression()
         && deviceSize > 1;
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/AlignByTimeOrderByLimitOffsetTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/AlignByTimeOrderByLimitOffsetTest.java
index e1a77a2d8ce..1cc7e9bbf6b 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/AlignByTimeOrderByLimitOffsetTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/AlignByTimeOrderByLimitOffsetTest.java
@@ -54,7 +54,6 @@ public class AlignByTimeOrderByLimitOffsetTest {
   DistributedQueryPlan plan;
   PlanNode firstFiRoot;
   PlanNode firstFiTopNode;
-  PlanNode mergeSortNode;
 
   /*
    * IdentitySinkNode-63
@@ -154,6 +153,7 @@ public class AlignByTimeOrderByLimitOffsetTest {
   @Test
   public void orderByExpressionTest2() {
     // select s1 order by s2 + limit N
+    // use TopKNode to replace SortNode + LimitNode
     sql =
         String.format(
             "select s1 from root.sg.d1 ORDER BY root.sg.d22.s2 DESC LIMIT %s", 
LIMIT_VALUE);
@@ -170,6 +170,7 @@ public class AlignByTimeOrderByLimitOffsetTest {
         firstFiTopNode.getChildren().get(0).getChildren().get(0) instanceof 
FullOuterTimeJoinNode);
 
     // select s1 order by s2 + offset M + limit N
+    // use TopKNode to replace SortNode
     sql =
         String.format(
             "select s1 from root.sg.d1 ORDER BY root.sg.d22.s2 DESC OFFSET 5 
LIMIT %s",
@@ -257,6 +258,7 @@ public class AlignByTimeOrderByLimitOffsetTest {
    */
   @Test
   public void orderByFillTest() {
+    // previous and constant fill can use TopKNode
     sql =
         String.format(
             "select * from root.sg.d1,root.sg.d22,root.sg.d333 ORDER BY 
root.sg.d1.s1 DESC fill(previous) LIMIT %s",
@@ -290,6 +292,7 @@ public class AlignByTimeOrderByLimitOffsetTest {
      *                   ├──ExchangeNode-58: 
[SourceAddress:192.0.2.1/test.7.0/61]
      *                   └──ExchangeNode-59: 
[SourceAddress:192.0.4.1/test.8.0/62]
      */
+    // linear fill can not use TopKNode
     sql =
         String.format(
             "select * from root.sg.d1,root.sg.d22,root.sg.d333 ORDER BY 
root.sg.d1.s1 DESC fill(linear) LIMIT %s",

Reply via email to