This is an automated email from the ASF dual-hosted git repository.

weihao pushed a commit to branch optimizeLast
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit bbc91a12b7831c445687e6d91d738f6bd5aa8091
Author: Weihao Li <[email protected]>
AuthorDate: Wed Mar 4 14:35:24 2026 +0800

    FE & UT
    
    Signed-off-by: Weihao Li <[email protected]>
---
 .../distribute/TableDistributedPlanGenerator.java  | 299 +++++++++++++++++----
 .../AlignedAggregationTreeDeviceViewScanNode.java  | 113 ++++++++
 ...onAlignedAggregationTreeDeviceViewScanNode.java | 113 ++++++++
 .../PushAggregationIntoTableScan.java              |   4 -
 .../plan/relational/analyzer/TreeViewTest.java     |  82 +++++-
 .../planner/assertions/PlanMatchPattern.java       |  34 +++
 6 files changed, 579 insertions(+), 66 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java
index 7072b5f519f..e6bbf92b4a9 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java
@@ -57,6 +57,7 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.planner.SymbolAllocator;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationTableScanNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationTreeDeviceViewScanNode;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AlignedAggregationTreeDeviceViewScanNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AssignUniqueId;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.CollectNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.DeviceTableScanNode;
@@ -72,6 +73,7 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.JoinNode;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.node.LimitNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.MarkDistinctNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.MergeSortNode;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.NonAlignedAggregationTreeDeviceViewScanNode;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.node.OffsetNode;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.node.OutputNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.PatternRecognitionNode;
@@ -1188,70 +1190,19 @@ public class TableDistributedPlanGenerator
   @Override
   public List<PlanNode> visitAggregationTableScan(
       AggregationTableScanNode node, PlanContext context) {
-    String dbName =
-        node instanceof AggregationTreeDeviceViewScanNode
-            ? ((AggregationTreeDeviceViewScanNode) node).getTreeDBName()
-            : node.getQualifiedObjectName().getDatabaseName();
+    String dbName = node.getQualifiedObjectName().getDatabaseName();
     DataPartition dataPartition = analysis.getDataPartitionInfo();
     if (dbName == null || dataPartition == null) {
       node.setRegionReplicaSet(NOT_ASSIGNED);
       return Collections.singletonList(node);
     }
-    boolean needSplit = false;
-    List<List<TRegionReplicaSet>> regionReplicaSetsList = new ArrayList<>();
-    if (dataPartition != null) {
-      Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, 
List<TRegionReplicaSet>>> seriesSlotMap =
-          dataPartition.getDataPartitionMap().get(dbName);
-      if (seriesSlotMap == null) {
-        throw new SemanticException(
-            String.format("Given queried database: %s is not exist!", dbName));
-      }
 
-      Map<Integer, List<TRegionReplicaSet>> cachedSeriesSlotWithRegions = new 
HashMap<>();
-      for (DeviceEntry deviceEntry : node.getDeviceEntries()) {
-        List<TRegionReplicaSet> regionReplicaSets =
-            getDeviceReplicaSets(
-                dataPartition,
-                seriesSlotMap,
-                deviceEntry.getDeviceID(),
-                node.getTimeFilter(),
-                cachedSeriesSlotWithRegions);
-        if (regionReplicaSets.size() > 1) {
-          needSplit = true;
-          context.deviceCrossRegion = true;
-          
queryContext.setNeedUpdateScanNumForLastQuery(node.mayUseLastCache());
-        }
-        regionReplicaSetsList.add(regionReplicaSets);
-      }
-    }
-
-    if (regionReplicaSetsList.isEmpty()) {
-      regionReplicaSetsList = 
Collections.singletonList(Collections.singletonList(NOT_ASSIGNED));
-    }
+    AggregationDistributionInfo distributionInfo =
+        prepareAggregationDistribution(node, dbName, dataPartition, context);
 
     Map<TRegionReplicaSet, AggregationTableScanNode> regionNodeMap = new 
HashMap<>();
-    // Step is SINGLE and device data in more than one region, we need to 
final aggregate the result
-    // from different region here, so split
-    // this node into two-stage
-    needSplit = needSplit && node.getStep() == SINGLE;
-    AggregationNode finalAggregation = null;
-    if (needSplit) {
-      Pair<AggregationNode, AggregationTableScanNode> splitResult =
-          split(node, symbolAllocator, queryId);
-      finalAggregation = splitResult.left;
-      AggregationTableScanNode partialAggregation = splitResult.right;
-
-      // cover case: complete push-down + group by + streamable
-      if (!context.hasSortProperty && finalAggregation.isStreamable()) {
-        OrderingScheme expectedOrderingSchema =
-            constructOrderingSchema(node.getPreGroupedSymbols());
-        context.setExpectedOrderingScheme(expectedOrderingSchema);
-      }
-
-      buildRegionNodeMap(node, regionReplicaSetsList, regionNodeMap, 
partialAggregation);
-    } else {
-      buildRegionNodeMap(node, regionReplicaSetsList, regionNodeMap, node);
-    }
+    buildRegionNodeMap(
+        node, distributionInfo.regionReplicaSetsList, regionNodeMap, 
distributionInfo.templateNode);
 
     List<PlanNode> resultTableScanNodeList = new ArrayList<>();
     TRegionReplicaSet mostUsedDataRegion = null;
@@ -1276,6 +1227,165 @@ public class TableDistributedPlanGenerator
       processSortProperty(node, resultTableScanNodeList, context);
     }
 
+    if (distributionInfo.needSplit) {
+      AggregationNode finalAggregation = distributionInfo.finalAggregation;
+      if (resultTableScanNodeList.size() == 1) {
+        finalAggregation.setChild(resultTableScanNodeList.get(0));
+      } else if (resultTableScanNodeList.size() > 1) {
+        OrderingScheme childOrdering =
+            
nodeOrderingMap.get(resultTableScanNodeList.get(0).getPlanNodeId());
+        finalAggregation.setChild(
+            mergeChildrenViaCollectOrMergeSort(childOrdering, 
resultTableScanNodeList));
+      } else {
+        throw new IllegalStateException("List<PlanNode>.size should >= 1, but 
now is 0");
+      }
+      resultTableScanNodeList = Collections.singletonList(finalAggregation);
+    }
+
+    return resultTableScanNodeList;
+  }
+
+  @Override
+  public List<PlanNode> visitAggregationTreeDeviceViewScan(
+      AggregationTreeDeviceViewScanNode node, PlanContext context) {
+    String dbName = node.getTreeDBName();
+    DataPartition dataPartition = analysis.getDataPartitionInfo();
+    if (dbName == null || dataPartition == null) {
+      node.setRegionReplicaSet(NOT_ASSIGNED);
+      return Collections.singletonList(node);
+    }
+
+    AggregationDistributionInfo distributionInfo =
+        prepareAggregationDistribution(node, dbName, dataPartition, context);
+
+    List<List<TRegionReplicaSet>> regionReplicaSetsList = 
distributionInfo.regionReplicaSetsList;
+    AggregationTableScanNode templateNode = distributionInfo.templateNode;
+    AggregationNode finalAggregation = distributionInfo.finalAggregation;
+    boolean needSplit = distributionInfo.needSplit;
+
+    Map<
+            TRegionReplicaSet,
+            Pair<
+                AlignedAggregationTreeDeviceViewScanNode,
+                NonAlignedAggregationTreeDeviceViewScanNode>>
+        tableScanNodeMap = new HashMap<>();
+
+    for (int i = 0; i < regionReplicaSetsList.size(); i++) {
+      DeviceEntry deviceEntry = node.getDeviceEntries().get(i);
+      List<TRegionReplicaSet> regionReplicaSets = regionReplicaSetsList.get(i);
+
+      for (TRegionReplicaSet regionReplicaSet : regionReplicaSets) {
+        boolean aligned = deviceEntry instanceof AlignedDeviceEntry;
+        Pair<AlignedAggregationTreeDeviceViewScanNode, 
NonAlignedAggregationTreeDeviceViewScanNode>
+            pair = tableScanNodeMap.computeIfAbsent(regionReplicaSet, k -> new 
Pair<>(null, null));
+
+        if (aligned && pair.left == null) {
+          AlignedAggregationTreeDeviceViewScanNode scanNode =
+              new AlignedAggregationTreeDeviceViewScanNode(
+                  queryId.genPlanNodeId(),
+                  templateNode.getQualifiedObjectName(),
+                  templateNode.getOutputSymbols(),
+                  templateNode.getAssignments(),
+                  new ArrayList<>(),
+                  templateNode.getTagAndAttributeIndexMap(),
+                  templateNode.getScanOrder(),
+                  templateNode.getTimePredicate().orElse(null),
+                  templateNode.getPushDownPredicate(),
+                  templateNode.getPushDownLimit(),
+                  templateNode.getPushDownOffset(),
+                  templateNode.isPushLimitToEachDevice(),
+                  templateNode.containsNonAlignedDevice(),
+                  templateNode.getProjection(),
+                  templateNode.getAggregations(),
+                  templateNode.getGroupingSets(),
+                  templateNode.getPreGroupedSymbols(),
+                  templateNode.getStep(),
+                  templateNode.getGroupIdSymbol(),
+                  node.getTreeDBName(),
+                  node.getMeasurementColumnNameMap());
+          scanNode.setRegionReplicaSet(regionReplicaSet);
+          pair.left = scanNode;
+        }
+
+        if (!aligned && pair.right == null) {
+          NonAlignedAggregationTreeDeviceViewScanNode scanNode =
+              new NonAlignedAggregationTreeDeviceViewScanNode(
+                  queryId.genPlanNodeId(),
+                  templateNode.getQualifiedObjectName(),
+                  templateNode.getOutputSymbols(),
+                  templateNode.getAssignments(),
+                  new ArrayList<>(),
+                  templateNode.getTagAndAttributeIndexMap(),
+                  templateNode.getScanOrder(),
+                  templateNode.getTimePredicate().orElse(null),
+                  templateNode.getPushDownPredicate(),
+                  templateNode.getPushDownLimit(),
+                  templateNode.getPushDownOffset(),
+                  templateNode.isPushLimitToEachDevice(),
+                  templateNode.containsNonAlignedDevice(),
+                  templateNode.getProjection(),
+                  templateNode.getAggregations(),
+                  templateNode.getGroupingSets(),
+                  templateNode.getPreGroupedSymbols(),
+                  templateNode.getStep(),
+                  templateNode.getGroupIdSymbol(),
+                  node.getTreeDBName(),
+                  node.getMeasurementColumnNameMap());
+          scanNode.setRegionReplicaSet(regionReplicaSet);
+          pair.right = scanNode;
+        }
+
+        if (aligned) {
+          pair.left.appendDeviceEntry(deviceEntry);
+        } else {
+          pair.right.appendDeviceEntry(deviceEntry);
+        }
+      }
+    }
+
+    if (tableScanNodeMap.isEmpty()) {
+      node.setRegionReplicaSet(NOT_ASSIGNED);
+      return Collections.singletonList(node);
+    }
+
+    List<PlanNode> resultTableScanNodeList = new ArrayList<>();
+    TRegionReplicaSet mostUsedDataRegion = null;
+    int maxDeviceEntrySizeOfTableScan = 0;
+    for (Map.Entry<
+            TRegionReplicaSet,
+            Pair<
+                AlignedAggregationTreeDeviceViewScanNode,
+                NonAlignedAggregationTreeDeviceViewScanNode>>
+        entry : 
topology.filterReachableCandidates(tableScanNodeMap.entrySet())) {
+      TRegionReplicaSet regionReplicaSet = entry.getKey();
+      Pair<AlignedAggregationTreeDeviceViewScanNode, 
NonAlignedAggregationTreeDeviceViewScanNode>
+          pair = entry.getValue();
+      int currentDeviceEntrySize = 0;
+
+      if (pair.left != null) {
+        currentDeviceEntrySize += pair.left.getDeviceEntries().size();
+        resultTableScanNodeList.add(pair.left);
+      }
+
+      if (pair.right != null) {
+        currentDeviceEntrySize += pair.right.getDeviceEntries().size();
+        resultTableScanNodeList.add(pair.right);
+      }
+
+      if (mostUsedDataRegion == null || currentDeviceEntrySize > 
maxDeviceEntrySizeOfTableScan) {
+        mostUsedDataRegion = regionReplicaSet;
+        maxDeviceEntrySizeOfTableScan = currentDeviceEntrySize;
+      }
+    }
+    if (mostUsedDataRegion == null) {
+      throw new RootFIPlacementException(tableScanNodeMap.keySet());
+    }
+    context.mostUsedRegion = mostUsedDataRegion;
+
+    if (context.hasSortProperty) {
+      processSortProperty(node, resultTableScanNodeList, context);
+    }
+
     if (needSplit) {
       if (resultTableScanNodeList.size() == 1) {
         finalAggregation.setChild(resultTableScanNodeList.get(0));
@@ -1293,6 +1403,83 @@ public class TableDistributedPlanGenerator
     return resultTableScanNodeList;
   }
 
+  private static class AggregationDistributionInfo {
+    private final List<List<TRegionReplicaSet>> regionReplicaSetsList;
+    private final AggregationTableScanNode templateNode;
+    private final AggregationNode finalAggregation;
+    private final boolean needSplit;
+
+    AggregationDistributionInfo(
+        List<List<TRegionReplicaSet>> regionReplicaSetsList,
+        AggregationTableScanNode templateNode,
+        AggregationNode finalAggregation,
+        boolean needSplit) {
+      this.regionReplicaSetsList = regionReplicaSetsList;
+      this.templateNode = templateNode;
+      this.finalAggregation = finalAggregation;
+      this.needSplit = needSplit;
+    }
+  }
+
+  private AggregationDistributionInfo prepareAggregationDistribution(
+      AggregationTableScanNode node,
+      String dbName,
+      DataPartition dataPartition,
+      PlanContext context) {
+    boolean needSplit = false;
+    List<List<TRegionReplicaSet>> regionReplicaSetsList = new ArrayList<>();
+
+    Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, 
List<TRegionReplicaSet>>> seriesSlotMap =
+        dataPartition.getDataPartitionMap().get(dbName);
+    if (seriesSlotMap == null) {
+      throw new SemanticException(
+          String.format("Given queried database: %s is not exist!", dbName));
+    }
+
+    Map<Integer, List<TRegionReplicaSet>> cachedSeriesSlotWithRegions = new 
HashMap<>();
+    for (DeviceEntry deviceEntry : node.getDeviceEntries()) {
+      List<TRegionReplicaSet> regionReplicaSets =
+          getDeviceReplicaSets(
+              dataPartition,
+              seriesSlotMap,
+              deviceEntry.getDeviceID(),
+              node.getTimeFilter(),
+              cachedSeriesSlotWithRegions);
+      if (regionReplicaSets.size() > 1) {
+        needSplit = true;
+        context.deviceCrossRegion = true;
+        queryContext.setNeedUpdateScanNumForLastQuery(node.mayUseLastCache());
+      }
+      regionReplicaSetsList.add(regionReplicaSets);
+    }
+
+    if (regionReplicaSetsList.isEmpty()) {
+      regionReplicaSetsList = 
Collections.singletonList(Collections.singletonList(NOT_ASSIGNED));
+    }
+
+    AggregationTableScanNode templateNode = node;
+    AggregationNode finalAggregation = null;
+    // Step is SINGLE and device data in more than one region, we need to 
final aggregate the result
+    // from different region here, so split this node into two-stage
+    needSplit = needSplit && node.getStep() == SINGLE;
+    if (needSplit) {
+      Pair<AggregationNode, AggregationTableScanNode> splitResult =
+          split(node, symbolAllocator, queryId);
+      finalAggregation = splitResult.left;
+      templateNode = splitResult.right;
+
+      // cover case: complete push-down + group by + streamable
+      if (!context.hasSortProperty && finalAggregation.isStreamable()) {
+        OrderingScheme expectedOrderingSchema =
+            constructOrderingSchema(node.getPreGroupedSymbols());
+        context.setExpectedOrderingScheme(expectedOrderingSchema);
+      }
+    }
+
+    return new AggregationDistributionInfo(
+        regionReplicaSetsList, templateNode, finalAggregation, needSplit);
+  }
+
   private List<TRegionReplicaSet> getDeviceReplicaSets(
       DataPartition dataPartition,
       Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, 
List<TRegionReplicaSet>>> seriesSlotMap,
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/AlignedAggregationTreeDeviceViewScanNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/AlignedAggregationTreeDeviceViewScanNode.java
new file mode 100644
index 00000000000..9c879717321
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/AlignedAggregationTreeDeviceViewScanNode.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.relational.planner.node;
+
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.queryengine.plan.relational.metadata.ColumnSchema;
+import org.apache.iotdb.db.queryengine.plan.relational.metadata.DeviceEntry;
+import 
org.apache.iotdb.db.queryengine.plan.relational.metadata.QualifiedObjectName;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.Assignments;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Expression;
+import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+public class AlignedAggregationTreeDeviceViewScanNode extends 
AggregationTreeDeviceViewScanNode {
+
+  public AlignedAggregationTreeDeviceViewScanNode(
+      PlanNodeId id,
+      QualifiedObjectName qualifiedObjectName,
+      List<Symbol> outputSymbols,
+      Map<Symbol, ColumnSchema> assignments,
+      List<DeviceEntry> deviceEntries,
+      Map<Symbol, Integer> tagAndAttributeIndexMap,
+      Ordering scanOrder,
+      Expression timePredicate,
+      Expression pushDownPredicate,
+      long pushDownLimit,
+      long pushDownOffset,
+      boolean pushLimitToEachDevice,
+      boolean containsNonAlignedDevice,
+      Assignments projection,
+      Map<Symbol, AggregationNode.Aggregation> aggregations,
+      AggregationNode.GroupingSetDescriptor groupingSets,
+      List<Symbol> preGroupedSymbols,
+      AggregationNode.Step step,
+      Optional<Symbol> groupIdSymbol,
+      String treeDBName,
+      Map<String, String> measurementColumnNameMap) {
+    super(
+        id,
+        qualifiedObjectName,
+        outputSymbols,
+        assignments,
+        deviceEntries,
+        tagAndAttributeIndexMap,
+        scanOrder,
+        timePredicate,
+        pushDownPredicate,
+        pushDownLimit,
+        pushDownOffset,
+        pushLimitToEachDevice,
+        containsNonAlignedDevice,
+        projection,
+        aggregations,
+        groupingSets,
+        preGroupedSymbols,
+        step,
+        groupIdSymbol,
+        treeDBName,
+        measurementColumnNameMap);
+  }
+
+  @Override
+  public AlignedAggregationTreeDeviceViewScanNode clone() {
+    return new AlignedAggregationTreeDeviceViewScanNode(
+        getPlanNodeId(),
+        qualifiedObjectName,
+        outputSymbols,
+        assignments,
+        deviceEntries,
+        tagAndAttributeIndexMap,
+        scanOrder,
+        timePredicate,
+        pushDownPredicate,
+        pushDownLimit,
+        pushDownOffset,
+        pushLimitToEachDevice,
+        containsNonAlignedDevice,
+        projection,
+        aggregations,
+        groupingSets,
+        preGroupedSymbols,
+        step,
+        groupIdSymbol,
+        getTreeDBName(),
+        getMeasurementColumnNameMap());
+  }
+
+  @Override
+  public String toString() {
+    return "AlignedAggregationTreeDeviceViewScanNode-" + this.getPlanNodeId();
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/NonAlignedAggregationTreeDeviceViewScanNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/NonAlignedAggregationTreeDeviceViewScanNode.java
new file mode 100644
index 00000000000..2649023b906
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/NonAlignedAggregationTreeDeviceViewScanNode.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.relational.planner.node;
+
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.queryengine.plan.relational.metadata.ColumnSchema;
+import org.apache.iotdb.db.queryengine.plan.relational.metadata.DeviceEntry;
+import 
org.apache.iotdb.db.queryengine.plan.relational.metadata.QualifiedObjectName;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.Assignments;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Expression;
+import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+public class NonAlignedAggregationTreeDeviceViewScanNode extends 
AggregationTreeDeviceViewScanNode {
+
+  public NonAlignedAggregationTreeDeviceViewScanNode(
+      PlanNodeId id,
+      QualifiedObjectName qualifiedObjectName,
+      List<Symbol> outputSymbols,
+      Map<Symbol, ColumnSchema> assignments,
+      List<DeviceEntry> deviceEntries,
+      Map<Symbol, Integer> tagAndAttributeIndexMap,
+      Ordering scanOrder,
+      Expression timePredicate,
+      Expression pushDownPredicate,
+      long pushDownLimit,
+      long pushDownOffset,
+      boolean pushLimitToEachDevice,
+      boolean containsNonAlignedDevice,
+      Assignments projection,
+      Map<Symbol, AggregationNode.Aggregation> aggregations,
+      AggregationNode.GroupingSetDescriptor groupingSets,
+      List<Symbol> preGroupedSymbols,
+      AggregationNode.Step step,
+      Optional<Symbol> groupIdSymbol,
+      String treeDBName,
+      Map<String, String> measurementColumnNameMap) {
+    super(
+        id,
+        qualifiedObjectName,
+        outputSymbols,
+        assignments,
+        deviceEntries,
+        tagAndAttributeIndexMap,
+        scanOrder,
+        timePredicate,
+        pushDownPredicate,
+        pushDownLimit,
+        pushDownOffset,
+        pushLimitToEachDevice,
+        containsNonAlignedDevice,
+        projection,
+        aggregations,
+        groupingSets,
+        preGroupedSymbols,
+        step,
+        groupIdSymbol,
+        treeDBName,
+        measurementColumnNameMap);
+  }
+
+  @Override
+  public NonAlignedAggregationTreeDeviceViewScanNode clone() {
+    return new NonAlignedAggregationTreeDeviceViewScanNode(
+        getPlanNodeId(),
+        qualifiedObjectName,
+        outputSymbols,
+        assignments,
+        deviceEntries,
+        tagAndAttributeIndexMap,
+        scanOrder,
+        timePredicate,
+        pushDownPredicate,
+        pushDownLimit,
+        pushDownOffset,
+        pushLimitToEachDevice,
+        containsNonAlignedDevice,
+        projection,
+        aggregations,
+        groupingSets,
+        preGroupedSymbols,
+        step,
+        groupIdSymbol,
+        getTreeDBName(),
+        getMeasurementColumnNameMap());
+  }
+
+  @Override
+  public String toString() {
+    return "NonAlignedAggregationTreeDeviceViewScanNode-" + 
this.getPlanNodeId();
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushAggregationIntoTableScan.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushAggregationIntoTableScan.java
index a47794fd329..3bcb6f1935b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushAggregationIntoTableScan.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushAggregationIntoTableScan.java
@@ -105,10 +105,6 @@ public class PushAggregationIntoTableScan implements 
PlanOptimizer {
         return node;
       }
 
-      if (tableScanNode.containsNonAlignedDevice()) {
-        return node;
-      }
-
       PushDownLevel pushDownLevel =
           calculatePushDownLevel(
               node.getAggregations().values(),
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/TreeViewTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/TreeViewTest.java
index 351bcb7f605..1dcd7d5487a 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/TreeViewTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/TreeViewTest.java
@@ -140,7 +140,7 @@ public class TreeViewTest {
   public void aggregationQueryTest() {
     PlanTester planTester = new PlanTester();
 
-    // has non-aligned DeviceEntry, no push-down
+    // has non-aligned DeviceEntry
     LogicalQueryPlan logicalQueryPlan =
         planTester.createPlan(
             "select tag1, count(s1) from "
@@ -149,14 +149,83 @@ public class TreeViewTest {
     PlanMatchPattern expectedPlanPattern =
         output(
             aggregation(
-                ImmutableMap.of("count", aggregationFunction("count", 
ImmutableList.of("s1"))),
-                treeDeviceViewTableScan(
+                ImmutableMap.of("count", aggregationFunction("count", 
ImmutableList.of("count_0"))),
+                aggregationTreeDeviceViewTableScan(
+                    singleGroupingSet("tag1"),
+                    ImmutableList.of("tag1"),
+                    Optional.empty(),
+                    PARTIAL,
                     DEFAULT_TREE_DEVICE_VIEW_TABLE_FULL_NAME,
-                    ImmutableList.of("tag1", "s1"),
+                    ImmutableList.of("tag1", "count_0"),
                     ImmutableSet.of("tag1", "s1"))));
     assertPlan(logicalQueryPlan, expectedPlanPattern);
 
-    // only aligned DeviceEntry, do push-down
+    assertPlan(
+        planTester.getFragmentPlan(0),
+        output(
+            aggregation(
+                ImmutableMap.of("count", aggregationFunction("count", 
ImmutableList.of("count_1"))),
+                FINAL,
+                mergeSort(exchange(), exchange(), exchange(), exchange()))));
+
+    assertPlan(
+        planTester.getFragmentPlan(1),
+        aggregation(
+            ImmutableMap.of("count", aggregationFunction("count", 
ImmutableList.of("count_0"))),
+            INTERMEDIATE,
+            aggregationTreeDeviceViewTableScan(
+                singleGroupingSet("tag1"),
+                ImmutableList.of("tag1"),
+                Optional.empty(),
+                PARTIAL,
+                DEFAULT_TREE_DEVICE_VIEW_TABLE_FULL_NAME,
+                ImmutableList.of("tag1", "count_0"),
+                ImmutableSet.of("tag1", "s1"),
+                true)));
+    assertPlan(
+        planTester.getFragmentPlan(2),
+        aggregation(
+            ImmutableMap.of("count", aggregationFunction("count", 
ImmutableList.of("count_0"))),
+            INTERMEDIATE,
+            aggregationTreeDeviceViewTableScan(
+                singleGroupingSet("tag1"),
+                ImmutableList.of("tag1"),
+                Optional.empty(),
+                PARTIAL,
+                DEFAULT_TREE_DEVICE_VIEW_TABLE_FULL_NAME,
+                ImmutableList.of("tag1", "count_0"),
+                ImmutableSet.of("tag1", "s1"),
+                false)));
+    assertPlan(
+        planTester.getFragmentPlan(3),
+        aggregation(
+            ImmutableMap.of("count", aggregationFunction("count", 
ImmutableList.of("count_0"))),
+            INTERMEDIATE,
+            aggregationTreeDeviceViewTableScan(
+                singleGroupingSet("tag1"),
+                ImmutableList.of("tag1"),
+                Optional.empty(),
+                PARTIAL,
+                DEFAULT_TREE_DEVICE_VIEW_TABLE_FULL_NAME,
+                ImmutableList.of("tag1", "count_0"),
+                ImmutableSet.of("tag1", "s1"),
+                true)));
+    assertPlan(
+        planTester.getFragmentPlan(4),
+        aggregation(
+            ImmutableMap.of("count", aggregationFunction("count", 
ImmutableList.of("count_0"))),
+            INTERMEDIATE,
+            aggregationTreeDeviceViewTableScan(
+                singleGroupingSet("tag1"),
+                ImmutableList.of("tag1"),
+                Optional.empty(),
+                PARTIAL,
+                DEFAULT_TREE_DEVICE_VIEW_TABLE_FULL_NAME,
+                ImmutableList.of("tag1", "count_0"),
+                ImmutableSet.of("tag1", "s1"),
+                false)));
+
+    // only aligned DeviceEntry
     logicalQueryPlan =
         planTester.createPlan(
             "select tag1, count(s1) from "
@@ -199,7 +268,8 @@ public class TreeViewTest {
                   PARTIAL,
                   DEFAULT_TREE_DEVICE_VIEW_TABLE_FULL_NAME,
                   ImmutableList.of("tag1", "count_0"),
-                  ImmutableSet.of("tag1", "s1"))));
+                  ImmutableSet.of("tag1", "s1"),
+                  true)));
     }
   }
 }
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/assertions/PlanMatchPattern.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/assertions/PlanMatchPattern.java
index 03f79fd2ec2..2ca57e5296a 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/assertions/PlanMatchPattern.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/assertions/PlanMatchPattern.java
@@ -29,6 +29,7 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.GroupRe
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationTableScanNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationTreeDeviceViewScanNode;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AlignedAggregationTreeDeviceViewScanNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AssignUniqueId;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.CollectNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.CteScanNode;
@@ -43,6 +44,7 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.JoinNode;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.node.LimitNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.MarkDistinctNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.MergeSortNode;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.NonAlignedAggregationTreeDeviceViewScanNode;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.node.OffsetNode;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.node.OutputNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.ProjectNode;
@@ -429,6 +431,38 @@ public final class PlanMatchPattern {
     return result;
   }
 
+  public static PlanMatchPattern aggregationTreeDeviceViewTableScan(
+      GroupingSetDescriptor groupingSets,
+      List<String> preGroupedSymbols,
+      Optional<Symbol> groupId,
+      AggregationNode.Step step,
+      String expectedTableName,
+      List<String> outputSymbols,
+      Set<String> assignmentsKeys,
+      boolean aligned) {
+    PlanMatchPattern result =
+        aligned
+            ? node(AlignedAggregationTreeDeviceViewScanNode.class)
+            : node(NonAlignedAggregationTreeDeviceViewScanNode.class);
+
+    result.with(
+        new AggregationDeviceTableScanMatcher(
+            groupingSets,
+            preGroupedSymbols,
+            ImmutableList.of(),
+            groupId,
+            step,
+            expectedTableName,
+            Optional.empty(),
+            outputSymbols,
+            assignmentsKeys));
+
+    outputSymbols.forEach(
+        outputSymbol ->
+            result.withAlias(outputSymbol, new 
ColumnReference(expectedTableName, outputSymbol)));
+    return result;
+  }
+
   // Attention: Now we only pass aliases according to outputSymbols, but we 
don't verify the output
   // column if exists in Table and their order because there maybe partial 
Agg-result.
   public static PlanMatchPattern aggregationTableScan(


Reply via email to