This is an automated email from the ASF dual-hosted git repository.

caogaofei pushed a commit to branch rc/1.3.1
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rc/1.3.1 by this push:
     new 9e8688183ea [IOTDB-6297] Optimize the distribute plan in aggregation 
align by device when some device cross data regions
9e8688183ea is described below

commit 9e8688183ea28ecf50f1e42a87476733403ac5b3
Author: Beyyes <[email protected]>
AuthorDate: Fri Mar 8 11:21:34 2024 +0800

    [IOTDB-6297] Optimize the distribute plan in aggregation align by device 
when some device cross data regions
---
 .../db/it/alignbydevice/IoTDBAlignByDevice3IT.java |  43 ++
 .../IoTDBAlignByDeviceWithTemplate2IT.java         |  43 ++
 .../IoTDBAlignByDeviceWithTemplateIT.java          |   2 +-
 .../IoTDBOrderByLimitOffsetAlignByDevice2IT.java   |  46 +++
 .../IoTDBOrderByWithAlignByDevice3IT.java          |  40 ++
 .../db/it/alignbydevice/IoTDBShuffleSink1IT.java   |   1 +
 .../db/it/alignbydevice/IoTDBShuffleSink2IT.java   |   1 +
 .../execution/aggregation/Accumulator.java         |   9 +
 .../execution/aggregation/AvgAccumulator.java      |   5 +
 .../aggregation/FirstValueAccumulator.java         |   5 +
 .../aggregation/LastValueAccumulator.java          |   5 +
 .../aggregation/TimeDurationAccumulator.java       |   5 +
 .../process/AggregationMergeSortOperator.java      | 286 +++++++++++++
 .../db/queryengine/plan/analyze/Analysis.java      |  10 -
 .../plan/planner/OperatorTreeGenerator.java        |  74 +++-
 .../plan/planner/distribution/SourceRewriter.java  | 196 +++++++--
 .../node/process/AggregationMergeSortNode.java     |  56 ++-
 .../planner/plan/node/process/DeviceViewNode.java  |   6 +-
 .../plan/parameter/AggregationDescriptor.java      |   8 +-
 .../distribution/AggregationDistributionTest.java  |  26 +-
 .../AlignByDeviceOrderByLimitOffsetTest.java       | 123 ++++--
 .../planner/distribution/AlignedByDeviceTest.java  | 446 ++++-----------------
 .../datanode1conf/iotdb-common.properties          |   1 -
 .../datanode3conf/iotdb-common.properties          |   1 -
 24 files changed, 983 insertions(+), 455 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/db/it/alignbydevice/IoTDBAlignByDevice3IT.java
 
b/integration-test/src/test/java/org/apache/iotdb/db/it/alignbydevice/IoTDBAlignByDevice3IT.java
new file mode 100644
index 00000000000..68b2c40cc6b
--- /dev/null
+++ 
b/integration-test/src/test/java/org/apache/iotdb/db/it/alignbydevice/IoTDBAlignByDevice3IT.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.it.alignbydevice;
+
+import org.apache.iotdb.it.env.EnvFactory;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+/**
+ * Change series_slot_num to 1, to generate more devices which are cross data 
regions as possible.
+ */
+public class IoTDBAlignByDevice3IT extends IoTDBAlignByDeviceIT {
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    EnvFactory.getEnv().getConfig().getCommonConfig().setSeriesSlotNum(1);
+    EnvFactory.getEnv().initClusterEnvironment();
+    insertData();
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    EnvFactory.getEnv().cleanClusterEnvironment();
+  }
+}
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/db/it/alignbydevice/IoTDBAlignByDeviceWithTemplate2IT.java
 
b/integration-test/src/test/java/org/apache/iotdb/db/it/alignbydevice/IoTDBAlignByDeviceWithTemplate2IT.java
new file mode 100644
index 00000000000..a4da0898f75
--- /dev/null
+++ 
b/integration-test/src/test/java/org/apache/iotdb/db/it/alignbydevice/IoTDBAlignByDeviceWithTemplate2IT.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.it.alignbydevice;
+
+import org.apache.iotdb.it.env.EnvFactory;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+/**
+ * Change series_slot_num to 1, to generate more devices which are cross data 
regions as possible.
+ */
+public class IoTDBAlignByDeviceWithTemplate2IT extends 
IoTDBAlignByDeviceWithTemplateIT {
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    EnvFactory.getEnv().getConfig().getCommonConfig().setSeriesSlotNum(1);
+    EnvFactory.getEnv().initClusterEnvironment();
+    insertData();
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    EnvFactory.getEnv().cleanClusterEnvironment();
+  }
+}
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/db/it/alignbydevice/IoTDBAlignByDeviceWithTemplateIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/db/it/alignbydevice/IoTDBAlignByDeviceWithTemplateIT.java
index b87f5df60f3..05e2104b904 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/db/it/alignbydevice/IoTDBAlignByDeviceWithTemplateIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/db/it/alignbydevice/IoTDBAlignByDeviceWithTemplateIT.java
@@ -957,7 +957,7 @@ public class IoTDBAlignByDeviceWithTemplateIT {
         retArray);
   }
 
-  private static void insertData() {
+  protected static void insertData() {
     try (Connection connection = EnvFactory.getEnv().getConnection();
         Statement statement = connection.createStatement()) {
 
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/db/it/alignbydevice/IoTDBOrderByLimitOffsetAlignByDevice2IT.java
 
b/integration-test/src/test/java/org/apache/iotdb/db/it/alignbydevice/IoTDBOrderByLimitOffsetAlignByDevice2IT.java
new file mode 100644
index 00000000000..85713e87e5d
--- /dev/null
+++ 
b/integration-test/src/test/java/org/apache/iotdb/db/it/alignbydevice/IoTDBOrderByLimitOffsetAlignByDevice2IT.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.it.alignbydevice;
+
+import org.apache.iotdb.it.env.EnvFactory;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+import static 
org.apache.iotdb.db.it.alignbydevice.IoTDBOrderByWithAlignByDeviceIT.insertData;
+import static 
org.apache.iotdb.db.it.alignbydevice.IoTDBOrderByWithAlignByDeviceIT.insertData2;
+
+public class IoTDBOrderByLimitOffsetAlignByDevice2IT
+    extends IoTDBOrderByLimitOffsetAlignByDeviceIT {
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    EnvFactory.getEnv().getConfig().getCommonConfig().setSeriesSlotNum(1);
+    EnvFactory.getEnv().initClusterEnvironment();
+    insertData();
+    insertData2();
+    insertData3();
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    EnvFactory.getEnv().cleanClusterEnvironment();
+  }
+}
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/db/it/alignbydevice/IoTDBOrderByWithAlignByDevice3IT.java
 
b/integration-test/src/test/java/org/apache/iotdb/db/it/alignbydevice/IoTDBOrderByWithAlignByDevice3IT.java
new file mode 100644
index 00000000000..011972ea471
--- /dev/null
+++ 
b/integration-test/src/test/java/org/apache/iotdb/db/it/alignbydevice/IoTDBOrderByWithAlignByDevice3IT.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.it.alignbydevice;
+
+import org.apache.iotdb.it.env.EnvFactory;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+public class IoTDBOrderByWithAlignByDevice3IT extends 
IoTDBOrderByWithAlignByDeviceIT {
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    EnvFactory.getEnv().getConfig().getCommonConfig().setSeriesSlotNum(1);
+    EnvFactory.getEnv().initClusterEnvironment();
+    insertData();
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    EnvFactory.getEnv().cleanClusterEnvironment();
+  }
+}
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/db/it/alignbydevice/IoTDBShuffleSink1IT.java
 
b/integration-test/src/test/java/org/apache/iotdb/db/it/alignbydevice/IoTDBShuffleSink1IT.java
index 73cc3c71dee..484fd5d9d94 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/db/it/alignbydevice/IoTDBShuffleSink1IT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/db/it/alignbydevice/IoTDBShuffleSink1IT.java
@@ -56,6 +56,7 @@ public class IoTDBShuffleSink1IT {
 
   @BeforeClass
   public static void setUp() throws Exception {
+    EnvFactory.getEnv().getConfig().getCommonConfig().setSeriesSlotNum(1);
     
EnvFactory.getEnv().getConfig().getCommonConfig().setDataRegionGroupExtensionPolicy("CUSTOM");
     
EnvFactory.getEnv().getConfig().getCommonConfig().setDefaultDataRegionGroupNumPerDatabase(2);
     EnvFactory.getEnv().initClusterEnvironment();
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/db/it/alignbydevice/IoTDBShuffleSink2IT.java
 
b/integration-test/src/test/java/org/apache/iotdb/db/it/alignbydevice/IoTDBShuffleSink2IT.java
index 8558e1f6442..cd29526e956 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/db/it/alignbydevice/IoTDBShuffleSink2IT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/db/it/alignbydevice/IoTDBShuffleSink2IT.java
@@ -72,6 +72,7 @@ public class IoTDBShuffleSink2IT {
 
   @BeforeClass
   public static void setUp() throws Exception {
+    EnvFactory.getEnv().getConfig().getCommonConfig().setSeriesSlotNum(1);
     
EnvFactory.getEnv().getConfig().getCommonConfig().setDataRegionGroupExtensionPolicy("CUSTOM");
     
EnvFactory.getEnv().getConfig().getCommonConfig().setDefaultDataRegionGroupNumPerDatabase(3);
     EnvFactory.getEnv().initClusterEnvironment();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/Accumulator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/Accumulator.java
index 75091443fd0..2065f899d9b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/Accumulator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/Accumulator.java
@@ -88,4 +88,13 @@ public interface Accumulator {
   TSDataType[] getIntermediateType();
 
   TSDataType getFinalType();
+
+  /**
+   * The return value equals to the length of tsBlockBuilder in {@link
+   * #outputIntermediate(ColumnBuilder[])}}. Currently only aggregation `Avg, 
FirstValue, LastValue,
+   * TimeDuration` will return 2.
+   */
+  default int getPartialResultSize() {
+    return 1;
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/AvgAccumulator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/AvgAccumulator.java
index 53f6d50ed3d..7d8604da5b0 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/AvgAccumulator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/AvgAccumulator.java
@@ -163,6 +163,11 @@ public class AvgAccumulator implements Accumulator {
     return TSDataType.DOUBLE;
   }
 
+  @Override
+  public int getPartialResultSize() {
+    return 2;
+  }
+
   private void addIntInput(Column[] column, BitMap bitMap, int lastIndex) {
     for (int i = 0; i <= lastIndex; i++) {
       if (bitMap != null && !bitMap.isMarked(i)) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/FirstValueAccumulator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/FirstValueAccumulator.java
index 2cfffa26d08..04f186b96fa 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/FirstValueAccumulator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/FirstValueAccumulator.java
@@ -252,6 +252,11 @@ public class FirstValueAccumulator implements Accumulator {
     return firstValue.getDataType();
   }
 
+  @Override
+  public int getPartialResultSize() {
+    return 2;
+  }
+
   protected void addIntInput(Column[] column, BitMap bitMap, int lastIndex) {
     for (int i = 0; i <= lastIndex; i++) {
       if (bitMap != null && !bitMap.isMarked(i)) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/LastValueAccumulator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/LastValueAccumulator.java
index b863812c444..c1ffa9fa6b8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/LastValueAccumulator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/LastValueAccumulator.java
@@ -252,6 +252,11 @@ public class LastValueAccumulator implements Accumulator {
     return lastValue.getDataType();
   }
 
+  @Override
+  public int getPartialResultSize() {
+    return 2;
+  }
+
   protected void addIntInput(Column[] column, BitMap bitMap, int lastIndex) {
     for (int i = 0; i <= lastIndex; i++) {
       if (bitMap != null && !bitMap.isMarked(i)) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/TimeDurationAccumulator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/TimeDurationAccumulator.java
index 84cd5a464ae..6a0045b3f8d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/TimeDurationAccumulator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/TimeDurationAccumulator.java
@@ -112,6 +112,11 @@ public class TimeDurationAccumulator implements 
Accumulator {
     return TSDataType.INT64;
   }
 
+  @Override
+  public int getPartialResultSize() {
+    return 2;
+  }
+
   protected void updateMaxTime(long curTime) {
     initResult = true;
     maxTime = Math.max(maxTime, curTime);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AggregationMergeSortOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AggregationMergeSortOperator.java
new file mode 100644
index 00000000000..52d4a6d93d2
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AggregationMergeSortOperator.java
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.execution.operator.process;
+
+import org.apache.iotdb.db.queryengine.execution.aggregation.Accumulator;
+import org.apache.iotdb.db.queryengine.execution.operator.Operator;
+import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
+import org.apache.iotdb.db.utils.datastructure.MergeSortHeap;
+import org.apache.iotdb.db.utils.datastructure.MergeSortKey;
+import org.apache.iotdb.db.utils.datastructure.SortKey;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
+import org.apache.iotdb.tsfile.utils.Binary;
+
+import com.google.common.util.concurrent.ListenableFuture;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import static com.google.common.util.concurrent.Futures.successfulAsList;
+
+public class AggregationMergeSortOperator extends AbstractConsumeAllOperator {
+
+  private final List<Accumulator> accumulators;
+
+  private final List<TSDataType> dataTypes;
+
+  private final TsBlockBuilder tsBlockBuilder;
+
+  private final boolean[] noMoreTsBlocks;
+
+  private final MergeSortHeap mergeSortHeap;
+
+  private final boolean hasGroupBy;
+
+  private boolean finished;
+
+  private Binary lastDevice;
+
+  private long lastTime;
+
+  public AggregationMergeSortOperator(
+      OperatorContext operatorContext,
+      List<Operator> children,
+      List<TSDataType> dataTypes,
+      List<Accumulator> accumulators,
+      boolean hasGroupBy,
+      Comparator<SortKey> comparator) {
+    super(operatorContext, children);
+    this.dataTypes = dataTypes;
+    this.tsBlockBuilder = new TsBlockBuilder(dataTypes);
+    this.noMoreTsBlocks = new boolean[this.inputOperatorsCount];
+    this.accumulators = accumulators;
+    this.hasGroupBy = hasGroupBy;
+    this.mergeSortHeap = new MergeSortHeap(inputOperatorsCount, comparator);
+  }
+
+  @Override
+  public TsBlock next() throws Exception {
+    long startTime = System.nanoTime();
+    long maxRuntime = 
operatorContext.getMaxRunTime().roundTo(TimeUnit.NANOSECONDS);
+
+    // init all element in inputTsBlocks
+    if (!prepareInput()) {
+      return null;
+    }
+
+    tsBlockBuilder.reset();
+    while (!mergeSortHeap.isEmpty()) {
+      MergeSortKey mergeSortKey = mergeSortHeap.poll();
+      TsBlock targetBlock = mergeSortKey.tsBlock;
+      int rowIndex = mergeSortKey.rowIndex;
+      Binary currentDevice = targetBlock.getColumn(0).getBinary(rowIndex);
+      long currentTime = targetBlock.getTimeByIndex(rowIndex);
+      if (lastDevice != null && (!currentDevice.equals(lastDevice) || 
currentTime != lastTime)) {
+        outputResultToTsBlock();
+      }
+
+      lastDevice = currentDevice;
+      lastTime = currentTime;
+
+      int cnt = 1;
+      for (Accumulator accumulator : accumulators) {
+        if (accumulator.getPartialResultSize() == 2) {
+          Column first =
+              hasGroupBy
+                  ? targetBlock.getColumn(cnt++).subColumn(rowIndex)
+                  : targetBlock.getColumn(cnt++);
+          Column second =
+              hasGroupBy
+                  ? targetBlock.getColumn(cnt++).subColumn(rowIndex)
+                  : targetBlock.getColumn(cnt++);
+          accumulator.addIntermediate(new Column[] {first, second});
+        } else {
+          Column column =
+              hasGroupBy
+                  ? targetBlock.getColumn(cnt++).subColumn(rowIndex)
+                  : targetBlock.getColumn(cnt++);
+          accumulator.addIntermediate(new Column[] {column});
+        }
+      }
+
+      if (mergeSortKey.rowIndex == mergeSortKey.tsBlock.getPositionCount() - 
1) {
+        inputTsBlocks[mergeSortKey.inputChannelIndex] = null;
+        break;
+      } else {
+        mergeSortKey.rowIndex++;
+        mergeSortHeap.push(mergeSortKey);
+      }
+
+      // break if time is out or tsBlockBuilder is full
+      if (System.nanoTime() - startTime > maxRuntime || 
tsBlockBuilder.isFull()) {
+        break;
+      }
+    }
+
+    if (mergeSortHeap.isEmpty()) {
+      outputResultToTsBlock();
+    }
+
+    return tsBlockBuilder.build();
+  }
+
+  private void outputResultToTsBlock() {
+    TimeColumnBuilder timeBuilder = tsBlockBuilder.getTimeColumnBuilder();
+    ColumnBuilder[] valueColumnBuilders = 
tsBlockBuilder.getValueColumnBuilders();
+    timeBuilder.writeLong(lastTime);
+    valueColumnBuilders[0].writeBinary(lastDevice);
+    for (int i = 1; i < dataTypes.size(); i++) {
+      accumulators.get(i - 1).outputFinal(valueColumnBuilders[i]);
+    }
+    tsBlockBuilder.declarePosition();
+    accumulators.forEach(Accumulator::reset);
+  }
+
+  @Override
+  public boolean hasNext() throws Exception {
+    if (finished) {
+      return false;
+    }
+
+    for (int i = 0; i < inputOperatorsCount; i++) {
+      if (isInputNotEmpty(i)) {
+        return true;
+      } else if (!noMoreTsBlocks[i]) {
+        if (!canCallNext[i] || children.get(i).hasNextWithTimer()) {
+          return true;
+        } else {
+          handleFinishedChild(i);
+        }
+      }
+    }
+
+    return false;
+  }
+
+  @Override
+  public ListenableFuture<?> isBlocked() {
+    boolean hasReadyChild = false;
+    List<ListenableFuture<?>> listenableFutures = new ArrayList<>();
+
+    for (int i = 0; i < inputOperatorsCount; i++) {
+      if (noMoreTsBlocks[i] || isInputNotEmpty(i) || children.get(i) == null) {
+        continue;
+      }
+      ListenableFuture<?> blocked = children.get(i).isBlocked();
+      if (blocked.isDone()) {
+        hasReadyChild = true;
+        // only when not blocked, canCallNext[i] equals true
+        canCallNext[i] = true;
+      } else {
+        listenableFutures.add(blocked);
+      }
+    }
+
+    return (hasReadyChild || listenableFutures.isEmpty())
+        ? NOT_BLOCKED
+        : successfulAsList(listenableFutures);
+  }
+
+  @Override
+  public boolean isFinished() throws Exception {
+    if (finished) {
+      return true;
+    }
+
+    finished = true;
+    for (int i = 0; i < inputOperatorsCount; i++) {
+      if (!noMoreTsBlocks[i] || isInputNotEmpty(i)) {
+        finished = false;
+        break;
+      }
+    }
+    return finished;
+  }
+
+  @Override
+  public void close() throws Exception {
+    for (int i = 0; i < inputOperatorsCount; i++) {
+      final Operator operator = children.get(i);
+      if (operator != null) {
+        operator.close();
+      }
+    }
+  }
+
+  @Override
+  protected void handleFinishedChild(int currentChildIndex) throws Exception {
+    // invoking this method when children.get(currentChildIndex).hasNext 
return false
+    noMoreTsBlocks[currentChildIndex] = true;
+    inputTsBlocks[currentChildIndex] = null;
+    children.get(currentChildIndex).close();
+    children.set(currentChildIndex, null);
+  }
+
+  @Override
+  protected boolean canSkipCurrentChild(int currentChildIndex) {
+    return noMoreTsBlocks[currentChildIndex]
+        || !isEmpty(currentChildIndex)
+        || children.get(currentChildIndex) == null;
+  }
+
+  @Override
+  protected void processCurrentInputTsBlock(int currentInputIndex) {
+    mergeSortHeap.push(new MergeSortKey(inputTsBlocks[currentInputIndex], 0, 
currentInputIndex));
+  }
+
+  @Override
+  public long calculateMaxPeekMemory() {
+    long maxPeekMemory = 
TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
+    // inputTsBlocks will cache all the tsBlocks returned by inputOperators
+    for (Operator operator : children) {
+      maxPeekMemory += operator.calculateMaxReturnSize();
+      maxPeekMemory += operator.calculateRetainedSizeAfterCallingNext();
+    }
+    for (Operator operator : children) {
+      maxPeekMemory = Math.max(maxPeekMemory, 
operator.calculateMaxPeekMemory());
+    }
+    return Math.max(maxPeekMemory, calculateMaxReturnSize());
+  }
+
+  @Override
+  public long calculateMaxReturnSize() {
+    return 
TSFileDescriptor.getInstance().getConfig().getMaxTsBlockSizeInBytes();
+  }
+
+  @Override
+  public long calculateRetainedSizeAfterCallingNext() {
+    long currentRetainedSize = 0;
+    long minChildReturnSize = Long.MAX_VALUE;
+    for (Operator child : children) {
+      long maxReturnSize = child.calculateMaxReturnSize();
+      minChildReturnSize = Math.min(minChildReturnSize, maxReturnSize);
+      currentRetainedSize += (maxReturnSize + 
child.calculateRetainedSizeAfterCallingNext());
+    }
+    return currentRetainedSize - minChildReturnSize;
+  }
+
+  private boolean isInputNotEmpty(int index) {
+    return inputTsBlocks[index] != null && !inputTsBlocks[index].isEmpty();
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java
index 3014cdec431..81c27d885f4 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analysis.java
@@ -174,8 +174,6 @@ public class Analysis {
   // deviceViewSpecialProcess equals true when all Aggregation Functions and 
DIFF
   private boolean deviceViewSpecialProcess;
 
-  private boolean existDeviceCrossRegion;
-
   
/////////////////////////////////////////////////////////////////////////////////////////////////
   // Query Common Analysis (above DeviceView)
   
/////////////////////////////////////////////////////////////////////////////////////////////////
@@ -658,14 +656,6 @@ public class Analysis {
     this.deviceViewSpecialProcess = deviceViewSpecialProcess;
   }
 
-  public boolean isExistDeviceCrossRegion() {
-    return existDeviceCrossRegion;
-  }
-
-  public void setExistDeviceCrossRegion() {
-    this.existDeviceCrossRegion = true;
-  }
-
   public DeviceViewIntoPathDescriptor getDeviceViewIntoPathDescriptor() {
     return deviceViewIntoPathDescriptor;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
index cbf8db730de..4ce99aa3737 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.queryengine.plan.planner;
 
+import org.apache.iotdb.common.rpc.thrift.TAggregationType;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.commons.path.AlignedPath;
 import org.apache.iotdb.commons.path.MeasurementPath;
@@ -27,6 +28,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.queryengine.common.FragmentInstanceId;
 import org.apache.iotdb.db.queryengine.common.NodeRef;
+import org.apache.iotdb.db.queryengine.execution.aggregation.Accumulator;
 import 
org.apache.iotdb.db.queryengine.execution.aggregation.AccumulatorFactory;
 import org.apache.iotdb.db.queryengine.execution.aggregation.Aggregator;
 import 
org.apache.iotdb.db.queryengine.execution.aggregation.slidingwindow.SlidingWindowAggregatorFactory;
@@ -45,6 +47,7 @@ import 
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceManage
 import org.apache.iotdb.db.queryengine.execution.operator.AggregationUtil;
 import org.apache.iotdb.db.queryengine.execution.operator.Operator;
 import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
+import 
org.apache.iotdb.db.queryengine.execution.operator.process.AggregationMergeSortOperator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.process.AggregationOperator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.process.ColumnInjectOperator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.process.DeviceViewIntoOperator;
@@ -96,7 +99,6 @@ import 
org.apache.iotdb.db.queryengine.execution.operator.process.join.LeftOuter
 import 
org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.AscTimeComparator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.ColumnMerger;
 import 
org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.DescTimeComparator;
-import 
org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.MergeSortComparator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.MultiColumnMerger;
 import 
org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.NonOverlappedMultiColumnMerger;
 import 
org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.SingleColumnMerger;
@@ -146,6 +148,7 @@ import 
org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeSchemaC
 import org.apache.iotdb.db.queryengine.plan.expression.Expression;
 import org.apache.iotdb.db.queryengine.plan.expression.leaf.TimeSeriesOperand;
 import org.apache.iotdb.db.queryengine.plan.expression.leaf.TimestampOperand;
+import 
org.apache.iotdb.db.queryengine.plan.expression.multi.FunctionExpression;
 import 
org.apache.iotdb.db.queryengine.plan.expression.visitor.ColumnTransformerVisitor;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
@@ -166,6 +169,7 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.Sche
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.SchemaQueryScanNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.TimeSeriesCountNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.TimeSeriesSchemaScanNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.AggregationMergeSortNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.AggregationNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ColumnInjectNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.DeviceViewIntoNode;
@@ -273,6 +277,7 @@ import static 
org.apache.iotdb.db.queryengine.execution.operator.AggregationUtil
 import static 
org.apache.iotdb.db.queryengine.execution.operator.AggregationUtil.calculateMaxAggregationResultSizeForLastQuery;
 import static 
org.apache.iotdb.db.queryengine.execution.operator.AggregationUtil.getOutputColumnSizePerLine;
 import static 
org.apache.iotdb.db.queryengine.execution.operator.AggregationUtil.initTimeRangeIterator;
+import static 
org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.MergeSortComparator.getComparator;
 import static 
org.apache.iotdb.db.queryengine.plan.expression.leaf.TimestampOperand.TIMESTAMP_EXPRESSION_STRING;
 import static 
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.SeriesScanOptions.updateFilterUsingTTL;
 import static 
org.apache.iotdb.db.utils.TimestampPrecisionUtils.TIMESTAMP_PRECISION;
@@ -837,7 +842,68 @@ public class OperatorTreeGenerator extends 
PlanVisitor<Operator, LocalExecutionP
         operatorContext,
         children,
         dataTypes,
-        MergeSortComparator.getComparator(sortItemList, sortItemIndexList, 
sortItemDataTypeList));
+        getComparator(sortItemList, sortItemIndexList, sortItemDataTypeList));
+  }
+
+  @Override
+  public Operator visitAggregationMergeSort(
+      AggregationMergeSortNode node, LocalExecutionPlanContext context) {
+    OperatorContext operatorContext =
+        context
+            .getDriverContext()
+            .addOperatorContext(
+                context.getNextOperatorId(),
+                node.getPlanNodeId(),
+                MergeSortOperator.class.getSimpleName());
+    List<TSDataType> dataTypes = getOutputColumnTypes(node, 
context.getTypeProvider());
+    List<Operator> children = dealWithConsumeAllChildrenPipelineBreaker(node, 
context);
+
+    List<SortItem> sortItemList = 
node.getMergeOrderParameter().getSortItemList();
+    if (!sortItemList.get(0).getSortKey().equalsIgnoreCase("Device")) {
+      throw new IllegalArgumentException(
+          "Only order by device align by device support 
AggregationMergeSortNode.");
+    }
+
+    boolean timeAscending = true;
+    for (SortItem sortItem : sortItemList) {
+      if (TIMESTAMP_EXPRESSION_STRING.equalsIgnoreCase(sortItem.getSortKey())
+          && (sortItem.getOrdering() == Ordering.DESC)) {
+        timeAscending = false;
+        break;
+      }
+    }
+
+    List<Accumulator> accumulators = new ArrayList<>();
+    for (Expression expression : node.getSelectExpressions()) {
+      if (expression instanceof FunctionExpression) {
+        FunctionExpression functionExpression = (FunctionExpression) 
expression;
+        String aggregationName = functionExpression.getFunctionName();
+        Accumulator accumulator =
+            AccumulatorFactory.createAccumulator(
+                TAggregationType.valueOf(aggregationName.toUpperCase()),
+                
context.getTypeProvider().getType(functionExpression.getOutputSymbol()),
+                functionExpression.getExpressions(),
+                functionExpression.getFunctionAttributes(),
+                timeAscending);
+        accumulators.add(accumulator);
+      }
+    }
+
+    List<Integer> sortItemIndexList = new ArrayList<>(sortItemList.size());
+    List<TSDataType> sortItemDataTypeList = new 
ArrayList<>(sortItemList.size());
+    genSortInformation(
+        node.getOutputColumnNames(),
+        dataTypes,
+        sortItemList,
+        sortItemIndexList,
+        sortItemDataTypeList);
+    return new AggregationMergeSortOperator(
+        operatorContext,
+        children,
+        dataTypes,
+        accumulators,
+        node.isHasGroupBy(),
+        getComparator(sortItemList, sortItemIndexList, sortItemDataTypeList));
   }
 
   @Override
@@ -866,7 +932,7 @@ public class OperatorTreeGenerator extends 
PlanVisitor<Operator, LocalExecutionP
         operatorContext,
         children,
         dataTypes,
-        MergeSortComparator.getComparator(sortItemList, sortItemIndexList, 
sortItemDataTypeList),
+        getComparator(sortItemList, sortItemIndexList, sortItemDataTypeList),
         node.getTopValue(),
         !sortItemList.isEmpty()
             && 
sortItemList.get(0).getSortKey().equalsIgnoreCase(OrderByKey.TIME)
@@ -1872,7 +1938,7 @@ public class OperatorTreeGenerator extends 
PlanVisitor<Operator, LocalExecutionP
         child,
         dataTypes,
         filePrefix,
-        MergeSortComparator.getComparator(sortItemList, sortItemIndexList, 
sortItemDataTypeList));
+        getComparator(sortItemList, sortItemIndexList, sortItemDataTypeList));
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java
index 450d37809c3..69066ec51ae 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java
@@ -29,7 +29,7 @@ import org.apache.iotdb.commons.utils.TimePartitionUtils;
 import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
 import org.apache.iotdb.db.queryengine.plan.analyze.Analysis;
 import org.apache.iotdb.db.queryengine.plan.expression.Expression;
-import org.apache.iotdb.db.queryengine.plan.planner.LogicalPlanBuilder;
+import 
org.apache.iotdb.db.queryengine.plan.expression.multi.FunctionExpression;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.BaseSourceRewriter;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
@@ -72,8 +72,11 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.OrderByParame
 import org.apache.iotdb.db.queryengine.plan.statement.component.OrderByKey;
 import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering;
 import org.apache.iotdb.db.queryengine.plan.statement.component.SortItem;
+import org.apache.iotdb.db.utils.constant.SqlConstant;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
@@ -84,8 +87,16 @@ import java.util.Set;
 import java.util.TreeSet;
 import java.util.stream.Collectors;
 
+import static org.apache.iotdb.commons.conf.IoTDBConstant.LAST_VALUE;
 import static 
org.apache.iotdb.commons.conf.IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD;
 import static org.apache.iotdb.commons.partition.DataPartition.NOT_ASSIGNED;
+import static 
org.apache.iotdb.db.queryengine.plan.analyze.ExpressionTypeAnalyzer.analyzeExpression;
+import static 
org.apache.iotdb.db.queryengine.plan.planner.LogicalPlanBuilder.updateTypeProviderByPartialAggregation;
+import static org.apache.iotdb.db.utils.constant.SqlConstant.AVG;
+import static org.apache.iotdb.db.utils.constant.SqlConstant.COUNT_IF;
+import static org.apache.iotdb.db.utils.constant.SqlConstant.DIFF;
+import static org.apache.iotdb.db.utils.constant.SqlConstant.FIRST_VALUE;
+import static org.apache.iotdb.db.utils.constant.SqlConstant.TIME_DURATION;
 
 public class SourceRewriter extends 
BaseSourceRewriter<DistributionPlanContext> {
 
@@ -123,7 +134,6 @@ public class SourceRewriter extends 
BaseSourceRewriter<DistributionPlanContext>
   public List<PlanNode> visitSingleDeviceView(
       SingleDeviceViewNode node, DistributionPlanContext context) {
 
-    // Same process logic as visitDeviceView
     if (analysis.isDeviceViewSpecialProcess()) {
       List<PlanNode> rewroteChildren = rewrite(node.getChild(), context);
       if (rewroteChildren.size() != 1) {
@@ -175,6 +185,7 @@ public class SourceRewriter extends 
BaseSourceRewriter<DistributionPlanContext>
     // Step 1: constructs DeviceViewSplits
     Set<TRegionReplicaSet> relatedDataRegions = new HashSet<>();
     List<DeviceViewSplit> deviceViewSplits = new ArrayList<>();
+    boolean existDeviceCrossRegion = false;
 
     for (int i = 0; i < node.getDevices().size(); i++) {
       String outputDevice = node.getDevices().get(i);
@@ -187,10 +198,9 @@ public class SourceRewriter extends 
BaseSourceRewriter<DistributionPlanContext>
                       context.getPartitionTimeFilter()))
               : new ArrayList<>(
                   analysis.getPartitionInfo(outputDevice, 
context.getPartitionTimeFilter()));
-      if (regionReplicaSets.size() > 1) {
-        // specialProcess and existDeviceCrossRegion, use the old aggregation 
logic
-        analysis.setExistDeviceCrossRegion();
-        if (analysis.isDeviceViewSpecialProcess()) {
+      if (regionReplicaSets.size() > 1 && !existDeviceCrossRegion) {
+        existDeviceCrossRegion = true;
+        if (analysis.isDeviceViewSpecialProcess() && 
aggregationCannotUseMergeSort()) {
           return processSpecialDeviceView(node, context);
         }
       }
@@ -200,7 +210,7 @@ public class SourceRewriter extends 
BaseSourceRewriter<DistributionPlanContext>
 
     // Step 2: Iterate all partition and create DeviceViewNode for each region
     List<PlanNode> deviceViewNodeList = new ArrayList<>();
-    if (analysis.isExistDeviceCrossRegion()) {
+    if (existDeviceCrossRegion) {
       constructDeviceViewNodeListWithCrossRegion(
           deviceViewNodeList, relatedDataRegions, deviceViewSplits, node, 
context);
     } else {
@@ -217,13 +227,103 @@ public class SourceRewriter extends 
BaseSourceRewriter<DistributionPlanContext>
       return deviceViewNodeList;
     }
 
-    MergeSortNode mergeSortNode =
-        new MergeSortNode(
-            context.queryContext.getQueryId().genPlanNodeId(),
-            node.getMergeOrderParameter(),
-            node.getOutputColumnNames());
-    deviceViewNodeList.forEach(mergeSortNode::addChild);
-    return Collections.singletonList(mergeSortNode);
+    // aggregation and some device cross region, user AggregationMergeSortNode
+    // 1. generate old and new measurement idx relationship
+    // 2. generate new outputColumns for each subDeviceView
+    if (existDeviceCrossRegion && analysis.isDeviceViewSpecialProcess()) {
+      Map<Integer, List<Integer>> newMeasurementIdxMap = new HashMap<>();
+      List<String> newPartialOutputColumns = new ArrayList<>();
+      Set<Expression> deviceViewOutputExpressions = 
analysis.getDeviceViewOutputExpressions();
+
+      int i = 0, newIdxSum = 0;
+      for (Expression expression : deviceViewOutputExpressions) {
+        if (i == 0) {
+          newPartialOutputColumns.add(expression.getOutputSymbol());
+          i++;
+          newIdxSum++;
+          continue;
+        }
+        FunctionExpression aggExpression = (FunctionExpression) expression;
+        // used for AVG, FIRST_VALUE, LAST_VALUE, TIME_DURATION agg function
+        List<String> actualPartialAggregationNames =
+            getActualPartialAggregationNames(aggExpression.getFunctionName());
+        for (String actualAggName : actualPartialAggregationNames) {
+          FunctionExpression partialFunctionExpression =
+              new FunctionExpression(
+                  actualAggName,
+                  aggExpression.getFunctionAttributes(),
+                  aggExpression.getExpressions());
+          if (actualPartialAggregationNames.size() > 1) {
+            TSDataType dataType = analyzeExpression(analysis, 
partialFunctionExpression);
+            context
+                .queryContext
+                .getTypeProvider()
+                .setType(partialFunctionExpression.getOutputSymbol(), 
dataType);
+          }
+          
newPartialOutputColumns.add(partialFunctionExpression.getOutputSymbol());
+        }
+        newMeasurementIdxMap.put(
+            i++,
+            actualPartialAggregationNames.size() > 1
+                ? Arrays.asList(newIdxSum++, newIdxSum++)
+                : Collections.singletonList(newIdxSum++));
+      }
+
+      for (String device : node.getDevices()) {
+        List<Integer> oldMeasurementIdxList = 
node.getDeviceToMeasurementIndexesMap().get(device);
+        List<Integer> newMeasurementIdxList = new ArrayList<>();
+        oldMeasurementIdxList.forEach(
+            idx -> 
newMeasurementIdxList.addAll(newMeasurementIdxMap.get(idx)));
+        node.getDeviceToMeasurementIndexesMap().put(device, 
newMeasurementIdxList);
+      }
+
+      for (PlanNode planNode : deviceViewNodeList) {
+        DeviceViewNode deviceViewNode = (DeviceViewNode) planNode;
+        deviceViewNode.setOutputColumnNames(newPartialOutputColumns);
+        transferAggregatorsRecursively(planNode, context);
+      }
+
+      boolean hasGroupBy =
+          analysis.getGroupByTimeParameter() != null || 
analysis.hasGroupByParameter();
+      AggregationMergeSortNode mergeSortNode =
+          new AggregationMergeSortNode(
+              context.queryContext.getQueryId().genPlanNodeId(),
+              node.getMergeOrderParameter(),
+              node.getOutputColumnNames(),
+              deviceViewOutputExpressions,
+              hasGroupBy);
+      deviceViewNodeList.forEach(mergeSortNode::addChild);
+      return Collections.singletonList(mergeSortNode);
+    } else {
+      MergeSortNode mergeSortNode =
+          new MergeSortNode(
+              context.queryContext.getQueryId().genPlanNodeId(),
+              node.getMergeOrderParameter(),
+              node.getOutputColumnNames());
+      deviceViewNodeList.forEach(mergeSortNode::addChild);
+      return Collections.singletonList(mergeSortNode);
+    }
+  }
+
+  /**
+   * aggregation align by device, and aggregation is `count_if` or `diff`, or 
aggregation used with
+   * group by parameter (session, variation, count), use the old aggregation 
logic
+   */
+  private boolean aggregationCannotUseMergeSort() {
+    if (analysis.hasGroupByParameter()) {
+      return true;
+    }
+
+    for (Expression expression : analysis.getDeviceViewOutputExpressions()) {
+      if (expression instanceof FunctionExpression) {
+        String functionName = ((FunctionExpression) 
expression).getFunctionName();
+        if (COUNT_IF.equalsIgnoreCase(functionName) || 
DIFF.equalsIgnoreCase(functionName)) {
+          return true;
+        }
+      }
+    }
+
+    return false;
   }
 
   private void constructDeviceViewNodeListWithCrossRegion(
@@ -286,10 +386,56 @@ public class SourceRewriter extends 
BaseSourceRewriter<DistributionPlanContext>
     }
   }
 
-  @Override
-  public List<PlanNode> visitAggregationMergeSort(
-      AggregationMergeSortNode node, DistributionPlanContext context) {
-    return null;
+  public List<String> getActualPartialAggregationNames(String aggregationType) 
{
+    List<String> outputAggregationNames = new ArrayList<>();
+    switch (aggregationType) {
+      case AVG:
+        outputAggregationNames.add(SqlConstant.COUNT);
+        outputAggregationNames.add(SqlConstant.SUM);
+        break;
+      case FIRST_VALUE:
+        outputAggregationNames.add(FIRST_VALUE);
+        outputAggregationNames.add(SqlConstant.MIN_TIME);
+        break;
+      case LAST_VALUE:
+        outputAggregationNames.add(SqlConstant.LAST_VALUE);
+        outputAggregationNames.add(SqlConstant.MAX_TIME);
+        break;
+      case TIME_DURATION:
+        outputAggregationNames.add(SqlConstant.MAX_TIME);
+        outputAggregationNames.add(SqlConstant.MIN_TIME);
+        break;
+      default:
+        // TODO how about UDAF?
+        outputAggregationNames.add(aggregationType);
+    }
+    return outputAggregationNames;
+  }
+
+  private void transferAggregatorsRecursively(PlanNode planNode, 
DistributionPlanContext context) {
+    List<AggregationDescriptor> descriptorList = 
getAggregationDescriptors(planNode);
+    if (descriptorList != null) {
+      for (AggregationDescriptor descriptor : descriptorList) {
+        descriptor.setStep(
+            planNode instanceof SlidingWindowAggregationNode
+                ? AggregationStep.INTERMEDIATE
+                : AggregationStep.PARTIAL);
+        updateTypeProviderByPartialAggregation(descriptor, 
context.queryContext.getTypeProvider());
+      }
+    }
+    planNode.getChildren().forEach(child -> 
transferAggregatorsRecursively(child, context));
+  }
+
+  private List<AggregationDescriptor> getAggregationDescriptors(PlanNode 
planNode) {
+    List<AggregationDescriptor> descriptorList = null;
+    if (planNode instanceof SeriesAggregationSourceNode) {
+      descriptorList = ((SeriesAggregationSourceNode) 
planNode).getAggregationDescriptorList();
+    } else if (planNode instanceof AggregationNode) {
+      descriptorList = ((AggregationNode) 
planNode).getAggregationDescriptorList();
+    } else if (planNode instanceof SlidingWindowAggregationNode) {
+      descriptorList = ((SlidingWindowAggregationNode) 
planNode).getAggregationDescriptorList();
+    }
+    return descriptorList;
   }
 
   private List<PlanNode> processSpecialDeviceView(
@@ -621,9 +767,7 @@ public class SourceRewriter extends 
BaseSourceRewriter<DistributionPlanContext>
                         descriptor.getInputExpressions(),
                         descriptor.getInputAttributes())));
     leafAggDescriptorList.forEach(
-        d ->
-            LogicalPlanBuilder.updateTypeProviderByPartialAggregation(
-                d, context.queryContext.getTypeProvider()));
+        d -> updateTypeProviderByPartialAggregation(d, 
context.queryContext.getTypeProvider()));
     List<AggregationDescriptor> rootAggDescriptorList = new ArrayList<>();
     node.getAggregationDescriptorList()
         .forEach(
@@ -1307,7 +1451,7 @@ public class SourceRewriter extends 
BaseSourceRewriter<DistributionPlanContext>
         }
         if (keep) {
           descriptorList.add(originalDescriptor);
-          LogicalPlanBuilder.updateTypeProviderByPartialAggregation(
+          updateTypeProviderByPartialAggregation(
               originalDescriptor, context.queryContext.getTypeProvider());
         }
       }
@@ -1349,8 +1493,7 @@ public class SourceRewriter extends 
BaseSourceRewriter<DistributionPlanContext>
         descriptor.setStep(level == 0 ? AggregationStep.FINAL : 
AggregationStep.INTERMEDIATE);
         descriptor.setInputExpressions(new ArrayList<>(descriptorExpressions));
         descriptorList.add(descriptor);
-        LogicalPlanBuilder.updateTypeProviderByPartialAggregation(
-            descriptor, context.queryContext.getTypeProvider());
+        updateTypeProviderByPartialAggregation(descriptor, 
context.queryContext.getTypeProvider());
       }
       handle.setGroupByLevelDescriptors(descriptorList);
     }
@@ -1447,8 +1590,7 @@ public class SourceRewriter extends 
BaseSourceRewriter<DistributionPlanContext>
           .forEach(
               d -> {
                 d.setStep(AggregationStep.PARTIAL);
-                LogicalPlanBuilder.updateTypeProviderByPartialAggregation(
-                    d, context.queryContext.getTypeProvider());
+                updateTypeProviderByPartialAggregation(d, 
context.queryContext.getTypeProvider());
               });
     }
 
@@ -1486,7 +1628,7 @@ public class SourceRewriter extends 
BaseSourceRewriter<DistributionPlanContext>
                 } else {
                   eachSeriesOneRegion[0] = false;
                   descriptor.setStep(AggregationStep.PARTIAL);
-                  LogicalPlanBuilder.updateTypeProviderByPartialAggregation(
+                  updateTypeProviderByPartialAggregation(
                       descriptor, context.queryContext.getTypeProvider());
                 }
               });
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/AggregationMergeSortNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/AggregationMergeSortNode.java
index 41c4cfb2b55..f4bced5a216 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/AggregationMergeSortNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/AggregationMergeSortNode.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.queryengine.plan.planner.plan.node.process;
 
+import org.apache.iotdb.db.queryengine.plan.expression.Expression;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
@@ -30,8 +31,10 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Objects;
+import java.util.Set;
 
 public class AggregationMergeSortNode extends MultiChildProcessNode {
 
@@ -39,30 +42,53 @@ public class AggregationMergeSortNode extends 
MultiChildProcessNode {
 
   private final List<String> outputColumns;
 
+  private final Set<Expression> selectExpressions;
+
+  private boolean hasGroupBy;
+
   public AggregationMergeSortNode(
-      PlanNodeId id, OrderByParameter mergeOrderParameter, List<String> 
outputColumns) {
+      PlanNodeId id,
+      OrderByParameter mergeOrderParameter,
+      List<String> outputColumns,
+      Set<Expression> selectExpressions,
+      boolean hasGroupBy) {
     super(id);
     this.mergeOrderParameter = mergeOrderParameter;
     this.outputColumns = outputColumns;
+    this.selectExpressions = selectExpressions;
+    this.hasGroupBy = hasGroupBy;
   }
 
   public AggregationMergeSortNode(
       PlanNodeId id,
       List<PlanNode> children,
       OrderByParameter mergeOrderParameter,
-      List<String> outputColumns) {
+      List<String> outputColumns,
+      Set<Expression> selectExpressions,
+      boolean hasGroupBy) {
     super(id, children);
     this.mergeOrderParameter = mergeOrderParameter;
     this.outputColumns = outputColumns;
+    this.selectExpressions = selectExpressions;
+    this.hasGroupBy = hasGroupBy;
   }
 
   public OrderByParameter getMergeOrderParameter() {
     return mergeOrderParameter;
   }
 
+  public Set<Expression> getSelectExpressions() {
+    return this.selectExpressions;
+  }
+
+  public boolean isHasGroupBy() {
+    return this.hasGroupBy;
+  }
+
   @Override
   public PlanNode clone() {
-    return new AggregationMergeSortNode(getPlanNodeId(), 
getMergeOrderParameter(), outputColumns);
+    return new AggregationMergeSortNode(
+        getPlanNodeId(), getMergeOrderParameter(), outputColumns, 
selectExpressions, hasGroupBy);
   }
 
   @Override
@@ -71,7 +97,9 @@ public class AggregationMergeSortNode extends 
MultiChildProcessNode {
         new PlanNodeId(String.format("%s-%s", getPlanNodeId(), subNodeId)),
         new ArrayList<>(children.subList(startIndex, endIndex)),
         getMergeOrderParameter(),
-        outputColumns);
+        outputColumns,
+        selectExpressions,
+        hasGroupBy);
   }
 
   @Override
@@ -92,6 +120,11 @@ public class AggregationMergeSortNode extends 
MultiChildProcessNode {
     for (String column : outputColumns) {
       ReadWriteIOUtils.write(column, byteBuffer);
     }
+    ReadWriteIOUtils.write(selectExpressions.size(), byteBuffer);
+    for (Expression expression : selectExpressions) {
+      Expression.serialize(expression, byteBuffer);
+    }
+    ReadWriteIOUtils.write(hasGroupBy, byteBuffer);
   }
 
   @Override
@@ -102,6 +135,11 @@ public class AggregationMergeSortNode extends 
MultiChildProcessNode {
     for (String column : outputColumns) {
       ReadWriteIOUtils.write(column, stream);
     }
+    ReadWriteIOUtils.write(selectExpressions.size(), stream);
+    for (Expression expression : selectExpressions) {
+      Expression.serialize(expression, stream);
+    }
+    ReadWriteIOUtils.write(hasGroupBy, stream);
   }
 
   public static AggregationMergeSortNode deserialize(ByteBuffer byteBuffer) {
@@ -112,8 +150,16 @@ public class AggregationMergeSortNode extends 
MultiChildProcessNode {
       outputColumns.add(ReadWriteIOUtils.readString(byteBuffer));
       columnSize--;
     }
+    Set<Expression> expressions = new LinkedHashSet<>();
+    int expressionSize = ReadWriteIOUtils.readInt(byteBuffer);
+    while (expressionSize > 0) {
+      expressions.add(Expression.deserialize(byteBuffer));
+      expressionSize--;
+    }
+    boolean hasGroupBy = ReadWriteIOUtils.readBool(byteBuffer);
     PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
-    return new AggregationMergeSortNode(planNodeId, orderByParameter, 
outputColumns);
+    return new AggregationMergeSortNode(
+        planNodeId, orderByParameter, outputColumns, expressions, hasGroupBy);
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/DeviceViewNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/DeviceViewNode.java
index 4d4b9b67049..bc7204a0d40 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/DeviceViewNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/DeviceViewNode.java
@@ -52,7 +52,7 @@ public class DeviceViewNode extends MultiChildProcessNode {
   private final List<String> devices = new ArrayList<>();
 
   // Device column and measurement columns in result output
-  private final List<String> outputColumnNames;
+  private List<String> outputColumnNames;
 
   // e.g. [s1,s2,s3] is query, but [s1, s3] exists in device1, then device1 -> 
[1, 3], s1 is 1 but
   // not 0 because device is the first column
@@ -114,6 +114,10 @@ public class DeviceViewNode extends MultiChildProcessNode {
     return outputColumnNames;
   }
 
+  public void setOutputColumnNames(List<String> outputColumnNames) {
+    this.outputColumnNames = outputColumnNames;
+  }
+
   @Override
   public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
     return visitor.visitDeviceView(this, context);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/AggregationDescriptor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/AggregationDescriptor.java
index 3b32baef3fa..b662b6f59da 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/AggregationDescriptor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/AggregationDescriptor.java
@@ -36,6 +36,8 @@ import java.util.Map;
 import java.util.Objects;
 
 import static 
org.apache.iotdb.db.queryengine.execution.operator.AggregationUtil.addPartialSuffix;
+import static org.apache.iotdb.db.utils.constant.SqlConstant.FIRST_VALUE;
+import static org.apache.iotdb.db.utils.constant.SqlConstant.STDDEV;
 
 public class AggregationDescriptor {
 
@@ -127,7 +129,7 @@ public class AggregationDescriptor {
   }
 
   /** Keep the lower case of function name for partial result, and origin 
value for others. */
-  protected List<String> getActualAggregationNames(boolean isPartial) {
+  public List<String> getActualAggregationNames(boolean isPartial) {
     List<String> outputAggregationNames = new ArrayList<>();
     if (isPartial) {
       switch (aggregationType) {
@@ -136,7 +138,7 @@ public class AggregationDescriptor {
           outputAggregationNames.add(SqlConstant.SUM);
           break;
         case FIRST_VALUE:
-          outputAggregationNames.add(SqlConstant.FIRST_VALUE);
+          outputAggregationNames.add(FIRST_VALUE);
           outputAggregationNames.add(SqlConstant.MIN_TIME);
           break;
         case LAST_VALUE:
@@ -148,7 +150,7 @@ public class AggregationDescriptor {
           outputAggregationNames.add(SqlConstant.MIN_TIME);
           break;
         case STDDEV:
-          outputAggregationNames.add(addPartialSuffix(SqlConstant.STDDEV));
+          outputAggregationNames.add(addPartialSuffix(STDDEV));
           break;
         case STDDEV_POP:
           outputAggregationNames.add(addPartialSuffix(SqlConstant.STDDEV_POP));
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/AggregationDistributionTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/AggregationDistributionTest.java
index b4b9d91e632..baf96bfcac3 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/AggregationDistributionTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/AggregationDistributionTest.java
@@ -34,6 +34,7 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.FragmentInstance;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.LogicalQueryPlan;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.AggregationMergeSortNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.AggregationNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.DeviceViewNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ExchangeNode;
@@ -43,7 +44,6 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SlidingWin
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.join.FullOuterTimeJoinNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesAggregationScanNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesAggregationSourceNode;
-import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesSourceNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.AggregationDescriptor;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.AggregationStep;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.CrossSeriesAggregationDescriptor;
@@ -779,10 +779,10 @@ public class AggregationDistributionTest {
         
plan.getInstances().get(0).getFragment().getPlanNodeTree().getChildren().get(0);
     PlanNode f2Root =
         
plan.getInstances().get(1).getFragment().getPlanNodeTree().getChildren().get(0);
-    assertTrue(f1Root instanceof DeviceViewNode);
-    assertTrue(f2Root instanceof HorizontallyConcatNode);
-    assertTrue(f1Root.getChildren().get(0) instanceof AggregationNode);
-    assertEquals(3, f1Root.getChildren().get(0).getChildren().size());
+    assertTrue(f1Root instanceof AggregationMergeSortNode);
+    assertTrue(f2Root instanceof DeviceViewNode);
+    assertTrue(f1Root.getChildren().get(0) instanceof DeviceViewNode);
+    assertEquals(1, f1Root.getChildren().get(0).getChildren().size());
   }
 
   @Test
@@ -803,13 +803,13 @@ public class AggregationDistributionTest {
         
plan.getInstances().get(1).getFragment().getPlanNodeTree().getChildren().get(0);
     PlanNode f3Root =
         
plan.getInstances().get(2).getFragment().getPlanNodeTree().getChildren().get(0);
-    assertTrue(f1Root instanceof DeviceViewNode);
-    assertTrue(f2Root instanceof HorizontallyConcatNode);
-    assertTrue(f3Root instanceof HorizontallyConcatNode);
-    assertTrue(f3Root.getChildren().get(0) instanceof SeriesSourceNode);
-    assertTrue(f1Root.getChildren().get(0) instanceof AggregationNode);
+    assertTrue(f1Root instanceof AggregationMergeSortNode);
+    assertTrue(f2Root instanceof DeviceViewNode);
+    assertTrue(f3Root instanceof DeviceViewNode);
+    assertTrue(f3Root.getChildren().get(0) instanceof FullOuterTimeJoinNode);
+    assertTrue(f1Root.getChildren().get(0) instanceof DeviceViewNode);
     assertTrue(f1Root.getChildren().get(1) instanceof ExchangeNode);
-    assertEquals(3, f1Root.getChildren().get(0).getChildren().size());
+    assertEquals(1, f1Root.getChildren().get(0).getChildren().size());
   }
 
   @Test
@@ -828,8 +828,8 @@ public class AggregationDistributionTest {
         
plan.getInstances().get(0).getFragment().getPlanNodeTree().getChildren().get(0);
     PlanNode f2Root =
         
plan.getInstances().get(1).getFragment().getPlanNodeTree().getChildren().get(0);
-    assertTrue(f1Root instanceof DeviceViewNode);
-    assertTrue(f2Root instanceof HorizontallyConcatNode);
+    assertTrue(f1Root instanceof AggregationMergeSortNode);
+    assertTrue(f2Root instanceof DeviceViewNode);
     assertEquals(2, f1Root.getChildren().size());
   }
 
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/AlignByDeviceOrderByLimitOffsetTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/AlignByDeviceOrderByLimitOffsetTest.java
index 750a86c4b3a..b32b231e71e 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/AlignByDeviceOrderByLimitOffsetTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/AlignByDeviceOrderByLimitOffsetTest.java
@@ -26,11 +26,11 @@ import 
org.apache.iotdb.db.queryengine.plan.analyze.Analysis;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.DistributedQueryPlan;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.LogicalQueryPlan;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.AggregationMergeSortNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.AggregationNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.DeviceViewNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ExchangeNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.FilterNode;
-import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.HorizontallyConcatNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.LimitNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.MergeSortNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SingleDeviceViewNode;
@@ -272,15 +272,15 @@ public class AlignByDeviceOrderByLimitOffsetTest {
     assertTrue(firstFiRoot.getChildren().get(0) instanceof LimitNode);
     PlanNode filterNode = ((LimitNode) 
firstFiRoot.getChildren().get(0)).getChild();
     assertTrue(filterNode instanceof FilterNode);
-    assertTrue(filterNode.getChildren().get(0) instanceof DeviceViewNode);
-    assertTrue(filterNode.getChildren().get(0).getChildren().get(0) instanceof 
AggregationNode);
+    assertTrue(filterNode.getChildren().get(0) instanceof 
AggregationMergeSortNode);
+    assertTrue(filterNode.getChildren().get(0).getChildren().get(0) instanceof 
DeviceViewNode);
     assertTrue(
         
filterNode.getChildren().get(0).getChildren().get(0).getChildren().get(0)
-            instanceof FilterNode);
+            instanceof AggregationNode);
     PlanNode thirdFiRoot = 
plan.getInstances().get(2).getFragment().getPlanNodeTree();
     assertTrue(thirdFiRoot instanceof IdentitySinkNode);
-    assertTrue(thirdFiRoot.getChildren().get(0) instanceof AggregationNode);
-    assertTrue(thirdFiRoot.getChildren().get(0).getChildren().get(0) 
instanceof FilterNode);
+    assertTrue(thirdFiRoot.getChildren().get(0) instanceof DeviceViewNode);
+    assertTrue(thirdFiRoot.getChildren().get(0).getChildren().get(0) 
instanceof AggregationNode);
   }
 
   /*
@@ -911,7 +911,7 @@ public class AlignByDeviceOrderByLimitOffsetTest {
     }
   }
 
-  /*
+  /* BEFORE:
    * IdentitySinkNode-35
    *   └──TransformNode-12
    *       └──SortNode-11
@@ -940,6 +940,36 @@ public class AlignByDeviceOrderByLimitOffsetTest {
    *   └──HorizontallyConcatNode-28
    *       ├──SeriesAggregationScanNode-24:[SeriesPath: root.sg.d333.s2, 
Descriptor: [AggregationDescriptor(count, PARTIAL)], DataRegion: 
TConsensusGroupId(type:DataRegion, id:4)]
    *       └──SeriesAggregationScanNode-26:[SeriesPath: root.sg.d333.s1, 
Descriptor: [AggregationDescriptor(count, PARTIAL)], DataRegion: 
TConsensusGroupId(type:DataRegion, id:4)]
+   *
+   * AFTER:
+   * IdentitySinkNode-43
+   *   └──TransformNode-12
+   *       └──MergeSort-32
+   *           ├──SortNode-33
+   *           │   └──DeviceView-19
+   *           │       ├──FullOuterTimeJoinNode-15
+   *           │       │   ├──SeriesAggregationScanNode-13:[SeriesPath: 
root.sg.d1.s2, Descriptor: [AggregationDescriptor(count, SINGLE)], DataRegion: 
TConsensusGroupId(type:DataRegion, id:1)]
+   *           │       │   └──SeriesAggregationScanNode-14:[SeriesPath: 
root.sg.d1.s1, Descriptor: [AggregationDescriptor(count, SINGLE)], DataRegion: 
TConsensusGroupId(type:DataRegion, id:1)]
+   *           │       └──FullOuterTimeJoinNode-18
+   *           │           ├──SeriesAggregationScanNode-16:[SeriesPath: 
root.sg.d333.s2, Descriptor: [AggregationDescriptor(count, SINGLE)], 
DataRegion: TConsensusGroupId(type:DataRegion, id:1)]
+   *           │           └──SeriesAggregationScanNode-17:[SeriesPath: 
root.sg.d333.s1, Descriptor: [AggregationDescriptor(count, SINGLE)], 
DataRegion: TConsensusGroupId(type:DataRegion, id:1)]
+   *           ├──ExchangeNode-37: [SourceAddress:192.0.3.1/test.2.0/40]
+   *           ├──ExchangeNode-38: [SourceAddress:192.0.2.1/test.3.0/41]
+   *           └──ExchangeNode-39: [SourceAddress:192.0.4.1/test.4.0/42]
+   *
+   *  IdentitySinkNode-40
+   *   └──SortNode-34
+   *       └──DeviceView-23
+   *           └──FullOuterTimeJoinNode-22
+   *               ├──SeriesAggregationScanNode-20:[SeriesPath: 
root.sg.d22.s1, Descriptor: [AggregationDescriptor(count, SINGLE)], DataRegion: 
TConsensusGroupId(type:DataRegion, id:3)]
+   *               └──SeriesAggregationScanNode-21:[SeriesPath: 
root.sg.d22.s2, Descriptor: [AggregationDescriptor(count, SINGLE)], DataRegion: 
TConsensusGroupId(type:DataRegion, id:3)]
+   *
+   *  IdentitySinkNode-41
+   *   └──SortNode-35
+   *       └──DeviceView-27
+   *           └──FullOuterTimeJoinNode-26
+   *               ├──SeriesAggregationScanNode-24:[SeriesPath: root.sg.d1.s2, 
Descriptor: [AggregationDescriptor(count, SINGLE)], DataRegion: 
TConsensusGroupId(type:DataRegion, id:2)]
+   *               └──SeriesAggregationScanNode-25:[SeriesPath: root.sg.d1.s1, 
Descriptor: [AggregationDescriptor(count, SINGLE)], DataRegion: 
TConsensusGroupId(type:DataRegion, id:2)]
    */
   @Test
   public void orderByExpressionTest3() {
@@ -955,29 +985,25 @@ public class AlignByDeviceOrderByLimitOffsetTest {
     firstFiRoot = plan.getInstances().get(0).getFragment().getPlanNodeTree();
     firstFiTopNode = firstFiRoot.getChildren().get(0);
     assertTrue(firstFiTopNode instanceof TransformNode);
-    assertTrue(firstFiTopNode.getChildren().get(0) instanceof SortNode);
-    assertTrue(firstFiTopNode.getChildren().get(0).getChildren().get(0) 
instanceof DeviceViewNode);
+    assertTrue(firstFiTopNode.getChildren().get(0) instanceof MergeSortNode);
+    assertTrue(firstFiTopNode.getChildren().get(0).getChildren().get(0) 
instanceof SortNode);
     assertTrue(
         
firstFiTopNode.getChildren().get(0).getChildren().get(0).getChildren().get(0)
-            instanceof AggregationNode);
+            instanceof DeviceViewNode);
     assertTrue(
-        
firstFiTopNode.getChildren().get(0).getChildren().get(0).getChildren().get(1)
-            instanceof ExchangeNode);
+        
plan.getInstances().get(1).getFragment().getPlanNodeTree().getChildren().get(0)
+            instanceof SortNode);
     assertTrue(
-        
firstFiTopNode.getChildren().get(0).getChildren().get(0).getChildren().get(2)
-            instanceof AggregationNode);
-    for (int i = 1; i < 4; i++) {
-      assertTrue(
-          
plan.getInstances().get(i).getFragment().getPlanNodeTree().getChildren().get(0)
-              instanceof HorizontallyConcatNode);
-    }
+        
plan.getInstances().get(2).getFragment().getPlanNodeTree().getChildren().get(0)
+            instanceof SortNode);
     for (int i = 0; i < 4; i++) {
       
assertScanNodeLimitValue(plan.getInstances().get(i).getFragment().getPlanNodeTree(),
 0);
     }
   }
 
   /*
-   * IdentitySinkNode-34
+   *  BEFORE:
+   *  IdentitySinkNode-34
    *   └──TopK-10
    *       └──DeviceView-12
    *           ├──AggregationNode-17
@@ -1005,6 +1031,43 @@ public class AlignByDeviceOrderByLimitOffsetTest {
    *       ├──SeriesAggregationScanNode-23:[SeriesPath: root.sg.d333.s2, 
Descriptor: [AggregationDescriptor(count, PARTIAL)], DataRegion: 
TConsensusGroupId(type:DataRegion, id:4)]
    *       └──SeriesAggregationScanNode-25:[SeriesPath: root.sg.d333.s1, 
Descriptor: [AggregationDescriptor(count, PARTIAL)], DataRegion: 
TConsensusGroupId(type:DataRegion, id:4)]
    */
+
+  /* AFTER:
+   * IdentitySinkNode-41
+   *   └──TopK-10
+   *       ├──TopK-31
+   *       │   └──DeviceView-18
+   *       │       ├──FullOuterTimeJoinNode-14
+   *       │       │   ├──SeriesAggregationScanNode-12:[SeriesPath: 
root.sg.d1.s2, Descriptor: [AggregationDescriptor(count, SINGLE)], DataRegion: 
TConsensusGroupId(type:DataRegion, id:1)]
+   *       │       │   └──SeriesAggregationScanNode-13:[SeriesPath: 
root.sg.d1.s1, Descriptor: [AggregationDescriptor(count, SINGLE)], DataRegion: 
TConsensusGroupId(type:DataRegion, id:1)]
+   *       │       └──FullOuterTimeJoinNode-17
+   *       │           ├──SeriesAggregationScanNode-15:[SeriesPath: 
root.sg.d333.s2, Descriptor: [AggregationDescriptor(count, SINGLE)], 
DataRegion: TConsensusGroupId(type:DataRegion, id:1)]
+   *       │           └──SeriesAggregationScanNode-16:[SeriesPath: 
root.sg.d333.s1, Descriptor: [AggregationDescriptor(count, SINGLE)], 
DataRegion: TConsensusGroupId(type:DataRegion, id:1)]
+   *       ├──ExchangeNode-35: [SourceAddress:192.0.3.1/test.2.0/38]
+   *       ├──ExchangeNode-36: [SourceAddress:192.0.2.1/test.3.0/39]
+   *       └──ExchangeNode-37: [SourceAddress:192.0.4.1/test.4.0/40]
+   *
+   *  IdentitySinkNode-38
+   *   └──TopK-32
+   *       └──DeviceView-22
+   *           └──FullOuterTimeJoinNode-21
+   *               ├──SeriesAggregationScanNode-19:[SeriesPath: 
root.sg.d22.s1, Descriptor: [AggregationDescriptor(count, SINGLE)], DataRegion: 
TConsensusGroupId(type:DataRegion, id:3)]
+   *               └──SeriesAggregationScanNode-20:[SeriesPath: 
root.sg.d22.s2, Descriptor: [AggregationDescriptor(count, SINGLE)], DataRegion: 
TConsensusGroupId(type:DataRegion, id:3)]
+   *
+   *  IdentitySinkNode-39
+   *   └──TopK-33
+   *       └──DeviceView-26
+   *           └──FullOuterTimeJoinNode-25
+   *               ├──SeriesAggregationScanNode-23:[SeriesPath: root.sg.d1.s2, 
Descriptor: [AggregationDescriptor(count, SINGLE)], DataRegion: 
TConsensusGroupId(type:DataRegion, id:2)]
+   *               └──SeriesAggregationScanNode-24:[SeriesPath: root.sg.d1.s1, 
Descriptor: [AggregationDescriptor(count, SINGLE)], DataRegion: 
TConsensusGroupId(type:DataRegion, id:2)]
+   *
+   *  IdentitySinkNode-40
+   *   └──TopK-34
+   *       └──DeviceView-30
+   *           └──FullOuterTimeJoinNode-29
+   *               ├──SeriesAggregationScanNode-27:[SeriesPath: 
root.sg.d333.s2, Descriptor: [AggregationDescriptor(count, SINGLE)], 
DataRegion: TConsensusGroupId(type:DataRegion, id:4)]
+   *               └──SeriesAggregationScanNode-28:[SeriesPath: 
root.sg.d333.s1, Descriptor: [AggregationDescriptor(count, SINGLE)], 
DataRegion: TConsensusGroupId(type:DataRegion, id:4)]
+   */
   @Test
   public void orderByExpressionTest4() {
     // aggregation, order by expression, has LIMIT
@@ -1019,14 +1082,22 @@ public class AlignByDeviceOrderByLimitOffsetTest {
     firstFiRoot = plan.getInstances().get(0).getFragment().getPlanNodeTree();
     firstFiTopNode = firstFiRoot.getChildren().get(0);
     assertTrue(firstFiTopNode instanceof TopKNode);
-    assertTrue(firstFiTopNode.getChildren().get(0) instanceof DeviceViewNode);
-    assertTrue(firstFiTopNode.getChildren().get(0).getChildren().get(0) 
instanceof AggregationNode);
-    assertTrue(firstFiTopNode.getChildren().get(0).getChildren().get(1) 
instanceof ExchangeNode);
-    assertTrue(firstFiTopNode.getChildren().get(0).getChildren().get(2) 
instanceof AggregationNode);
+    assertTrue(firstFiTopNode.getChildren().get(0) instanceof TopKNode);
+    assertTrue(firstFiTopNode.getChildren().get(0).getChildren().get(0) 
instanceof DeviceViewNode);
+    assertTrue(
+        
firstFiTopNode.getChildren().get(0).getChildren().get(0).getChildren().get(0)
+            instanceof FullOuterTimeJoinNode);
     for (int i = 1; i < 4; i++) {
       assertTrue(
-          
plan.getInstances().get(i).getFragment().getPlanNodeTree().getChildren().get(0)
-              instanceof HorizontallyConcatNode);
+          plan.getInstances()
+                  .get(i)
+                  .getFragment()
+                  .getPlanNodeTree()
+                  .getChildren()
+                  .get(0)
+                  .getChildren()
+                  .get(0)
+              instanceof DeviceViewNode);
     }
     for (int i = 0; i < 4; i++) {
       
assertScanNodeLimitValue(plan.getInstances().get(i).getFragment().getPlanNodeTree(),
 0);
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/AlignedByDeviceTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/AlignedByDeviceTest.java
index f56a685a048..0e33d31c14d 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/AlignedByDeviceTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/AlignedByDeviceTest.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.db.queryengine.plan.analyze.Analysis;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.DistributedQueryPlan;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.LogicalQueryPlan;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.AggregationMergeSortNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.AggregationNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.DeviceViewNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ExchangeNode;
@@ -37,6 +38,8 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TransformN
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.join.FullOuterTimeJoinNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.sink.IdentitySinkNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.sink.ShuffleSinkNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesAggregationScanNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesScanNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesSourceNode;
 
 import org.junit.Test;
@@ -61,24 +64,19 @@ public class AlignedByDeviceTest {
     PlanNode f1Root = 
plan.getInstances().get(0).getFragment().getPlanNodeTree();
     PlanNode f2Root = 
plan.getInstances().get(1).getFragment().getPlanNodeTree();
     assertTrue(f1Root instanceof IdentitySinkNode);
-    assertTrue(f1Root.getChildren().get(0) instanceof DeviceViewNode);
-    assertTrue(f1Root.getChildren().get(0).getChildren().get(0) instanceof 
AggregationNode);
+    assertTrue(f1Root.getChildren().get(0) instanceof 
AggregationMergeSortNode);
+    assertTrue(f1Root.getChildren().get(0).getChildren().get(0) instanceof 
DeviceViewNode);
     assertTrue(
         f1Root.getChildren().get(0).getChildren().get(0).getChildren().get(0)
             instanceof SeriesSourceNode);
     assertTrue(
         f1Root.getChildren().get(0).getChildren().get(0).getChildren().get(1)
-            instanceof ExchangeNode);
-    assertTrue(f1Root.getChildren().get(0).getChildren().get(1) instanceof 
AggregationNode);
-    assertTrue(
-        f1Root.getChildren().get(0).getChildren().get(1).getChildren().get(0)
             instanceof SeriesSourceNode);
-    assertTrue(
-        f1Root.getChildren().get(0).getChildren().get(1).getChildren().get(1)
-            instanceof ExchangeNode);
+    assertTrue(f1Root.getChildren().get(0).getChildren().get(1) instanceof 
ExchangeNode);
     assertTrue(f2Root instanceof IdentitySinkNode);
-    assertTrue(f2Root.getChildren().get(0) instanceof SeriesSourceNode);
-    assertTrue(f2Root.getChildren().get(1) instanceof SeriesSourceNode);
+    assertTrue(f2Root.getChildren().get(0) instanceof DeviceViewNode);
+    assertTrue(
+        f2Root.getChildren().get(0).getChildren().get(0) instanceof 
SeriesAggregationScanNode);
 
     // test of MULTI_SERIES
     sql = "select count(s1),count(s2) from root.sg.d333,root.sg.d4444 align by 
device";
@@ -90,30 +88,19 @@ public class AlignedByDeviceTest {
     f1Root = plan.getInstances().get(0).getFragment().getPlanNodeTree();
     f2Root = plan.getInstances().get(1).getFragment().getPlanNodeTree();
     assertTrue(f1Root instanceof IdentitySinkNode);
-    assertTrue(f1Root.getChildren().get(0) instanceof DeviceViewNode);
-    assertTrue(f1Root.getChildren().get(0).getChildren().get(0) instanceof 
AggregationNode);
+    assertTrue(f1Root.getChildren().get(0) instanceof 
AggregationMergeSortNode);
+    assertTrue(f1Root.getChildren().get(0).getChildren().get(0) instanceof 
DeviceViewNode);
     assertTrue(
         f1Root.getChildren().get(0).getChildren().get(0).getChildren().get(0)
-            instanceof SeriesSourceNode);
+            instanceof FullOuterTimeJoinNode);
     assertTrue(
         f1Root.getChildren().get(0).getChildren().get(0).getChildren().get(1)
-            instanceof SeriesSourceNode);
-    assertTrue(
-        f1Root.getChildren().get(0).getChildren().get(0).getChildren().get(2)
-            instanceof ExchangeNode);
-    assertTrue(f1Root.getChildren().get(0).getChildren().get(1) instanceof 
AggregationNode);
-    assertTrue(
-        f1Root.getChildren().get(0).getChildren().get(1).getChildren().get(0)
-            instanceof SeriesSourceNode);
-    assertTrue(
-        f1Root.getChildren().get(0).getChildren().get(1).getChildren().get(1)
-            instanceof SeriesSourceNode);
-    assertTrue(
-        f1Root.getChildren().get(0).getChildren().get(1).getChildren().get(2)
-            instanceof ExchangeNode);
+            instanceof FullOuterTimeJoinNode);
+    assertTrue(f1Root.getChildren().get(0).getChildren().get(1) instanceof 
ExchangeNode);
     assertTrue(f2Root instanceof IdentitySinkNode);
-    assertTrue(f2Root.getChildren().get(0) instanceof HorizontallyConcatNode);
-    assertTrue(f2Root.getChildren().get(1) instanceof HorizontallyConcatNode);
+    assertTrue(f2Root.getChildren().get(0) instanceof DeviceViewNode);
+    assertTrue(f2Root.getChildren().get(0).getChildren().get(0) instanceof 
FullOuterTimeJoinNode);
+    assertTrue(f2Root.getChildren().get(0).getChildren().get(1) instanceof 
FullOuterTimeJoinNode);
   }
 
   @Test
@@ -132,76 +119,15 @@ public class AlignedByDeviceTest {
     PlanNode f1Root = 
plan.getInstances().get(0).getFragment().getPlanNodeTree();
     PlanNode f2Root = 
plan.getInstances().get(1).getFragment().getPlanNodeTree();
     assertTrue(f1Root instanceof IdentitySinkNode);
-    assertTrue(f1Root.getChildren().get(0) instanceof DeviceViewNode);
-    assertTrue(f1Root.getChildren().get(0).getChildren().get(0) instanceof 
AggregationNode);
+    assertTrue(f1Root.getChildren().get(0) instanceof 
AggregationMergeSortNode);
+    assertTrue(f1Root.getChildren().get(0).getChildren().get(0) instanceof 
DeviceViewNode);
     assertTrue(
         f1Root.getChildren().get(0).getChildren().get(0).getChildren().get(0)
-            instanceof FilterNode);
-    assertTrue(
-        
f1Root.getChildren().get(0).getChildren().get(0).getChildren().get(0).getChildren().get(0)
-            instanceof FullOuterTimeJoinNode);
-    assertTrue(
-        f1Root
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(0)
-            instanceof SeriesSourceNode);
-    assertTrue(
-        f1Root
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(1)
-            instanceof ExchangeNode);
-    assertTrue(f1Root.getChildren().get(0).getChildren().get(1) instanceof 
AggregationNode);
-    assertTrue(
-        f1Root.getChildren().get(0).getChildren().get(1).getChildren().get(0)
-            instanceof FilterNode);
-    assertTrue(
-        
f1Root.getChildren().get(0).getChildren().get(1).getChildren().get(0).getChildren().get(0)
-            instanceof FullOuterTimeJoinNode);
-    assertTrue(
-        f1Root
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(1)
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(0)
-            instanceof SeriesSourceNode);
-    assertTrue(
-        f1Root
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(1)
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(1)
-            instanceof ExchangeNode);
+            instanceof AggregationNode);
     assertTrue(f2Root instanceof IdentitySinkNode);
-    assertTrue(f2Root.getChildren().get(0) instanceof SeriesSourceNode);
-    assertTrue(f2Root.getChildren().get(1) instanceof SeriesSourceNode);
+    assertTrue(f2Root.getChildren().get(0) instanceof DeviceViewNode);
+    assertTrue(f2Root.getChildren().get(0).getChildren().get(0) instanceof 
AggregationNode);
+    assertTrue(f2Root.getChildren().get(0).getChildren().get(1) instanceof 
AggregationNode);
 
     // test of MULTI_SERIES
     sql =
@@ -214,99 +140,19 @@ public class AlignedByDeviceTest {
     f1Root = plan.getInstances().get(0).getFragment().getPlanNodeTree();
     f2Root = plan.getInstances().get(1).getFragment().getPlanNodeTree();
     assertTrue(f1Root instanceof IdentitySinkNode);
-    assertTrue(f1Root.getChildren().get(0) instanceof DeviceViewNode);
-    assertTrue(f1Root.getChildren().get(0).getChildren().get(0) instanceof 
AggregationNode);
+    assertTrue(f1Root.getChildren().get(0) instanceof 
AggregationMergeSortNode);
+    assertTrue(f1Root.getChildren().get(0).getChildren().get(0) instanceof 
DeviceViewNode);
     assertTrue(
         f1Root.getChildren().get(0).getChildren().get(0).getChildren().get(0)
-            instanceof FilterNode);
-    assertTrue(
-        
f1Root.getChildren().get(0).getChildren().get(0).getChildren().get(0).getChildren().get(0)
-            instanceof FullOuterTimeJoinNode);
-    assertTrue(
-        f1Root
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(0)
-            instanceof SeriesSourceNode);
-    assertTrue(
-        f1Root
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(1)
-            instanceof SeriesSourceNode);
-    assertTrue(
-        f1Root
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(2)
-            instanceof ExchangeNode);
-    assertTrue(f1Root.getChildren().get(0).getChildren().get(1) instanceof 
AggregationNode);
-    assertTrue(
-        
f1Root.getChildren().get(0).getChildren().get(1).getChildren().get(0).getChildren().get(0)
-            instanceof FullOuterTimeJoinNode);
-    assertTrue(
-        f1Root
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(1)
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(0)
-            instanceof SeriesSourceNode);
-    assertTrue(
-        f1Root
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(1)
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(1)
-            instanceof SeriesSourceNode);
+            instanceof AggregationNode);
+    assertTrue(f1Root.getChildren().get(0).getChildren().get(1) instanceof 
ExchangeNode);
     assertTrue(
-        f1Root
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(1)
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(2)
-            instanceof ExchangeNode);
+        f1Root.getChildren().get(0).getChildren().get(0).getChildren().get(1)
+            instanceof AggregationNode);
     assertTrue(f2Root instanceof IdentitySinkNode);
-    assertTrue(f2Root.getChildren().get(0) instanceof FullOuterTimeJoinNode);
-    assertTrue(f2Root.getChildren().get(1) instanceof FullOuterTimeJoinNode);
+    assertTrue(f2Root.getChildren().get(0) instanceof DeviceViewNode);
+    assertTrue(f2Root.getChildren().get(0).getChildren().get(0) instanceof 
AggregationNode);
+    assertTrue(f2Root.getChildren().get(0).getChildren().get(1) instanceof 
AggregationNode);
   }
 
   @Test
@@ -679,25 +525,19 @@ public class AlignedByDeviceTest {
     PlanNode f2Root = 
plan.getInstances().get(1).getFragment().getPlanNodeTree();
     PlanNode f3Root = 
plan.getInstances().get(2).getFragment().getPlanNodeTree();
     assertTrue(f1Root instanceof IdentitySinkNode);
-    assertTrue(f1Root.getChildren().get(0) instanceof DeviceViewNode);
-    assertTrue(f1Root.getChildren().get(0).getChildren().get(0) instanceof 
AggregationNode);
+    assertTrue(f1Root.getChildren().get(0) instanceof 
AggregationMergeSortNode);
+    assertTrue(f1Root.getChildren().get(0).getChildren().get(0) instanceof 
DeviceViewNode);
     assertTrue(
         f1Root.getChildren().get(0).getChildren().get(0).getChildren().get(0)
-            instanceof SeriesSourceNode);
+            instanceof SeriesAggregationScanNode);
     assertTrue(
         f1Root.getChildren().get(0).getChildren().get(0).getChildren().get(1)
-            instanceof ExchangeNode);
-    assertTrue(f1Root.getChildren().get(0).getChildren().get(1) instanceof 
AggregationNode);
-    assertTrue(
-        f1Root.getChildren().get(0).getChildren().get(1).getChildren().get(0)
-            instanceof SeriesSourceNode);
-    assertTrue(
-        f1Root.getChildren().get(0).getChildren().get(1).getChildren().get(1)
-            instanceof ExchangeNode);
+            instanceof SeriesAggregationScanNode);
+    assertTrue(f1Root.getChildren().get(0).getChildren().get(1) instanceof 
ExchangeNode);
     assertTrue(f2Root instanceof IdentitySinkNode);
-    assertTrue(f2Root.getChildren().get(0) instanceof SeriesSourceNode);
+    assertTrue(f2Root.getChildren().get(0) instanceof DeviceViewNode);
     assertTrue(f3Root instanceof IdentitySinkNode);
-    assertTrue(f3Root.getChildren().get(0) instanceof SeriesSourceNode);
+    assertTrue(f3Root.getChildren().get(0) instanceof DeviceViewNode);
 
     // test of MULTI_SERIES
     sql = "select count(s1),count(s2) from root.sg.d1,root.sg.d333 align by 
device";
@@ -710,33 +550,47 @@ public class AlignedByDeviceTest {
     f2Root = plan.getInstances().get(1).getFragment().getPlanNodeTree();
     f3Root = plan.getInstances().get(2).getFragment().getPlanNodeTree();
     assertTrue(f1Root instanceof IdentitySinkNode);
-    assertTrue(f1Root.getChildren().get(0) instanceof DeviceViewNode);
-    assertTrue(f1Root.getChildren().get(0).getChildren().get(0) instanceof 
AggregationNode);
+    assertTrue(f1Root.getChildren().get(0) instanceof 
AggregationMergeSortNode);
+    assertTrue(f1Root.getChildren().get(0).getChildren().get(0) instanceof 
DeviceViewNode);
     assertTrue(
         f1Root.getChildren().get(0).getChildren().get(0).getChildren().get(0)
-            instanceof SeriesSourceNode);
+            instanceof FullOuterTimeJoinNode);
     assertTrue(
         f1Root.getChildren().get(0).getChildren().get(0).getChildren().get(1)
-            instanceof SeriesSourceNode);
-    assertTrue(
-        f1Root.getChildren().get(0).getChildren().get(0).getChildren().get(2)
-            instanceof ExchangeNode);
-    assertTrue(f1Root.getChildren().get(0).getChildren().get(1) instanceof 
AggregationNode);
-    assertTrue(
-        f1Root.getChildren().get(0).getChildren().get(1).getChildren().get(0)
-            instanceof SeriesSourceNode);
-    assertTrue(
-        f1Root.getChildren().get(0).getChildren().get(1).getChildren().get(1)
-            instanceof SeriesSourceNode);
-    assertTrue(
-        f1Root.getChildren().get(0).getChildren().get(1).getChildren().get(2)
-            instanceof ExchangeNode);
+            instanceof FullOuterTimeJoinNode);
+    assertTrue(f1Root.getChildren().get(0).getChildren().get(1) instanceof 
ExchangeNode);
+    assertTrue(f1Root.getChildren().get(0).getChildren().get(1) instanceof 
ExchangeNode);
     assertTrue(f2Root instanceof IdentitySinkNode);
-    assertTrue(f2Root.getChildren().get(0) instanceof HorizontallyConcatNode);
+    assertTrue(f2Root.getChildren().get(0) instanceof DeviceViewNode);
     assertTrue(f3Root instanceof IdentitySinkNode);
-    assertTrue(f3Root.getChildren().get(0) instanceof HorizontallyConcatNode);
+    assertTrue(f3Root.getChildren().get(0) instanceof DeviceViewNode);
   }
 
+  /*
+   * IdentitySinkNode-28
+   *   └──AggregationMergeSort-23
+   *       ├──DeviceView-14
+   *       │   ├──AggregationNode-10
+   *       │   │   └──FilterNode-9
+   *       │   │       └──SeriesScanNode-8:[SeriesPath: root.sg.d1.s1, 
DataRegion: TConsensusGroupId(type:DataRegion, id:1)]
+   *       │   └──AggregationNode-13
+   *       │       └──FilterNode-12
+   *       │           └──SeriesScanNode-11:[SeriesPath: root.sg.d333.s1, 
DataRegion: TConsensusGroupId(type:DataRegion, id:1)]
+   *       ├──ExchangeNode-24: [SourceAddress:192.0.2.1/test.2.0/26]
+   *       └──ExchangeNode-25: [SourceAddress:192.0.4.1/test.3.0/27]
+   *
+   * IdentitySinkNode-26
+   *   └──DeviceView-18
+   *       └──AggregationNode-17
+   *           └──FilterNode-16
+   *               └──SeriesScanNode-15:[SeriesPath: root.sg.d1.s1, 
DataRegion: TConsensusGroupId(type:DataRegion, id:2)]
+   *
+   * IdentitySinkNode-27
+   *   └──DeviceView-22
+   *       └──AggregationNode-21
+   *           └──FilterNode-20
+   *               └──SeriesScanNode-19:[SeriesPath: root.sg.d333.s1, 
DataRegion: TConsensusGroupId(type:DataRegion, id:4)]
+   */
   @Test
   public void testAggregation2Device3RegionWithValueFilter() {
     QueryId queryId = new QueryId("test");
@@ -754,77 +608,28 @@ public class AlignedByDeviceTest {
     PlanNode f2Root = 
plan.getInstances().get(1).getFragment().getPlanNodeTree();
     PlanNode f3Root = 
plan.getInstances().get(1).getFragment().getPlanNodeTree();
     assertTrue(f1Root instanceof IdentitySinkNode);
-    assertTrue(f1Root.getChildren().get(0) instanceof DeviceViewNode);
-    assertTrue(f1Root.getChildren().get(0).getChildren().get(0) instanceof 
AggregationNode);
+    assertTrue(f1Root.getChildren().get(0) instanceof 
AggregationMergeSortNode);
+    assertTrue(f1Root.getChildren().get(0).getChildren().get(0) instanceof 
DeviceViewNode);
     assertTrue(
         f1Root.getChildren().get(0).getChildren().get(0).getChildren().get(0)
-            instanceof FilterNode);
-    assertTrue(
-        
f1Root.getChildren().get(0).getChildren().get(0).getChildren().get(0).getChildren().get(0)
-            instanceof FullOuterTimeJoinNode);
-    assertTrue(
-        f1Root
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(0)
-            instanceof SeriesSourceNode);
-    assertTrue(
-        f1Root
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(1)
-            instanceof ExchangeNode);
-    assertTrue(f1Root.getChildren().get(0).getChildren().get(1) instanceof 
AggregationNode);
-    assertTrue(
-        f1Root.getChildren().get(0).getChildren().get(1).getChildren().get(0)
-            instanceof FilterNode);
-    assertTrue(
-        
f1Root.getChildren().get(0).getChildren().get(1).getChildren().get(0).getChildren().get(0)
-            instanceof FullOuterTimeJoinNode);
+            instanceof AggregationNode);
     assertTrue(
         f1Root
                 .getChildren()
                 .get(0)
                 .getChildren()
-                .get(1)
-                .getChildren()
                 .get(0)
                 .getChildren()
                 .get(0)
                 .getChildren()
                 .get(0)
-            instanceof SeriesSourceNode);
-    assertTrue(
-        f1Root
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(1)
-                .getChildren()
-                .get(0)
                 .getChildren()
                 .get(0)
-                .getChildren()
-                .get(1)
-            instanceof ExchangeNode);
+            instanceof SeriesScanNode);
     assertTrue(f2Root instanceof IdentitySinkNode);
-    assertTrue(f2Root.getChildren().get(0) instanceof SeriesSourceNode);
+    assertTrue(f2Root.getChildren().get(0) instanceof DeviceViewNode);
     assertTrue(f3Root instanceof IdentitySinkNode);
-    assertTrue(f3Root.getChildren().get(0) instanceof SeriesSourceNode);
+    assertTrue(f3Root.getChildren().get(0) instanceof DeviceViewNode);
 
     // test of MULTI_SERIES
     sql = "select count(s1),count(s2) from root.sg.d1,root.sg.d333 where s1 <= 
4 align by device";
@@ -837,100 +642,15 @@ public class AlignedByDeviceTest {
     f2Root = plan.getInstances().get(1).getFragment().getPlanNodeTree();
     f3Root = plan.getInstances().get(2).getFragment().getPlanNodeTree();
     assertTrue(f1Root instanceof IdentitySinkNode);
-    assertTrue(f1Root.getChildren().get(0) instanceof DeviceViewNode);
-    assertTrue(f1Root.getChildren().get(0).getChildren().get(0) instanceof 
AggregationNode);
+    assertTrue(f1Root.getChildren().get(0) instanceof 
AggregationMergeSortNode);
+    assertTrue(f1Root.getChildren().get(0).getChildren().get(0) instanceof 
DeviceViewNode);
     assertTrue(
         f1Root.getChildren().get(0).getChildren().get(0).getChildren().get(0)
-            instanceof FilterNode);
-    assertTrue(
-        
f1Root.getChildren().get(0).getChildren().get(0).getChildren().get(0).getChildren().get(0)
-            instanceof FullOuterTimeJoinNode);
-    assertTrue(
-        f1Root
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(0)
-            instanceof SeriesSourceNode);
-    assertTrue(
-        f1Root
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(1)
-            instanceof SeriesSourceNode);
-    assertTrue(
-        f1Root
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(2)
-            instanceof ExchangeNode);
-    assertTrue(f1Root.getChildren().get(0).getChildren().get(1) instanceof 
AggregationNode);
-    assertTrue(
-        
f1Root.getChildren().get(0).getChildren().get(1).getChildren().get(0).getChildren().get(0)
-            instanceof FullOuterTimeJoinNode);
-    assertTrue(
-        f1Root
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(1)
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(0)
-            instanceof SeriesSourceNode);
-    assertTrue(
-        f1Root
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(1)
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(1)
-            instanceof SeriesSourceNode);
-    assertTrue(
-        f1Root
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(1)
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(0)
-                .getChildren()
-                .get(2)
-            instanceof ExchangeNode);
+            instanceof AggregationNode);
     assertTrue(f2Root instanceof IdentitySinkNode);
-    assertTrue(f2Root.getChildren().get(0) instanceof FullOuterTimeJoinNode);
+    assertTrue(f2Root.getChildren().get(0) instanceof DeviceViewNode);
     assertTrue(f3Root instanceof IdentitySinkNode);
-    assertTrue(f3Root.getChildren().get(0) instanceof FullOuterTimeJoinNode);
+    assertTrue(f3Root.getChildren().get(0) instanceof DeviceViewNode);
   }
 
   @Test
diff --git 
a/iotdb-core/datanode/src/test/resources/datanode1conf/iotdb-common.properties 
b/iotdb-core/datanode/src/test/resources/datanode1conf/iotdb-common.properties
index b89ef13778b..143c0e0b7f0 100644
--- 
a/iotdb-core/datanode/src/test/resources/datanode1conf/iotdb-common.properties
+++ 
b/iotdb-core/datanode/src/test/resources/datanode1conf/iotdb-common.properties
@@ -16,7 +16,6 @@
 # specific language governing permissions and limitations
 # under the License.
 #
-
 timestamp_precision=ms
 udf_lib_dir=target/datanode1/ext/udf
 trigger_lib_dir=target/datanode1/ext/trigger
diff --git 
a/iotdb-core/datanode/src/test/resources/datanode3conf/iotdb-common.properties 
b/iotdb-core/datanode/src/test/resources/datanode3conf/iotdb-common.properties
index 83dbc1b051e..1b48e21f303 100644
--- 
a/iotdb-core/datanode/src/test/resources/datanode3conf/iotdb-common.properties
+++ 
b/iotdb-core/datanode/src/test/resources/datanode3conf/iotdb-common.properties
@@ -16,7 +16,6 @@
 # specific language governing permissions and limitations
 # under the License.
 #
-
 timestamp_precision=ms
 udf_lib_dir=target/datanode3/ext/udf
 trigger_lib_dir=target/datanode3/ext/trigger

Reply via email to