This is an automated email from the ASF dual-hosted git repository.

caogaofei pushed a commit to branch beyyes/agg_mergesort
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit bcb42793358d879c736bc4a656612df82edb3b27
Author: Beyyes <[email protected]>
AuthorDate: Sat Feb 3 16:24:54 2024 +0800

    add AggMergeSort impl
---
 .../operator/process/AggMergeSortOperator.java     | 193 +++++++++++++++++++++
 .../plan/planner/LogicalPlanBuilder.java           |  32 +++-
 .../plan/planner/OperatorTreeGenerator.java        |  22 +++
 .../planner/distribution/ExchangeNodeAdder.java    |  62 ++++++-
 .../plan/planner/distribution/SourceRewriter.java  | 132 ++++++++------
 .../plan/planner/plan/node/PlanGraphPrinter.java   |   9 +
 .../plan/planner/plan/node/PlanNodeType.java       |   7 +-
 .../plan/planner/plan/node/PlanVisitor.java        |   5 +
 .../{DeviceViewNode.java => AggMergeSortNode.java} |  73 ++------
 .../planner/plan/node/process/DeviceViewNode.java  |   8 +-
 .../AlignByDeviceOrderByLimitOffsetTest.java       |   3 +-
 11 files changed, 412 insertions(+), 134 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AggMergeSortOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AggMergeSortOperator.java
new file mode 100644
index 00000000000..8616079e069
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AggMergeSortOperator.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.execution.operator.process;
+
+import org.apache.iotdb.db.queryengine.execution.operator.Operator;
+import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
+import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.column.BinaryColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.NullColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.RunLengthEncodedColumn;
+import org.apache.iotdb.tsfile.utils.Binary;
+
+import com.google.common.util.concurrent.ListenableFuture;
+
+import java.util.List;
+
+/**
+ * Since devices have been sorted by the merge order as expected, what {@link 
AggMergeSortOperator}
+ * need to do is traversing the device child operators, get all tsBlocks of 
one device and transform
+ * it to the form we need, adding the device column and allocating value 
column to its expected
+ * location, then get the next device operator until no next device.
+ *
+ * <p>The deviceOperators can be timeJoinOperator or seriesScanOperator that 
have not transformed
+ * the result form.
+ *
+ * <p>Attention! If some columns are not existing in one device, those columns 
will be null. e.g.
+ * [s1,s2,s3] is query, but only [s1, s3] exists in device1, then the column 
of s2 will be filled
+ * with NullColumn.
+ */
+public class AggMergeSortOperator implements ProcessOperator {
+
+  private final OperatorContext operatorContext;
+  // The size devices and deviceOperators should be the same.
+  private final List<String> devices;
+  private final List<Operator> deviceOperators;
+  // Used to fill columns and leave null columns which doesn't exist in some 
devices.
+  // e.g. [s1,s2,s3] is query, but [s1, s3] exists in device1, then device1 -> 
[1, 3], s1 is 1 but
+  // not 0 because device is the first column
+  private final List<List<Integer>> deviceColumnIndex;
+  // Column dataTypes that includes device column
+  private final List<TSDataType> dataTypes;
+
+  private int deviceIndex;
+
+  public AggMergeSortOperator(
+      OperatorContext operatorContext,
+      List<String> devices,
+      List<Operator> deviceOperators,
+      List<List<Integer>> deviceColumnIndex,
+      List<TSDataType> dataTypes) {
+    this.operatorContext = operatorContext;
+    this.devices = devices;
+    this.deviceOperators = deviceOperators;
+    this.deviceColumnIndex = deviceColumnIndex;
+    this.dataTypes = dataTypes;
+
+    this.deviceIndex = 0;
+  }
+
+  private String getCurDeviceName() {
+    return devices.get(deviceIndex);
+  }
+
+  private Operator getCurDeviceOperator() {
+    return deviceOperators.get(deviceIndex);
+  }
+
+  private List<Integer> getCurDeviceIndexes() {
+    return deviceColumnIndex.get(deviceIndex);
+  }
+
+  @Override
+  public OperatorContext getOperatorContext() {
+    return operatorContext;
+  }
+
+  @Override
+  public ListenableFuture<?> isBlocked() {
+    if (deviceIndex >= deviceOperators.size()) {
+      return NOT_BLOCKED;
+    }
+    ListenableFuture<?> blocked = getCurDeviceOperator().isBlocked();
+    if (!blocked.isDone()) {
+      return blocked;
+    }
+    return NOT_BLOCKED;
+  }
+
+  @Override
+  public TsBlock next() throws Exception {
+    if (!getCurDeviceOperator().hasNextWithTimer()) {
+      // close finished child
+      getCurDeviceOperator().close();
+      deviceOperators.set(deviceIndex, null);
+      // increment index, move to next child
+      deviceIndex++;
+      return null;
+    }
+
+    TsBlock tsBlock = getCurDeviceOperator().nextWithTimer();
+    if (tsBlock == null) {
+      return null;
+    }
+    List<Integer> indexes = getCurDeviceIndexes();
+
+    // fill existing columns
+    Column[] newValueColumns = new Column[dataTypes.size()];
+    for (int i = 0; i < indexes.size(); i++) {
+      newValueColumns[indexes.get(i)] = tsBlock.getColumn(i);
+    }
+    // construct device column
+    ColumnBuilder deviceColumnBuilder = new BinaryColumnBuilder(null, 1);
+    deviceColumnBuilder.writeBinary(new Binary(getCurDeviceName(), 
TSFileConfig.STRING_CHARSET));
+    newValueColumns[0] =
+        new RunLengthEncodedColumn(deviceColumnBuilder.build(), 
tsBlock.getPositionCount());
+    // construct other null columns
+    for (int i = 0; i < dataTypes.size(); i++) {
+      if (newValueColumns[i] == null) {
+        newValueColumns[i] = NullColumn.create(dataTypes.get(i), 
tsBlock.getPositionCount());
+      }
+    }
+    return new TsBlock(tsBlock.getPositionCount(), tsBlock.getTimeColumn(), 
newValueColumns);
+  }
+
+  @Override
+  public boolean hasNext() throws Exception {
+    return deviceIndex < devices.size();
+  }
+
+  @Override
+  public void close() throws Exception {
+    for (int i = deviceIndex, n = deviceOperators.size(); i < n; i++) {
+      Operator currentChild = deviceOperators.get(i);
+      if (currentChild != null) {
+        deviceOperators.get(i).close();
+      }
+    }
+  }
+
+  @Override
+  public boolean isFinished() throws Exception {
+    return !this.hasNextWithTimer();
+  }
+
+  @Override
+  public long calculateMaxPeekMemory() {
+    long maxPeekMemory = calculateMaxReturnSize() + 
calculateRetainedSizeAfterCallingNext();
+    for (Operator child : deviceOperators) {
+      maxPeekMemory = Math.max(maxPeekMemory, child.calculateMaxPeekMemory());
+    }
+    return maxPeekMemory;
+  }
+
+  @Override
+  public long calculateMaxReturnSize() {
+    // null columns would be filled, so return size equals to
+    // (numberOfValueColumns(dataTypes.size() - 1) + 1(timeColumn)) * 
columnSize + deviceColumnSize
+    // size of device name column is ignored
+    return (long) (dataTypes.size())
+        * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
+  }
+
+  @Override
+  public long calculateRetainedSizeAfterCallingNext() {
+    long max = 0;
+    for (Operator operator : deviceOperators) {
+      max = Math.max(max, operator.calculateRetainedSizeAfterCallingNext());
+    }
+    return max;
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java
index f4e95050210..4871df6af9d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java
@@ -55,6 +55,7 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.Sche
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.SchemaQueryOrderByHeatNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.TimeSeriesCountNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.TimeSeriesSchemaScanNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.AggMergeSortNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.AggregationNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ColumnInjectNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.DeviceViewIntoNode;
@@ -811,7 +812,8 @@ public class LogicalPlanBuilder {
                 outputColumnNames,
                 deviceToMeasurementIndexesMap,
                 deviceNameToSourceNodesMap,
-                valueFilterLimit));
+                valueFilterLimit,
+                queryStatement.isAggregationQuery()));
       }
 
       analysis.setUseTopKNode();
@@ -836,7 +838,8 @@ public class LogicalPlanBuilder {
               outputColumnNames,
               deviceToMeasurementIndexesMap,
               deviceNameToSourceNodesMap,
-              -1);
+              -1,
+              queryStatement.isAggregationQuery());
     }
 
     context.getTypeProvider().setType(DEVICE, TSDataType.TEXT);
@@ -916,13 +919,24 @@ public class LogicalPlanBuilder {
       List<String> outputColumnNames,
       Map<String, List<Integer>> deviceToMeasurementIndexesMap,
       Map<String, PlanNode> deviceNameToSourceNodesMap,
-      long valueFilterLimit) {
-    DeviceViewNode deviceViewNode =
-        new DeviceViewNode(
-            context.getQueryId().genPlanNodeId(),
-            orderByParameter,
-            outputColumnNames,
-            deviceToMeasurementIndexesMap);
+      long valueFilterLimit,
+      boolean isAggregation) {
+    DeviceViewNode deviceViewNode;
+    if (isAggregation) {
+      deviceViewNode =
+          new AggMergeSortNode(
+              context.getQueryId().genPlanNodeId(),
+              orderByParameter,
+              outputColumnNames,
+              deviceToMeasurementIndexesMap);
+    } else {
+      deviceViewNode =
+          new DeviceViewNode(
+              context.getQueryId().genPlanNodeId(),
+              orderByParameter,
+              outputColumnNames,
+              deviceToMeasurementIndexesMap);
+    }
 
     for (Map.Entry<String, PlanNode> entry : 
deviceNameToSourceNodesMap.entrySet()) {
       String deviceName = entry.getKey();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
index 7d2b88bb9cc..8661406f2a6 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
@@ -45,6 +45,7 @@ import 
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceManage
 import org.apache.iotdb.db.queryengine.execution.operator.AggregationUtil;
 import org.apache.iotdb.db.queryengine.execution.operator.Operator;
 import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
+import 
org.apache.iotdb.db.queryengine.execution.operator.process.AggMergeSortOperator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.process.AggregationOperator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.process.ColumnInjectOperator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.process.DeviceViewIntoOperator;
@@ -166,6 +167,7 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.Sche
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.SchemaQueryScanNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.TimeSeriesCountNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.TimeSeriesSchemaScanNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.AggMergeSortNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.AggregationNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ColumnInjectNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.DeviceViewIntoNode;
@@ -822,6 +824,26 @@ public class OperatorTreeGenerator extends 
PlanVisitor<Operator, LocalExecutionP
         operatorContext, node.getDevices(), children, deviceColumnIndex, 
outputColumnTypes);
   }
 
+  @Override
+  public Operator visitAggMergeSort(AggMergeSortNode node, 
LocalExecutionPlanContext context) {
+    OperatorContext operatorContext =
+        context
+            .getDriverContext()
+            .addOperatorContext(
+                context.getNextOperatorId(),
+                node.getPlanNodeId(),
+                DeviceViewOperator.class.getSimpleName());
+    List<Operator> children = dealWithConsumeChildrenOneByOneNode(node, 
context);
+    List<List<Integer>> deviceColumnIndex =
+        node.getDevices().stream()
+            .map(deviceName -> 
node.getDeviceToMeasurementIndexesMap().get(deviceName))
+            .collect(Collectors.toList());
+    List<TSDataType> outputColumnTypes = getOutputColumnTypes(node, 
context.getTypeProvider());
+
+    return new AggMergeSortOperator(
+        operatorContext, node.getDevices(), children, deviceColumnIndex, 
outputColumnTypes);
+  }
+
   @Override
   public Operator visitMergeSort(MergeSortNode node, LocalExecutionPlanContext 
context) {
     OperatorContext operatorContext =
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/ExchangeNodeAdder.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/ExchangeNodeAdder.java
index 06043bd2e41..8e6fed5a02a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/ExchangeNodeAdder.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/ExchangeNodeAdder.java
@@ -32,6 +32,7 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.Sche
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.SchemaQueryMergeNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.SchemaQueryOrderByHeatNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.SchemaQueryScanNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.AggMergeSortNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.AggregationNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.DeviceMergeNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.DeviceViewNode;
@@ -200,6 +201,11 @@ public class ExchangeNodeAdder extends 
PlanVisitor<PlanNode, NodeGroupContext> {
     return processMultiChildNode(node, context);
   }
 
+  @Override
+  public PlanNode visitAggMergeSort(AggMergeSortNode node, NodeGroupContext 
context) {
+    return processMultiChildNode(node, context);
+  }
+
   @Override
   public PlanNode visitDeviceMerge(DeviceMergeNode node, NodeGroupContext 
context) {
     return processMultiChildNode(node, context);
@@ -377,7 +383,7 @@ public class ExchangeNodeAdder extends 
PlanVisitor<PlanNode, NodeGroupContext> {
             .map(child -> visit(child, context))
             .collect(Collectors.toList());
 
-    // DataRegion which node locates
+    // DataRegion in which node locates
     TRegionReplicaSet dataRegion;
     boolean isChildrenDistributionSame = 
nodeDistributionIsSame(visitedChildren, context);
     NodeDistributionType distributionType =
@@ -406,10 +412,14 @@ public class ExchangeNodeAdder extends 
PlanVisitor<PlanNode, NodeGroupContext> {
       return newNode;
     }
 
+    if (node instanceof AggMergeSortNode) {
+      return processAggMergeSortNode(node, visitedChildren, context, newNode, 
dataRegion);
+    }
+
     // optimize `order by time|expression limit N align by device` query,
     // to ensure that the number of ExchangeNode equals to DataRegionNum but 
not equals to DeviceNum
     if (node instanceof TopKNode) {
-      return processTopNode(node, visitedChildren, context, newNode, 
dataRegion);
+      return processTopKNode(node, visitedChildren, context, newNode, 
dataRegion);
     }
 
     // Otherwise, we need to add ExchangeNode for the child whose DataRegion 
is different from the
@@ -450,7 +460,7 @@ public class ExchangeNodeAdder extends 
PlanVisitor<PlanNode, NodeGroupContext> {
     return newNode;
   }
 
-  private PlanNode processTopNode(
+  private PlanNode processTopKNode(
       MultiChildProcessNode node,
       List<PlanNode> visitedChildren,
       NodeGroupContext context,
@@ -496,6 +506,52 @@ public class ExchangeNodeAdder extends 
PlanVisitor<PlanNode, NodeGroupContext> {
     return newNode;
   }
 
+  private PlanNode processAggMergeSortNode(
+      MultiChildProcessNode node,
+      List<PlanNode> visitedChildren,
+      NodeGroupContext context,
+      MultiChildProcessNode newNode,
+      TRegionReplicaSet dataRegion) {
+    AggMergeSortNode aggMergeSortNode = (AggMergeSortNode) node;
+    Map<TRegionReplicaSet, DeviceViewNode> regionTopKNodeMap = new HashMap<>();
+    for (PlanNode child : visitedChildren) {
+      TRegionReplicaSet region = 
context.getNodeDistribution(child.getPlanNodeId()).region;
+      regionTopKNodeMap
+          .computeIfAbsent(
+              region,
+              k -> {
+                DeviceViewNode childDeviceViewNode =
+                    new DeviceViewNode(
+                        context.queryContext.getQueryId().genPlanNodeId(),
+                        aggMergeSortNode.getMergeOrderParameter(),
+                        aggMergeSortNode.getOutputColumnNames(),
+                        aggMergeSortNode.getDeviceToMeasurementIndexesMap());
+                context.putNodeDistribution(
+                    childDeviceViewNode.getPlanNodeId(),
+                    new 
NodeDistribution(NodeDistributionType.SAME_WITH_ALL_CHILDREN, region));
+                return childDeviceViewNode;
+              })
+          .addChild(child);
+    }
+
+    for (Map.Entry<TRegionReplicaSet, DeviceViewNode> entry : 
regionTopKNodeMap.entrySet()) {
+      TRegionReplicaSet deviceViewNodeLocatedRegion = entry.getKey();
+      DeviceViewNode deviceViewNode = entry.getValue();
+
+      if (!dataRegion.equals(deviceViewNodeLocatedRegion)) {
+        ExchangeNode exchangeNode =
+            new 
ExchangeNode(context.queryContext.getQueryId().genPlanNodeId());
+        exchangeNode.setChild(deviceViewNode);
+        
exchangeNode.setOutputColumnNames(deviceViewNode.getOutputColumnNames());
+        context.hasExchangeNode = true;
+        newNode.addChild(exchangeNode);
+      } else {
+        newNode.addChild(deviceViewNode);
+      }
+    }
+    return newNode;
+  }
+
   @Override
   public PlanNode visitSlidingWindowAggregation(
       SlidingWindowAggregationNode node, NodeGroupContext context) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java
index 7b5a82062f9..3661d93b252 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java
@@ -38,6 +38,7 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.Sche
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.SchemaFetchScanNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.SchemaQueryMergeNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.SchemaQueryScanNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.AggMergeSortNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.AggregationNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.DeviceViewNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.GroupByLevelNode;
@@ -232,6 +233,30 @@ public class SourceRewriter extends 
SimplePlanNodeRewriter<DistributionPlanConte
     return Collections.singletonList(mergeSortNode);
   }
 
+  @Override
+  public List<PlanNode> visitAggMergeSort(AggMergeSortNode node, 
DistributionPlanContext context) {
+    AggMergeSortNode newRoot =
+        new AggMergeSortNode(
+            context.queryContext.getQueryId().genPlanNodeId(),
+            node.getMergeOrderParameter(),
+            node.getOutputColumnNames(),
+            node.getDeviceToMeasurementIndexesMap());
+    for (int i = 0; i < node.getDevices().size(); i++) {
+      List<PlanNode> rewroteNode = rewrite(node.getChildren().get(i), context);
+      for (PlanNode planNode : rewroteNode) {
+        if (planNode instanceof AggregationNode) {
+          // if children of AggregationNode can only be AggScanNode?
+          for (PlanNode aggScanNode : planNode.getChildren()) {
+            newRoot.addChildDeviceNode(node.getDevices().get(i), aggScanNode);
+          }
+        } else {
+          newRoot.addChildDeviceNode(node.getDevices().get(i), planNode);
+        }
+      }
+    }
+    return Collections.singletonList(newRoot);
+  }
+
   private List<PlanNode> processSpecialDeviceView(
       DeviceViewNode node, DistributionPlanContext context) {
     DeviceViewNode newRoot = cloneDeviceViewNodeWithoutChild(node, context);
@@ -347,23 +372,21 @@ public class SourceRewriter extends 
SimplePlanNodeRewriter<DistributionPlanConte
                 }
               });
       schemaRegions.forEach(
-          region -> {
-            addSchemaSourceNode(
-                root,
-                seed.getPath(),
-                region,
-                context.queryContext.getQueryId().genPlanNodeId(),
-                seed);
-          });
+          region ->
+              addSchemaSourceNode(
+                  root,
+                  seed.getPath(),
+                  region,
+                  context.queryContext.getQueryId().genPlanNodeId(),
+                  seed));
       regionsOfSystemDatabase.forEach(
-          region -> {
-            addSchemaSourceNode(
-                root,
-                seed.getPath(),
-                region,
-                context.queryContext.getQueryId().genPlanNodeId(),
-                seed);
-          });
+          region ->
+              addSchemaSourceNode(
+                  root,
+                  seed.getPath(),
+                  region,
+                  context.queryContext.getQueryId().genPlanNodeId(),
+                  seed));
     } else {
       // the path pattern may only overlap with part of storageGroup or 
storageGroup.**, need filter
       PathPatternTree patternTree = new PathPatternTree();
@@ -394,31 +417,29 @@ public class SourceRewriter extends 
SimplePlanNodeRewriter<DistributionPlanConte
             List<PartialPath> filteredPathPatternList =
                 filterPathPattern(patternTree, storageGroup);
             schemaRegionSet.forEach(
-                region -> {
-                  addSchemaSourceNode(
-                      root,
-                      filteredPathPatternList.size() == 1
-                          ? filteredPathPatternList.get(0)
-                          : seed.getPath(),
-                      region,
-                      context.queryContext.getQueryId().genPlanNodeId(),
-                      seed);
-                });
+                region ->
+                    addSchemaSourceNode(
+                        root,
+                        filteredPathPatternList.size() == 1
+                            ? filteredPathPatternList.get(0)
+                            : seed.getPath(),
+                        region,
+                        context.queryContext.getQueryId().genPlanNodeId(),
+                        seed));
           });
       if (!regionsOfSystemDatabase.isEmpty()) {
         List<PartialPath> filteredPathPatternList =
             filterPathPattern(patternTree, SchemaConstant.SYSTEM_DATABASE);
         regionsOfSystemDatabase.forEach(
-            region -> {
-              addSchemaSourceNode(
-                  root,
-                  filteredPathPatternList.size() == 1
-                      ? filteredPathPatternList.get(0)
-                      : seed.getPath(),
-                  region,
-                  context.queryContext.getQueryId().genPlanNodeId(),
-                  seed);
-            });
+            region ->
+                addSchemaSourceNode(
+                    root,
+                    filteredPathPatternList.size() == 1
+                        ? filteredPathPatternList.get(0)
+                        : seed.getPath(),
+                    region,
+                    context.queryContext.getQueryId().genPlanNodeId(),
+                    seed));
       }
     }
     return Collections.singletonList(root);
@@ -462,11 +483,10 @@ public class SourceRewriter extends 
SimplePlanNodeRewriter<DistributionPlanConte
         .getSchemaPartitionInfo()
         .getSchemaPartitionMap()
         .forEach(
-            (storageGroup, deviceGroup) -> {
-              deviceGroup.forEach(
-                  (deviceGroupId, schemaRegionReplicaSet) ->
-                      schemaRegions.add(schemaRegionReplicaSet));
-            });
+            (storageGroup, deviceGroup) ->
+                deviceGroup.forEach(
+                    (deviceGroupId, schemaRegionReplicaSet) ->
+                        schemaRegions.add(schemaRegionReplicaSet)));
     schemaRegions.forEach(
         region -> {
           SchemaQueryScanNode schemaQueryScanNode = (SchemaQueryScanNode) 
seed.clone();
@@ -564,14 +584,13 @@ public class SourceRewriter extends 
SimplePlanNodeRewriter<DistributionPlanConte
     List<AggregationDescriptor> leafAggDescriptorList = new ArrayList<>();
     node.getAggregationDescriptorList()
         .forEach(
-            descriptor -> {
-              leafAggDescriptorList.add(
-                  new AggregationDescriptor(
-                      descriptor.getAggregationFuncName(),
-                      AggregationStep.PARTIAL,
-                      descriptor.getInputExpressions(),
-                      descriptor.getInputAttributes()));
-            });
+            descriptor ->
+                leafAggDescriptorList.add(
+                    new AggregationDescriptor(
+                        descriptor.getAggregationFuncName(),
+                        AggregationStep.PARTIAL,
+                        descriptor.getInputExpressions(),
+                        descriptor.getInputAttributes())));
     leafAggDescriptorList.forEach(
         d ->
             LogicalPlanBuilder.updateTypeProviderByPartialAggregation(
@@ -579,14 +598,13 @@ public class SourceRewriter extends 
SimplePlanNodeRewriter<DistributionPlanConte
     List<AggregationDescriptor> rootAggDescriptorList = new ArrayList<>();
     node.getAggregationDescriptorList()
         .forEach(
-            descriptor -> {
-              rootAggDescriptorList.add(
-                  new AggregationDescriptor(
-                      descriptor.getAggregationFuncName(),
-                      context.isRoot ? AggregationStep.FINAL : 
AggregationStep.INTERMEDIATE,
-                      descriptor.getInputExpressions(),
-                      descriptor.getInputAttributes()));
-            });
+            descriptor ->
+                rootAggDescriptorList.add(
+                    new AggregationDescriptor(
+                        descriptor.getAggregationFuncName(),
+                        context.isRoot ? AggregationStep.FINAL : 
AggregationStep.INTERMEDIATE,
+                        descriptor.getInputExpressions(),
+                        descriptor.getInputAttributes())));
 
     AggregationNode aggregationNode =
         new AggregationNode(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java
index fcebd9b0c66..72ac2a5a581 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.commons.partition.DataPartition;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.db.queryengine.plan.expression.Expression;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.AggMergeSortNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.AggregationNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ColumnInjectNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.DeviceMergeNode;
@@ -188,6 +189,14 @@ public class PlanGraphPrinter extends 
PlanVisitor<List<String>, PlanGraphPrinter
     return render(node, boxValue, context);
   }
 
+  @Override
+  public List<String> visitAggMergeSort(AggMergeSortNode node, GraphContext 
context) {
+    List<String> boxValue = new ArrayList<>();
+    boxValue.add(String.format("AggMergeSort-%s", 
node.getPlanNodeId().getId()));
+    boxValue.add(String.format("AggDeviceCount: %d", 
node.getDevices().size()));
+    return render(node, boxValue, context);
+  }
+
   @Override
   public List<String> visitMergeSort(MergeSortNode node, GraphContext context) 
{
     List<String> boxValue = new ArrayList<>();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
index 5eb722c7422..1cf585eeb7d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
@@ -60,6 +60,7 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedC
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedDeleteDataNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedInsertNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedWriteSchemaNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.AggMergeSortNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.AggregationNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ColumnInjectNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.DeviceMergeNode;
@@ -200,7 +201,9 @@ public enum PlanNodeType {
   PIPE_ENRICHED_DELETE_SCHEMA((short) 86),
 
   INNER_TIME_JOIN((short) 87),
-  LEFT_OUTER_TIME_JOIN((short) 88);
+  LEFT_OUTER_TIME_JOIN((short) 88),
+
+  AGG_MERGE_SORT((short) 89);
 
   public static final int BYTES = Short.BYTES;
 
@@ -425,6 +428,8 @@ public enum PlanNodeType {
         return InnerTimeJoinNode.deserialize(buffer);
       case 88:
         return LeftOuterTimeJoinNode.deserialize(buffer);
+      case 89:
+        return AggMergeSortNode.deserialize(buffer);
       default:
         throw new IllegalArgumentException("Invalid node type: " + nodeType);
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java
index ed7072a76eb..8b67a9a98bc 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java
@@ -57,6 +57,7 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedC
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedDeleteDataNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedInsertNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedWriteSchemaNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.AggMergeSortNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.AggregationNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ColumnInjectNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.DeviceMergeNode;
@@ -231,6 +232,10 @@ public abstract class PlanVisitor<R, C> {
     return visitMultiChildProcess(node, context);
   }
 
+  public R visitAggMergeSort(AggMergeSortNode node, C context) {
+    return visitMultiChildProcess(node, context);
+  }
+
   public R visitDeviceMerge(DeviceMergeNode node, C context) {
     return visitMultiChildProcess(node, context);
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/DeviceViewNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/AggMergeSortNode.java
similarity index 72%
copy from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/DeviceViewNode.java
copy to 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/AggMergeSortNode.java
index 4d4b9b67049..ad1f2797db7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/DeviceViewNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/AggMergeSortNode.java
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
@@ -16,6 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.iotdb.db.queryengine.plan.planner.plan.node.process;
 
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
@@ -34,70 +35,33 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 
-/**
- * DeviceViewNode is responsible for constructing a device-based view of a set 
of series. And output
- * the result with specific order. The order could be 'order by device' or 
'order by timestamp'
- *
- * <p>Each output from its children should have the same schema. That means, 
the columns should be
- * same between these TsBlocks. If the input TsBlock contains n columns, the 
device-based view will
- * contain n+1 columns where the new column is Device column.
- */
-public class DeviceViewNode extends MultiChildProcessNode {
-
-  // The result output order, which could sort by device and time.
-  // The size of this list is 2 and the first SortItem in this list has higher 
priority.
-  private final OrderByParameter mergeOrderParameter;
-
-  // The size devices and children should be the same.
-  private final List<String> devices = new ArrayList<>();
+public class AggMergeSortNode extends DeviceViewNode {
 
-  // Device column and measurement columns in result output
-  private final List<String> outputColumnNames;
-
-  // e.g. [s1,s2,s3] is query, but [s1, s3] exists in device1, then device1 -> 
[1, 3], s1 is 1 but
-  // not 0 because device is the first column
-  private final Map<String, List<Integer>> deviceToMeasurementIndexesMap;
-
-  public DeviceViewNode(
+  public AggMergeSortNode(
       PlanNodeId id,
       OrderByParameter mergeOrderParameter,
       List<String> outputColumnNames,
       Map<String, List<Integer>> deviceToMeasurementIndexesMap) {
-    super(id);
-    this.mergeOrderParameter = mergeOrderParameter;
-    this.outputColumnNames = outputColumnNames;
-    this.deviceToMeasurementIndexesMap = deviceToMeasurementIndexesMap;
+    super(id, mergeOrderParameter, outputColumnNames, 
deviceToMeasurementIndexesMap);
   }
 
-  public DeviceViewNode(
+  public AggMergeSortNode(
       PlanNodeId id,
       OrderByParameter mergeOrderParameter,
       List<String> outputColumnNames,
       List<String> devices,
       Map<String, List<Integer>> deviceToMeasurementIndexesMap) {
-    super(id);
-    this.mergeOrderParameter = mergeOrderParameter;
-    this.outputColumnNames = outputColumnNames;
-    this.devices.addAll(devices);
-    this.deviceToMeasurementIndexesMap = deviceToMeasurementIndexesMap;
-  }
-
-  public void addChildDeviceNode(String deviceName, PlanNode childNode) {
-    this.devices.add(deviceName);
-    this.children.add(childNode);
-  }
-
-  public List<String> getDevices() {
-    return devices;
+    super(id, mergeOrderParameter, outputColumnNames, devices, 
deviceToMeasurementIndexesMap);
   }
 
-  public Map<String, List<Integer>> getDeviceToMeasurementIndexesMap() {
-    return deviceToMeasurementIndexesMap;
+  @Override
+  public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+    return visitor.visitAggMergeSort(this, context);
   }
 
   @Override
   public PlanNode clone() {
-    return new DeviceViewNode(
+    return new AggMergeSortNode(
         getPlanNodeId(),
         mergeOrderParameter,
         outputColumnNames,
@@ -105,23 +69,14 @@ public class DeviceViewNode extends MultiChildProcessNode {
         deviceToMeasurementIndexesMap);
   }
 
-  public OrderByParameter getMergeOrderParameter() {
-    return mergeOrderParameter;
-  }
-
   @Override
   public List<String> getOutputColumnNames() {
     return outputColumnNames;
   }
 
-  @Override
-  public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
-    return visitor.visitDeviceView(this, context);
-  }
-
   @Override
   protected void serializeAttributes(ByteBuffer byteBuffer) {
-    PlanNodeType.DEVICE_VIEW.serialize(byteBuffer);
+    PlanNodeType.AGG_MERGE_SORT.serialize(byteBuffer);
     mergeOrderParameter.serializeAttributes(byteBuffer);
     ReadWriteIOUtils.write(outputColumnNames.size(), byteBuffer);
     for (String column : outputColumnNames) {
@@ -143,7 +98,7 @@ public class DeviceViewNode extends MultiChildProcessNode {
 
   @Override
   protected void serializeAttributes(DataOutputStream stream) throws 
IOException {
-    PlanNodeType.DEVICE_VIEW.serialize(stream);
+    PlanNodeType.AGG_MERGE_SORT.serialize(stream);
     mergeOrderParameter.serializeAttributes(stream);
     ReadWriteIOUtils.write(outputColumnNames.size(), stream);
     for (String column : outputColumnNames) {
@@ -225,6 +180,6 @@ public class DeviceViewNode extends MultiChildProcessNode {
 
   @Override
   public String toString() {
-    return "DeviceView-" + this.getPlanNodeId();
+    return "AggMergeSort-" + this.getPlanNodeId();
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/DeviceViewNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/DeviceViewNode.java
index 4d4b9b67049..72fddc56896 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/DeviceViewNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/DeviceViewNode.java
@@ -46,17 +46,17 @@ public class DeviceViewNode extends MultiChildProcessNode {
 
   // The result output order, which could sort by device and time.
   // The size of this list is 2 and the first SortItem in this list has higher 
priority.
-  private final OrderByParameter mergeOrderParameter;
+  protected final OrderByParameter mergeOrderParameter;
 
   // The size devices and children should be the same.
-  private final List<String> devices = new ArrayList<>();
+  protected final List<String> devices = new ArrayList<>();
 
   // Device column and measurement columns in result output
-  private final List<String> outputColumnNames;
+  protected final List<String> outputColumnNames;
 
   // e.g. [s1,s2,s3] is query, but [s1, s3] exists in device1, then device1 -> 
[1, 3], s1 is 1 but
   // not 0 because device is the first column
-  private final Map<String, List<Integer>> deviceToMeasurementIndexesMap;
+  protected final Map<String, List<Integer>> deviceToMeasurementIndexesMap;
 
   public DeviceViewNode(
       PlanNodeId id,
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/AlignByDeviceOrderByLimitOffsetTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/AlignByDeviceOrderByLimitOffsetTest.java
index 750a86c4b3a..bf9ac3e9861 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/AlignByDeviceOrderByLimitOffsetTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/AlignByDeviceOrderByLimitOffsetTest.java
@@ -90,7 +90,8 @@ public class AlignByDeviceOrderByLimitOffsetTest {
   @Test
   public void orderByDeviceTest1() {
     // no order by
-    sql = "select * from root.sg.d1, root.sg.d22 LIMIT 10 align by device";
+    // sql = "select * from root.sg.d1, root.sg.d22 LIMIT 10 align by device";
+    sql = "select first_value(s1) from root.sg.d1, root.sg.d22 align by 
device";
     analysis = Util.analyze(sql, context);
     logicalPlanNode = Util.genLogicalPlan(analysis, context);
     planner = new DistributionPlanner(analysis, new LogicalQueryPlan(context, 
logicalPlanNode));


Reply via email to