This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new f5b88fb0e93 [IOTDB-6297] Optimize the distribute plan in the situation 
of `aggregation with align by device`
f5b88fb0e93 is described below

commit f5b88fb0e932c01ff601a26c829be7429efb1f38
Author: Beyyes <[email protected]>
AuthorDate: Thu Feb 22 14:31:22 2024 +0800

    [IOTDB-6297] Optimize the distribute plan in the situation of `aggregation 
with align by device`
---
 .../db/queryengine/plan/analyze/Analysis.java      |  11 +
 .../plan/planner/LogicalPlanBuilder.java           |   1 -
 .../planner/distribution/DistributionPlanner.java  |   2 +-
 .../planner/distribution/ExchangeNodeAdder.java    |  62 +--
 .../planner/distribution/NodeDistribution.java     |  24 +-
 .../plan/planner/distribution/SourceRewriter.java  | 328 +++++++++------
 ...anNodeRewriter.java => BaseSourceRewriter.java} |   2 +-
 .../plan/planner/plan/node/PlanGraphPrinter.java   |   9 +
 .../plan/planner/plan/node/PlanNode.java           |   2 +-
 .../plan/planner/plan/node/PlanNodeType.java       |   7 +-
 .../plan/planner/plan/node/PlanVisitor.java        |   5 +
 .../node/process/AggregationMergeSortNode.java     | 143 +++++++
 .../operator/AggregationOperatorTest.java          |  17 +-
 .../distribution/AggregationAlignByDeviceTest.java | 451 +++++++++++++++++++++
 .../distribution/AggregationDistributionTest.java  |  18 +-
 15 files changed, 902 insertions(+), 180 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java
index 4e9c5ec6b15..3014cdec431 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java
@@ -171,8 +171,11 @@ public class Analysis {
 
   // indicates whether DeviceView need special process when rewriteSource in 
DistributionPlan,
   // you can see SourceRewriter#visitDeviceView to get more information
+  // deviceViewSpecialProcess equals true when all Aggregation Functions and 
DIFF
   private boolean deviceViewSpecialProcess;
 
+  private boolean existDeviceCrossRegion;
+
   
/////////////////////////////////////////////////////////////////////////////////////////////////
   // Query Common Analysis (above DeviceView)
   
/////////////////////////////////////////////////////////////////////////////////////////////////
@@ -655,6 +658,14 @@ public class Analysis {
     this.deviceViewSpecialProcess = deviceViewSpecialProcess;
   }
 
+  public boolean isExistDeviceCrossRegion() {
+    return existDeviceCrossRegion;
+  }
+
+  public void setExistDeviceCrossRegion() {
+    this.existDeviceCrossRegion = true;
+  }
+
   public DeviceViewIntoPathDescriptor getDeviceViewIntoPathDescriptor() {
     return deviceViewIntoPathDescriptor;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java
index f72f28b3d93..0aebfcab1c7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java
@@ -855,7 +855,6 @@ public class LogicalPlanBuilder {
           -1);
       this.root = mergeSortNode;
     } else {
-      // order by based on device, use DeviceViewNode
       this.root =
           addDeviceViewNode(
               orderByParameter,
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/DistributionPlanner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/DistributionPlanner.java
index aa7d14608c7..755119b2607 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/DistributionPlanner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/DistributionPlanner.java
@@ -144,7 +144,7 @@ public class DistributionPlanner {
       if (child instanceof ExchangeNode) {
         ExchangeNode exchangeNode = (ExchangeNode) child;
         TRegionReplicaSet regionOfChild =
-            
context.getNodeDistribution(exchangeNode.getChild().getPlanNodeId()).region;
+            
context.getNodeDistribution(exchangeNode.getChild().getPlanNodeId()).getRegion();
         MultiChildrenSinkNode newChild =
             memo.computeIfAbsent(
                 regionOfChild,
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/ExchangeNodeAdder.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/ExchangeNodeAdder.java
index 06043bd2e41..a8025719967 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/ExchangeNodeAdder.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/ExchangeNodeAdder.java
@@ -32,6 +32,7 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.Sche
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.SchemaQueryMergeNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.SchemaQueryOrderByHeatNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.SchemaQueryScanNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.AggregationMergeSortNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.AggregationNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.DeviceMergeNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.DeviceViewNode;
@@ -115,13 +116,14 @@ public class ExchangeNodeAdder extends 
PlanVisitor<PlanNode, NodeGroupContext> {
     NodeDistribution nodeDistribution =
         new NodeDistribution(NodeDistributionType.DIFFERENT_FROM_ALL_CHILDREN);
     PlanNode newNode = node.clone();
-    nodeDistribution.region = 
calculateSchemaRegionByChildren(node.getChildren(), context);
+    
nodeDistribution.setRegion(calculateSchemaRegionByChildren(node.getChildren(), 
context));
     context.putNodeDistribution(newNode.getPlanNodeId(), nodeDistribution);
     node.getChildren()
         .forEach(
             child -> {
-              if (!nodeDistribution.region.equals(
-                  context.getNodeDistribution(child.getPlanNodeId()).region)) {
+              if (!nodeDistribution
+                  .getRegion()
+                  
.equals(context.getNodeDistribution(child.getPlanNodeId()).getRegion())) {
                 ExchangeNode exchangeNode =
                     new 
ExchangeNode(context.queryContext.getQueryId().genPlanNodeId());
                 exchangeNode.setChild(child);
@@ -200,6 +202,12 @@ public class ExchangeNodeAdder extends 
PlanVisitor<PlanNode, NodeGroupContext> {
     return processMultiChildNode(node, context);
   }
 
+  @Override
+  public PlanNode visitAggregationMergeSort(
+      AggregationMergeSortNode node, NodeGroupContext context) {
+    return processMultiChildNode(node, context);
+  }
+
   @Override
   public PlanNode visitDeviceMerge(DeviceMergeNode node, NodeGroupContext 
context) {
     return processMultiChildNode(node, context);
@@ -265,7 +273,7 @@ public class ExchangeNodeAdder extends 
PlanVisitor<PlanNode, NodeGroupContext> {
       // else we set the selected mostlyUsedDataRegion to this node
       dataRegion =
           isChildrenDistributionSame
-              ? context.getNodeDistribution(leftChild.getPlanNodeId()).region
+              ? 
context.getNodeDistribution(leftChild.getPlanNodeId()).getRegion()
               : context.getMostlyUsedDataRegion();
       context.putNodeDistribution(
           newNode.getPlanNodeId(), new NodeDistribution(distributionType, 
dataRegion));
@@ -284,7 +292,7 @@ public class ExchangeNodeAdder extends 
PlanVisitor<PlanNode, NodeGroupContext> {
 
     // Otherwise, we need to add ExchangeNode for the child whose DataRegion 
is different from the
     // parent.
-    if 
(!dataRegion.equals(context.getNodeDistribution(leftChild.getPlanNodeId()).region))
 {
+    if 
(!dataRegion.equals(context.getNodeDistribution(leftChild.getPlanNodeId()).getRegion()))
 {
       if (leftChild instanceof SingleDeviceViewNode) {
         ((SingleDeviceViewNode) leftChild).setCacheOutputColumnNames(true);
       }
@@ -298,7 +306,7 @@ public class ExchangeNodeAdder extends 
PlanVisitor<PlanNode, NodeGroupContext> {
       newNode.setLeftChild(leftChild);
     }
 
-    if 
(!dataRegion.equals(context.getNodeDistribution(rightChild.getPlanNodeId()).region))
 {
+    if 
(!dataRegion.equals(context.getNodeDistribution(rightChild.getPlanNodeId()).getRegion()))
 {
       if (rightChild instanceof SingleDeviceViewNode) {
         ((SingleDeviceViewNode) rightChild).setCacheOutputColumnNames(true);
       }
@@ -377,7 +385,7 @@ public class ExchangeNodeAdder extends 
PlanVisitor<PlanNode, NodeGroupContext> {
             .map(child -> visit(child, context))
             .collect(Collectors.toList());
 
-    // DataRegion which node locates
+    // DataRegion in which node locates
     TRegionReplicaSet dataRegion;
     boolean isChildrenDistributionSame = 
nodeDistributionIsSame(visitedChildren, context);
     NodeDistributionType distributionType =
@@ -390,7 +398,7 @@ public class ExchangeNodeAdder extends 
PlanVisitor<PlanNode, NodeGroupContext> {
       // else we set the selected mostlyUsedDataRegion to this node
       dataRegion =
           isChildrenDistributionSame
-              ? 
context.getNodeDistribution(visitedChildren.get(0).getPlanNodeId()).region
+              ? 
context.getNodeDistribution(visitedChildren.get(0).getPlanNodeId()).getRegion()
               : context.getMostlyUsedDataRegion();
       context.putNodeDistribution(
           newNode.getPlanNodeId(), new NodeDistribution(distributionType, 
dataRegion));
@@ -409,21 +417,17 @@ public class ExchangeNodeAdder extends 
PlanVisitor<PlanNode, NodeGroupContext> {
     // optimize `order by time|expression limit N align by device` query,
     // to ensure that the number of ExchangeNode equals to DataRegionNum but 
not equals to DeviceNum
     if (node instanceof TopKNode) {
-      return processTopNode(node, visitedChildren, context, newNode, 
dataRegion);
+      return processTopKNode(node, visitedChildren, context, newNode, 
dataRegion);
     }
 
     // Otherwise, we need to add ExchangeNode for the child whose DataRegion 
is different from the
     // parent.
     for (PlanNode child : visitedChildren) {
-      if 
(!dataRegion.equals(context.getNodeDistribution(child.getPlanNodeId()).region)) 
{
+      if 
(!dataRegion.equals(context.getNodeDistribution(child.getPlanNodeId()).getRegion()))
 {
         if (child instanceof SingleDeviceViewNode) {
           ((SingleDeviceViewNode) child).setCacheOutputColumnNames(true);
         }
-        ExchangeNode exchangeNode =
-            new 
ExchangeNode(context.queryContext.getQueryId().genPlanNodeId());
-        exchangeNode.setChild(child);
-        exchangeNode.setOutputColumnNames(child.getOutputColumnNames());
-        context.hasExchangeNode = true;
+        ExchangeNode exchangeNode = genExchangeNode(context, child);
         newNode.addChild(exchangeNode);
       } else {
         newNode.addChild(child);
@@ -450,7 +454,7 @@ public class ExchangeNodeAdder extends 
PlanVisitor<PlanNode, NodeGroupContext> {
     return newNode;
   }
 
-  private PlanNode processTopNode(
+  private PlanNode processTopKNode(
       MultiChildProcessNode node,
       List<PlanNode> visitedChildren,
       NodeGroupContext context,
@@ -459,7 +463,7 @@ public class ExchangeNodeAdder extends 
PlanVisitor<PlanNode, NodeGroupContext> {
     TopKNode rootNode = (TopKNode) node;
     Map<TRegionReplicaSet, TopKNode> regionTopKNodeMap = new HashMap<>();
     for (PlanNode child : visitedChildren) {
-      TRegionReplicaSet region = 
context.getNodeDistribution(child.getPlanNodeId()).region;
+      TRegionReplicaSet region = 
context.getNodeDistribution(child.getPlanNodeId()).getRegion();
       regionTopKNodeMap
           .computeIfAbsent(
               region,
@@ -483,11 +487,7 @@ public class ExchangeNodeAdder extends 
PlanVisitor<PlanNode, NodeGroupContext> {
       TopKNode topKNode = entry.getValue();
 
       if (!dataRegion.equals(topKNodeLocatedRegion)) {
-        ExchangeNode exchangeNode =
-            new 
ExchangeNode(context.queryContext.getQueryId().genPlanNodeId());
-        exchangeNode.setChild(topKNode);
-        exchangeNode.setOutputColumnNames(topKNode.getOutputColumnNames());
-        context.hasExchangeNode = true;
+        ExchangeNode exchangeNode = genExchangeNode(context, topKNode);
         newNode.addChild(exchangeNode);
       } else {
         newNode.addChild(topKNode);
@@ -496,6 +496,14 @@ public class ExchangeNodeAdder extends 
PlanVisitor<PlanNode, NodeGroupContext> {
     return newNode;
   }
 
+  private ExchangeNode genExchangeNode(NodeGroupContext context, PlanNode 
child) {
+    ExchangeNode exchangeNode = new 
ExchangeNode(context.queryContext.getQueryId().genPlanNodeId());
+    exchangeNode.setChild(child);
+    exchangeNode.setOutputColumnNames(child.getOutputColumnNames());
+    context.hasExchangeNode = true;
+    return exchangeNode;
+  }
+
   @Override
   public PlanNode visitSlidingWindowAggregation(
       SlidingWindowAggregationNode node, NodeGroupContext context) {
@@ -506,7 +514,7 @@ public class ExchangeNodeAdder extends 
PlanVisitor<PlanNode, NodeGroupContext> {
     PlanNode newNode = node.clone();
     PlanNode child = visit(node.getChildren().get(0), context);
     newNode.addChild(child);
-    TRegionReplicaSet dataRegion = 
context.getNodeDistribution(child.getPlanNodeId()).region;
+    TRegionReplicaSet dataRegion = 
context.getNodeDistribution(child.getPlanNodeId()).getRegion();
     context.putNodeDistribution(
         newNode.getPlanNodeId(),
         new NodeDistribution(NodeDistributionType.SAME_WITH_ALL_CHILDREN, 
dataRegion));
@@ -523,9 +531,9 @@ public class ExchangeNodeAdder extends 
PlanVisitor<PlanNode, NodeGroupContext> {
                 Collectors.groupingBy(
                     child -> {
                       TRegionReplicaSet region =
-                          
context.getNodeDistribution(child.getPlanNodeId()).region;
+                          
context.getNodeDistribution(child.getPlanNodeId()).getRegion();
                       if (region == null
-                          && 
context.getNodeDistribution(child.getPlanNodeId()).type
+                          && 
context.getNodeDistribution(child.getPlanNodeId()).getType()
                               == NodeDistributionType.SAME_WITH_ALL_CHILDREN) {
                         return 
calculateSchemaRegionByChildren(child.getChildren(), context);
                       }
@@ -571,7 +579,7 @@ public class ExchangeNodeAdder extends 
PlanVisitor<PlanNode, NodeGroupContext> {
   private TRegionReplicaSet calculateSchemaRegionByChildren(
       List<PlanNode> children, NodeGroupContext context) {
     // We always make the schemaRegion of MetaMergeNode to be the same as its 
first child.
-    return context.getNodeDistribution(children.get(0).getPlanNodeId()).region;
+    return 
context.getNodeDistribution(children.get(0).getPlanNodeId()).getRegion();
   }
 
   private boolean nodeDistributionIsSame(List<PlanNode> children, 
NodeGroupContext context) {
@@ -579,7 +587,7 @@ public class ExchangeNodeAdder extends 
PlanVisitor<PlanNode, NodeGroupContext> {
     NodeDistribution first = 
context.getNodeDistribution(children.get(0).getPlanNodeId());
     for (int i = 1; i < children.size(); i++) {
       NodeDistribution next = 
context.getNodeDistribution(children.get(i).getPlanNodeId());
-      if (first.region == null || !first.region.equals(next.region)) {
+      if (first.getRegion() == null || 
!first.getRegion().equals(next.getRegion())) {
         return false;
       }
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/NodeDistribution.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/NodeDistribution.java
index 4b2ca213c6d..2c793c2c39e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/NodeDistribution.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/NodeDistribution.java
@@ -22,15 +22,31 @@ package 
org.apache.iotdb.db.queryengine.plan.planner.distribution;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 
 public class NodeDistribution {
-  protected NodeDistributionType type;
-  protected TRegionReplicaSet region;
+  private NodeDistributionType type;
+  private TRegionReplicaSet region;
 
-  protected NodeDistribution(NodeDistributionType type, TRegionReplicaSet 
region) {
+  public NodeDistribution(NodeDistributionType type, TRegionReplicaSet region) 
{
     this.type = type;
     this.region = region;
   }
 
-  protected NodeDistribution(NodeDistributionType type) {
+  public NodeDistribution(NodeDistributionType type) {
     this.type = type;
   }
+
+  public NodeDistributionType getType() {
+    return this.type;
+  }
+
+  public void setType(NodeDistributionType type) {
+    this.type = type;
+  }
+
+  public TRegionReplicaSet getRegion() {
+    return this.region;
+  }
+
+  public void setRegion(TRegionReplicaSet region) {
+    this.region = region;
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java
index 7b5a82062f9..53814483e4c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java
@@ -30,14 +30,15 @@ import 
org.apache.iotdb.db.queryengine.common.MPPQueryContext;
 import org.apache.iotdb.db.queryengine.plan.analyze.Analysis;
 import org.apache.iotdb.db.queryengine.plan.expression.Expression;
 import org.apache.iotdb.db.queryengine.plan.planner.LogicalPlanBuilder;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.BaseSourceRewriter;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
-import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.SimplePlanNodeRewriter;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.CountSchemaMergeNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.SchemaFetchMergeNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.SchemaFetchScanNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.SchemaQueryMergeNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.SchemaQueryScanNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.AggregationMergeSortNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.AggregationNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.DeviceViewNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.GroupByLevelNode;
@@ -49,6 +50,7 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.MultiChild
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SingleDeviceViewNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SlidingWindowAggregationNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SortNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TransformNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.join.FullOuterTimeJoinNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.join.InnerTimeJoinNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.last.LastQueryCollectNode;
@@ -85,7 +87,7 @@ import java.util.stream.Collectors;
 import static 
org.apache.iotdb.commons.conf.IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD;
 import static org.apache.iotdb.commons.partition.DataPartition.NOT_ASSIGNED;
 
-public class SourceRewriter extends 
SimplePlanNodeRewriter<DistributionPlanContext> {
+public class SourceRewriter extends 
BaseSourceRewriter<DistributionPlanContext> {
 
   private final Analysis analysis;
 
@@ -170,37 +172,66 @@ public class SourceRewriter extends 
SimplePlanNodeRewriter<DistributionPlanConte
           "size of devices and its children in DeviceViewNode should be same");
     }
 
-    // If the DeviceView is mixed with Function that need to merge data from 
different Data Region,
-    // it should be processed by a special logic.
-    // Now the Functions are : all Aggregation Functions and DIFF
-    if (analysis.isDeviceViewSpecialProcess()) {
-      return processSpecialDeviceView(node, context);
-    }
-
+    // Step 1: constructs DeviceViewSplits
     Set<TRegionReplicaSet> relatedDataRegions = new HashSet<>();
-
     List<DeviceViewSplit> deviceViewSplits = new ArrayList<>();
 
-    // Step 1: constructs DeviceViewSplit
-    Map<String, String> outputDeviceToQueriedDevicesMap =
-        analysis.getOutputDeviceToQueriedDevicesMap();
     for (int i = 0; i < node.getDevices().size(); i++) {
       String outputDevice = node.getDevices().get(i);
       PlanNode child = node.getChildren().get(i);
       List<TRegionReplicaSet> regionReplicaSets =
-          !analysis.useLogicalView()
+          analysis.useLogicalView()
               ? new ArrayList<>(
-                  analysis.getPartitionInfo(outputDevice, 
context.getPartitionTimeFilter()))
-              : new ArrayList<>(
                   analysis.getPartitionInfo(
-                      outputDeviceToQueriedDevicesMap.get(outputDevice),
-                      context.getPartitionTimeFilter()));
+                      
analysis.getOutputDeviceToQueriedDevicesMap().get(outputDevice),
+                      context.getPartitionTimeFilter()))
+              : new ArrayList<>(
+                  analysis.getPartitionInfo(outputDevice, 
context.getPartitionTimeFilter()));
+      if (regionReplicaSets.size() > 1) {
+        // specialProcess and existDeviceCrossRegion, use the old aggregation 
logic
+        analysis.setExistDeviceCrossRegion();
+        if (analysis.isDeviceViewSpecialProcess()) {
+          return processSpecialDeviceView(node, context);
+        }
+      }
       deviceViewSplits.add(new DeviceViewSplit(outputDevice, child, 
regionReplicaSets));
       relatedDataRegions.addAll(regionReplicaSets);
     }
 
     // Step 2: Iterate all partition and create DeviceViewNode for each region
     List<PlanNode> deviceViewNodeList = new ArrayList<>();
+    if (analysis.isExistDeviceCrossRegion()) {
+      constructDeviceViewNodeListWithCrossRegion(
+          deviceViewNodeList, relatedDataRegions, deviceViewSplits, node, 
context);
+    } else {
+      constructDeviceViewNodeListWithoutCrossRegion(
+          deviceViewNodeList, deviceViewSplits, node, context, analysis);
+    }
+
+    // 1. Only one DeviceViewNode, the is no need to use MergeSortNode.
+    // 2. for DeviceView+SortNode case, the parent of DeviceViewNode will be 
SortNode, MergeSortNode
+    // will be generated in {@link #visitSort}.
+    // 3. for DeviceView+TopKNode case, there is no need MergeSortNode, 
TopKNode can output the
+    // sorted result.
+    if (deviceViewNodeList.size() == 1 || analysis.isHasSortNode() || 
analysis.isUseTopKNode()) {
+      return deviceViewNodeList;
+    }
+
+    MergeSortNode mergeSortNode =
+        new MergeSortNode(
+            context.queryContext.getQueryId().genPlanNodeId(),
+            node.getMergeOrderParameter(),
+            node.getOutputColumnNames());
+    deviceViewNodeList.forEach(mergeSortNode::addChild);
+    return Collections.singletonList(mergeSortNode);
+  }
+
+  private void constructDeviceViewNodeListWithCrossRegion(
+      List<PlanNode> deviceViewNodeList,
+      Set<TRegionReplicaSet> relatedDataRegions,
+      List<DeviceViewSplit> deviceViewSplits,
+      DeviceViewNode node,
+      DistributionPlanContext context) {
     for (TRegionReplicaSet regionReplicaSet : relatedDataRegions) {
       List<String> devices = new ArrayList<>();
       List<PlanNode> children = new ArrayList<>();
@@ -216,20 +247,49 @@ public class SourceRewriter extends 
SimplePlanNodeRewriter<DistributionPlanConte
       }
       deviceViewNodeList.add(regionDeviceViewNode);
     }
+  }
 
-    if (deviceViewNodeList.size() == 1 || analysis.isHasSortNode() || 
analysis.isUseTopKNode()) {
-      return deviceViewNodeList;
-    }
+  private void constructDeviceViewNodeListWithoutCrossRegion(
+      List<PlanNode> deviceViewNodeList,
+      List<DeviceViewSplit> deviceViewSplits,
+      DeviceViewNode node,
+      DistributionPlanContext context,
+      Analysis analysis) {
 
-    MergeSortNode mergeSortNode =
-        new MergeSortNode(
-            context.queryContext.getQueryId().genPlanNodeId(),
-            node.getMergeOrderParameter(),
-            node.getOutputColumnNames());
-    for (PlanNode deviceViewNode : deviceViewNodeList) {
-      mergeSortNode.addChild(deviceViewNode);
+    Map<TRegionReplicaSet, DeviceViewNode> regionDeviceViewMap = new 
HashMap<>();
+    for (DeviceViewSplit split : deviceViewSplits) {
+      if (split.dataPartitions.size() != 1) {
+        throw new IllegalStateException(
+            "In non-cross data region device-view situation, "
+                + "each device should only have on data partition.");
+      }
+      TRegionReplicaSet region = split.dataPartitions.iterator().next();
+      DeviceViewNode regionDeviceViewNode =
+          regionDeviceViewMap.computeIfAbsent(
+              region,
+              k -> {
+                DeviceViewNode deviceViewNode = 
cloneDeviceViewNodeWithoutChild(node, context);
+                deviceViewNodeList.add(deviceViewNode);
+                return deviceViewNode;
+              });
+      PlanNode childNode = split.buildPlanNodeInRegion(region, 
context.queryContext);
+      if (analysis.isDeviceViewSpecialProcess()) {
+        List<PlanNode> rewriteResult = rewrite(childNode, context);
+        if (rewriteResult.size() != 1) {
+          throw new IllegalStateException(
+              "In non-cross data region aggregation device-view situation, "
+                  + "each rewrite child node of DeviceView should only be 
one.");
+        }
+        childNode = rewriteResult.get(0);
+      }
+      regionDeviceViewNode.addChildDeviceNode(split.device, childNode);
     }
-    return Collections.singletonList(mergeSortNode);
+  }
+
+  @Override
+  public List<PlanNode> visitAggregationMergeSort(
+      AggregationMergeSortNode node, DistributionPlanContext context) {
+    return null;
   }
 
   private List<PlanNode> processSpecialDeviceView(
@@ -253,41 +313,35 @@ public class SourceRewriter extends 
SimplePlanNodeRewriter<DistributionPlanConte
         node.getDeviceToMeasurementIndexesMap());
   }
 
-  private static class DeviceViewSplit {
-    protected String device;
-    protected PlanNode root;
-    protected Set<TRegionReplicaSet> dataPartitions;
-
-    protected DeviceViewSplit(
-        String device, PlanNode root, List<TRegionReplicaSet> dataPartitions) {
-      this.device = device;
-      this.root = root;
-      this.dataPartitions = new HashSet<>();
-      this.dataPartitions.addAll(dataPartitions);
-    }
-
-    protected PlanNode buildPlanNodeInRegion(
-        TRegionReplicaSet regionReplicaSet, MPPQueryContext context) {
-      return buildPlanNodeInRegion(this.root, regionReplicaSet, context);
+  @Override
+  public List<PlanNode> visitTransform(TransformNode node, 
DistributionPlanContext context) {
+    List<PlanNode> children = rewrite(node.getChild(), context);
+    if (children.size() == 1) {
+      node.setChild(children.get(0));
+      return Collections.singletonList(node);
     }
 
-    protected boolean needDistributeTo(TRegionReplicaSet regionReplicaSet) {
-      return this.dataPartitions.contains(regionReplicaSet);
-    }
+    // for some query such as `select avg(s1)+avg(s2) from root.** where 
count(s3)>1 align by
+    // device`,
+    // the children of TransformNode may be N(N>1) DeviceViewNodes, so need 
return N TransformNodes
+    return children.stream()
+        .map(
+            child -> {
+              TransformNode transformNode = 
cloneTransformNodeWithOutChild(node, context);
+              transformNode.addChild(child);
+              return transformNode;
+            })
+        .collect(Collectors.toList());
+  }
 
-    private PlanNode buildPlanNodeInRegion(
-        PlanNode root, TRegionReplicaSet regionReplicaSet, MPPQueryContext 
context) {
-      List<PlanNode> children =
-          root.getChildren().stream()
-              .map(child -> buildPlanNodeInRegion(child, regionReplicaSet, 
context))
-              .collect(Collectors.toList());
-      PlanNode newRoot = root.cloneWithChildren(children);
-      newRoot.setPlanNodeId(context.getQueryId().genPlanNodeId());
-      if (newRoot instanceof SourceNode) {
-        ((SourceNode) newRoot).setRegionReplicaSet(regionReplicaSet);
-      }
-      return newRoot;
-    }
+  private TransformNode cloneTransformNodeWithOutChild(
+      TransformNode node, DistributionPlanContext context) {
+    return new TransformNode(
+        context.queryContext.getQueryId().genPlanNodeId(),
+        node.getOutputExpressions(),
+        node.isKeepNull(),
+        node.getZoneId(),
+        node.getScanOrder());
   }
 
   @Override
@@ -347,23 +401,21 @@ public class SourceRewriter extends 
SimplePlanNodeRewriter<DistributionPlanConte
                 }
               });
       schemaRegions.forEach(
-          region -> {
-            addSchemaSourceNode(
-                root,
-                seed.getPath(),
-                region,
-                context.queryContext.getQueryId().genPlanNodeId(),
-                seed);
-          });
+          region ->
+              addSchemaSourceNode(
+                  root,
+                  seed.getPath(),
+                  region,
+                  context.queryContext.getQueryId().genPlanNodeId(),
+                  seed));
       regionsOfSystemDatabase.forEach(
-          region -> {
-            addSchemaSourceNode(
-                root,
-                seed.getPath(),
-                region,
-                context.queryContext.getQueryId().genPlanNodeId(),
-                seed);
-          });
+          region ->
+              addSchemaSourceNode(
+                  root,
+                  seed.getPath(),
+                  region,
+                  context.queryContext.getQueryId().genPlanNodeId(),
+                  seed));
     } else {
       // the path pattern may only overlap with part of storageGroup or 
storageGroup.**, need filter
       PathPatternTree patternTree = new PathPatternTree();
@@ -394,31 +446,29 @@ public class SourceRewriter extends 
SimplePlanNodeRewriter<DistributionPlanConte
             List<PartialPath> filteredPathPatternList =
                 filterPathPattern(patternTree, storageGroup);
             schemaRegionSet.forEach(
-                region -> {
-                  addSchemaSourceNode(
-                      root,
-                      filteredPathPatternList.size() == 1
-                          ? filteredPathPatternList.get(0)
-                          : seed.getPath(),
-                      region,
-                      context.queryContext.getQueryId().genPlanNodeId(),
-                      seed);
-                });
+                region ->
+                    addSchemaSourceNode(
+                        root,
+                        filteredPathPatternList.size() == 1
+                            ? filteredPathPatternList.get(0)
+                            : seed.getPath(),
+                        region,
+                        context.queryContext.getQueryId().genPlanNodeId(),
+                        seed));
           });
       if (!regionsOfSystemDatabase.isEmpty()) {
         List<PartialPath> filteredPathPatternList =
             filterPathPattern(patternTree, SchemaConstant.SYSTEM_DATABASE);
         regionsOfSystemDatabase.forEach(
-            region -> {
-              addSchemaSourceNode(
-                  root,
-                  filteredPathPatternList.size() == 1
-                      ? filteredPathPatternList.get(0)
-                      : seed.getPath(),
-                  region,
-                  context.queryContext.getQueryId().genPlanNodeId(),
-                  seed);
-            });
+            region ->
+                addSchemaSourceNode(
+                    root,
+                    filteredPathPatternList.size() == 1
+                        ? filteredPathPatternList.get(0)
+                        : seed.getPath(),
+                    region,
+                    context.queryContext.getQueryId().genPlanNodeId(),
+                    seed));
       }
     }
     return Collections.singletonList(root);
@@ -462,11 +512,10 @@ public class SourceRewriter extends 
SimplePlanNodeRewriter<DistributionPlanConte
         .getSchemaPartitionInfo()
         .getSchemaPartitionMap()
         .forEach(
-            (storageGroup, deviceGroup) -> {
-              deviceGroup.forEach(
-                  (deviceGroupId, schemaRegionReplicaSet) ->
-                      schemaRegions.add(schemaRegionReplicaSet));
-            });
+            (storageGroup, deviceGroup) ->
+                deviceGroup.forEach(
+                    (deviceGroupId, schemaRegionReplicaSet) ->
+                        schemaRegions.add(schemaRegionReplicaSet)));
     schemaRegions.forEach(
         region -> {
           SchemaQueryScanNode schemaQueryScanNode = (SchemaQueryScanNode) 
seed.clone();
@@ -564,14 +613,13 @@ public class SourceRewriter extends 
SimplePlanNodeRewriter<DistributionPlanConte
     List<AggregationDescriptor> leafAggDescriptorList = new ArrayList<>();
     node.getAggregationDescriptorList()
         .forEach(
-            descriptor -> {
-              leafAggDescriptorList.add(
-                  new AggregationDescriptor(
-                      descriptor.getAggregationFuncName(),
-                      AggregationStep.PARTIAL,
-                      descriptor.getInputExpressions(),
-                      descriptor.getInputAttributes()));
-            });
+            descriptor ->
+                leafAggDescriptorList.add(
+                    new AggregationDescriptor(
+                        descriptor.getAggregationFuncName(),
+                        AggregationStep.PARTIAL,
+                        descriptor.getInputExpressions(),
+                        descriptor.getInputAttributes())));
     leafAggDescriptorList.forEach(
         d ->
             LogicalPlanBuilder.updateTypeProviderByPartialAggregation(
@@ -579,14 +627,13 @@ public class SourceRewriter extends 
SimplePlanNodeRewriter<DistributionPlanConte
     List<AggregationDescriptor> rootAggDescriptorList = new ArrayList<>();
     node.getAggregationDescriptorList()
         .forEach(
-            descriptor -> {
-              rootAggDescriptorList.add(
-                  new AggregationDescriptor(
-                      descriptor.getAggregationFuncName(),
-                      context.isRoot ? AggregationStep.FINAL : 
AggregationStep.INTERMEDIATE,
-                      descriptor.getInputExpressions(),
-                      descriptor.getInputAttributes()));
-            });
+            descriptor ->
+                rootAggDescriptorList.add(
+                    new AggregationDescriptor(
+                        descriptor.getAggregationFuncName(),
+                        context.isRoot ? AggregationStep.FINAL : 
AggregationStep.INTERMEDIATE,
+                        descriptor.getInputExpressions(),
+                        descriptor.getInputAttributes())));
 
     AggregationNode aggregationNode =
         new AggregationNode(
@@ -1436,14 +1483,14 @@ public class SourceRewriter extends 
SimplePlanNodeRewriter<DistributionPlanConte
       source
           .getAggregationDescriptorList()
           .forEach(
-              d -> {
+              descriptor -> {
                 if (isSingle) {
-                  d.setStep(AggregationStep.SINGLE);
+                  descriptor.setStep(AggregationStep.SINGLE);
                 } else {
                   eachSeriesOneRegion[0] = false;
-                  d.setStep(AggregationStep.PARTIAL);
+                  descriptor.setStep(AggregationStep.PARTIAL);
                   LogicalPlanBuilder.updateTypeProviderByPartialAggregation(
-                      d, context.queryContext.getTypeProvider());
+                      descriptor, context.queryContext.getTypeProvider());
                 }
               });
     }
@@ -1475,4 +1522,41 @@ public class SourceRewriter extends 
SimplePlanNodeRewriter<DistributionPlanConte
   public List<PlanNode> visit(PlanNode node, DistributionPlanContext context) {
     return node.accept(this, context);
   }
+
+  private static class DeviceViewSplit {
+    protected String device;
+    protected PlanNode root;
+    protected Set<TRegionReplicaSet> dataPartitions;
+
+    protected DeviceViewSplit(
+        String device, PlanNode root, List<TRegionReplicaSet> dataPartitions) {
+      this.device = device;
+      this.root = root;
+      this.dataPartitions = new HashSet<>();
+      this.dataPartitions.addAll(dataPartitions);
+    }
+
+    protected PlanNode buildPlanNodeInRegion(
+        TRegionReplicaSet regionReplicaSet, MPPQueryContext context) {
+      return buildPlanNodeInRegion(this.root, regionReplicaSet, context);
+    }
+
+    protected boolean needDistributeTo(TRegionReplicaSet regionReplicaSet) {
+      return this.dataPartitions.contains(regionReplicaSet);
+    }
+
+    private PlanNode buildPlanNodeInRegion(
+        PlanNode root, TRegionReplicaSet regionReplicaSet, MPPQueryContext 
context) {
+      List<PlanNode> children =
+          root.getChildren().stream()
+              .map(child -> buildPlanNodeInRegion(child, regionReplicaSet, 
context))
+              .collect(Collectors.toList());
+      PlanNode newRoot = root.cloneWithChildren(children);
+      newRoot.setPlanNodeId(context.getQueryId().genPlanNodeId());
+      if (newRoot instanceof SourceNode) {
+        ((SourceNode) newRoot).setRegionReplicaSet(regionReplicaSet);
+      }
+      return newRoot;
+    }
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/SimplePlanNodeRewriter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/BaseSourceRewriter.java
similarity index 95%
rename from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/SimplePlanNodeRewriter.java
rename to 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/BaseSourceRewriter.java
index 42329085405..11f8afd6448 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/SimplePlanNodeRewriter.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/BaseSourceRewriter.java
@@ -24,7 +24,7 @@ import java.util.List;
 
 import static com.google.common.collect.ImmutableList.toImmutableList;
 
-public class SimplePlanNodeRewriter<C> extends PlanVisitor<List<PlanNode>, C> {
+public class BaseSourceRewriter<C> extends PlanVisitor<List<PlanNode>, C> {
   @Override
   public List<PlanNode> visitPlan(PlanNode node, C context) {
     // TODO: (xingtanzjr) we apply no action for IWritePlanNode currently
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java
index fcebd9b0c66..7bfd680c40e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.commons.partition.DataPartition;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.db.queryengine.plan.expression.Expression;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.AggregationMergeSortNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.AggregationNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ColumnInjectNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.DeviceMergeNode;
@@ -188,6 +189,14 @@ public class PlanGraphPrinter extends 
PlanVisitor<List<String>, PlanGraphPrinter
     return render(node, boxValue, context);
   }
 
+  @Override
+  public List<String> visitAggregationMergeSort(
+      AggregationMergeSortNode node, GraphContext context) {
+    List<String> boxValue = new ArrayList<>();
+    boxValue.add(String.format("AggregationMergeSort-%s", 
node.getPlanNodeId().getId()));
+    return render(node, boxValue, context);
+  }
+
   @Override
   public List<String> visitMergeSort(MergeSortNode node, GraphContext context) 
{
     List<String> boxValue = new ArrayList<>();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNode.java
index 6d13dde35ff..d147d316318 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNode.java
@@ -87,7 +87,7 @@ public abstract class PlanNode implements IConsensusRequest {
    */
   public PlanNode createSubNode(int subNodeId, int startIndex, int endIndex) {
     throw new UnsupportedOperationException(
-        String.format("Can't create subNode for %s", 
this.getClass().toString()));
+        String.format("Can't create subNode for %s", this.getClass()));
   }
 
   public PlanNode cloneWithChildren(List<PlanNode> children) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
index 5eb722c7422..3b40eb38a1d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
@@ -60,6 +60,7 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedC
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedDeleteDataNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedInsertNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedWriteSchemaNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.AggregationMergeSortNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.AggregationNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ColumnInjectNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.DeviceMergeNode;
@@ -200,7 +201,9 @@ public enum PlanNodeType {
   PIPE_ENRICHED_DELETE_SCHEMA((short) 86),
 
   INNER_TIME_JOIN((short) 87),
-  LEFT_OUTER_TIME_JOIN((short) 88);
+  LEFT_OUTER_TIME_JOIN((short) 88),
+
+  AGG_MERGE_SORT((short) 89);
 
   public static final int BYTES = Short.BYTES;
 
@@ -425,6 +428,8 @@ public enum PlanNodeType {
         return InnerTimeJoinNode.deserialize(buffer);
       case 88:
         return LeftOuterTimeJoinNode.deserialize(buffer);
+      case 89:
+        return AggregationMergeSortNode.deserialize(buffer);
       default:
         throw new IllegalArgumentException("Invalid node type: " + nodeType);
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java
index ed7072a76eb..1091d9a352a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java
@@ -57,6 +57,7 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedC
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedDeleteDataNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedInsertNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedWriteSchemaNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.AggregationMergeSortNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.AggregationNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ColumnInjectNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.DeviceMergeNode;
@@ -231,6 +232,10 @@ public abstract class PlanVisitor<R, C> {
     return visitMultiChildProcess(node, context);
   }
 
+  public R visitAggregationMergeSort(AggregationMergeSortNode node, C context) 
{
+    return visitMultiChildProcess(node, context);
+  }
+
   public R visitDeviceMerge(DeviceMergeNode node, C context) {
     return visitMultiChildProcess(node, context);
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/AggregationMergeSortNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/AggregationMergeSortNode.java
new file mode 100644
index 00000000000..41c4cfb2b55
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/AggregationMergeSortNode.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.planner.plan.node.process;
+
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.OrderByParameter;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+public class AggregationMergeSortNode extends MultiChildProcessNode {
+
+  private final OrderByParameter mergeOrderParameter;
+
+  private final List<String> outputColumns;
+
+  public AggregationMergeSortNode(
+      PlanNodeId id, OrderByParameter mergeOrderParameter, List<String> 
outputColumns) {
+    super(id);
+    this.mergeOrderParameter = mergeOrderParameter;
+    this.outputColumns = outputColumns;
+  }
+
+  public AggregationMergeSortNode(
+      PlanNodeId id,
+      List<PlanNode> children,
+      OrderByParameter mergeOrderParameter,
+      List<String> outputColumns) {
+    super(id, children);
+    this.mergeOrderParameter = mergeOrderParameter;
+    this.outputColumns = outputColumns;
+  }
+
+  public OrderByParameter getMergeOrderParameter() {
+    return mergeOrderParameter;
+  }
+
+  @Override
+  public PlanNode clone() {
+    return new AggregationMergeSortNode(getPlanNodeId(), 
getMergeOrderParameter(), outputColumns);
+  }
+
+  @Override
+  public PlanNode createSubNode(int subNodeId, int startIndex, int endIndex) {
+    return new AggregationMergeSortNode(
+        new PlanNodeId(String.format("%s-%s", getPlanNodeId(), subNodeId)),
+        new ArrayList<>(children.subList(startIndex, endIndex)),
+        getMergeOrderParameter(),
+        outputColumns);
+  }
+
+  @Override
+  public List<String> getOutputColumnNames() {
+    return outputColumns;
+  }
+
+  @Override
+  public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+    return visitor.visitAggregationMergeSort(this, context);
+  }
+
+  @Override
+  protected void serializeAttributes(ByteBuffer byteBuffer) {
+    PlanNodeType.AGG_MERGE_SORT.serialize(byteBuffer);
+    mergeOrderParameter.serializeAttributes(byteBuffer);
+    ReadWriteIOUtils.write(outputColumns.size(), byteBuffer);
+    for (String column : outputColumns) {
+      ReadWriteIOUtils.write(column, byteBuffer);
+    }
+  }
+
+  @Override
+  protected void serializeAttributes(DataOutputStream stream) throws 
IOException {
+    PlanNodeType.AGG_MERGE_SORT.serialize(stream);
+    mergeOrderParameter.serializeAttributes(stream);
+    ReadWriteIOUtils.write(outputColumns.size(), stream);
+    for (String column : outputColumns) {
+      ReadWriteIOUtils.write(column, stream);
+    }
+  }
+
+  public static AggregationMergeSortNode deserialize(ByteBuffer byteBuffer) {
+    OrderByParameter orderByParameter = 
OrderByParameter.deserialize(byteBuffer);
+    int columnSize = ReadWriteIOUtils.readInt(byteBuffer);
+    List<String> outputColumns = new ArrayList<>();
+    while (columnSize > 0) {
+      outputColumns.add(ReadWriteIOUtils.readString(byteBuffer));
+      columnSize--;
+    }
+    PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
+    return new AggregationMergeSortNode(planNodeId, orderByParameter, 
outputColumns);
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    if (!super.equals(o)) {
+      return false;
+    }
+    AggregationMergeSortNode that = (AggregationMergeSortNode) o;
+    return Objects.equals(mergeOrderParameter, that.getMergeOrderParameter());
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(super.hashCode(), mergeOrderParameter);
+  }
+
+  @Override
+  public String toString() {
+    return "AggregationMergeSort-" + this.getPlanNodeId();
+  }
+}
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/AggregationOperatorTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/AggregationOperatorTest.java
index ee98da4047d..c7644217bd3 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/AggregationOperatorTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/AggregationOperatorTest.java
@@ -125,9 +125,8 @@ public class AggregationOperatorTest {
       if (!aggregationOperator.hasNext()) {
         break;
       }
-      TsBlock resultTsBlock = null;
-      resultTsBlock = aggregationOperator.next();
 
+      TsBlock resultTsBlock = aggregationOperator.next();
       if (resultTsBlock == null) {
         continue;
       }
@@ -169,8 +168,7 @@ public class AggregationOperatorTest {
       if (!aggregationOperator.hasNext()) {
         break;
       }
-      TsBlock resultTsBlock = null;
-      resultTsBlock = aggregationOperator.next();
+      TsBlock resultTsBlock = aggregationOperator.next();
       if (resultTsBlock == null) {
         continue;
       }
@@ -221,8 +219,7 @@ public class AggregationOperatorTest {
       if (!aggregationOperator.hasNext()) {
         break;
       }
-      TsBlock resultTsBlock = null;
-      resultTsBlock = aggregationOperator.next();
+      TsBlock resultTsBlock = aggregationOperator.next();
       if (resultTsBlock == null) {
         continue;
       }
@@ -275,8 +272,7 @@ public class AggregationOperatorTest {
       if (!aggregationOperator.hasNext()) {
         break;
       }
-      TsBlock resultTsBlock = null;
-      resultTsBlock = aggregationOperator.next();
+      TsBlock resultTsBlock = aggregationOperator.next();
       if (resultTsBlock == null) {
         continue;
       }
@@ -321,10 +317,7 @@ public class AggregationOperatorTest {
     driverContext.addOperatorContext(3, planNodeId3, 
AggregationOperator.class.getSimpleName());
     driverContext
         .getOperatorContexts()
-        .forEach(
-            operatorContext -> {
-              operatorContext.setMaxRunTime(TEST_TIME_SLICE);
-            });
+        .forEach(operatorContext -> 
OperatorContext.setMaxRunTime(TEST_TIME_SLICE));
 
     MeasurementPath measurementPath1 =
         new MeasurementPath(AGGREGATION_OPERATOR_TEST_SG + ".device0.sensor0", 
TSDataType.INT32);
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/AggregationAlignByDeviceTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/AggregationAlignByDeviceTest.java
new file mode 100644
index 00000000000..0ed46adfedb
--- /dev/null
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/AggregationAlignByDeviceTest.java
@@ -0,0 +1,451 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.planner.distribution;
+
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
+import org.apache.iotdb.db.queryengine.common.QueryId;
+import org.apache.iotdb.db.queryengine.plan.analyze.Analysis;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.DistributedQueryPlan;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.LogicalQueryPlan;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.AggregationNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.DeviceViewNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.FilterNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.HorizontallyConcatNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.MergeSortNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SingleDeviceViewNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SortNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TransformNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.join.FullOuterTimeJoinNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.sink.IdentitySinkNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.sink.ShuffleSinkNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesAggregationScanNode;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class AggregationAlignByDeviceTest {
+  private static final long LIMIT_VALUE = 10;
+
+  QueryId queryId = new QueryId("test");
+  MPPQueryContext context =
+      new MPPQueryContext("", queryId, null, new TEndPoint(), new TEndPoint());
+  String sql;
+  Analysis analysis;
+  PlanNode logicalPlanNode;
+  DistributionPlanner planner;
+  DistributedQueryPlan plan;
+  PlanNode firstFiRoot;
+  PlanNode secondFiRoot;
+  PlanNode firstFiTopNode;
+
+  // ================= no scaling situation, i.e., each device only in one 
data region ===========
+
+  /*
+   * IdentitySinkNode-10
+   *   └──MergeSort-7
+   *       ├──DeviceView-5
+   *       │   └──SeriesAggregationScanNode-1:[SeriesPath: root.sg.d22.s1,
+   *              Descriptor: [AggregationDescriptor(first_value, SINGLE)],
+   *              DataRegion: TConsensusGroupId(type:DataRegion, id:3)]
+   *       └──ExchangeNode-8: [SourceAddress:192.0.4.1/test.2.0/9]
+   *
+   *  IdentitySinkNode-9
+   *   └──DeviceView-6
+   *       └──SeriesAggregationScanNode-2:[SeriesPath: root.sg.d55555.s1,
+   *          Descriptor: [AggregationDescriptor(first_value, SINGLE)],
+   *          DataRegion: TConsensusGroupId(type:DataRegion, id:4)]
+   */
+  @Test
+  public void orderByDeviceTest1() {
+    // one aggregation measurement, two devices
+    sql = "select first_value(s1) from root.sg.d22, root.sg.d55555 align by 
device";
+    analysis = Util.analyze(sql, context);
+    logicalPlanNode = Util.genLogicalPlan(analysis, context);
+    planner = new DistributionPlanner(analysis, new LogicalQueryPlan(context, 
logicalPlanNode));
+    plan = planner.planFragments();
+    assertEquals(2, plan.getInstances().size());
+
+    firstFiRoot = plan.getInstances().get(0).getFragment().getPlanNodeTree();
+    assertTrue(firstFiRoot instanceof IdentitySinkNode);
+    firstFiTopNode = firstFiRoot.getChildren().get(0);
+    assertTrue(firstFiTopNode instanceof MergeSortNode);
+    assertTrue(firstFiTopNode.getChildren().get(0) instanceof DeviceViewNode);
+    assertTrue(
+        firstFiTopNode.getChildren().get(0).getChildren().get(0)
+            instanceof SeriesAggregationScanNode);
+
+    secondFiRoot = plan.getInstances().get(1).getFragment().getPlanNodeTree();
+    assertTrue(secondFiRoot instanceof IdentitySinkNode);
+    assertTrue(secondFiRoot.getChildren().get(0) instanceof DeviceViewNode);
+    assertTrue(
+        secondFiRoot.getChildren().get(0).getChildren().get(0)
+            instanceof SeriesAggregationScanNode);
+
+    // one aggregation measurement, one device
+    sql = "select first_value(s1) from root.sg.d22 align by device";
+    analysis = Util.analyze(sql, context);
+    logicalPlanNode = Util.genLogicalPlan(analysis, context);
+    planner = new DistributionPlanner(analysis, new LogicalQueryPlan(context, 
logicalPlanNode));
+    plan = planner.planFragments();
+    assertEquals(1, plan.getInstances().size());
+
+    firstFiRoot = plan.getInstances().get(0).getFragment().getPlanNodeTree();
+    assertTrue(firstFiRoot instanceof IdentitySinkNode);
+    firstFiTopNode = firstFiRoot.getChildren().get(0);
+    assertTrue(firstFiTopNode instanceof DeviceViewNode);
+    assertTrue(firstFiTopNode.getChildren().get(0) instanceof 
SeriesAggregationScanNode);
+
+    // two aggregation measurement, two devices
+    sql = "select first_value(s1), count(s2) from root.sg.d22, root.sg.d55555 
align by device";
+    analysis = Util.analyze(sql, context);
+    logicalPlanNode = Util.genLogicalPlan(analysis, context);
+    planner = new DistributionPlanner(analysis, new LogicalQueryPlan(context, 
logicalPlanNode));
+    plan = planner.planFragments();
+    assertEquals(2, plan.getInstances().size());
+
+    firstFiRoot = plan.getInstances().get(0).getFragment().getPlanNodeTree();
+    assertTrue(firstFiRoot instanceof IdentitySinkNode);
+    firstFiTopNode = firstFiRoot.getChildren().get(0);
+    assertTrue(firstFiTopNode instanceof MergeSortNode);
+    assertTrue(firstFiTopNode.getChildren().get(0) instanceof DeviceViewNode);
+    assertTrue(
+        firstFiTopNode.getChildren().get(0).getChildren().get(0) instanceof 
HorizontallyConcatNode);
+
+    secondFiRoot = plan.getInstances().get(1).getFragment().getPlanNodeTree();
+    assertTrue(secondFiRoot instanceof IdentitySinkNode);
+    assertTrue(secondFiRoot.getChildren().get(0) instanceof DeviceViewNode);
+    assertTrue(
+        secondFiRoot.getChildren().get(0).getChildren().get(0) instanceof 
HorizontallyConcatNode);
+
+    // two aggregation measurement, one device
+    sql = "select first_value(s1), count(s2) from root.sg.d22 align by device";
+    analysis = Util.analyze(sql, context);
+    logicalPlanNode = Util.genLogicalPlan(analysis, context);
+    planner = new DistributionPlanner(analysis, new LogicalQueryPlan(context, 
logicalPlanNode));
+    plan = planner.planFragments();
+    assertEquals(1, plan.getInstances().size());
+
+    firstFiRoot = plan.getInstances().get(0).getFragment().getPlanNodeTree();
+    assertTrue(firstFiRoot instanceof IdentitySinkNode);
+    firstFiTopNode = firstFiRoot.getChildren().get(0);
+    assertTrue(firstFiTopNode instanceof DeviceViewNode);
+    assertTrue(firstFiTopNode.getChildren().get(0) instanceof 
HorizontallyConcatNode);
+  }
+
+  /*
+   * IdentitySinkNode-31
+   *   └──MergeSort-28
+   *       ├──DeviceView-12
+   *       │   └──AggregationNode-17
+   *       │       └──FilterNode-16
+   *       │           └──FullOuterTimeJoinNode-15
+   *       │               ├──SeriesScanNode-18:[SeriesPath: root.sg.d22.s1,
+   *                          DataRegion: TConsensusGroupId(type:DataRegion, 
id:3)]
+   *       │               └──SeriesScanNode-19:[SeriesPath: root.sg.d22.s2,
+   *                          DataRegion: TConsensusGroupId(type:DataRegion, 
id:3)]
+   *       └──ExchangeNode-29: [SourceAddress:192.0.4.1/test.2.0/30]
+   *
+   * IdentitySinkNode-30
+   *   └──DeviceView-20
+   *       └──AggregationNode-25
+   *           └──FilterNode-24
+   *               └──FullOuterTimeJoinNode-23
+   *                   ├──SeriesScanNode-26:[SeriesPath: root.sg.d55555.s1,
+   *                      DataRegion: TConsensusGroupId(type:DataRegion, id:4)]
+   *                   └──SeriesScanNode-27:[SeriesPath: root.sg.d55555.s2,
+   *                      DataRegion: TConsensusGroupId(type:DataRegion, id:4)]
+   */
+  @Test
+  public void orderByDeviceTest2() {
+    // one aggregation measurement, two devices, with filter
+    sql = "select first_value(s1) from root.sg.d22, root.sg.d55555 where s2>1 
align by device";
+    analysis = Util.analyze(sql, context);
+    logicalPlanNode = Util.genLogicalPlan(analysis, context);
+    planner = new DistributionPlanner(analysis, new LogicalQueryPlan(context, 
logicalPlanNode));
+    plan = planner.planFragments();
+    assertEquals(2, plan.getInstances().size());
+
+    firstFiRoot = plan.getInstances().get(0).getFragment().getPlanNodeTree();
+    assertTrue(firstFiRoot instanceof IdentitySinkNode);
+    firstFiTopNode = firstFiRoot.getChildren().get(0);
+    assertTrue(firstFiTopNode instanceof MergeSortNode);
+    assertTrue(firstFiTopNode.getChildren().get(0) instanceof DeviceViewNode);
+    assertTrue(firstFiTopNode.getChildren().get(0).getChildren().get(0) 
instanceof AggregationNode);
+    assertTrue(
+        
firstFiTopNode.getChildren().get(0).getChildren().get(0).getChildren().get(0)
+            instanceof FilterNode);
+    assertTrue(
+        firstFiTopNode
+                .getChildren()
+                .get(0)
+                .getChildren()
+                .get(0)
+                .getChildren()
+                .get(0)
+                .getChildren()
+                .get(0)
+            instanceof FullOuterTimeJoinNode);
+
+    secondFiRoot = plan.getInstances().get(1).getFragment().getPlanNodeTree();
+    assertTrue(secondFiRoot instanceof IdentitySinkNode);
+    assertTrue(secondFiRoot.getChildren().get(0) instanceof DeviceViewNode);
+    assertTrue(secondFiRoot.getChildren().get(0).getChildren().get(0) 
instanceof AggregationNode);
+    assertTrue(
+        
secondFiRoot.getChildren().get(0).getChildren().get(0).getChildren().get(0)
+            instanceof FilterNode);
+    assertTrue(
+        secondFiRoot
+                .getChildren()
+                .get(0)
+                .getChildren()
+                .get(0)
+                .getChildren()
+                .get(0)
+                .getChildren()
+                .get(0)
+            instanceof FullOuterTimeJoinNode);
+
+    // two aggregation measurement, two devices, with filter
+    sql =
+        "select first_value(s1), count(s2) from root.sg.d22, root.sg.d55555 
where s2>1 align by device";
+    analysis = Util.analyze(sql, context);
+    logicalPlanNode = Util.genLogicalPlan(analysis, context);
+    planner = new DistributionPlanner(analysis, new LogicalQueryPlan(context, 
logicalPlanNode));
+    plan = planner.planFragments();
+    assertEquals(2, plan.getInstances().size());
+
+    firstFiRoot = plan.getInstances().get(0).getFragment().getPlanNodeTree();
+    assertTrue(firstFiRoot instanceof IdentitySinkNode);
+    firstFiTopNode = firstFiRoot.getChildren().get(0);
+    assertTrue(firstFiTopNode instanceof MergeSortNode);
+    assertTrue(firstFiTopNode.getChildren().get(0) instanceof DeviceViewNode);
+    assertTrue(firstFiTopNode.getChildren().get(0).getChildren().get(0) 
instanceof AggregationNode);
+    assertTrue(
+        
firstFiTopNode.getChildren().get(0).getChildren().get(0).getChildren().get(0)
+            instanceof FilterNode);
+    assertTrue(
+        firstFiTopNode
+                .getChildren()
+                .get(0)
+                .getChildren()
+                .get(0)
+                .getChildren()
+                .get(0)
+                .getChildren()
+                .get(0)
+            instanceof FullOuterTimeJoinNode);
+
+    secondFiRoot = plan.getInstances().get(1).getFragment().getPlanNodeTree();
+    assertTrue(secondFiRoot instanceof IdentitySinkNode);
+    assertTrue(secondFiRoot.getChildren().get(0) instanceof DeviceViewNode);
+    assertTrue(secondFiRoot.getChildren().get(0).getChildren().get(0) 
instanceof AggregationNode);
+    assertTrue(
+        
secondFiRoot.getChildren().get(0).getChildren().get(0).getChildren().get(0)
+            instanceof FilterNode);
+    assertTrue(
+        secondFiRoot
+                .getChildren()
+                .get(0)
+                .getChildren()
+                .get(0)
+                .getChildren()
+                .get(0)
+                .getChildren()
+                .get(0)
+            instanceof FullOuterTimeJoinNode);
+  }
+
+  /*
+   * IdentitySinkNode-32
+   *   └──TransformNode-10
+   *       └──MergeSort-27
+   *           ├──SortNode-28
+   *           │   └──TransformNode-25
+   *           │       └──DeviceView-11
+   *           │           └──HorizontallyConcatNode-17
+   *           │               ├──SeriesAggregationScanNode-15:[SeriesPath: 
root.sg.d22.s1,
+   *                              Descriptor: [AggregationDescriptor(avg, 
SINGLE)],
+   *                              DataRegion: 
TConsensusGroupId(type:DataRegion, id:3)]
+   *           │               └──SeriesAggregationScanNode-16:[SeriesPath: 
root.sg.d22.s2,
+   *                              Descriptor: [AggregationDescriptor(avg, 
SINGLE),
+   *                              AggregationDescriptor(max_value, SINGLE)],
+   *                              DataRegion: 
TConsensusGroupId(type:DataRegion, id:3)]
+   *           └──ExchangeNode-30: [SourceAddress:192.0.4.1/test.2.0/31]
+   *
+   * IdentitySinkNode-31
+   *   └──SortNode-29
+   *       └──TransformNode-26
+   *           └──DeviceView-18
+   *               └──HorizontallyConcatNode-24
+   *                   ├──SeriesAggregationScanNode-22:[SeriesPath: 
root.sg.d55555.s1,
+   *                      Descriptor: [AggregationDescriptor(avg, SINGLE)],
+   *                      DataRegion: TConsensusGroupId(type:DataRegion, id:4)]
+   *                   └──SeriesAggregationScanNode-23:[SeriesPath: 
root.sg.d55555.s2,
+   *                      Descriptor: [AggregationDescriptor(avg, SINGLE), 
AggregationDescriptor(max_value, SINGLE)],
+   *                      DataRegion: TConsensusGroupId(type:DataRegion, id:4)]
+   */
+  @Test
+  public void orderByExpressionTest() {
+    sql =
+        "select avg(s1)+avg(s2) from root.sg.d22, root.sg.d55555 order by 
max_value(s2) align by device";
+    analysis = Util.analyze(sql, context);
+    logicalPlanNode = Util.genLogicalPlan(analysis, context);
+    planner = new DistributionPlanner(analysis, new LogicalQueryPlan(context, 
logicalPlanNode));
+    plan = planner.planFragments();
+    assertEquals(2, plan.getInstances().size());
+
+    firstFiRoot = plan.getInstances().get(0).getFragment().getPlanNodeTree();
+    assertTrue(firstFiRoot instanceof IdentitySinkNode);
+    firstFiTopNode = firstFiRoot.getChildren().get(0);
+    assertTrue(firstFiTopNode instanceof TransformNode);
+    assertTrue(firstFiTopNode.getChildren().get(0) instanceof MergeSortNode);
+    assertTrue(firstFiTopNode.getChildren().get(0).getChildren().get(0) 
instanceof SortNode);
+    assertTrue(
+        
firstFiTopNode.getChildren().get(0).getChildren().get(0).getChildren().get(0)
+            instanceof TransformNode);
+    assertTrue(
+        firstFiTopNode
+                .getChildren()
+                .get(0)
+                .getChildren()
+                .get(0)
+                .getChildren()
+                .get(0)
+                .getChildren()
+                .get(0)
+            instanceof DeviceViewNode);
+    assertTrue(
+        firstFiTopNode
+                .getChildren()
+                .get(0)
+                .getChildren()
+                .get(0)
+                .getChildren()
+                .get(0)
+                .getChildren()
+                .get(0)
+                .getChildren()
+                .get(0)
+            instanceof HorizontallyConcatNode);
+
+    secondFiRoot = plan.getInstances().get(1).getFragment().getPlanNodeTree();
+    assertTrue(secondFiRoot instanceof IdentitySinkNode);
+    assertTrue(secondFiRoot.getChildren().get(0) instanceof SortNode);
+    assertTrue(secondFiRoot.getChildren().get(0).getChildren().get(0) 
instanceof TransformNode);
+    assertTrue(
+        
secondFiRoot.getChildren().get(0).getChildren().get(0).getChildren().get(0)
+            instanceof DeviceViewNode);
+    assertTrue(
+        secondFiRoot
+                .getChildren()
+                .get(0)
+                .getChildren()
+                .get(0)
+                .getChildren()
+                .get(0)
+                .getChildren()
+                .get(0)
+            instanceof HorizontallyConcatNode);
+  }
+
+  @Test
+  public void orderByTimeTest1() {
+    // one aggregation measurement, two devices
+    sql =
+        "select first_value(s1) from root.sg.d22, root.sg.d55555 order by time 
desc align by device";
+    analysis = Util.analyze(sql, context);
+    logicalPlanNode = Util.genLogicalPlan(analysis, context);
+    planner = new DistributionPlanner(analysis, new LogicalQueryPlan(context, 
logicalPlanNode));
+    plan = planner.planFragments();
+    assertEquals(2, plan.getInstances().size());
+
+    firstFiRoot = plan.getInstances().get(0).getFragment().getPlanNodeTree();
+    assertTrue(firstFiRoot instanceof IdentitySinkNode);
+    firstFiTopNode = firstFiRoot.getChildren().get(0);
+    assertTrue(firstFiTopNode instanceof MergeSortNode);
+    assertTrue(firstFiTopNode.getChildren().get(0) instanceof 
SingleDeviceViewNode);
+    assertTrue(
+        firstFiTopNode.getChildren().get(0).getChildren().get(0)
+            instanceof SeriesAggregationScanNode);
+
+    secondFiRoot = plan.getInstances().get(1).getFragment().getPlanNodeTree();
+    assertTrue(secondFiRoot instanceof ShuffleSinkNode);
+    assertTrue(secondFiRoot.getChildren().get(0) instanceof 
SingleDeviceViewNode);
+    assertTrue(
+        secondFiRoot.getChildren().get(0).getChildren().get(0)
+            instanceof SeriesAggregationScanNode);
+
+    // one aggregation measurement, one device
+    sql = "select first_value(s1) from root.sg.d22 order by time desc align by 
device";
+    analysis = Util.analyze(sql, context);
+    logicalPlanNode = Util.genLogicalPlan(analysis, context);
+    planner = new DistributionPlanner(analysis, new LogicalQueryPlan(context, 
logicalPlanNode));
+    plan = planner.planFragments();
+    assertEquals(1, plan.getInstances().size());
+
+    firstFiRoot = plan.getInstances().get(0).getFragment().getPlanNodeTree();
+    assertTrue(firstFiRoot instanceof IdentitySinkNode);
+    firstFiTopNode = firstFiRoot.getChildren().get(0);
+    assertTrue(firstFiTopNode instanceof DeviceViewNode);
+    assertTrue(firstFiTopNode.getChildren().get(0) instanceof 
SeriesAggregationScanNode);
+
+    // two aggregation measurement, two devices
+    sql =
+        "select first_value(s1), count(s2) from root.sg.d22, root.sg.d55555 "
+            + "order by time desc align by device";
+    analysis = Util.analyze(sql, context);
+    logicalPlanNode = Util.genLogicalPlan(analysis, context);
+    planner = new DistributionPlanner(analysis, new LogicalQueryPlan(context, 
logicalPlanNode));
+    plan = planner.planFragments();
+    assertEquals(2, plan.getInstances().size());
+
+    firstFiRoot = plan.getInstances().get(0).getFragment().getPlanNodeTree();
+    assertTrue(firstFiRoot instanceof IdentitySinkNode);
+    firstFiTopNode = firstFiRoot.getChildren().get(0);
+    assertTrue(firstFiTopNode instanceof MergeSortNode);
+    assertTrue(firstFiTopNode.getChildren().get(0) instanceof 
SingleDeviceViewNode);
+    assertTrue(
+        firstFiTopNode.getChildren().get(0).getChildren().get(0) instanceof 
HorizontallyConcatNode);
+
+    secondFiRoot = plan.getInstances().get(1).getFragment().getPlanNodeTree();
+    assertTrue(secondFiRoot instanceof ShuffleSinkNode);
+    assertTrue(secondFiRoot.getChildren().get(0) instanceof 
SingleDeviceViewNode);
+    assertTrue(
+        secondFiRoot.getChildren().get(0).getChildren().get(0) instanceof 
HorizontallyConcatNode);
+
+    // two aggregation measurement, one device
+    sql = "select first_value(s1), count(s2) from root.sg.d22 order by time 
desc align by device";
+    analysis = Util.analyze(sql, context);
+    logicalPlanNode = Util.genLogicalPlan(analysis, context);
+    planner = new DistributionPlanner(analysis, new LogicalQueryPlan(context, 
logicalPlanNode));
+    plan = planner.planFragments();
+    assertEquals(1, plan.getInstances().size());
+
+    firstFiRoot = plan.getInstances().get(0).getFragment().getPlanNodeTree();
+    assertTrue(firstFiRoot instanceof IdentitySinkNode);
+    firstFiTopNode = firstFiRoot.getChildren().get(0);
+    assertTrue(firstFiTopNode instanceof DeviceViewNode);
+    assertTrue(firstFiTopNode.getChildren().get(0) instanceof 
HorizontallyConcatNode);
+  }
+}
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/AggregationDistributionTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/AggregationDistributionTest.java
index 812d5319e19..b04c6c4b847 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/AggregationDistributionTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/AggregationDistributionTest.java
@@ -68,7 +68,7 @@ import static org.junit.Assert.assertTrue;
 public class AggregationDistributionTest {
 
   @Test
-  public void testAggregation1Series2Regions() throws IllegalPathException {
+  public void testAggregation1Series2Regions() {
     QueryId queryId = new QueryId("test_1_series_2_regions");
     MPPQueryContext context =
         new MPPQueryContext("", queryId, null, new TEndPoint(), new 
TEndPoint());
@@ -95,7 +95,7 @@ public class AggregationDistributionTest {
   }
 
   @Test
-  public void testAggregation1Series2RegionsWithSlidingWindow() throws 
IllegalPathException {
+  public void testAggregation1Series2RegionsWithSlidingWindow() {
     QueryId queryId = new QueryId("test_1_series_2_regions_sliding_window");
     MPPQueryContext context =
         new MPPQueryContext("", queryId, null, new TEndPoint(), new 
TEndPoint());
@@ -130,7 +130,7 @@ public class AggregationDistributionTest {
   }
 
   @Test
-  public void testTimeJoinAggregationSinglePerRegion() throws 
IllegalPathException {
+  public void testTimeJoinAggregationSinglePerRegion() {
     QueryId queryId = new QueryId("test_query_time_join_aggregation");
     MPPQueryContext context =
         new MPPQueryContext("", queryId, null, new TEndPoint(), new 
TEndPoint());
@@ -166,9 +166,7 @@ public class AggregationDistributionTest {
       SeriesAggregationSourceNode handle = (SeriesAggregationSourceNode) root;
       List<AggregationDescriptor> descriptorList = 
handle.getAggregationDescriptorList();
       descriptorList.forEach(
-          d -> {
-            
assertEquals(expected.get(handle.getPartitionPath().getFullPath()), 
d.getStep());
-          });
+          d -> 
assertEquals(expected.get(handle.getPartitionPath().getFullPath()), 
d.getStep()));
     }
     root.getChildren().forEach(child -> verifyAggregationStep(expected, 
child));
   }
@@ -190,7 +188,7 @@ public class AggregationDistributionTest {
   }
 
   @Test
-  public void testTimeJoinAggregationWithSlidingWindow() throws 
IllegalPathException {
+  public void testTimeJoinAggregationWithSlidingWindow() {
     QueryId queryId = new QueryId("test_query_time_join_agg_with_sliding");
     MPPQueryContext context =
         new MPPQueryContext("", queryId, null, new TEndPoint(), new 
TEndPoint());
@@ -229,7 +227,7 @@ public class AggregationDistributionTest {
   }
 
   @Test
-  public void testTimeJoinAggregationMultiPerRegion() throws 
IllegalPathException {
+  public void testTimeJoinAggregationMultiPerRegion() {
     QueryId queryId = new QueryId("test_query_time_join_aggregation");
     MPPQueryContext context =
         new MPPQueryContext("", queryId, null, new TEndPoint(), new 
TEndPoint());
@@ -254,7 +252,7 @@ public class AggregationDistributionTest {
   }
 
   @Test
-  public void testTimeJoinAggregationMultiPerRegion2() throws 
IllegalPathException {
+  public void testTimeJoinAggregationMultiPerRegion2() {
     QueryId queryId = new QueryId("test_query_time_join_aggregation");
     MPPQueryContext context =
         new MPPQueryContext("", queryId, null, new TEndPoint(), new 
TEndPoint());
@@ -775,7 +773,7 @@ public class AggregationDistributionTest {
   }
 
   @Test
-  public void testAlignByDevice1Device2Region() throws IllegalPathException {
+  public void testAlignByDevice1Device2Region() {
     QueryId queryId = new QueryId("test_align_by_device_1_device_2_region");
     MPPQueryContext context =
         new MPPQueryContext("", queryId, null, new TEndPoint(), new 
TEndPoint());

Reply via email to