This is an automated email from the ASF dual-hosted git repository.

caogaofei pushed a commit to branch beyyes/lmh/PredicatePushDown
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 3579a33aabdb6ef62a61de5b5fa5837de2722f32
Author: Beyyes <[email protected]>
AuthorDate: Sun Jan 28 12:20:50 2024 +0800

    change conf
---
 .../iotdb/confignode/conf/ConfigNodeConfig.java    |   2 +-
 .../queryengine/plan/execution/QueryExecution.java |   2 +
 .../plan/planner/OperatorTreeGenerator.java        |  29 +-
 .../plan/planner/distribution/test2.java           | 931 +++++++++++++++++++++
 .../datanode1conf/iotdb-common.properties          |   3 +-
 .../datanode2conf/iotdb-common.properties          |   3 +-
 6 files changed, 949 insertions(+), 21 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
index c755eb010cc..e7e66b3ec86 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
@@ -69,7 +69,7 @@ public class ConfigNodeConfig {
   private String dataRegionConsensusProtocolClass = 
ConsensusFactory.IOT_CONSENSUS;
 
   /** Default number of DataRegion replicas. */
-  private int dataReplicationFactor = 1;
+  private int dataReplicationFactor = 2;
 
   /** Number of SeriesPartitionSlots per Database. */
   private int seriesSlotNum = 1000;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java
index aa3bb22973b..0161aef1ee9 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java
@@ -239,6 +239,8 @@ public class QueryExecution implements IQueryExecution {
     if (context.getQueryType() == QueryType.WRITE && analysis.isFailed()) {
       stateMachine.transitionToFailed(analysis.getFailStatus());
     }
+
+    logger.warn(" =========== fe cost: {}ns =========", System.nanoTime() - 
startTime);
   }
 
   private void checkTimeOutForQuery() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
index ca46bd57a19..bc9784a06cc 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
@@ -20,7 +20,6 @@
 package org.apache.iotdb.db.queryengine.plan.planner;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
-import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.path.AlignedPath;
 import org.apache.iotdb.commons.path.MeasurementPath;
 import org.apache.iotdb.commons.path.PartialPath;
@@ -471,35 +470,29 @@ public class OperatorTreeGenerator extends 
PlanVisitor<Operator, LocalExecutionP
                 node.getPlanNodeId(),
                 ProjectOperator.class.getSimpleName());
 
-    List<String> inputColumnNames = node.getChild().getOutputColumnNames();
+    List<String> inputColumnNames = null;
     List<String> outputColumnNames = node.getOutputColumnNames();
     if (outputColumnNames == null) {
       outputColumnNames = 
context.getTypeProvider().getTemplatedInfo().getSelectMeasurements();
       // skip device column
       outputColumnNames = outputColumnNames.subList(1, 
outputColumnNames.size());
+      inputColumnNames = 
context.getTypeProvider().getTemplatedInfo().getMeasurementList();
 
-      List<PartialPath> inputColumnPaths = new ArrayList<>();
-      for (String inputColumnName : inputColumnNames) {
-        try {
-          inputColumnPaths.add(new PartialPath(inputColumnName));
-        } catch (IllegalPathException e) {
-          throw new IllegalArgumentException(
-              "Cannot parse column name to path: " + inputColumnName);
-        }
+      if (inputColumnNames.equals(outputColumnNames)) {
+        // no need to project
+        return child;
       }
-      inputColumnNames =
-          
inputColumnPaths.stream().map(PartialPath::getMeasurement).collect(Collectors.toList());
-    }
-
-    if (inputColumnNames.equals(outputColumnNames)) {
-      // no need to project
-      return child;
+    } else {
+      inputColumnNames = node.getChild().getOutputColumnNames();
     }
 
     List<Integer> remainingColumnIndexList = new ArrayList<>();
     for (String outputColumnName : outputColumnNames) {
       int index = inputColumnNames.indexOf(outputColumnName);
-      checkState(index >= 0, "Cannot find column [%s] in child's output", 
outputColumnName);
+      if (index < 0) {
+        throw new IllegalStateException(
+            String.format("Cannot find column [%s] in child's output", 
outputColumnName));
+      }
       remainingColumnIndexList.add(index);
     }
     return new ProjectOperator(operatorContext, child, 
remainingColumnIndexList);
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/test2.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/test2.java
new file mode 100644
index 00000000000..488526db97e
--- /dev/null
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/test2.java
@@ -0,0 +1,931 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.planner.distribution;
+
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
+import org.apache.iotdb.db.queryengine.common.QueryId;
+import org.apache.iotdb.db.queryengine.plan.analyze.Analysis;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.DistributedQueryPlan;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.LogicalQueryPlan;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.AggregationNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.DeviceViewNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ExchangeNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.FilterNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.LimitNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.MergeSortNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SingleDeviceViewNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SortNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TopKNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TransformNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.join.FullOuterTimeJoinNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.sink.IdentitySinkNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedSeriesScanNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesScanNode;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class test2 {
+
+  private static final long LIMIT_VALUE = 10;
+
+  QueryId queryId = new QueryId("test");
+  MPPQueryContext context =
+      new MPPQueryContext("", queryId, null, new TEndPoint(), new TEndPoint());
+  String sql;
+  Analysis analysis;
+  PlanNode logicalPlanNode;
+  DistributionPlanner planner;
+  DistributedQueryPlan plan;
+  PlanNode firstFiRoot;
+  PlanNode firstFiTopNode;
+  PlanNode mergeSortNode;
+
+  /*
+   * IdentitySinkNode-27
+   *   └──LimitNode-22
+   *       └──MergeSort-21
+   *           ├──DeviceView-12
+   *           │   └──FullOuterTimeJoinNode-11
+   *           │       ├──SeriesScanNode-9:[SeriesPath: root.sg.d1.s1, 
DataRegion: TConsensusGroupId(type:DataRegion, id:1)]
+   *           │       └──SeriesScanNode-10:[SeriesPath: root.sg.d1.s2, 
DataRegion: TConsensusGroupId(type:DataRegion, id:1)]
+   *           ├──ExchangeNode-23: [SourceAddress:192.0.3.1/test.2.0/25]
+   *           └──ExchangeNode-24: [SourceAddress:192.0.2.1/test.3.0/26]
+   *
+   * IdentitySinkNode-25
+   *   └──DeviceView-16
+   *       └──FullOuterTimeJoinNode-15
+   *           ├──SeriesScanNode-13:[SeriesPath: root.sg.d22.s1, DataRegion: 
TConsensusGroupId(type:DataRegion, id:3)]
+   *           └──SeriesScanNode-14:[SeriesPath: root.sg.d22.s2, DataRegion: 
TConsensusGroupId(type:DataRegion, id:3)]
+   *
+   * IdentitySinkNode-26
+   *   └──DeviceView-20
+   *       └──FullOuterTimeJoinNode-19
+   *           ├──SeriesScanNode-17:[SeriesPath: root.sg.d1.s1, DataRegion: 
TConsensusGroupId(type:DataRegion, id:2)]
+   *           └──SeriesScanNode-18:[SeriesPath: root.sg.d1.s2, DataRegion: 
TConsensusGroupId(type:DataRegion, id:2)]
+   */
+  @Test
+  public void orderByDeviceTest1() {
+    // no order by
+    sql = "select * from root.sg.d1, root.sg.d22 LIMIT 10 align by device";
+    analysis = Util.analyze(sql, context);
+    logicalPlanNode = Util.genLogicalPlan(analysis, context);
+    planner = new DistributionPlanner(analysis, new LogicalQueryPlan(context, 
logicalPlanNode));
+    plan = planner.planFragments();
+    assertEquals(3, plan.getInstances().size());
+    firstFiRoot = plan.getInstances().get(0).getFragment().getPlanNodeTree();
+    assertTrue(firstFiRoot instanceof IdentitySinkNode);
+    assertTrue(firstFiRoot.getChildren().get(0) instanceof LimitNode);
+    mergeSortNode = ((LimitNode) firstFiRoot.getChildren().get(0)).getChild();
+    assertTrue(mergeSortNode instanceof MergeSortNode);
+    assertTrue(mergeSortNode.getChildren().get(0) instanceof DeviceViewNode);
+    assertTrue(
+        mergeSortNode.getChildren().get(0).getChildren().get(0) instanceof 
FullOuterTimeJoinNode);
+
+    // order by device, time
+    sql =
+        "select * from root.sg.d1, root.sg.d22 order by device asc, time desc 
LIMIT 10 align by device";
+    analysis = Util.analyze(sql, context);
+    logicalPlanNode = Util.genLogicalPlan(analysis, context);
+    planner = new DistributionPlanner(analysis, new LogicalQueryPlan(context, 
logicalPlanNode));
+    plan = planner.planFragments();
+    assertEquals(3, plan.getInstances().size());
+    firstFiRoot = plan.getInstances().get(0).getFragment().getPlanNodeTree();
+    assertTrue(firstFiRoot instanceof IdentitySinkNode);
+    assertTrue(firstFiRoot.getChildren().get(0) instanceof LimitNode);
+    mergeSortNode = ((LimitNode) firstFiRoot.getChildren().get(0)).getChild();
+    assertTrue(mergeSortNode instanceof MergeSortNode);
+    assertTrue(mergeSortNode.getChildren().get(0) instanceof DeviceViewNode);
+    assertTrue(
+        mergeSortNode.getChildren().get(0).getChildren().get(0) instanceof 
FullOuterTimeJoinNode);
+    assertScanNodeLimitValue(
+        plan.getInstances().get(0).getFragment().getPlanNodeTree(), 
LIMIT_VALUE);
+    assertScanNodeLimitValue(
+        plan.getInstances().get(1).getFragment().getPlanNodeTree(), 
LIMIT_VALUE);
+    assertScanNodeLimitValue(
+        plan.getInstances().get(2).getFragment().getPlanNodeTree(), 
LIMIT_VALUE);
+  }
+
+  /*
+   * IdentitySinkNode-32
+   *   └──LimitNode-27
+   *       └──MergeSort-26
+   *           ├──DeviceView-15
+   *           │   └──SortNode-14
+   *           │       └──FullOuterTimeJoinNode-13
+   *           │           ├──SeriesScanNode-11:[SeriesPath: root.sg.d1.s1, 
DataRegion: TConsensusGroupId(type:DataRegion, id:1)]
+   *           │           └──SeriesScanNode-12:[SeriesPath: root.sg.d1.s2, 
DataRegion: TConsensusGroupId(type:DataRegion, id:1)]
+   *           ├──ExchangeNode-28: [SourceAddress:192.0.3.1/test.2.0/30]
+   *           └──ExchangeNode-29: [SourceAddress:192.0.2.1/test.3.0/31]
+   *
+   *  IdentitySinkNode-30
+   *   └──DeviceView-20
+   *       └──SortNode-19
+   *           └──FullOuterTimeJoinNode-18
+   *               ├──SeriesScanNode-16:[SeriesPath: root.sg.d22.s1, 
DataRegion: TConsensusGroupId(type:DataRegion, id:3)]
+   *               └──SeriesScanNode-17:[SeriesPath: root.sg.d22.s2, 
DataRegion: TConsensusGroupId(type:DataRegion, id:3)]
+   *
+   *  IdentitySinkNode-31
+   *   └──DeviceView-25
+   *       └──SortNode-24
+   *           └──FullOuterTimeJoinNode-23
+   *               ├──SeriesScanNode-21:[SeriesPath: root.sg.d1.s1, 
DataRegion: TConsensusGroupId(type:DataRegion, id:2)]
+   *               └──SeriesScanNode-22:[SeriesPath: root.sg.d1.s2, 
DataRegion: TConsensusGroupId(type:DataRegion, id:2)]
+   */
+  @Test
+  public void orderByDeviceTest2() {
+    // order by device, expression
+    sql =
+        "select * from root.sg.d1, root.sg.d22 order by device asc, s1 desc 
LIMIT 10 align by device";
+    analysis = Util.analyze(sql, context);
+    logicalPlanNode = Util.genLogicalPlan(analysis, context);
+    planner = new DistributionPlanner(analysis, new LogicalQueryPlan(context, 
logicalPlanNode));
+    plan = planner.planFragments();
+    assertEquals(3, plan.getInstances().size());
+    firstFiRoot = plan.getInstances().get(0).getFragment().getPlanNodeTree();
+    assertTrue(firstFiRoot instanceof IdentitySinkNode);
+    assertTrue(firstFiRoot.getChildren().get(0) instanceof LimitNode);
+    mergeSortNode = ((LimitNode) firstFiRoot.getChildren().get(0)).getChild();
+    assertTrue(mergeSortNode instanceof MergeSortNode);
+    assertTrue(mergeSortNode.getChildren().get(0) instanceof DeviceViewNode);
+    assertTrue(mergeSortNode.getChildren().get(0).getChildren().get(0) 
instanceof SortNode);
+    assertScanNodeLimitValue(
+        plan.getInstances().get(0).getFragment().getPlanNodeTree(), 
LIMIT_VALUE);
+    assertScanNodeLimitValue(
+        plan.getInstances().get(1).getFragment().getPlanNodeTree(), 
LIMIT_VALUE);
+    assertScanNodeLimitValue(
+        plan.getInstances().get(2).getFragment().getPlanNodeTree(), 
LIMIT_VALUE);
+  }
+
+  /*
+   * IdentitySinkNode-38
+   *   └──LimitNode-33
+   *       └──TransformNode-12
+   *           └──MergeSort-32
+   *               ├──DeviceView-19
+   *               │   └──SortNode-18
+   *               │       └──FilterNode-17
+   *               │           └──FullOuterTimeJoinNode-16
+   *               │               ├──SeriesScanNode-14:[SeriesPath: 
root.sg.d1.s1, DataRegion: TConsensusGroupId(type:DataRegion, id:1)]
+   *               │               └──SeriesScanNode-15:[SeriesPath: 
root.sg.d1.s2, DataRegion: TConsensusGroupId(type:DataRegion, id:1)]
+   *               ├──ExchangeNode-34: [SourceAddress:192.0.3.1/test.2.0/36]
+   *               └──ExchangeNode-35: [SourceAddress:192.0.2.1/test.3.0/37]
+   *
+   *  IdentitySinkNode-36
+   *   └──DeviceView-25
+   *       └──SortNode-24
+   *           └──FilterNode-23
+   *               └──FullOuterTimeJoinNode-22
+   *                   ├──SeriesScanNode-20:[SeriesPath: root.sg.d22.s1, 
DataRegion: TConsensusGroupId(type:DataRegion, id:3)]
+   *                   └──SeriesScanNode-21:[SeriesPath: root.sg.d22.s2, 
DataRegion: TConsensusGroupId(type:DataRegion, id:3)]
+   *
+   *  IdentitySinkNode-37
+   *   └──DeviceView-31
+   *       └──SortNode-30
+   *           └──FilterNode-29
+   *               └──FullOuterTimeJoinNode-28
+   *                   ├──SeriesScanNode-26:[SeriesPath: root.sg.d1.s1, 
DataRegion: TConsensusGroupId(type:DataRegion, id:2)]
+   *                   └──SeriesScanNode-27:[SeriesPath: root.sg.d1.s2, 
DataRegion: TConsensusGroupId(type:DataRegion, id:2)]
+   */
+  @Test
+  public void orderByDeviceTest3() {
+    // order by device, expression; with value filter
+    sql =
+        "select s1 from root.sg.d1, root.sg.d22 WHERE s2=1 order by device 
asc, s2 desc LIMIT 5 align by device";
+    analysis = Util.analyze(sql, context);
+    logicalPlanNode = Util.genLogicalPlan(analysis, context);
+    planner = new DistributionPlanner(analysis, new LogicalQueryPlan(context, 
logicalPlanNode));
+    plan = planner.planFragments();
+    assertEquals(3, plan.getInstances().size());
+    firstFiRoot = plan.getInstances().get(0).getFragment().getPlanNodeTree();
+    assertTrue(firstFiRoot instanceof IdentitySinkNode);
+    assertTrue(firstFiRoot.getChildren().get(0) instanceof LimitNode);
+    PlanNode transformNode = ((LimitNode) 
firstFiRoot.getChildren().get(0)).getChild();
+    assertTrue(transformNode instanceof TransformNode);
+    assertTrue(transformNode.getChildren().get(0) instanceof MergeSortNode);
+    assertTrue(transformNode.getChildren().get(0).getChildren().get(0) 
instanceof DeviceViewNode);
+    assertTrue(
+        
transformNode.getChildren().get(0).getChildren().get(0).getChildren().get(0)
+            instanceof SortNode);
+  }
+
+  /*
+   * IdentitySinkNode-27
+   *   └──LimitNode-22
+   *       └──FilterNode-12
+   *           └──DeviceView-14
+   *               ├──AggregationNode-5
+   *               │   └──FilterNode-4
+   *               │       └──FullOuterTimeJoinNode-3
+   *               │           ├──SeriesScanNode-15:[SeriesPath: 
root.sg.d1.s1, DataRegion: TConsensusGroupId(type:DataRegion, id:1)]
+   *               │           ├──SeriesScanNode-17:[SeriesPath: 
root.sg.d1.s2, DataRegion: TConsensusGroupId(type:DataRegion, id:1)]
+   *               │           └──ExchangeNode-23: 
[SourceAddress:192.0.2.1/test.2.0/25]
+   *               └──ExchangeNode-24: [SourceAddress:192.0.3.1/test.3.0/26]
+   *
+   *  IdentitySinkNode-25
+   *   └──FullOuterTimeJoinNode-19
+   *       ├──SeriesScanNode-16:[SeriesPath: root.sg.d1.s1, DataRegion: 
TConsensusGroupId(type:DataRegion, id:2)]
+   *       └──SeriesScanNode-18:[SeriesPath: root.sg.d1.s2, DataRegion: 
TConsensusGroupId(type:DataRegion, id:2)]
+   *
+   *  IdentitySinkNode-26
+   *   └──AggregationNode-10
+   *       └──FilterNode-9
+   *           └──FullOuterTimeJoinNode-8
+   *               ├──SeriesScanNode-20:[SeriesPath: root.sg.d22.s1, 
DataRegion: TConsensusGroupId(type:DataRegion, id:3)]
+   *               └──SeriesScanNode-21:[SeriesPath: root.sg.d22.s2, 
DataRegion: TConsensusGroupId(type:DataRegion, id:3)]
+   */
+  @Test
+  public void orderByDeviceTest4() {
+    // aggregation + order by device, expression; with value filter
+    sql =
+        "select count(s1) from root.sg.d1, root.sg.d22 WHERE s2=1 
having(count(s1)>1) LIMIT 5 align by device";
+    analysis = Util.analyze(sql, context);
+    logicalPlanNode = Util.genLogicalPlan(analysis, context);
+    planner = new DistributionPlanner(analysis, new LogicalQueryPlan(context, 
logicalPlanNode));
+    plan = planner.planFragments();
+    assertEquals(3, plan.getInstances().size());
+    firstFiRoot = plan.getInstances().get(0).getFragment().getPlanNodeTree();
+    assertTrue(firstFiRoot instanceof IdentitySinkNode);
+    assertTrue(firstFiRoot.getChildren().get(0) instanceof LimitNode);
+    PlanNode filterNode = ((LimitNode) 
firstFiRoot.getChildren().get(0)).getChild();
+    assertTrue(filterNode instanceof FilterNode);
+    assertTrue(filterNode.getChildren().get(0) instanceof DeviceViewNode);
+    assertTrue(filterNode.getChildren().get(0).getChildren().get(0) instanceof 
AggregationNode);
+    assertTrue(
+        
filterNode.getChildren().get(0).getChildren().get(0).getChildren().get(0)
+            instanceof FilterNode);
+    PlanNode thirdFiRoot = 
plan.getInstances().get(2).getFragment().getPlanNodeTree();
+    assertTrue(thirdFiRoot instanceof IdentitySinkNode);
+    assertTrue(thirdFiRoot.getChildren().get(0) instanceof AggregationNode);
+    assertTrue(thirdFiRoot.getChildren().get(0).getChildren().get(0) 
instanceof FilterNode);
+  }
+
+  /*
+   * IdentitySinkNode-44
+   *   └──TopK-10
+   *       ├──TopK-34
+   *       │   ├──SingleDeviceView-17
+   *       │   │   └──FullOuterTimeJoinNode-16
+   *       │   │       ├──SeriesScanNode-14:[SeriesPath: root.sg.d1.s1, 
DataRegion: TConsensusGroupId(type:DataRegion, id:1)]
+   *       │   │       └──SeriesScanNode-15:[SeriesPath: root.sg.d1.s2, 
DataRegion: TConsensusGroupId(type:DataRegion, id:1)]
+   *       │   └──SingleDeviceView-29
+   *       │       └──FullOuterTimeJoinNode-28
+   *       │           ├──SeriesScanNode-26:[SeriesPath: root.sg.d333.s1, 
DataRegion: TConsensusGroupId(type:DataRegion, id:1)]
+   *       │           └──SeriesScanNode-27:[SeriesPath: root.sg.d333.s2, 
DataRegion: TConsensusGroupId(type:DataRegion, id:1)]
+   *       ├──ExchangeNode-38: [SourceAddress:192.0.3.1/test.2.0/41]
+   *       ├──ExchangeNode-39: [SourceAddress:192.0.2.1/test.3.0/42]
+   *       └──ExchangeNode-40: [SourceAddress:192.0.4.1/test.4.0/43]
+   *
+   *  IdentitySinkNode-41
+   *   └──TopK-36
+   *       └──SingleDeviceView-25
+   *           └──FullOuterTimeJoinNode-24
+   *               ├──SeriesScanNode-22:[SeriesPath: root.sg.d22.s1, 
DataRegion: TConsensusGroupId(type:DataRegion, id:3)]
+   *               └──SeriesScanNode-23:[SeriesPath: root.sg.d22.s2, 
DataRegion: TConsensusGroupId(type:DataRegion, id:3)]
+   *
+   *  IdentitySinkNode-42
+   *   └──TopK-35
+   *       └──SingleDeviceView-21
+   *           └──FullOuterTimeJoinNode-20
+   *               ├──SeriesScanNode-18:[SeriesPath: root.sg.d1.s1, 
DataRegion: TConsensusGroupId(type:DataRegion, id:2)]
+   *               └──SeriesScanNode-19:[SeriesPath: root.sg.d1.s2, 
DataRegion: TConsensusGroupId(type:DataRegion, id:2)]
+   *
+   *  IdentitySinkNode-43
+   *   └──TopK-37
+   *       └──SingleDeviceView-33
+   *           └──FullOuterTimeJoinNode-32
+   *               ├──SeriesScanNode-30:[SeriesPath: root.sg.d333.s1, 
DataRegion: TConsensusGroupId(type:DataRegion, id:4)]
+   *               └──SeriesScanNode-31:[SeriesPath: root.sg.d333.s2, 
DataRegion: TConsensusGroupId(type:DataRegion, id:4)]
+   */
+  @Test
+  public void orderByTimeTest1() {
+    // only order by time, no filter
+    sql =
+        String.format(
+            "select * from root.sg.d1,root.sg.d22,root.sg.d333 ORDER BY TIME 
DESC LIMIT %s align by device",
+            LIMIT_VALUE);
+    analysis = Util.analyze(sql, context);
+    logicalPlanNode = Util.genLogicalPlan(analysis, context);
+    planner = new DistributionPlanner(analysis, new LogicalQueryPlan(context, 
logicalPlanNode));
+    plan = planner.planFragments();
+    assertEquals(4, plan.getInstances().size());
+    firstFiRoot = plan.getInstances().get(0).getFragment().getPlanNodeTree();
+    assertTrue(firstFiRoot instanceof IdentitySinkNode);
+    assertEquals(4, firstFiRoot.getChildren().get(0).getChildren().size());
+    PlanNode firstFiTopNode = firstFiRoot.getChildren().get(0);
+    assertTrue(firstFiTopNode instanceof TopKNode);
+    for (PlanNode node : firstFiTopNode.getChildren().get(0).getChildren()) {
+      assertTrue(node instanceof SingleDeviceViewNode);
+    }
+    assertTrue(firstFiTopNode.getChildren().get(1) instanceof ExchangeNode);
+    assertTrue(firstFiTopNode.getChildren().get(2) instanceof ExchangeNode);
+    assertTrue(firstFiTopNode.getChildren().get(3) instanceof ExchangeNode);
+    assertScanNodeLimitValue(
+        plan.getInstances().get(0).getFragment().getPlanNodeTree(), 
LIMIT_VALUE);
+    assertScanNodeLimitValue(
+        plan.getInstances().get(1).getFragment().getPlanNodeTree(), 
LIMIT_VALUE);
+    assertScanNodeLimitValue(
+        plan.getInstances().get(2).getFragment().getPlanNodeTree(), 
LIMIT_VALUE);
+    assertScanNodeLimitValue(
+        plan.getInstances().get(3).getFragment().getPlanNodeTree(), 
LIMIT_VALUE);
+  }
+
+  /*
+   * IdentitySinkNode-60
+   *   └──TopK-13
+   *       ├──TopK-50
+   *       │   ├──SingleDeviceView-25
+   *       │   │   └──LimitNode-24
+   *       │   │       └──FilterNode-23
+   *       │   │           └──FullOuterTimeJoinNode-22
+   *       │   │               ├──SeriesScanNode-20:[SeriesPath: 
root.sg.d1.s1, DataRegion: TConsensusGroupId(type:DataRegion, id:1)]
+   *       │   │               └──SeriesScanNode-21:[SeriesPath: 
root.sg.d1.s2, DataRegion: TConsensusGroupId(type:DataRegion, id:1)]
+   *       │   └──SingleDeviceView-43
+   *       │       └──LimitNode-42
+   *       │           └──FilterNode-41
+   *       │               └──FullOuterTimeJoinNode-40
+   *       │                   ├──SeriesScanNode-38:[SeriesPath: 
root.sg.d333.s1, DataRegion: TConsensusGroupId(type:DataRegion, id:1)]
+   *       │                   └──SeriesScanNode-39:[SeriesPath: 
root.sg.d333.s2, DataRegion: TConsensusGroupId(type:DataRegion, id:1)]
+   *       ├──ExchangeNode-54: [SourceAddress:192.0.3.1/test.2.0/57]
+   *       ├──ExchangeNode-55: [SourceAddress:192.0.2.1/test.3.0/58]
+   *       └──ExchangeNode-56: [SourceAddress:192.0.4.1/test.4.0/59]
+   *
+   *  IdentitySinkNode-57
+   *   └──TopK-52
+   *       └──SingleDeviceView-37
+   *           └──LimitNode-36
+   *               └──FilterNode-35
+   *                   └──FullOuterTimeJoinNode-34
+   *                       ├──SeriesScanNode-32:[SeriesPath: root.sg.d22.s1, 
DataRegion: TConsensusGroupId(type:DataRegion, id:3)]
+   *                       └──SeriesScanNode-33:[SeriesPath: root.sg.d22.s2, 
DataRegion: TConsensusGroupId(type:DataRegion, id:3)]
+   *
+   *  IdentitySinkNode-58
+   *   └──TopK-51
+   *       └──SingleDeviceView-31
+   *           └──LimitNode-30
+   *               └──FilterNode-29
+   *                   └──FullOuterTimeJoinNode-28
+   *                       ├──SeriesScanNode-26:[SeriesPath: root.sg.d1.s1, 
DataRegion: TConsensusGroupId(type:DataRegion, id:2)]
+   *                       └──SeriesScanNode-27:[SeriesPath: root.sg.d1.s2, 
DataRegion: TConsensusGroupId(type:DataRegion, id:2)]
+   *
+   *  IdentitySinkNode-59
+   *   └──TopK-53
+   *       └──SingleDeviceView-49
+   *           └──LimitNode-48
+   *               └──FilterNode-47
+   *                   └──FullOuterTimeJoinNode-46
+   *                       ├──SeriesScanNode-44:[SeriesPath: root.sg.d333.s1, 
DataRegion: TConsensusGroupId(type:DataRegion, id:4)]
+   *                       └──SeriesScanNode-45:[SeriesPath: root.sg.d333.s2, 
DataRegion: TConsensusGroupId(type:DataRegion, id:4)]
+   */
+  @Test
+  public void orderByTimeTest2() {
+    // only order by time, has value filter
+    // put LIMIT-NODE below of SingleDeviceViewNode
+    sql =
+        String.format(
+            "select s1 from root.sg.d1,root.sg.d22,root.sg.d333 where s2>1 
ORDER BY TIME DESC LIMIT %s align by device",
+            LIMIT_VALUE);
+    analysis = Util.analyze(sql, context);
+    logicalPlanNode = Util.genLogicalPlan(analysis, context);
+    planner = new DistributionPlanner(analysis, new LogicalQueryPlan(context, 
logicalPlanNode));
+    plan = planner.planFragments();
+    assertEquals(4, plan.getInstances().size());
+    firstFiRoot = plan.getInstances().get(0).getFragment().getPlanNodeTree();
+    PlanNode firstFiTopNode = firstFiRoot.getChildren().get(0);
+    assertTrue(firstFiTopNode instanceof TopKNode);
+    for (PlanNode node : firstFiTopNode.getChildren().get(0).getChildren()) {
+      assertTrue(node instanceof SingleDeviceViewNode);
+      assertTrue(node.getChildren().get(0) instanceof LimitNode);
+      assertTrue(node.getChildren().get(0).getChildren().get(0) instanceof 
FilterNode);
+    }
+    assertTrue(firstFiTopNode.getChildren().get(1) instanceof ExchangeNode);
+    assertTrue(firstFiTopNode.getChildren().get(2) instanceof ExchangeNode);
+    assertTrue(firstFiTopNode.getChildren().get(3) instanceof ExchangeNode);
+    
assertScanNodeLimitValue(plan.getInstances().get(0).getFragment().getPlanNodeTree(),
 0);
+    
assertScanNodeLimitValue(plan.getInstances().get(1).getFragment().getPlanNodeTree(),
 0);
+    
assertScanNodeLimitValue(plan.getInstances().get(2).getFragment().getPlanNodeTree(),
 0);
+    
assertScanNodeLimitValue(plan.getInstances().get(3).getFragment().getPlanNodeTree(),
 0);
+  }
+
+  /*
+   * IdentitySinkNode-98
+   *   └──TopK-54
+   *       ├──TopK-88
+   *       │   └──DeviceView-69
+   *       │       ├──LimitNode-63
+   *       │       │   └──FilterNode-62
+   *       │       │       └──FullOuterTimeJoinNode-61
+   *       │       │           ├──SeriesScanNode-59:[SeriesPath: 
root.sg.d1.s1, DataRegion: TConsensusGroupId(type:DataRegion, id:1)]
+   *       │       │           └──SeriesScanNode-60:[SeriesPath: 
root.sg.d1.s2, DataRegion: TConsensusGroupId(type:DataRegion, id:1)]
+   *       │       └──LimitNode-68
+   *       │           └──FilterNode-67
+   *       │               └──FullOuterTimeJoinNode-66
+   *       │                   ├──SeriesScanNode-64:[SeriesPath: 
root.sg.d333.s1, DataRegion: TConsensusGroupId(type:DataRegion, id:1)]
+   *       │                   └──SeriesScanNode-65:[SeriesPath: 
root.sg.d333.s2, DataRegion: TConsensusGroupId(type:DataRegion, id:1)]
+   *       ├──ExchangeNode-92: [SourceAddress:192.0.3.1/test.6.0/95]
+   *       ├──ExchangeNode-93: [SourceAddress:192.0.2.1/test.7.0/96]
+   *       └──ExchangeNode-94: [SourceAddress:192.0.4.1/test.8.0/97]
+   *
+   *  IdentitySinkNode-95
+   *   └──TopK-89
+   *       └──DeviceView-75
+   *           └──LimitNode-74
+   *               └──FilterNode-73
+   *                   └──FullOuterTimeJoinNode-72
+   *                       ├──SeriesScanNode-70:[SeriesPath: root.sg.d22.s1, 
DataRegion: TConsensusGroupId(type:DataRegion, id:3)]
+   *                       └──SeriesScanNode-71:[SeriesPath: root.sg.d22.s2, 
DataRegion: TConsensusGroupId(type:DataRegion, id:3)]
+   *
+   *  IdentitySinkNode-96
+   *   └──TopK-90
+   *       └──DeviceView-81
+   *           └──LimitNode-80
+   *               └──FilterNode-79
+   *                   └──FullOuterTimeJoinNode-78
+   *                       ├──SeriesScanNode-76:[SeriesPath: root.sg.d1.s1, 
DataRegion: TConsensusGroupId(type:DataRegion, id:2)]
+   *                       └──SeriesScanNode-77:[SeriesPath: root.sg.d1.s2, 
DataRegion: TConsensusGroupId(type:DataRegion, id:2)]
+   *
+   *  IdentitySinkNode-97
+   *   └──TopK-91
+   *       └──DeviceView-87
+   *           └──LimitNode-86
+   *               └──FilterNode-85
+   *                   └──FullOuterTimeJoinNode-84
+   *                       ├──SeriesScanNode-82:[SeriesPath: root.sg.d333.s1, 
DataRegion: TConsensusGroupId(type:DataRegion, id:4)]
+   *                       └──SeriesScanNode-83:[SeriesPath: root.sg.d333.s2, 
DataRegion: TConsensusGroupId(type:DataRegion, id:4)]
+   */
+  @Test
+  public void orderByTimeTest3() {
+    // order by time and expression, no filter
+    // need read all data, use DeviceViewNode instead of SingleDeviceViewNode
+    // no LimitNode below DeviceViewNode
+    // can push down LIMIT value to ScanNode
+    sql =
+        String.format(
+            "select s1 from root.sg.d1,root.sg.d22,root.sg.d333 ORDER BY TIME 
DESC, s2 DESC LIMIT %s align by device",
+            LIMIT_VALUE);
+    analysis = Util.analyze(sql, context);
+    logicalPlanNode = Util.genLogicalPlan(analysis, context);
+    planner = new DistributionPlanner(analysis, new LogicalQueryPlan(context, 
logicalPlanNode));
+    plan = planner.planFragments();
+    assertEquals(4, plan.getInstances().size());
+    firstFiRoot = plan.getInstances().get(0).getFragment().getPlanNodeTree();
+    firstFiTopNode = firstFiRoot.getChildren().get(0);
+    assertTrue(firstFiTopNode instanceof TopKNode);
+    for (PlanNode node : firstFiTopNode.getChildren().get(0).getChildren()) {
+      assertTrue(node instanceof DeviceViewNode);
+    }
+    assertTrue(firstFiTopNode.getChildren().get(1) instanceof ExchangeNode);
+    assertTrue(firstFiTopNode.getChildren().get(2) instanceof ExchangeNode);
+    assertTrue(firstFiTopNode.getChildren().get(3) instanceof ExchangeNode);
+    assertScanNodeLimitValue(
+        plan.getInstances().get(0).getFragment().getPlanNodeTree(), 
LIMIT_VALUE);
+    assertScanNodeLimitValue(
+        plan.getInstances().get(1).getFragment().getPlanNodeTree(), 
LIMIT_VALUE);
+    assertScanNodeLimitValue(
+        plan.getInstances().get(2).getFragment().getPlanNodeTree(), 
LIMIT_VALUE);
+    assertScanNodeLimitValue(
+        plan.getInstances().get(3).getFragment().getPlanNodeTree(), 
LIMIT_VALUE);
+
+    // order by time and expression, has value filter
+    // need read all data, use DeviceViewNode instead of SingleDeviceViewNode
+    // has LimitNode below DeviceViewNode
+    // can not push down LIMIT value to ScanNode
+    sql =
+        String.format(
+            "select s1 from root.sg.d1,root.sg.d22,root.sg.d333 where s2>1 
ORDER BY TIME DESC, s2 DESC LIMIT %s align by device",
+            LIMIT_VALUE);
+    analysis = Util.analyze(sql, context);
+    logicalPlanNode = Util.genLogicalPlan(analysis, context);
+    planner = new DistributionPlanner(analysis, new LogicalQueryPlan(context, 
logicalPlanNode));
+    plan = planner.planFragments();
+    assertEquals(4, plan.getInstances().size());
+    firstFiRoot = plan.getInstances().get(0).getFragment().getPlanNodeTree();
+    firstFiTopNode = firstFiRoot.getChildren().get(0);
+    assertTrue(firstFiTopNode instanceof TopKNode);
+    for (PlanNode node : firstFiTopNode.getChildren().get(0).getChildren()) {
+      assertTrue(node instanceof DeviceViewNode);
+      assertTrue(node.getChildren().get(0) instanceof LimitNode);
+      assertTrue(node.getChildren().get(0).getChildren().get(0) instanceof 
FilterNode);
+    }
+    assertTrue(firstFiTopNode.getChildren().get(1) instanceof ExchangeNode);
+    assertTrue(firstFiTopNode.getChildren().get(2) instanceof ExchangeNode);
+    assertTrue(firstFiTopNode.getChildren().get(3) instanceof ExchangeNode);
+    
assertScanNodeLimitValue(plan.getInstances().get(0).getFragment().getPlanNodeTree(),
 0);
+    
assertScanNodeLimitValue(plan.getInstances().get(1).getFragment().getPlanNodeTree(),
 0);
+    
assertScanNodeLimitValue(plan.getInstances().get(2).getFragment().getPlanNodeTree(),
 0);
+    
assertScanNodeLimitValue(plan.getInstances().get(3).getFragment().getPlanNodeTree(),
 0);
+  }
+
+  /*
+   * IdentitySinkNode-21
+   *   └──MergeSort-8
+   *       ├──SingleDeviceView-5
+   *       │   └──AggregationNode-9
+   *       │       ├──SeriesAggregationScanNode-10:[SeriesPath: root.sg.d1.s1, 
Descriptor: [AggregationDescriptor(count, PARTIAL)], DataRegion: 
TConsensusGroupId(type:DataRegion, id:1)]
+   *       │       └──ExchangeNode-15: [SourceAddress:192.0.2.1/test.2.0/18]
+   *       ├──ExchangeNode-17: [SourceAddress:192.0.3.1/test.3.0/19]
+   *       └──SingleDeviceView-7
+   *           └──AggregationNode-12
+   *               ├──SeriesAggregationScanNode-13:[SeriesPath: 
root.sg.d333.s1, Descriptor: [AggregationDescriptor(count, PARTIAL)], 
DataRegion: TConsensusGroupId(type:DataRegion, id:1)]
+   *               └──ExchangeNode-16: [SourceAddress:192.0.4.1/test.4.0/20]
+   *
+   * ShuffleSinkNode-18
+   *   └──SeriesAggregationScanNode-11:[SeriesPath: root.sg.d1.s1, Descriptor: 
[AggregationDescriptor(count, PARTIAL)], DataRegion: 
TConsensusGroupId(type:DataRegion, id:2)]
+   *
+   * ShuffleSinkNode-19
+   *   └──SingleDeviceView-6
+   *       └──SeriesAggregationScanNode-2:[SeriesPath: root.sg.d22.s1, 
Descriptor: [AggregationDescriptor(count, SINGLE)], DataRegion: 
TConsensusGroupId(type:DataRegion, id:3)]
+   *
+   * ShuffleSinkNode-20
+   *   └──SeriesAggregationScanNode-14:[SeriesPath: root.sg.d333.s1, 
Descriptor: [AggregationDescriptor(count, PARTIAL)], DataRegion: 
TConsensusGroupId(type:DataRegion, id:4)]
+   */
+  @Test
+  public void orderByTimeTest4() {
+    // aggregation + order by time, no LIMIT
+    // SingleDeviceViewNode + MergeSortNode
+    sql =
+        "select count(s1) from root.sg.d1,root.sg.d22,root.sg.d333 ORDER BY 
TIME DESC align by device";
+    analysis = Util.analyze(sql, context);
+    logicalPlanNode = Util.genLogicalPlan(analysis, context);
+    planner = new DistributionPlanner(analysis, new LogicalQueryPlan(context, 
logicalPlanNode));
+    plan = planner.planFragments();
+    assertEquals(4, plan.getInstances().size());
+    firstFiRoot = plan.getInstances().get(0).getFragment().getPlanNodeTree();
+    firstFiTopNode = firstFiRoot.getChildren().get(0);
+    assertTrue(firstFiTopNode instanceof MergeSortNode);
+    assertEquals(3, firstFiTopNode.getChildren().size());
+    assertTrue(firstFiTopNode.getChildren().get(0) instanceof 
SingleDeviceViewNode);
+    assertTrue(firstFiTopNode.getChildren().get(1) instanceof ExchangeNode);
+    assertTrue(firstFiTopNode.getChildren().get(2) instanceof 
SingleDeviceViewNode);
+
+    // aggregation + order by time + group by time, no LIMIT
+    // SingleDeviceViewNode + MergeSortNode
+    sql =
+        "select count(s1) from root.sg.d1,root.sg.d22,root.sg.d333 group by 
((1,10], 1ms) ORDER BY TIME DESC align by device";
+    analysis = Util.analyze(sql, context);
+    logicalPlanNode = Util.genLogicalPlan(analysis, context);
+    planner = new DistributionPlanner(analysis, new LogicalQueryPlan(context, 
logicalPlanNode));
+    plan = planner.planFragments();
+    assertEquals(4, plan.getInstances().size());
+    firstFiRoot = plan.getInstances().get(0).getFragment().getPlanNodeTree();
+    firstFiTopNode = firstFiRoot.getChildren().get(0);
+    assertTrue(firstFiTopNode instanceof MergeSortNode);
+    assertEquals(3, firstFiTopNode.getChildren().size());
+    assertTrue(firstFiTopNode.getChildren().get(0) instanceof 
SingleDeviceViewNode);
+    assertTrue(firstFiTopNode.getChildren().get(1) instanceof ExchangeNode);
+    assertTrue(firstFiTopNode.getChildren().get(2) instanceof 
SingleDeviceViewNode);
+  }
+
+  /*
+   * IdentitySinkNode-22
+   *   └──TopK-4
+   *       ├──TopK-16
+   *       │   ├──SingleDeviceView-5
+   *       │   │   └──AggregationNode-8
+   *       │   │       ├──SeriesAggregationScanNode-9:[SeriesPath: 
root.sg.d1.s1, Descriptor: [AggregationDescriptor(count, PARTIAL)], DataRegion: 
TConsensusGroupId(type:DataRegion, id:1)]
+   *       │   │       └──ExchangeNode-14: 
[SourceAddress:192.0.2.1/test.2.0/19]
+   *       │   └──SingleDeviceView-7
+   *       │       └──AggregationNode-11
+   *       │           ├──SeriesAggregationScanNode-12:[SeriesPath: 
root.sg.d333.s1, Descriptor: [AggregationDescriptor(count, PARTIAL)], 
DataRegion: TConsensusGroupId(type:DataRegion, id:1)]
+   *       │           └──ExchangeNode-15: 
[SourceAddress:192.0.4.1/test.3.0/20]
+   *       └──ExchangeNode-18: [SourceAddress:192.0.3.1/test.4.0/21]
+   *
+   *  IdentitySinkNode-19
+   *   └──SeriesAggregationScanNode-10:[SeriesPath: root.sg.d1.s1, Descriptor: 
[AggregationDescriptor(count, PARTIAL)], DataRegion: 
TConsensusGroupId(type:DataRegion, id:2)]
+   *
+   *  IdentitySinkNode-20
+   *   └──SeriesAggregationScanNode-13:[SeriesPath: root.sg.d333.s1, 
Descriptor: [AggregationDescriptor(count, PARTIAL)], DataRegion: 
TConsensusGroupId(type:DataRegion, id:4)]
+   *
+   *  IdentitySinkNode-21
+   *   └──TopK-17
+   *       └──SingleDeviceView-6
+   *           └──SeriesAggregationScanNode-2:[SeriesPath: root.sg.d22.s1, 
Descriptor: [AggregationDescriptor(count, SINGLE)], DataRegion: 
TConsensusGroupId(type:DataRegion, id:3)]
+   */
+  @Test
+  public void orderByTimeTest5() {
+    // aggregation + order by time, has LIMIT
+    // SingleDeviceViewNode + TopKNode
+    sql =
+        "select count(s1) from root.sg.d1,root.sg.d22,root.sg.d333 ORDER BY 
TIME DESC LIMIT 10 align by device";
+    analysis = Util.analyze(sql, context);
+    logicalPlanNode = Util.genLogicalPlan(analysis, context);
+    planner = new DistributionPlanner(analysis, new LogicalQueryPlan(context, 
logicalPlanNode));
+    plan = planner.planFragments();
+    assertEquals(4, plan.getInstances().size());
+    firstFiRoot = plan.getInstances().get(0).getFragment().getPlanNodeTree();
+    firstFiTopNode = firstFiRoot.getChildren().get(0);
+    assertTrue(firstFiTopNode instanceof TopKNode);
+    assertEquals(2, firstFiTopNode.getChildren().size());
+    assertTrue(firstFiTopNode.getChildren().get(0) instanceof TopKNode);
+    assertTrue(
+        firstFiTopNode.getChildren().get(0).getChildren().get(0) instanceof 
SingleDeviceViewNode);
+    assertTrue(
+        
firstFiTopNode.getChildren().get(0).getChildren().get(0).getChildren().get(0)
+            instanceof AggregationNode);
+    assertTrue(
+        firstFiTopNode.getChildren().get(0).getChildren().get(1) instanceof 
SingleDeviceViewNode);
+    assertTrue(
+        
firstFiTopNode.getChildren().get(0).getChildren().get(1).getChildren().get(0)
+            instanceof AggregationNode);
+    assertTrue(firstFiTopNode.getChildren().get(1) instanceof ExchangeNode);
+
+    // aggregation + order by time + group by time, has LIMIT
+    // SingleDeviceViewNode + TopKNode
+    sql =
+        "select count(s1) from root.sg.d1,root.sg.d22,root.sg.d333 group by 
((1,10], 1ms) ORDER BY TIME DESC LIMIT 10 align by device";
+    analysis = Util.analyze(sql, context);
+    logicalPlanNode = Util.genLogicalPlan(analysis, context);
+    planner = new DistributionPlanner(analysis, new LogicalQueryPlan(context, 
logicalPlanNode));
+    plan = planner.planFragments();
+    assertEquals(4, plan.getInstances().size());
+    firstFiRoot = plan.getInstances().get(0).getFragment().getPlanNodeTree();
+    firstFiTopNode = firstFiRoot.getChildren().get(0);
+    assertTrue(firstFiTopNode instanceof TopKNode);
+    assertEquals(2, firstFiTopNode.getChildren().size());
+    assertTrue(firstFiTopNode.getChildren().get(0) instanceof TopKNode);
+    assertTrue(
+        firstFiTopNode.getChildren().get(0).getChildren().get(0) instanceof 
SingleDeviceViewNode);
+    assertTrue(
+        
firstFiTopNode.getChildren().get(0).getChildren().get(0).getChildren().get(0)
+            instanceof AggregationNode);
+    assertTrue(
+        firstFiTopNode.getChildren().get(0).getChildren().get(1) instanceof 
SingleDeviceViewNode);
+    assertTrue(
+        
firstFiTopNode.getChildren().get(0).getChildren().get(1).getChildren().get(0)
+            instanceof AggregationNode);
+    assertTrue(firstFiTopNode.getChildren().get(1) instanceof ExchangeNode);
+  }
+
+  @Test
+  public void orderByTimeWithOffsetTest() {
+    // order by time, offset + limit
+    // on top of TOP-K NODE, LIMIT-NODE is necessary
+    sql =
+        String.format(
+            "select * from root.sg.** ORDER BY time DESC OFFSET %s LIMIT %s 
align by device",
+            LIMIT_VALUE, LIMIT_VALUE);
+    analysis = Util.analyze(sql, context);
+    logicalPlanNode = Util.genLogicalPlan(analysis, context);
+    planner = new DistributionPlanner(analysis, new LogicalQueryPlan(context, 
logicalPlanNode));
+    plan = planner.planFragments();
+    assertEquals(4, plan.getInstances().size());
+    firstFiRoot = plan.getInstances().get(0).getFragment().getPlanNodeTree();
+    PlanNode firstFIFirstNode = firstFiRoot.getChildren().get(0);
+    assertTrue(firstFIFirstNode instanceof LimitNode);
+    PlanNode firstFiTopNode = ((LimitNode) 
firstFIFirstNode).getChild().getChildren().get(0);
+    for (PlanNode node : firstFiTopNode.getChildren().get(0).getChildren()) {
+      assertTrue(node instanceof SingleDeviceViewNode);
+    }
+    assertTrue(firstFiTopNode.getChildren().get(1) instanceof ExchangeNode);
+    assertTrue(firstFiTopNode.getChildren().get(2) instanceof ExchangeNode);
+    assertTrue(firstFiTopNode.getChildren().get(3) instanceof ExchangeNode);
+    assertScanNodeLimitValue(
+        plan.getInstances().get(0).getFragment().getPlanNodeTree(), 
LIMIT_VALUE * 2);
+    assertScanNodeLimitValue(
+        plan.getInstances().get(1).getFragment().getPlanNodeTree(), 
LIMIT_VALUE * 2);
+    assertScanNodeLimitValue(
+        plan.getInstances().get(2).getFragment().getPlanNodeTree(), 
LIMIT_VALUE * 2);
+    assertScanNodeLimitValue(
+        plan.getInstances().get(3).getFragment().getPlanNodeTree(), 
LIMIT_VALUE * 2);
+  }
+
+  /*
+   * IdentitySinkNode-40
+   *   └──TransformNode-13
+   *       └──SortNode-12
+   *           └──MergeSort-14
+   *               ├──DeviceView-21
+   *               │   ├──FullOuterTimeJoinNode-17
+   *               │   │   ├──SeriesScanNode-15:[SeriesPath: root.sg.d1.s1, 
DataRegion: TConsensusGroupId(type:DataRegion, id:1)]
+   *               │   │   └──SeriesScanNode-16:[SeriesPath: root.sg.d1.s2, 
DataRegion: TConsensusGroupId(type:DataRegion, id:1)]
+   *               │   └──FullOuterTimeJoinNode-20
+   *               │       ├──SeriesScanNode-18:[SeriesPath: root.sg.d333.s1, 
DataRegion: TConsensusGroupId(type:DataRegion, id:1)]
+   *               │       └──SeriesScanNode-19:[SeriesPath: root.sg.d333.s2, 
DataRegion: TConsensusGroupId(type:DataRegion, id:1)]
+   *               ├──ExchangeNode-34: [SourceAddress:192.0.3.1/test.2.0/37]
+   *               ├──ExchangeNode-35: [SourceAddress:192.0.2.1/test.3.0/38]
+   *               └──ExchangeNode-36: [SourceAddress:192.0.4.1/test.4.0/39]
+   *
+   * IdentitySinkNode-37
+   *   └──DeviceView-25
+   *       └──FullOuterTimeJoinNode-24
+   *           ├──SeriesScanNode-22:[SeriesPath: root.sg.d22.s1, DataRegion: 
TConsensusGroupId(type:DataRegion, id:3)]
+   *           └──SeriesScanNode-23:[SeriesPath: root.sg.d22.s2, DataRegion: 
TConsensusGroupId(type:DataRegion, id:3)]
+   *
+   * IdentitySinkNode-38
+   *   └──DeviceView-29
+   *       └──FullOuterTimeJoinNode-28
+   *           ├──SeriesScanNode-26:[SeriesPath: root.sg.d1.s1, DataRegion: 
TConsensusGroupId(type:DataRegion, id:2)]
+   *           └──SeriesScanNode-27:[SeriesPath: root.sg.d1.s2, DataRegion: 
TConsensusGroupId(type:DataRegion, id:2)]
+   *
+   * IdentitySinkNode-39
+   *   └──DeviceView-33
+   *       └──FullOuterTimeJoinNode-32
+   *           ├──SeriesScanNode-30:[SeriesPath: root.sg.d333.s1, DataRegion: 
TConsensusGroupId(type:DataRegion, id:4)]
+   *           └──SeriesScanNode-31:[SeriesPath: root.sg.d333.s2, DataRegion: 
TConsensusGroupId(type:DataRegion, id:4)]
+   */
+  @Test
+  public void orderByExpressionTest1() {
+    // only order by expression, no LIMIT
+    // use MergeSortNode + SortNode + TransformNode
+    sql = "select s1 from root.sg.d1,root.sg.d22,root.sg.d333 ORDER BY s2 DESC 
align by device";
+    analysis = Util.analyze(sql, context);
+    logicalPlanNode = Util.genLogicalPlan(analysis, context);
+    planner = new DistributionPlanner(analysis, new LogicalQueryPlan(context, 
logicalPlanNode));
+    plan = planner.planFragments();
+    assertEquals(4, plan.getInstances().size());
+    firstFiRoot = plan.getInstances().get(0).getFragment().getPlanNodeTree();
+    firstFiTopNode = firstFiRoot.getChildren().get(0);
+    assertTrue(firstFiTopNode instanceof TransformNode);
+    assertTrue(firstFiTopNode.getChildren().get(0) instanceof SortNode);
+    assertTrue(firstFiTopNode.getChildren().get(0).getChildren().get(0) 
instanceof MergeSortNode);
+    MergeSortNode mergeSortNode =
+        (MergeSortNode) 
firstFiTopNode.getChildren().get(0).getChildren().get(0);
+    assertTrue(mergeSortNode.getChildren().get(0) instanceof DeviceViewNode);
+    assertTrue(mergeSortNode.getChildren().get(1) instanceof ExchangeNode);
+    assertTrue(mergeSortNode.getChildren().get(2) instanceof ExchangeNode);
+    assertTrue(mergeSortNode.getChildren().get(3) instanceof ExchangeNode);
+    for (int i = 0; i < 4; i++) {
+      
assertScanNodeLimitValue(plan.getInstances().get(i).getFragment().getPlanNodeTree(),
 0);
+    }
+  }
+
+  /*
+   * IdentitySinkNode-41
+   *   └──TopK-10
+   *       ├──TopK-31
+   *       │   └──DeviceView-18
+   *       │       ├──FullOuterTimeJoinNode-14
+   *       │       │   ├──SeriesScanNode-12:[SeriesPath: root.sg.d1.s1, 
DataRegion: TConsensusGroupId(type:DataRegion, id:1)]
+   *       │       │   └──SeriesScanNode-13:[SeriesPath: root.sg.d1.s2, 
DataRegion: TConsensusGroupId(type:DataRegion, id:1)]
+   *       │       └──FullOuterTimeJoinNode-17
+   *       │           ├──SeriesScanNode-15:[SeriesPath: root.sg.d333.s1, 
DataRegion: TConsensusGroupId(type:DataRegion, id:1)]
+   *       │           └──SeriesScanNode-16:[SeriesPath: root.sg.d333.s2, 
DataRegion: TConsensusGroupId(type:DataRegion, id:1)]
+   *       ├──ExchangeNode-35: [SourceAddress:192.0.3.1/test.2.0/38]
+   *       ├──ExchangeNode-36: [SourceAddress:192.0.2.1/test.3.0/39]
+   *       └──ExchangeNode-37: [SourceAddress:192.0.4.1/test.4.0/40]
+   *
+   * IdentitySinkNode-38
+   *   └──TopK-32
+   *       └──DeviceView-22
+   *           └──FullOuterTimeJoinNode-21
+   *               ├──SeriesScanNode-19:[SeriesPath: root.sg.d22.s1, 
DataRegion: TConsensusGroupId(type:DataRegion, id:3)]
+   *               └──SeriesScanNode-20:[SeriesPath: root.sg.d22.s2, 
DataRegion: TConsensusGroupId(type:DataRegion, id:3)]
+   *
+   * IdentitySinkNode-39
+   *   └──TopK-33
+   *       └──DeviceView-26
+   *           └──FullOuterTimeJoinNode-25
+   *               ├──SeriesScanNode-23:[SeriesPath: root.sg.d1.s1, 
DataRegion: TConsensusGroupId(type:DataRegion, id:2)]
+   *               └──SeriesScanNode-24:[SeriesPath: root.sg.d1.s2, 
DataRegion: TConsensusGroupId(type:DataRegion, id:2)]
+   *
+   * IdentitySinkNode-40
+   *   └──TopK-34
+   *       └──DeviceView-30
+   *           └──FullOuterTimeJoinNode-29
+   *               ├──SeriesScanNode-27:[SeriesPath: root.sg.d333.s1, 
DataRegion: TConsensusGroupId(type:DataRegion, id:4)]
+   *               └──SeriesScanNode-28:[SeriesPath: root.sg.d333.s2, 
DataRegion: TConsensusGroupId(type:DataRegion, id:4)]
+   */
+  @Test
+  public void orderByExpressionTest2() {
+    // only order by expression, has LIMIT
+    // use TopKNode
+    sql =
+        "select s1 from root.sg.d1,root.sg.d22,root.sg.d333 ORDER BY s2 DESC 
LIMIT 10 align by device";
+    analysis = Util.analyze(sql, context);
+    logicalPlanNode = Util.genLogicalPlan(analysis, context);
+    planner = new DistributionPlanner(analysis, new LogicalQueryPlan(context, 
logicalPlanNode));
+    plan = planner.planFragments();
+    assertEquals(4, plan.getInstances().size());
+    firstFiRoot = plan.getInstances().get(0).getFragment().getPlanNodeTree();
+    firstFiTopNode = firstFiRoot.getChildren().get(0);
+    assertTrue(firstFiTopNode instanceof TopKNode);
+    for (PlanNode node : firstFiTopNode.getChildren().get(0).getChildren()) {
+      assertTrue(node instanceof DeviceViewNode);
+      assertTrue(node.getChildren().get(0) instanceof FullOuterTimeJoinNode);
+    }
+    assertTrue(firstFiTopNode.getChildren().get(1) instanceof ExchangeNode);
+    assertTrue(firstFiTopNode.getChildren().get(2) instanceof ExchangeNode);
+    assertTrue(firstFiTopNode.getChildren().get(3) instanceof ExchangeNode);
+    for (int i = 0; i < 4; i++) {
+      
assertScanNodeLimitValue(plan.getInstances().get(i).getFragment().getPlanNodeTree(),
 0);
+    }
+  }
+
+  /*
+   * IdentitySinkNode-37
+   *   └──TransformNode-13
+   *       └──SortNode-12
+   *           └──MergeSort-14
+   *               └──DeviceView-15
+   *                   ├──AggregationNode-20
+   *                   │   ├──SeriesAggregationScanNode-16:[SeriesPath: 
root.sg.d1.s2, Descriptor: [AggregationDescriptor(count, PARTIAL)], DataRegion: 
TConsensusGroupId(type:DataRegion, id:1)]
+   *                   │   ├──SeriesAggregationScanNode-18:[SeriesPath: 
root.sg.d1.s1, Descriptor: [AggregationDescriptor(count, PARTIAL)], DataRegion: 
TConsensusGroupId(type:DataRegion, id:1)]
+   *                   │   └──ExchangeNode-31: 
[SourceAddress:192.0.2.1/test.2.0/34]
+   *                   ├──ExchangeNode-33: 
[SourceAddress:192.0.3.1/test.3.0/35]
+   *                   └──AggregationNode-29
+   *                       ├──SeriesAggregationScanNode-25:[SeriesPath: 
root.sg.d333.s2, Descriptor: [AggregationDescriptor(count, PARTIAL)], 
DataRegion: TConsensusGroupId(type:DataRegion, id:1)]
+   *                       ├──SeriesAggregationScanNode-27:[SeriesPath: 
root.sg.d333.s1, Descriptor: [AggregationDescriptor(count, PARTIAL)], 
DataRegion: TConsensusGroupId(type:DataRegion, id:1)]
+   *                       └──ExchangeNode-32: 
[SourceAddress:192.0.4.1/test.4.0/36]
+   *
+   * IdentitySinkNode-34
+   *   └──HorizontallyConcatNode-21
+   *       ├──SeriesAggregationScanNode-17:[SeriesPath: root.sg.d1.s2, 
Descriptor: [AggregationDescriptor(count, PARTIAL)], DataRegion: 
TConsensusGroupId(type:DataRegion, id:2)]
+   *       └──SeriesAggregationScanNode-19:[SeriesPath: root.sg.d1.s1, 
Descriptor: [AggregationDescriptor(count, PARTIAL)], DataRegion: 
TConsensusGroupId(type:DataRegion, id:2)]
+   *
+   * IdentitySinkNode-35
+   *   └──HorizontallyConcatNode-24
+   *       ├──SeriesAggregationScanNode-22:[SeriesPath: root.sg.d22.s1, 
Descriptor: [AggregationDescriptor(count, SINGLE)], DataRegion: 
TConsensusGroupId(type:DataRegion, id:3)]
+   *       └──SeriesAggregationScanNode-23:[SeriesPath: root.sg.d22.s2, 
Descriptor: [AggregationDescriptor(count, SINGLE)], DataRegion: 
TConsensusGroupId(type:DataRegion, id:3)]
+   *
+   * IdentitySinkNode-36
+   *   └──HorizontallyConcatNode-30
+   *       ├──SeriesAggregationScanNode-26:[SeriesPath: root.sg.d333.s2, 
Descriptor: [AggregationDescriptor(count, PARTIAL)], DataRegion: 
TConsensusGroupId(type:DataRegion, id:4)]
+   *       └──SeriesAggregationScanNode-28:[SeriesPath: root.sg.d333.s1, 
Descriptor: [AggregationDescriptor(count, PARTIAL)], DataRegion: 
TConsensusGroupId(type:DataRegion, id:4)]
+   */
+  @Test
+  public void orderByExpressionTest3() {
+    // aggregation, order by expression, no LIMIT
+    // use MergeSortNode
+    sql =
+        "select count(s1) from root.sg.d1,root.sg.d22,root.sg.d333 ORDER BY 
count(s2) DESC align by device";
+    analysis = Util.analyze(sql, context);
+    logicalPlanNode = Util.genLogicalPlan(analysis, context);
+    planner = new DistributionPlanner(analysis, new LogicalQueryPlan(context, 
logicalPlanNode));
+    plan = planner.planFragments();
+    assertEquals(4, plan.getInstances().size());
+    firstFiRoot = plan.getInstances().get(0).getFragment().getPlanNodeTree();
+    firstFiTopNode = firstFiRoot.getChildren().get(0);
+    assertTrue(firstFiTopNode instanceof TransformNode);
+    assertTrue(firstFiTopNode.getChildren().get(0) instanceof SortNode);
+    assertTrue(firstFiTopNode.getChildren().get(0).getChildren().get(0) 
instanceof MergeSortNode);
+    MergeSortNode mergeSortNode =
+        (MergeSortNode) 
firstFiTopNode.getChildren().get(0).getChildren().get(0);
+    assertTrue(mergeSortNode.getChildren().get(0) instanceof DeviceViewNode);
+    assertTrue(mergeSortNode.getChildren().get(0).getChildren().get(0) 
instanceof AggregationNode);
+    assertTrue(mergeSortNode.getChildren().get(0).getChildren().get(1) 
instanceof ExchangeNode);
+    assertTrue(mergeSortNode.getChildren().get(0).getChildren().get(2) 
instanceof AggregationNode);
+    for (int i = 0; i < 4; i++) {
+      
assertScanNodeLimitValue(plan.getInstances().get(i).getFragment().getPlanNodeTree(),
 0);
+    }
+  }
+
+  @Test
+  public void orderByExpressionTest4() {
+    // aggregation, order by expression, has LIMIT
+    // use TopKNode
+    sql =
+        "select count(s1) from root.sg.d1,root.sg.d22,root.sg.d333 ORDER BY 
count(s2) DESC LIMIT 10 align by device";
+    analysis = Util.analyze(sql, context);
+    logicalPlanNode = Util.genLogicalPlan(analysis, context);
+    planner = new DistributionPlanner(analysis, new LogicalQueryPlan(context, 
logicalPlanNode));
+    plan = planner.planFragments();
+    assertEquals(4, plan.getInstances().size());
+    firstFiRoot = plan.getInstances().get(0).getFragment().getPlanNodeTree();
+    firstFiTopNode = firstFiRoot.getChildren().get(0);
+    assertTrue(firstFiTopNode instanceof TopKNode);
+    for (PlanNode node : firstFiTopNode.getChildren().get(0).getChildren()) {
+      assertTrue(node instanceof DeviceViewNode);
+      assertTrue(node.getChildren().get(0) instanceof FullOuterTimeJoinNode);
+    }
+    assertTrue(firstFiTopNode.getChildren().get(1) instanceof ExchangeNode);
+    assertTrue(firstFiTopNode.getChildren().get(2) instanceof ExchangeNode);
+    assertTrue(firstFiTopNode.getChildren().get(3) instanceof ExchangeNode);
+    
assertScanNodeLimitValue(plan.getInstances().get(0).getFragment().getPlanNodeTree(),
 0);
+    
assertScanNodeLimitValue(plan.getInstances().get(1).getFragment().getPlanNodeTree(),
 0);
+    
assertScanNodeLimitValue(plan.getInstances().get(2).getFragment().getPlanNodeTree(),
 0);
+    
assertScanNodeLimitValue(plan.getInstances().get(3).getFragment().getPlanNodeTree(),
 0);
+  }
+
+  private void assertScanNodeLimitValue(PlanNode root, long limitValue) {
+    for (PlanNode node : root.getChildren()) {
+      if (node instanceof SeriesScanNode) {
+        assertEquals(limitValue, ((SeriesScanNode) node).getPushDownLimit());
+      } else if (node instanceof AlignedSeriesScanNode) {
+        assertEquals(limitValue, ((AlignedSeriesScanNode) 
node).getPushDownLimit());
+      } else {
+        assertScanNodeLimitValue(node, limitValue);
+      }
+    }
+  }
+}
diff --git 
a/iotdb-core/datanode/src/test/resources/datanode1conf/iotdb-common.properties 
b/iotdb-core/datanode/src/test/resources/datanode1conf/iotdb-common.properties
index b89ef13778b..23fe0146afc 100644
--- 
a/iotdb-core/datanode/src/test/resources/datanode1conf/iotdb-common.properties
+++ 
b/iotdb-core/datanode/src/test/resources/datanode1conf/iotdb-common.properties
@@ -20,4 +20,5 @@
 timestamp_precision=ms
 udf_lib_dir=target/datanode1/ext/udf
 trigger_lib_dir=target/datanode1/ext/trigger
-pipe_lib_dir=target/datanode1/ext/pipe
\ No newline at end of file
+pipe_lib_dir=target/datanode1/ext/pipe
+data_replication_factor=2
\ No newline at end of file
diff --git 
a/iotdb-core/datanode/src/test/resources/datanode2conf/iotdb-common.properties 
b/iotdb-core/datanode/src/test/resources/datanode2conf/iotdb-common.properties
index 9cf060d61fd..4587be02dd3 100644
--- 
a/iotdb-core/datanode/src/test/resources/datanode2conf/iotdb-common.properties
+++ 
b/iotdb-core/datanode/src/test/resources/datanode2conf/iotdb-common.properties
@@ -19,4 +19,5 @@
 timestamp_precision=ms
 udf_lib_dir=target/datanode2/ext/udf
 trigger_lib_dir=target/datanode2/ext/trigger
-pipe_lib_dir=target/datanode2/ext/pipe
\ No newline at end of file
+pipe_lib_dir=target/datanode2/ext/pipe
+data_replication_factor=2
\ No newline at end of file

Reply via email to