This is an automated email from the ASF dual-hosted git repository.

caogaofei pushed a commit to branch beyyes/agg_mergesort
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/beyyes/agg_mergesort by this 
push:
     new 1351d02a509 perfect code
1351d02a509 is described below

commit 1351d02a50900b04410240fb802a91ab119c949f
Author: Beyyes <[email protected]>
AuthorDate: Sun Feb 18 20:05:32 2024 +0800

    perfect code
---
 .../process/AggregationMergeSortOperator.java      | 283 ++++++++++++---------
 .../plan/planner/LogicalPlanBuilder.java           |  11 +-
 .../plan/planner/OperatorTreeGenerator.java        |  25 +-
 .../planner/distribution/ExchangeNodeAdder.java    |   8 +-
 .../plan/planner/distribution/SourceRewriter.java  |  12 +-
 .../plan/planner/plan/node/PlanGraphPrinter.java   |   1 -
 .../node/process/AggregationMergeSortNode.java     | 141 ++++------
 .../operator/AggregationOperatorTest.java          |  17 +-
 .../distribution/AggregationAlignByDeviceTest.java | 117 +++++++++
 9 files changed, 377 insertions(+), 238 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AggregationMergeSortOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AggregationMergeSortOperator.java
index 3efb8111624..c7f4b19717d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AggregationMergeSortOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AggregationMergeSortOperator.java
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
@@ -21,173 +21,228 @@ package 
org.apache.iotdb.db.queryengine.execution.operator.process;
 
 import org.apache.iotdb.db.queryengine.execution.operator.Operator;
 import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
-import 
org.apache.iotdb.db.queryengine.execution.operator.source.ExchangeOperator;
+import org.apache.iotdb.db.utils.datastructure.MergeSortHeap;
+import org.apache.iotdb.db.utils.datastructure.MergeSortKey;
+import org.apache.iotdb.db.utils.datastructure.SortKey;
 import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
-import org.apache.iotdb.tsfile.read.common.block.column.BinaryColumnBuilder;
-import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
 import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
-import org.apache.iotdb.tsfile.read.common.block.column.NullColumn;
-import org.apache.iotdb.tsfile.read.common.block.column.RunLengthEncodedColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
 
 import com.google.common.util.concurrent.ListenableFuture;
 
+import java.util.ArrayList;
+import java.util.Comparator;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import static com.google.common.util.concurrent.Futures.successfulAsList;
+
+public class AggregationMergeSortOperator extends AbstractConsumeAllOperator {
 
-/**
- * Since devices have been sorted by the merge order as expected, what {@link
- * AggregationMergeSortOperator} need to do is traversing the device child 
operators, get all
- * tsBlocks of one device and transform it to the form we need, adding the 
device column and
- * allocating value column to its expected location, then get the next device 
operator until no next
- * device.
- *
- * <p>The deviceOperators can be aggregationSeriesScanOperator, 
imeJoinOperator or
- * seriesScanOperator that have not transformed the result form.
- *
- * <p>Attention! If some columns are not existing in one device, those columns 
will be null. e.g.
- * [s1,s2,s3] is query, but only [s1, s3] exists in device1, then the column 
of s2 will be filled
- * with NullColumn.
- */
-public class AggregationMergeSortOperator implements ProcessOperator {
-
-  private final OperatorContext operatorContext;
-  // The size devices and deviceOperators should be the same.
-  private final List<String> devices;
-  private final List<Operator> deviceOperators;
-  // Used to fill columns and leave null columns which doesn't exist in some 
devices.
-  // e.g. [s1,s2,s3] is query, but [s1, s3] exists in device1, then device1 -> 
[1, 3], s1 is 1 but
-  // not 0 because device is the first column
-  private final List<List<Integer>> deviceColumnIndex;
-  // Column dataTypes that includes device column
   private final List<TSDataType> dataTypes;
+  private final TsBlockBuilder tsBlockBuilder;
+  private final boolean[] noMoreTsBlocks;
+  private final MergeSortHeap mergeSortHeap;
+  private final Comparator<SortKey> comparator;
 
-  private int deviceIndex;
+  private boolean finished;
 
   public AggregationMergeSortOperator(
       OperatorContext operatorContext,
-      List<String> devices,
-      List<Operator> deviceOperators,
-      List<List<Integer>> deviceColumnIndex,
-      List<TSDataType> dataTypes) {
-    this.operatorContext = operatorContext;
-    this.devices = devices;
-    this.deviceOperators = deviceOperators;
-    this.deviceColumnIndex = deviceColumnIndex;
+      List<Operator> inputOperators,
+      List<TSDataType> dataTypes,
+      Comparator<SortKey> comparator) {
+    super(operatorContext, inputOperators);
     this.dataTypes = dataTypes;
-
-    this.deviceIndex = 0;
-  }
-
-  private Operator getCurDeviceOperator() {
-    return deviceOperators.get(deviceIndex);
-  }
-
-  private List<Integer> getCurDeviceIndexes() {
-    return deviceColumnIndex.get(deviceIndex);
-  }
-
-  @Override
-  public OperatorContext getOperatorContext() {
-    return operatorContext;
+    this.mergeSortHeap = new MergeSortHeap(inputOperatorsCount, comparator);
+    this.comparator = comparator;
+    this.noMoreTsBlocks = new boolean[inputOperatorsCount];
+    this.tsBlockBuilder = new TsBlockBuilder(dataTypes);
   }
 
   @Override
   public ListenableFuture<?> isBlocked() {
-    if (deviceIndex >= deviceOperators.size()) {
-      return NOT_BLOCKED;
-    }
-    ListenableFuture<?> blocked = getCurDeviceOperator().isBlocked();
-    if (!blocked.isDone()) {
-      return blocked;
+    boolean hasReadyChild = false;
+    List<ListenableFuture<?>> listenableFutures = new ArrayList<>();
+    for (int i = 0; i < inputOperatorsCount; i++) {
+      if (noMoreTsBlocks[i] || !isEmpty(i) || children.get(i) == null) {
+        continue;
+      }
+      ListenableFuture<?> blocked = children.get(i).isBlocked();
+      if (blocked.isDone()) {
+        hasReadyChild = true;
+        canCallNext[i] = true;
+      } else {
+        listenableFutures.add(blocked);
+      }
     }
-    return NOT_BLOCKED;
+    return (hasReadyChild || listenableFutures.isEmpty())
+        ? NOT_BLOCKED
+        : successfulAsList(listenableFutures);
   }
 
+  @SuppressWarnings({"squid:S3776", "squid:S135"})
   @Override
   public TsBlock next() throws Exception {
-    if (!getCurDeviceOperator().hasNextWithTimer()) {
-      // close finished child
-      getCurDeviceOperator().close();
-      deviceOperators.set(deviceIndex, null);
-      // increment index, move to next child
-      deviceIndex++;
-      return null;
-    }
-
-    boolean deviceView =
-        getCurDeviceOperator() instanceof DeviceViewOperator
-            || getCurDeviceOperator() instanceof ExchangeOperator;
+    // start stopwatch
+    long startTime = System.nanoTime();
+    long maxRuntime = 
operatorContext.getMaxRunTime().roundTo(TimeUnit.NANOSECONDS);
 
-    TsBlock tsBlock = getCurDeviceOperator().nextWithTimer();
-    if (tsBlock == null) {
+    // 1. fill consumed up TsBlock
+    if (!prepareInput()) {
       return null;
     }
-    List<Integer> indexes = getCurDeviceIndexes();
 
-    // fill existing columns
-    Column[] newValueColumns = new Column[dataTypes.size()];
-    for (int i = 0; i < indexes.size(); i++) {
-      newValueColumns[indexes.get(i)] = tsBlock.getColumn(deviceView ? i + 1 : 
i);
+    // 2. check if we can directly return the original TsBlock instead of 
merging way
+    MergeSortKey minMergeSortKey = mergeSortHeap.poll();
+    if (mergeSortHeap.isEmpty()
+        || comparator.compare(
+                new MergeSortKey(
+                    minMergeSortKey.tsBlock, 
minMergeSortKey.tsBlock.getPositionCount() - 1),
+                mergeSortHeap.peek())
+            < 0) {
+      inputTsBlocks[minMergeSortKey.inputChannelIndex] = null;
+      return minMergeSortKey.rowIndex == 0
+          ? minMergeSortKey.tsBlock
+          : minMergeSortKey.tsBlock.subTsBlock(minMergeSortKey.rowIndex);
     }
-    // construct device column
-    ColumnBuilder deviceColumnBuilder = new BinaryColumnBuilder(null, 1);
-    deviceColumnBuilder.writeBinary(tsBlock.getColumn(0).getBinary(0));
-    newValueColumns[0] =
-        new RunLengthEncodedColumn(deviceColumnBuilder.build(), 
tsBlock.getPositionCount());
-    // construct other null columns
-    for (int i = 0; i < dataTypes.size(); i++) {
-      if (newValueColumns[i] == null) {
-        newValueColumns[i] = NullColumn.create(dataTypes.get(i), 
tsBlock.getPositionCount());
+    mergeSortHeap.push(minMergeSortKey);
+
+    // 3. do merge sort until one TsBlock is consumed up
+    tsBlockBuilder.reset();
+    TimeColumnBuilder timeBuilder = tsBlockBuilder.getTimeColumnBuilder();
+    ColumnBuilder[] valueColumnBuilders = 
tsBlockBuilder.getValueColumnBuilders();
+    while (!mergeSortHeap.isEmpty()) {
+      MergeSortKey mergeSortKey = mergeSortHeap.poll();
+      TsBlock targetBlock = mergeSortKey.tsBlock;
+      int rowIndex = mergeSortKey.rowIndex;
+      timeBuilder.writeLong(targetBlock.getTimeByIndex(rowIndex));
+      for (int i = 0; i < valueColumnBuilders.length; i++) {
+        if (targetBlock.getColumn(i).isNull(rowIndex)) {
+          valueColumnBuilders[i].appendNull();
+          continue;
+        }
+        valueColumnBuilders[i].write(targetBlock.getColumn(i), rowIndex);
+      }
+      tsBlockBuilder.declarePosition();
+      if (mergeSortKey.rowIndex == mergeSortKey.tsBlock.getPositionCount() - 
1) {
+        inputTsBlocks[mergeSortKey.inputChannelIndex] = null;
+        break;
+      } else {
+        mergeSortKey.rowIndex++;
+        mergeSortHeap.push(mergeSortKey);
+      }
+      // break if time is out or tsBlockBuilder is full
+      if (System.nanoTime() - startTime > maxRuntime || 
tsBlockBuilder.isFull()) {
+        break;
       }
     }
-    return new TsBlock(tsBlock.getPositionCount(), tsBlock.getTimeColumn(), 
newValueColumns);
+    return tsBlockBuilder.build();
   }
 
   @Override
   public boolean hasNext() throws Exception {
-    return deviceIndex < deviceOperators.size();
-  }
-
-  @Override
-  public void close() throws Exception {
-    for (int i = deviceIndex, n = deviceOperators.size(); i < n; i++) {
-      Operator currentChild = deviceOperators.get(i);
-      if (currentChild != null) {
-        deviceOperators.get(i).close();
+    if (finished) {
+      return false;
+    }
+    for (int i = 0; i < inputOperatorsCount; i++) {
+      if (!isEmpty(i)) {
+        return true;
+      } else if (!noMoreTsBlocks[i]) {
+        if (!canCallNext[i] || children.get(i).hasNextWithTimer()) {
+          return true;
+        } else {
+          children.get(i).close();
+          children.set(i, null);
+          noMoreTsBlocks[i] = true;
+          inputTsBlocks[i] = null;
+        }
       }
     }
+    return false;
   }
 
   @Override
   public boolean isFinished() throws Exception {
-    return !this.hasNextWithTimer();
+    if (finished) {
+      return true;
+    }
+    finished = true;
+
+    for (int i = 0; i < inputOperatorsCount; i++) {
+      if (!noMoreTsBlocks[i] || !isEmpty(i)) {
+        finished = false;
+        break;
+      }
+    }
+    return finished;
   }
 
   @Override
   public long calculateMaxPeekMemory() {
-    long maxPeekMemory = calculateMaxReturnSize() + 
calculateRetainedSizeAfterCallingNext();
-    for (Operator child : deviceOperators) {
-      maxPeekMemory = Math.max(maxPeekMemory, child.calculateMaxPeekMemory());
+    // MergeToolKit will cache startKey and endKey
+    long maxPeekMemory = 
TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
+    // inputTsBlocks will cache all the tsBlocks returned by inputOperators
+    for (Operator operator : children) {
+      maxPeekMemory += operator.calculateMaxReturnSize();
+      maxPeekMemory += operator.calculateRetainedSizeAfterCallingNext();
     }
-    return maxPeekMemory;
+    for (Operator operator : children) {
+      maxPeekMemory = Math.max(maxPeekMemory, 
operator.calculateMaxPeekMemory());
+    }
+    return Math.max(maxPeekMemory, calculateMaxReturnSize());
   }
 
   @Override
   public long calculateMaxReturnSize() {
-    // null columns would be filled, so return size equals to
-    // (numberOfValueColumns(dataTypes.size() - 1) + 1(timeColumn)) * 
columnSize + deviceColumnSize
-    // size of device name column is ignored
-    return (long) (dataTypes.size())
-        * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
+    return (1L + dataTypes.size()) * 
TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
   }
 
   @Override
   public long calculateRetainedSizeAfterCallingNext() {
-    long max = 0;
-    for (Operator operator : deviceOperators) {
-      max = Math.max(max, operator.calculateRetainedSizeAfterCallingNext());
+    long currentRetainedSize = 0;
+    long minChildReturnSize = Long.MAX_VALUE;
+    for (Operator child : children) {
+      long maxReturnSize = child.calculateMaxReturnSize();
+      minChildReturnSize = Math.min(minChildReturnSize, maxReturnSize);
+      currentRetainedSize += (maxReturnSize + 
child.calculateRetainedSizeAfterCallingNext());
     }
-    return max;
+    return currentRetainedSize - minChildReturnSize;
+  }
+
+  // region helper function used in prepareInput
+
+  /**
+   * @param currentChildIndex the index of the child
+   * @return true if we can skip the currentChild in prepareInput
+   */
+  @Override
+  protected boolean canSkipCurrentChild(int currentChildIndex) {
+    return noMoreTsBlocks[currentChildIndex]
+        || !isEmpty(currentChildIndex)
+        || children.get(currentChildIndex) == null;
+  }
+
+  /** @param currentInputIndex index of the input TsBlock */
+  @Override
+  protected void processCurrentInputTsBlock(int currentInputIndex) {
+    mergeSortHeap.push(new MergeSortKey(inputTsBlocks[currentInputIndex], 0, 
currentInputIndex));
   }
+
+  /**
+   * @param currentChildIndex the index of the child
+   * @throws Exception Potential Exception thrown by Operator.close()
+   */
+  @Override
+  protected void handleFinishedChild(int currentChildIndex) throws Exception {
+    noMoreTsBlocks[currentChildIndex] = true;
+    inputTsBlocks[currentChildIndex] = null;
+    children.get(currentChildIndex).close();
+    children.set(currentChildIndex, null);
+  }
+
+  // endregion
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java
index 3ecf47ad30a..baf0ff42407 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java
@@ -806,7 +806,6 @@ public class LogicalPlanBuilder {
             valueFilterLimit);
       } else {
         // has order by expression, use TopKNode + DeviceViewNode
-
         topKNode.addChild(
             addDeviceViewNode(
                 orderByParameter,
@@ -837,8 +836,8 @@ public class LogicalPlanBuilder {
             addAggMergeSortNode(
                 orderByParameter,
                 outputColumnNames,
-                deviceToMeasurementIndexesMap,
                 deviceNameToSourceNodesMap,
+                deviceToMeasurementIndexesMap,
                 -1);
       } else {
         this.root =
@@ -953,8 +952,8 @@ public class LogicalPlanBuilder {
   private MultiChildProcessNode addAggMergeSortNode(
       OrderByParameter orderByParameter,
       List<String> outputColumnNames,
-      Map<String, List<Integer>> deviceToMeasurementIndexesMap,
       Map<String, PlanNode> deviceNameToSourceNodesMap,
+      Map<String, List<Integer>> deviceToMeasurementIndexesMap,
       long valueFilterLimit) {
     AggregationMergeSortNode aggMergeSortNode =
         new AggregationMergeSortNode(
@@ -964,14 +963,14 @@ public class LogicalPlanBuilder {
             deviceToMeasurementIndexesMap);
 
     for (Map.Entry<String, PlanNode> entry : 
deviceNameToSourceNodesMap.entrySet()) {
-      String deviceName = entry.getKey();
       PlanNode subPlan = entry.getValue();
       if (valueFilterLimit > 0) {
+        // TODO test this situation
         LimitNode limitNode =
             new LimitNode(context.getQueryId().genPlanNodeId(), subPlan, 
valueFilterLimit);
-        aggMergeSortNode.addChildDeviceNode(deviceName, limitNode);
+        aggMergeSortNode.addChild(limitNode);
       } else {
-        aggMergeSortNode.addChildDeviceNode(deviceName, subPlan);
+        aggMergeSortNode.addChild(subPlan);
       }
     }
     return aggMergeSortNode;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
index efe5996fc56..08817fb5e52 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
@@ -833,16 +833,25 @@ public class OperatorTreeGenerator extends 
PlanVisitor<Operator, LocalExecutionP
             .addOperatorContext(
                 context.getNextOperatorId(),
                 node.getPlanNodeId(),
-                DeviceViewOperator.class.getSimpleName());
-    List<Operator> children = dealWithConsumeChildrenOneByOneNode(node, 
context);
-    List<List<Integer>> deviceColumnIndex =
-        node.getDevices().stream()
-            .map(deviceName -> 
node.getDeviceToMeasurementIndexesMap().get(deviceName))
-            .collect(Collectors.toList());
-    List<TSDataType> outputColumnTypes = getOutputColumnTypes(node, 
context.getTypeProvider());
+                MergeSortOperator.class.getSimpleName());
+    List<TSDataType> dataTypes = getOutputColumnTypes(node, 
context.getTypeProvider());
+    context.setCachedDataTypes(dataTypes);
+    List<Operator> children = dealWithConsumeAllChildrenPipelineBreaker(node, 
context);
+    List<SortItem> sortItemList = 
node.getMergeOrderParameter().getSortItemList();
 
+    List<Integer> sortItemIndexList = new ArrayList<>(sortItemList.size());
+    List<TSDataType> sortItemDataTypeList = new 
ArrayList<>(sortItemList.size());
+    genSortInformation(
+        node.getOutputColumnNames(),
+        dataTypes,
+        sortItemList,
+        sortItemIndexList,
+        sortItemDataTypeList);
     return new AggregationMergeSortOperator(
-        operatorContext, node.getDevices(), children, deviceColumnIndex, 
outputColumnTypes);
+        operatorContext,
+        children,
+        dataTypes,
+        MergeSortComparator.getComparator(sortItemList, sortItemIndexList, 
sortItemDataTypeList));
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/ExchangeNodeAdder.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/ExchangeNodeAdder.java
index ced6ab74a0b..05901a22f5a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/ExchangeNodeAdder.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/ExchangeNodeAdder.java
@@ -507,6 +507,7 @@ public class ExchangeNodeAdder extends 
PlanVisitor<PlanNode, NodeGroupContext> {
     return newNode;
   }
 
+  // TODO the impl of this method is not clear
   private PlanNode processAggMergeSortNode(
       MultiChildProcessNode node,
       List<PlanNode> visitedChildren,
@@ -514,11 +515,11 @@ public class ExchangeNodeAdder extends 
PlanVisitor<PlanNode, NodeGroupContext> {
       MultiChildProcessNode newNode,
       TRegionReplicaSet dataRegion) {
     AggregationMergeSortNode aggMergeSortNode = (AggregationMergeSortNode) 
node;
-    Map<TRegionReplicaSet, DeviceViewNode> regionTopKNodeMap = new HashMap<>();
+    Map<TRegionReplicaSet, DeviceViewNode> regionAggMergeNodeMap = new 
HashMap<>();
     for (PlanNode child : visitedChildren) {
       TRegionReplicaSet region = 
context.getNodeDistribution(child.getPlanNodeId()).region;
       DeviceViewNode deviceViewNode =
-          regionTopKNodeMap.computeIfAbsent(
+          regionAggMergeNodeMap.computeIfAbsent(
               region,
               k -> {
                 DeviceViewNode childDeviceViewNode =
@@ -539,7 +540,7 @@ public class ExchangeNodeAdder extends 
PlanVisitor<PlanNode, NodeGroupContext> {
       deviceViewNode.addChildDeviceNode(device, child);
     }
 
-    for (Map.Entry<TRegionReplicaSet, DeviceViewNode> entry : 
regionTopKNodeMap.entrySet()) {
+    for (Map.Entry<TRegionReplicaSet, DeviceViewNode> entry : 
regionAggMergeNodeMap.entrySet()) {
       TRegionReplicaSet deviceViewNodeLocatedRegion = entry.getKey();
       DeviceViewNode deviceViewNode = entry.getValue();
 
@@ -564,6 +565,7 @@ public class ExchangeNodeAdder extends 
PlanVisitor<PlanNode, NodeGroupContext> {
     } else if (child instanceof AlignedSeriesAggregationScanNode) {
       device = ((AlignedSeriesAggregationScanNode) 
child).getAlignedPath().getDevice();
     } else {
+      // TODO can be other Node?
       throw new UnsupportedOperationException(
           String.format("Unsupported child node of AggMergeSortNode, node: 
%s", child.getClass()));
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java
index d152fcd4288..6d0cc848c24 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java
@@ -236,23 +236,17 @@ public class SourceRewriter extends 
SimplePlanNodeRewriter<DistributionPlanConte
   @Override
   public List<PlanNode> visitAggregationMergeSort(
       AggregationMergeSortNode node, DistributionPlanContext context) {
+    // TODO what's the meaning of newRoot
     AggregationMergeSortNode newRoot =
         new AggregationMergeSortNode(
             context.queryContext.getQueryId().genPlanNodeId(),
             node.getMergeOrderParameter(),
             node.getOutputColumnNames(),
             node.getDeviceToMeasurementIndexesMap());
-    for (int i = 0; i < node.getDevices().size(); i++) {
+    for (int i = 0; i < node.getChildren().size(); i++) {
       List<PlanNode> rewroteNode = rewrite(node.getChildren().get(i), context);
       for (PlanNode planNode : rewroteNode) {
-        if (planNode instanceof AggregationNode) {
-          // if children of AggregationNode can only be AggScanNode?
-          for (PlanNode aggScanNode : planNode.getChildren()) {
-            newRoot.addChildDeviceNode(node.getDevices().get(i), aggScanNode);
-          }
-        } else {
-          newRoot.addChildDeviceNode(node.getDevices().get(i), planNode);
-        }
+        newRoot.addChild(planNode);
       }
     }
     return Collections.singletonList(newRoot);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java
index 6db8bb75fbd..df3d42d4d44 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java
@@ -194,7 +194,6 @@ public class PlanGraphPrinter extends 
PlanVisitor<List<String>, PlanGraphPrinter
       AggregationMergeSortNode node, GraphContext context) {
     List<String> boxValue = new ArrayList<>();
     boxValue.add(String.format("AggMergeSort-%s", 
node.getPlanNodeId().getId()));
-    boxValue.add(String.format("AggDeviceCount: %d", 
node.getDevices().size()));
     return render(node, boxValue, context);
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/AggregationMergeSortNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/AggregationMergeSortNode.java
index 58e1b9aed12..d18f408a599 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/AggregationMergeSortNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/AggregationMergeSortNode.java
@@ -37,109 +37,82 @@ import java.util.Objects;
 
 public class AggregationMergeSortNode extends MultiChildProcessNode {
 
-  // The result output order, which could sort by device and time.
-  // The size of this list is 2 and the first SortItem in this list has higher 
priority.
-  protected final OrderByParameter mergeOrderParameter;
+  private final OrderByParameter mergeOrderParameter;
 
-  // The size devices and children should be the same.
-  protected final List<String> devices = new ArrayList<>();
-
-  // Device column and measurement columns in result output
-  protected final List<String> outputColumnNames;
+  private final List<String> outputColumns;
 
   // e.g. [s1,s2,s3] is query, but [s1, s3] exists in device1, then device1 -> 
[1, 3], s1 is 1 but
   // not 0 because device is the first column
-  final Map<String, List<Integer>> deviceToMeasurementIndexesMap;
+  private final Map<String, List<Integer>> deviceToMeasurementIndexesMap;
 
   public AggregationMergeSortNode(
       PlanNodeId id,
       OrderByParameter mergeOrderParameter,
-      List<String> outputColumnNames,
+      List<String> outputColumns,
       Map<String, List<Integer>> deviceToMeasurementIndexesMap) {
     super(id);
     this.mergeOrderParameter = mergeOrderParameter;
-    this.outputColumnNames = outputColumnNames;
+    this.outputColumns = outputColumns;
     this.deviceToMeasurementIndexesMap = deviceToMeasurementIndexesMap;
   }
 
   public AggregationMergeSortNode(
       PlanNodeId id,
+      List<PlanNode> children,
       OrderByParameter mergeOrderParameter,
-      List<String> outputColumnNames,
-      List<String> devices,
+      List<String> outputColumns,
       Map<String, List<Integer>> deviceToMeasurementIndexesMap) {
-    super(id);
+    super(id, children);
     this.mergeOrderParameter = mergeOrderParameter;
-    this.outputColumnNames = outputColumnNames;
-    this.devices.addAll(devices);
+    this.outputColumns = outputColumns;
     this.deviceToMeasurementIndexesMap = deviceToMeasurementIndexesMap;
   }
 
-  @Override
-  public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
-    return visitor.visitAggregationMergeSort(this, context);
+  public OrderByParameter getMergeOrderParameter() {
+    return mergeOrderParameter;
+  }
+
+  public Map<String, List<Integer>> getDeviceToMeasurementIndexesMap() {
+    return deviceToMeasurementIndexesMap;
   }
 
   @Override
   public PlanNode clone() {
     return new AggregationMergeSortNode(
         getPlanNodeId(),
-        mergeOrderParameter,
-        outputColumnNames,
-        devices,
-        deviceToMeasurementIndexesMap);
-  }
-
-  @Override
-  public List<String> getOutputColumnNames() {
-    return outputColumnNames;
+        getMergeOrderParameter(),
+        outputColumns,
+        getDeviceToMeasurementIndexesMap());
   }
 
   @Override
-  public boolean equals(Object o) {
-    if (this == o) {
-      return true;
-    }
-    if (o == null || getClass() != o.getClass()) {
-      return false;
-    }
-    if (!super.equals(o)) {
-      return false;
-    }
-    AggregationMergeSortNode that = (AggregationMergeSortNode) o;
-    return mergeOrderParameter.equals(that.mergeOrderParameter)
-        && devices.equals(that.devices)
-        && outputColumnNames.equals(that.outputColumnNames)
-        && 
deviceToMeasurementIndexesMap.equals(that.deviceToMeasurementIndexesMap);
+  public PlanNode createSubNode(int subNodeId, int startIndex, int endIndex) {
+    return new AggregationMergeSortNode(
+        new PlanNodeId(String.format("%s-%s", getPlanNodeId(), subNodeId)),
+        new ArrayList<>(children.subList(startIndex, endIndex)),
+        getMergeOrderParameter(),
+        outputColumns,
+        getDeviceToMeasurementIndexesMap());
   }
 
   @Override
-  public int hashCode() {
-    return Objects.hash(
-        super.hashCode(),
-        mergeOrderParameter,
-        devices,
-        outputColumnNames,
-        deviceToMeasurementIndexesMap);
+  public List<String> getOutputColumnNames() {
+    return outputColumns;
   }
 
   @Override
-  public String toString() {
-    return "AggMergeSort-" + this.getPlanNodeId();
+  public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+    return visitor.visitAggregationMergeSort(this, context);
   }
 
   @Override
   protected void serializeAttributes(ByteBuffer byteBuffer) {
     PlanNodeType.AGG_MERGE_SORT.serialize(byteBuffer);
     mergeOrderParameter.serializeAttributes(byteBuffer);
-    ReadWriteIOUtils.write(outputColumnNames.size(), byteBuffer);
-    for (String column : outputColumnNames) {
+    ReadWriteIOUtils.write(outputColumns.size(), byteBuffer);
+    for (String column : outputColumns) {
       ReadWriteIOUtils.write(column, byteBuffer);
     }
-    ReadWriteIOUtils.write(devices.size(), byteBuffer);
-    for (String deviceName : devices) {
-      ReadWriteIOUtils.write(deviceName, byteBuffer);
-    }
     ReadWriteIOUtils.write(deviceToMeasurementIndexesMap.size(), byteBuffer);
     for (Map.Entry<String, List<Integer>> entry : 
deviceToMeasurementIndexesMap.entrySet()) {
       ReadWriteIOUtils.write(entry.getKey(), byteBuffer);
@@ -154,14 +127,10 @@ public class AggregationMergeSortNode extends 
MultiChildProcessNode {
   protected void serializeAttributes(DataOutputStream stream) throws 
IOException {
     PlanNodeType.AGG_MERGE_SORT.serialize(stream);
     mergeOrderParameter.serializeAttributes(stream);
-    ReadWriteIOUtils.write(outputColumnNames.size(), stream);
-    for (String column : outputColumnNames) {
+    ReadWriteIOUtils.write(outputColumns.size(), stream);
+    for (String column : outputColumns) {
       ReadWriteIOUtils.write(column, stream);
     }
-    ReadWriteIOUtils.write(devices.size(), stream);
-    for (String deviceName : devices) {
-      ReadWriteIOUtils.write(deviceName, stream);
-    }
     ReadWriteIOUtils.write(deviceToMeasurementIndexesMap.size(), stream);
     for (Map.Entry<String, List<Integer>> entry : 
deviceToMeasurementIndexesMap.entrySet()) {
       ReadWriteIOUtils.write(entry.getKey(), stream);
@@ -173,19 +142,13 @@ public class AggregationMergeSortNode extends 
MultiChildProcessNode {
   }
 
   public static AggregationMergeSortNode deserialize(ByteBuffer byteBuffer) {
-    OrderByParameter mergeOrderParameter = 
OrderByParameter.deserialize(byteBuffer);
+    OrderByParameter orderByParameter = 
OrderByParameter.deserialize(byteBuffer);
     int columnSize = ReadWriteIOUtils.readInt(byteBuffer);
-    List<String> outputColumnNames = new ArrayList<>();
+    List<String> outputColumns = new ArrayList<>();
     while (columnSize > 0) {
-      outputColumnNames.add(ReadWriteIOUtils.readString(byteBuffer));
+      outputColumns.add(ReadWriteIOUtils.readString(byteBuffer));
       columnSize--;
     }
-    int devicesSize = ReadWriteIOUtils.readInt(byteBuffer);
-    List<String> devices = new ArrayList<>();
-    while (devicesSize > 0) {
-      devices.add(ReadWriteIOUtils.readString(byteBuffer));
-      devicesSize--;
-    }
     int mapSize = ReadWriteIOUtils.readInt(byteBuffer);
     Map<String, List<Integer>> deviceToMeasurementIndexesMap = new 
HashMap<>(mapSize);
     while (mapSize > 0) {
@@ -201,23 +164,31 @@ public class AggregationMergeSortNode extends 
MultiChildProcessNode {
     }
     PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
     return new AggregationMergeSortNode(
-        planNodeId, mergeOrderParameter, outputColumnNames, devices, 
deviceToMeasurementIndexesMap);
-  }
-
-  public void addChildDeviceNode(String deviceName, PlanNode childNode) {
-    this.devices.add(deviceName);
-    this.children.add(childNode);
+        planNodeId, orderByParameter, outputColumns, 
deviceToMeasurementIndexesMap);
   }
 
-  public List<String> getDevices() {
-    return devices;
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    if (!super.equals(o)) {
+      return false;
+    }
+    AggregationMergeSortNode that = (AggregationMergeSortNode) o;
+    return Objects.equals(mergeOrderParameter, that.getMergeOrderParameter());
   }
 
-  public Map<String, List<Integer>> getDeviceToMeasurementIndexesMap() {
-    return deviceToMeasurementIndexesMap;
+  @Override
+  public int hashCode() {
+    return Objects.hash(super.hashCode(), mergeOrderParameter);
   }
 
-  public OrderByParameter getMergeOrderParameter() {
-    return mergeOrderParameter;
+  @Override
+  public String toString() {
+    return "AggregationMergeSort-" + this.getPlanNodeId();
   }
 }
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/AggregationOperatorTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/AggregationOperatorTest.java
index 6880e08951f..17211459d65 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/AggregationOperatorTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/AggregationOperatorTest.java
@@ -124,9 +124,8 @@ public class AggregationOperatorTest {
       if (!aggregationOperator.hasNext()) {
         break;
       }
-      TsBlock resultTsBlock = null;
-      resultTsBlock = aggregationOperator.next();
 
+      TsBlock resultTsBlock = aggregationOperator.next();
       if (resultTsBlock == null) {
         continue;
       }
@@ -166,8 +165,7 @@ public class AggregationOperatorTest {
       if (!aggregationOperator.hasNext()) {
         break;
       }
-      TsBlock resultTsBlock = null;
-      resultTsBlock = aggregationOperator.next();
+      TsBlock resultTsBlock = aggregationOperator.next();
       if (resultTsBlock == null) {
         continue;
       }
@@ -215,8 +213,7 @@ public class AggregationOperatorTest {
       if (!aggregationOperator.hasNext()) {
         break;
       }
-      TsBlock resultTsBlock = null;
-      resultTsBlock = aggregationOperator.next();
+      TsBlock resultTsBlock = aggregationOperator.next();
       if (resultTsBlock == null) {
         continue;
       }
@@ -267,8 +264,7 @@ public class AggregationOperatorTest {
       if (!aggregationOperator.hasNext()) {
         break;
       }
-      TsBlock resultTsBlock = null;
-      resultTsBlock = aggregationOperator.next();
+      TsBlock resultTsBlock = aggregationOperator.next();
       if (resultTsBlock == null) {
         continue;
       }
@@ -313,10 +309,7 @@ public class AggregationOperatorTest {
     driverContext.addOperatorContext(3, planNodeId3, 
AggregationOperator.class.getSimpleName());
     driverContext
         .getOperatorContexts()
-        .forEach(
-            operatorContext -> {
-              operatorContext.setMaxRunTime(TEST_TIME_SLICE);
-            });
+        .forEach(operatorContext -> 
OperatorContext.setMaxRunTime(TEST_TIME_SLICE));
 
     MeasurementPath measurementPath1 =
         new MeasurementPath(AGGREGATION_OPERATOR_TEST_SG + ".device0.sensor0", 
TSDataType.INT32);
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/AggregationAlignByDeviceTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/AggregationAlignByDeviceTest.java
new file mode 100644
index 00000000000..052032d2c28
--- /dev/null
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/AggregationAlignByDeviceTest.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.planner.distribution;
+
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
+import org.apache.iotdb.db.queryengine.common.QueryId;
+import org.apache.iotdb.db.queryengine.plan.analyze.Analysis;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.DistributedQueryPlan;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.LogicalQueryPlan;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.AggregationNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.DeviceViewNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.FilterNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.LimitNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.sink.IdentitySinkNode;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class AggregationAlignByDeviceTest {
+  private static final long LIMIT_VALUE = 10;
+
+  QueryId queryId = new QueryId("test");
+  MPPQueryContext context =
+      new MPPQueryContext("", queryId, null, new TEndPoint(), new TEndPoint());
+  String sql;
+  Analysis analysis;
+  PlanNode logicalPlanNode;
+  DistributionPlanner planner;
+  DistributedQueryPlan plan;
+  PlanNode firstFiRoot;
+  PlanNode firstFiTopNode;
+  PlanNode mergeSortNode;
+
+  /*
+   * IdentitySinkNode-27
+   *   └──LimitNode-22
+   *       └──FilterNode-12
+   *           └──DeviceView-14
+   *               ├──AggregationNode-5
+   *               │   └──FilterNode-4
+   *               │       └──FullOuterTimeJoinNode-3
+   *               │           ├──SeriesScanNode-15:[SeriesPath: 
root.sg.d1.s1, DataRegion: TConsensusGroupId(type:DataRegion, id:1)]
+   *               │           ├──SeriesScanNode-17:[SeriesPath: 
root.sg.d1.s2, DataRegion: TConsensusGroupId(type:DataRegion, id:1)]
+   *               │           └──ExchangeNode-23: 
[SourceAddress:192.0.2.1/test.2.0/25]
+   *               └──ExchangeNode-24: [SourceAddress:192.0.3.1/test.3.0/26]
+   *
+   *  IdentitySinkNode-25
+   *   └──FullOuterTimeJoinNode-19
+   *       ├──SeriesScanNode-16:[SeriesPath: root.sg.d1.s1, DataRegion: 
TConsensusGroupId(type:DataRegion, id:2)]
+   *       └──SeriesScanNode-18:[SeriesPath: root.sg.d1.s2, DataRegion: 
TConsensusGroupId(type:DataRegion, id:2)]
+   *
+   *  IdentitySinkNode-26
+   *   └──AggregationNode-10
+   *       └──FilterNode-9
+   *           └──FullOuterTimeJoinNode-8
+   *               ├──SeriesScanNode-20:[SeriesPath: root.sg.d22.s1, 
DataRegion: TConsensusGroupId(type:DataRegion, id:3)]
+   *               └──SeriesScanNode-21:[SeriesPath: root.sg.d22.s2, 
DataRegion: TConsensusGroupId(type:DataRegion, id:3)]
+   */
+  @Test
+  public void oneMeasurementOneRegionTest() {
+    // aggregation + order by device, no value filter
+    sql = "select first_value(s1) from root.sg.d22 align by device";
+    analysis = Util.analyze(sql, context);
+    logicalPlanNode = Util.genLogicalPlan(analysis, context);
+    planner = new DistributionPlanner(analysis, new LogicalQueryPlan(context, 
logicalPlanNode));
+    plan = planner.planFragments();
+    assertEquals(3, plan.getInstances().size());
+    firstFiRoot = plan.getInstances().get(0).getFragment().getPlanNodeTree();
+    assertTrue(firstFiRoot instanceof IdentitySinkNode);
+    assertTrue(firstFiRoot.getChildren().get(0) instanceof LimitNode);
+    PlanNode filterNode = ((LimitNode) 
firstFiRoot.getChildren().get(0)).getChild();
+    assertTrue(filterNode instanceof FilterNode);
+    assertTrue(filterNode.getChildren().get(0) instanceof DeviceViewNode);
+    assertTrue(filterNode.getChildren().get(0).getChildren().get(0) instanceof 
AggregationNode);
+    assertTrue(
+        
filterNode.getChildren().get(0).getChildren().get(0).getChildren().get(0)
+            instanceof FilterNode);
+    PlanNode thirdFiRoot = 
plan.getInstances().get(2).getFragment().getPlanNodeTree();
+    assertTrue(thirdFiRoot instanceof IdentitySinkNode);
+    assertTrue(thirdFiRoot.getChildren().get(0) instanceof AggregationNode);
+    assertTrue(thirdFiRoot.getChildren().get(0).getChildren().get(0) 
instanceof FilterNode);
+  }
+
+  @Test
+  public void twoMeasurementMultiRegionTest() {
+    // aggregation + order by device, no value filter
+    sql = "select count(s1), first_value(s2) from root.sg.d1,root.sg.d333 
align by device";
+    analysis = Util.analyze(sql, context);
+    logicalPlanNode = Util.genLogicalPlan(analysis, context);
+    planner = new DistributionPlanner(analysis, new LogicalQueryPlan(context, 
logicalPlanNode));
+    plan = planner.planFragments();
+    assertEquals(3, plan.getInstances().size());
+    firstFiRoot = plan.getInstances().get(0).getFragment().getPlanNodeTree();
+    System.out.println("aa");
+  }
+}


Reply via email to