This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 50d01a78f56 Fix ColumnInjectionPushDown bug & add UTs
50d01a78f56 is described below

commit 50d01a78f566f9119875e51aef3f0d1fb8a03cd8
Author: liuminghui233 <[email protected]>
AuthorDate: Wed Dec 20 08:44:16 2023 +0800

    Fix ColumnInjectionPushDown bug & add UTs
---
 .../plan/optimization/ColumnInjectionPushDown.java |  57 +-
 .../plan/planner/plan/node/PlanVisitor.java        |   3 +-
 .../optimization/ColumnInjectionPushDownTest.java  | 657 +++++++++++++++++++++
 .../plan/optimization/LimitOffsetPushDownTest.java |  35 +-
 .../plan/optimization/OptimizationTestUtil.java    |  75 +++
 .../plan/optimization/TestPlanBuilder.java         | 149 ++++-
 6 files changed, 905 insertions(+), 71 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/optimization/ColumnInjectionPushDown.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/optimization/ColumnInjectionPushDown.java
index 40a869d18a1..b8cd9f43453 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/optimization/ColumnInjectionPushDown.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/optimization/ColumnInjectionPushDown.java
@@ -35,8 +35,6 @@ import 
org.apache.iotdb.db.queryengine.plan.statement.crud.QueryStatement;
 import java.util.ArrayList;
 import java.util.List;
 
-import static org.apache.iotdb.tsfile.utils.Preconditions.checkArgument;
-
 /**
  * <b>Optimization phase:</b> Distributed plan planning.
  *
@@ -67,34 +65,38 @@ public class ColumnInjectionPushDown implements 
PlanOptimizer {
       // SeriesAggregationNode,
       // If it is and has overlap in groupByParameter, there is 
SlidingWindowNode
       // There will be a ColumnInjectNode on them, so we need to check if it 
can be pushed down.
-      return plan.accept(new Rewriter(), new RewriterContext());
+      return plan.accept(new Rewriter(), null);
     }
     return plan;
   }
 
-  private static class Rewriter extends PlanVisitor<PlanNode, RewriterContext> 
{
+  private static class Rewriter extends PlanVisitor<PlanNode, Void> {
 
     @Override
-    public PlanNode visitPlan(PlanNode node, RewriterContext context) {
-      for (PlanNode child : node.getChildren()) {
-        context.setParent(node);
-        child.accept(this, context);
-      }
+    public PlanNode visitPlan(PlanNode node, Void context) {
+      // other source node, just return
       return node;
     }
 
     @Override
-    public PlanNode visitMultiChildProcess(MultiChildProcessNode node, 
RewriterContext context) {
-      List<PlanNode> children = new ArrayList<>();
+    public PlanNode visitSingleChildProcess(SingleChildProcessNode node, Void 
context) {
+      PlanNode rewrittenChild = node.getChild().accept(this, context);
+      node.setChild(rewrittenChild);
+      return node;
+    }
+
+    @Override
+    public PlanNode visitMultiChildProcess(MultiChildProcessNode node, Void 
context) {
+      List<PlanNode> rewrittenChildren = new ArrayList<>();
       for (PlanNode child : node.getChildren()) {
-        context.setParent(null);
-        children.add(child.accept(this, context));
+        rewrittenChildren.add(child.accept(this, context));
       }
-      return node.cloneWithChildren(children);
+      node.setChildren(rewrittenChildren);
+      return node;
     }
 
     @Override
-    public PlanNode visitColumnInject(ColumnInjectNode node, RewriterContext 
context) {
+    public PlanNode visitColumnInject(ColumnInjectNode node, Void context) {
       PlanNode child = node.getChild();
 
       boolean columnInjectPushDown = true;
@@ -109,32 +111,9 @@ public class ColumnInjectionPushDown implements 
PlanOptimizer {
       }
 
       if (columnInjectPushDown) {
-        return concatParentWithChild(context.getParent(), child);
-      }
-      return node;
-    }
-
-    private PlanNode concatParentWithChild(PlanNode parent, PlanNode child) {
-      if (parent == null) {
         return child;
       }
-
-      checkArgument(parent instanceof SingleChildProcessNode);
-      ((SingleChildProcessNode) parent).setChild(child);
-      return parent;
-    }
-  }
-
-  private static class RewriterContext {
-
-    private PlanNode parent;
-
-    public PlanNode getParent() {
-      return parent;
-    }
-
-    public void setParent(PlanNode parent) {
-      this.parent = parent;
+      return node;
     }
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java
index 10bde9ccf6c..e8f6b149f0b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java
@@ -103,6 +103,7 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNo
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsOfOneDeviceNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
 
+@SuppressWarnings("java:S6539") // suppress "Monster class" warning
 public abstract class PlanVisitor<R, C> {
 
   public R process(PlanNode node, C context) {
@@ -200,7 +201,7 @@ public abstract class PlanVisitor<R, C> {
   }
 
   public R visitColumnInject(ColumnInjectNode node, C context) {
-    return visitPlan(node, context);
+    return visitSingleChildProcess(node, context);
   }
 
   public R visitSingleDeviceView(SingleDeviceViewNode node, C context) {
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/optimization/ColumnInjectionPushDownTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/optimization/ColumnInjectionPushDownTest.java
new file mode 100644
index 00000000000..f5dd0507b27
--- /dev/null
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/optimization/ColumnInjectionPushDownTest.java
@@ -0,0 +1,657 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.optimization;
+
+import org.apache.iotdb.common.rpc.thrift.TAggregationType;
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.path.AlignedPath;
+import org.apache.iotdb.commons.path.MeasurementPath;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.queryengine.plan.expression.ExpressionFactory;
+import org.apache.iotdb.db.queryengine.plan.expression.leaf.TimeSeriesOperand;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.AggregationDescriptor;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.AggregationStep;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.GroupByTimeParameter;
+import org.apache.iotdb.db.queryengine.plan.statement.component.FillPolicy;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.utils.TimeDuration;
+
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+public class ColumnInjectionPushDownTest {
+
+  private static final Map<String, PartialPath> schemaMap = new HashMap<>();
+
+  static {
+    try {
+      schemaMap.put("root.sg.d1.s1", new MeasurementPath("root.sg.d1.s1", 
TSDataType.INT32));
+      schemaMap.put("root.sg.d1.s2", new MeasurementPath("root.sg.d1.s2", 
TSDataType.DOUBLE));
+
+      MeasurementPath d2s1 = new MeasurementPath("root.sg.d2.a.s1", 
TSDataType.INT32);
+      d2s1.setUnderAlignedEntity(true);
+      schemaMap.put("root.sg.d2.a.s1", d2s1);
+
+      AlignedPath aligned_d2s1 =
+          new AlignedPath(
+              "root.sg.d2.a",
+              Collections.singletonList("s1"),
+              Collections.singletonList(d2s1.getMeasurementSchema()));
+      schemaMap.put("aligned_root.sg.d2.a.s1", aligned_d2s1);
+
+      MeasurementPath d2s2 = new MeasurementPath("root.sg.d2.a.s2", 
TSDataType.DOUBLE);
+      d2s2.setUnderAlignedEntity(true);
+      schemaMap.put("root.sg.d2.a.s2", d2s2);
+
+      AlignedPath aligned_d2s2 =
+          new AlignedPath(
+              "root.sg.d2.a",
+              Collections.singletonList("s2"),
+              Collections.singletonList(d2s2.getMeasurementSchema()));
+      schemaMap.put("aligned_root.sg.d2.a.s2", aligned_d2s2);
+
+      AlignedPath alignedPath =
+          new AlignedPath(
+              "root.sg.d2.a",
+              Arrays.asList("s2", "s1"),
+              Arrays.asList(d2s2.getMeasurementSchema(), 
d2s1.getMeasurementSchema()));
+      schemaMap.put("root.sg.d2.a", alignedPath);
+    } catch (IllegalPathException e) {
+      e.printStackTrace();
+    }
+  }
+
+  private void checkPushDown(String sql, PlanNode rawPlan, PlanNode optPlan) {
+    OptimizationTestUtil.checkPushDown(new ColumnInjectionPushDown(), sql, 
rawPlan, optPlan);
+  }
+
+  private void checkCannotPushDown(String sql, PlanNode rawPlan) {
+    OptimizationTestUtil.checkCannotPushDown(new ColumnInjectionPushDown(), 
sql, rawPlan);
+  }
+
+  @Test
+  public void testPushDownAggregationSourceAlignByTime() {
+    GroupByTimeParameter groupByTimeParameter =
+        new GroupByTimeParameter(0, 100, new TimeDuration(0, 10), new 
TimeDuration(0, 10), true);
+    List<AggregationDescriptor> aggregationDescriptorList =
+        Collections.singletonList(
+            getAggregationDescriptor(AggregationStep.SINGLE, "root.sg.d1.s1"));
+
+    checkPushDown(
+        "select __endTime, count(s1) from root.sg.d1 group by ([0, 100), 
10ms);",
+        new TestPlanBuilder()
+            .aggregationScan(
+                "0",
+                schemaMap.get("root.sg.d1.s1"),
+                aggregationDescriptorList,
+                groupByTimeParameter,
+                false)
+            .columnInject("1", groupByTimeParameter)
+            .getRoot(),
+        new TestPlanBuilder()
+            .aggregationScan(
+                "0",
+                schemaMap.get("root.sg.d1.s1"),
+                aggregationDescriptorList,
+                groupByTimeParameter,
+                true)
+            .getRoot());
+    checkPushDown(
+        "select __endTime, count(s1) from root.sg.d1 group by ([0, 100), 10ms) 
fill(previous);",
+        new TestPlanBuilder()
+            .aggregationScan(
+                "0",
+                schemaMap.get("root.sg.d1.s1"),
+                aggregationDescriptorList,
+                groupByTimeParameter,
+                false)
+            .columnInject("1", groupByTimeParameter)
+            .fill("2", FillPolicy.PREVIOUS)
+            .getRoot(),
+        new TestPlanBuilder()
+            .aggregationScan(
+                "0",
+                schemaMap.get("root.sg.d1.s1"),
+                aggregationDescriptorList,
+                groupByTimeParameter,
+                true)
+            .fill("2", FillPolicy.PREVIOUS)
+            .getRoot());
+  }
+
+  @Test
+  public void testPushDownAlignedAggregationSourceAlignByTime() {
+    GroupByTimeParameter groupByTimeParameter =
+        new GroupByTimeParameter(0, 100, new TimeDuration(0, 10), new 
TimeDuration(0, 10), true);
+    List<AggregationDescriptor> aggregationDescriptorList =
+        Collections.singletonList(
+            getAggregationDescriptor(AggregationStep.SINGLE, 
"root.sg.d2.a.s1"));
+
+    checkPushDown(
+        "select __endTime, count(s1) from root.sg.d2.a group by ([0, 100), 
10ms);",
+        new TestPlanBuilder()
+            .alignedAggregationScan(
+                "0",
+                schemaMap.get("aligned_root.sg.d2.a.s1"),
+                aggregationDescriptorList,
+                groupByTimeParameter,
+                false)
+            .columnInject("1", groupByTimeParameter)
+            .getRoot(),
+        new TestPlanBuilder()
+            .alignedAggregationScan(
+                "0",
+                schemaMap.get("aligned_root.sg.d2.a.s1"),
+                aggregationDescriptorList,
+                groupByTimeParameter,
+                true)
+            .getRoot());
+    checkPushDown(
+        "select __endTime, count(s1) from root.sg.d2.a group by ([0, 100), 
10ms) fill(previous);",
+        new TestPlanBuilder()
+            .alignedAggregationScan(
+                "0",
+                schemaMap.get("aligned_root.sg.d2.a.s1"),
+                aggregationDescriptorList,
+                groupByTimeParameter,
+                false)
+            .columnInject("1", groupByTimeParameter)
+            .fill("2", FillPolicy.PREVIOUS)
+            .getRoot(),
+        new TestPlanBuilder()
+            .alignedAggregationScan(
+                "0",
+                schemaMap.get("aligned_root.sg.d2.a.s1"),
+                aggregationDescriptorList,
+                groupByTimeParameter,
+                true)
+            .fill("2", FillPolicy.PREVIOUS)
+            .getRoot());
+  }
+
+  @Test
+  public void testPushDownSlidingWindowAlignByTime() {
+    GroupByTimeParameter groupByTimeParameter =
+        new GroupByTimeParameter(0, 100, new TimeDuration(0, 10), new 
TimeDuration(0, 5), true);
+    List<AggregationDescriptor> aggregationDescriptorList1 =
+        Collections.singletonList(
+            getAggregationDescriptor(AggregationStep.PARTIAL, 
"root.sg.d1.s1"));
+    List<AggregationDescriptor> aggregationDescriptorList2 =
+        
Collections.singletonList(getAggregationDescriptor(AggregationStep.FINAL, 
"root.sg.d1.s1"));
+    checkPushDown(
+        "select __endTime, count(s1) from root.sg.d1 group by ([0, 100), 10ms, 
5ms);",
+        new TestPlanBuilder()
+            .aggregationScan(
+                "0",
+                schemaMap.get("root.sg.d1.s1"),
+                aggregationDescriptorList1,
+                groupByTimeParameter,
+                false)
+            .slidingWindow("1", aggregationDescriptorList2, 
groupByTimeParameter, false)
+            .columnInject("2", groupByTimeParameter)
+            .getRoot(),
+        new TestPlanBuilder()
+            .aggregationScan(
+                "0",
+                schemaMap.get("root.sg.d1.s1"),
+                aggregationDescriptorList1,
+                groupByTimeParameter,
+                false)
+            .slidingWindow("1", aggregationDescriptorList2, 
groupByTimeParameter, true)
+            .getRoot());
+    checkPushDown(
+        "select __endTime, count(s1) from root.sg.d1 group by ([0, 100), 10ms, 
5ms) fill(previous);",
+        new TestPlanBuilder()
+            .aggregationScan(
+                "0",
+                schemaMap.get("root.sg.d1.s1"),
+                aggregationDescriptorList1,
+                groupByTimeParameter,
+                false)
+            .slidingWindow("1", aggregationDescriptorList2, 
groupByTimeParameter, false)
+            .columnInject("2", groupByTimeParameter)
+            .fill("3", FillPolicy.PREVIOUS)
+            .getRoot(),
+        new TestPlanBuilder()
+            .aggregationScan(
+                "0",
+                schemaMap.get("root.sg.d1.s1"),
+                aggregationDescriptorList1,
+                groupByTimeParameter,
+                false)
+            .slidingWindow("1", aggregationDescriptorList2, 
groupByTimeParameter, true)
+            .fill("3", FillPolicy.PREVIOUS)
+            .getRoot());
+  }
+
+  @Test
+  public void testPushDownRawDataAggregationAlignByTime() {
+    GroupByTimeParameter groupByTimeParameter =
+        new GroupByTimeParameter(0, 100, new TimeDuration(0, 10), new 
TimeDuration(0, 10), true);
+    List<AggregationDescriptor> aggregationDescriptorList =
+        Collections.singletonList(
+            getAggregationDescriptor(AggregationStep.SINGLE, "root.sg.d1.s1"));
+
+    checkPushDown(
+        "select __endTime, count(s1) from root.sg.d1 where s1 > 10 group by 
([0, 100), 10ms);",
+        new TestPlanBuilder()
+            .scan("0", schemaMap.get("root.sg.d1.s1"))
+            .filter(
+                "1",
+                Collections.singletonList(
+                    
ExpressionFactory.timeSeries(schemaMap.get("root.sg.d1.s1"))),
+                ExpressionFactory.gt(
+                    
ExpressionFactory.timeSeries(schemaMap.get("root.sg.d1.s1")),
+                    ExpressionFactory.intValue("10")),
+                true)
+            .rawDataAggregation("2", aggregationDescriptorList, 
groupByTimeParameter, true)
+            .columnInject("3", groupByTimeParameter)
+            .getRoot(),
+        new TestPlanBuilder()
+            .scan("0", schemaMap.get("root.sg.d1.s1"))
+            .filter(
+                "1",
+                Collections.singletonList(
+                    
ExpressionFactory.timeSeries(schemaMap.get("root.sg.d1.s1"))),
+                ExpressionFactory.gt(
+                    
ExpressionFactory.timeSeries(schemaMap.get("root.sg.d1.s1")),
+                    ExpressionFactory.intValue("10")),
+                true)
+            .rawDataAggregation("2", aggregationDescriptorList, 
groupByTimeParameter, true)
+            .getRoot());
+  }
+
+  @Test
+  public void testCannotPushDownTimeJoinAlignByTime() {
+    GroupByTimeParameter groupByTimeParameter =
+        new GroupByTimeParameter(0, 100, new TimeDuration(0, 10), new 
TimeDuration(0, 10), true);
+    List<AggregationDescriptor> aggregationDescriptorList1 =
+        Collections.singletonList(
+            getAggregationDescriptor(AggregationStep.SINGLE, "root.sg.d1.s1"));
+    List<AggregationDescriptor> aggregationDescriptorList2 =
+        Collections.singletonList(
+            getAggregationDescriptor(AggregationStep.SINGLE, "root.sg.d1.s2"));
+
+    checkCannotPushDown(
+        "select __endTime, count(s1), count(s2) from root.sg.d1 group by ([0, 
100), 10ms);",
+        new TestPlanBuilder()
+            .aggregationTimeJoin(
+                0,
+                Arrays.asList(schemaMap.get("root.sg.d1.s2"), 
schemaMap.get("root.sg.d1.s1")),
+                Arrays.asList(aggregationDescriptorList2, 
aggregationDescriptorList1),
+                groupByTimeParameter)
+            .columnInject("3", groupByTimeParameter)
+            .getRoot());
+    checkCannotPushDown(
+        "select __endTime, count(s1), count(s2) from root.sg.d1 group by ([0, 
100), 10ms) fill(previous);",
+        new TestPlanBuilder()
+            .aggregationTimeJoin(
+                0,
+                Arrays.asList(schemaMap.get("root.sg.d1.s2"), 
schemaMap.get("root.sg.d1.s1")),
+                Arrays.asList(aggregationDescriptorList2, 
aggregationDescriptorList1),
+                groupByTimeParameter)
+            .columnInject("3", groupByTimeParameter)
+            .fill("4", FillPolicy.PREVIOUS)
+            .getRoot());
+  }
+
+  @Test
+  public void testPushDownAggregationSourceAlignByDevice() {
+    GroupByTimeParameter groupByTimeParameter =
+        new GroupByTimeParameter(0, 100, new TimeDuration(0, 10), new 
TimeDuration(0, 10), true);
+    List<AggregationDescriptor> aggregationDescriptorList1 =
+        Collections.singletonList(
+            getAggregationDescriptor(AggregationStep.SINGLE, "root.sg.d1.s1"));
+    List<AggregationDescriptor> aggregationDescriptorList2 =
+        Collections.singletonList(
+            getAggregationDescriptor(AggregationStep.SINGLE, 
"root.sg.d2.a.s1"));
+
+    List<String> outputColumnNames = Arrays.asList("Device", "__endTime", 
"count(s1)");
+    List<String> devices = Arrays.asList("root.sg.d1", "root.sg.d2.a");
+    Map<String, List<Integer>> deviceToMeasurementIndexesMap = new HashMap<>();
+    deviceToMeasurementIndexesMap.put("root.sg.d1", Arrays.asList(1, 2));
+    deviceToMeasurementIndexesMap.put("root.sg.d2.a", Arrays.asList(1, 2));
+
+    checkPushDown(
+        "select __endTime, count(s1) from root.sg.d1, root.sg.d2.a group by 
([0, 100), 10ms) align by device;",
+        new TestPlanBuilder()
+            .deviceView(
+                "4",
+                outputColumnNames,
+                devices,
+                deviceToMeasurementIndexesMap,
+                Arrays.asList(
+                    new TestPlanBuilder()
+                        .aggregationScan(
+                            "0",
+                            schemaMap.get("root.sg.d1.s1"),
+                            aggregationDescriptorList1,
+                            groupByTimeParameter,
+                            false)
+                        .columnInject("1", groupByTimeParameter)
+                        .getRoot(),
+                    new TestPlanBuilder()
+                        .alignedAggregationScan(
+                            "2",
+                            schemaMap.get("aligned_root.sg.d2.a.s1"),
+                            aggregationDescriptorList2,
+                            groupByTimeParameter,
+                            false)
+                        .columnInject("3", groupByTimeParameter)
+                        .getRoot()))
+            .getRoot(),
+        new TestPlanBuilder()
+            .deviceView(
+                "4",
+                outputColumnNames,
+                devices,
+                deviceToMeasurementIndexesMap,
+                Arrays.asList(
+                    new TestPlanBuilder()
+                        .aggregationScan(
+                            "0",
+                            schemaMap.get("root.sg.d1.s1"),
+                            aggregationDescriptorList1,
+                            groupByTimeParameter,
+                            true)
+                        .getRoot(),
+                    new TestPlanBuilder()
+                        .alignedAggregationScan(
+                            "2",
+                            schemaMap.get("aligned_root.sg.d2.a.s1"),
+                            aggregationDescriptorList2,
+                            groupByTimeParameter,
+                            true)
+                        .getRoot()))
+            .getRoot());
+  }
+
+  @Test
+  public void testPushDownSlidingWindowAlignByDevice() {
+    GroupByTimeParameter groupByTimeParameter =
+        new GroupByTimeParameter(0, 100, new TimeDuration(0, 10), new 
TimeDuration(0, 5), true);
+
+    List<AggregationDescriptor> aggregationDescriptorList1_1 =
+        Collections.singletonList(
+            getAggregationDescriptor(AggregationStep.PARTIAL, 
"root.sg.d1.s1"));
+    List<AggregationDescriptor> aggregationDescriptorList1_2 =
+        Collections.singletonList(
+            getAggregationDescriptor(AggregationStep.PARTIAL, 
"root.sg.d2.a.s1"));
+
+    List<AggregationDescriptor> aggregationDescriptorList2_1 =
+        
Collections.singletonList(getAggregationDescriptor(AggregationStep.FINAL, 
"root.sg.d1.s1"));
+    List<AggregationDescriptor> aggregationDescriptorList2_2 =
+        Collections.singletonList(
+            getAggregationDescriptor(AggregationStep.FINAL, 
"root.sg.d2.a.s1"));
+
+    List<String> outputColumnNames = Arrays.asList("Device", "__endTime", 
"count(s1)");
+    List<String> devices = Arrays.asList("root.sg.d1", "root.sg.d2.a");
+    Map<String, List<Integer>> deviceToMeasurementIndexesMap = new HashMap<>();
+    deviceToMeasurementIndexesMap.put("root.sg.d1", Arrays.asList(1, 2));
+    deviceToMeasurementIndexesMap.put("root.sg.d2.a", Arrays.asList(1, 2));
+
+    checkPushDown(
+        "select __endTime, count(s1) from root.sg.d1, root.sg.d2.a group by 
([0, 100), 10ms, 5ms) align by device;",
+        new TestPlanBuilder()
+            .deviceView(
+                "6",
+                outputColumnNames,
+                devices,
+                deviceToMeasurementIndexesMap,
+                Arrays.asList(
+                    new TestPlanBuilder()
+                        .aggregationScan(
+                            "0",
+                            schemaMap.get("root.sg.d1.s1"),
+                            aggregationDescriptorList1_1,
+                            groupByTimeParameter,
+                            false)
+                        .slidingWindow(
+                            "1", aggregationDescriptorList2_1, 
groupByTimeParameter, false)
+                        .columnInject("2", groupByTimeParameter)
+                        .getRoot(),
+                    new TestPlanBuilder()
+                        .alignedAggregationScan(
+                            "3",
+                            schemaMap.get("aligned_root.sg.d2.a.s1"),
+                            aggregationDescriptorList1_2,
+                            groupByTimeParameter,
+                            false)
+                        .slidingWindow(
+                            "4", aggregationDescriptorList2_2, 
groupByTimeParameter, false)
+                        .columnInject("5", groupByTimeParameter)
+                        .getRoot()))
+            .getRoot(),
+        new TestPlanBuilder()
+            .deviceView(
+                "6",
+                outputColumnNames,
+                devices,
+                deviceToMeasurementIndexesMap,
+                Arrays.asList(
+                    new TestPlanBuilder()
+                        .aggregationScan(
+                            "0",
+                            schemaMap.get("root.sg.d1.s1"),
+                            aggregationDescriptorList1_1,
+                            groupByTimeParameter,
+                            false)
+                        .slidingWindow(
+                            "1", aggregationDescriptorList2_1, 
groupByTimeParameter, true)
+                        .getRoot(),
+                    new TestPlanBuilder()
+                        .alignedAggregationScan(
+                            "3",
+                            schemaMap.get("aligned_root.sg.d2.a.s1"),
+                            aggregationDescriptorList1_2,
+                            groupByTimeParameter,
+                            false)
+                        .slidingWindow(
+                            "4", aggregationDescriptorList2_2, 
groupByTimeParameter, true)
+                        .getRoot()))
+            .getRoot());
+  }
+
+  @Test
+  public void testPushDownRawDataAggregationAlignByDevice() {
+    GroupByTimeParameter groupByTimeParameter =
+        new GroupByTimeParameter(0, 100, new TimeDuration(0, 10), new 
TimeDuration(0, 10), true);
+
+    List<AggregationDescriptor> aggregationDescriptorList1 =
+        Collections.singletonList(
+            getAggregationDescriptor(AggregationStep.SINGLE, "root.sg.d1.s1"));
+    List<AggregationDescriptor> aggregationDescriptorList2 =
+        Collections.singletonList(
+            getAggregationDescriptor(AggregationStep.SINGLE, 
"root.sg.d2.a.s1"));
+
+    List<String> outputColumnNames = Arrays.asList("Device", "__endTime", 
"count(s1)");
+    List<String> devices = Arrays.asList("root.sg.d1", "root.sg.d2.a");
+    Map<String, List<Integer>> deviceToMeasurementIndexesMap = new HashMap<>();
+    deviceToMeasurementIndexesMap.put("root.sg.d1", Arrays.asList(1, 2));
+    deviceToMeasurementIndexesMap.put("root.sg.d2.a", Arrays.asList(1, 2));
+
+    checkPushDown(
+        "select __endTime, count(s1) from root.sg.d1, root.sg.d2.a where s1 > 
10 group by ([0, 100), 10ms) align by device;",
+        new TestPlanBuilder()
+            .deviceView(
+                "8",
+                outputColumnNames,
+                devices,
+                deviceToMeasurementIndexesMap,
+                Arrays.asList(
+                    new TestPlanBuilder()
+                        .scan("0", schemaMap.get("root.sg.d1.s1"))
+                        .filter(
+                            "1",
+                            Collections.singletonList(
+                                
ExpressionFactory.timeSeries(schemaMap.get("root.sg.d1.s1"))),
+                            ExpressionFactory.gt(
+                                
ExpressionFactory.timeSeries(schemaMap.get("root.sg.d1.s1")),
+                                ExpressionFactory.intValue("10")),
+                            true)
+                        .rawDataAggregation(
+                            "2", aggregationDescriptorList1, 
groupByTimeParameter, true)
+                        .columnInject("3", groupByTimeParameter)
+                        .getRoot(),
+                    new TestPlanBuilder()
+                        .scanAligned("4", 
schemaMap.get("aligned_root.sg.d2.a.s1"))
+                        .filter(
+                            "5",
+                            Collections.singletonList(
+                                
ExpressionFactory.timeSeries(schemaMap.get("root.sg.d2.a.s1"))),
+                            ExpressionFactory.gt(
+                                
ExpressionFactory.timeSeries(schemaMap.get("root.sg.d2.a.s1")),
+                                ExpressionFactory.intValue("10")),
+                            true)
+                        .rawDataAggregation(
+                            "6", aggregationDescriptorList2, 
groupByTimeParameter, true)
+                        .columnInject("7", groupByTimeParameter)
+                        .getRoot()))
+            .getRoot(),
+        new TestPlanBuilder()
+            .deviceView(
+                "8",
+                outputColumnNames,
+                devices,
+                deviceToMeasurementIndexesMap,
+                Arrays.asList(
+                    new TestPlanBuilder()
+                        .scan("0", schemaMap.get("root.sg.d1.s1"))
+                        .filter(
+                            "1",
+                            Collections.singletonList(
+                                
ExpressionFactory.timeSeries(schemaMap.get("root.sg.d1.s1"))),
+                            ExpressionFactory.gt(
+                                
ExpressionFactory.timeSeries(schemaMap.get("root.sg.d1.s1")),
+                                ExpressionFactory.intValue("10")),
+                            true)
+                        .rawDataAggregation(
+                            "2", aggregationDescriptorList1, 
groupByTimeParameter, true)
+                        .getRoot(),
+                    new TestPlanBuilder()
+                        .scanAligned("4", 
schemaMap.get("aligned_root.sg.d2.a.s1"))
+                        .filter(
+                            "5",
+                            Collections.singletonList(
+                                
ExpressionFactory.timeSeries(schemaMap.get("root.sg.d2.a.s1"))),
+                            ExpressionFactory.gt(
+                                
ExpressionFactory.timeSeries(schemaMap.get("root.sg.d2.a.s1")),
+                                ExpressionFactory.intValue("10")),
+                            true)
+                        .rawDataAggregation(
+                            "6", aggregationDescriptorList2, 
groupByTimeParameter, true)
+                        .getRoot()))
+            .getRoot());
+  }
+
+  @Test
+  public void testPartialPushDownTimeJoinAlignByDevice() {
+    GroupByTimeParameter groupByTimeParameter =
+        new GroupByTimeParameter(0, 100, new TimeDuration(0, 10), new 
TimeDuration(0, 10), true);
+    List<AggregationDescriptor> aggregationDescriptorList1_1 =
+        Collections.singletonList(
+            getAggregationDescriptor(AggregationStep.SINGLE, "root.sg.d1.s1"));
+    List<AggregationDescriptor> aggregationDescriptorList1_2 =
+        Collections.singletonList(
+            getAggregationDescriptor(AggregationStep.SINGLE, "root.sg.d1.s2"));
+
+    List<AggregationDescriptor> aggregationDescriptorList2 =
+        Arrays.asList(
+            getAggregationDescriptor(AggregationStep.SINGLE, 
"root.sg.d2.a.s2"),
+            getAggregationDescriptor(AggregationStep.SINGLE, 
"root.sg.d2.a.s1"));
+
+    List<String> outputColumnNames = Arrays.asList("Device", "__endTime", 
"count(s1)", "count(s2)");
+    List<String> devices = Arrays.asList("root.sg.d1", "root.sg.d2.a");
+    Map<String, List<Integer>> deviceToMeasurementIndexesMap = new 
LinkedHashMap<>();
+    deviceToMeasurementIndexesMap.put("root.sg.d1", Arrays.asList(1, 3, 2));
+    deviceToMeasurementIndexesMap.put("root.sg.d2.a", Arrays.asList(1, 3, 2));
+
+    checkPushDown(
+        "select __endTime, count(s1), count(s2) from root.sg.d1, root.sg.d2.a 
group by ([0, 100), 10ms) align by device;",
+        new TestPlanBuilder()
+            .deviceView(
+                "6",
+                outputColumnNames,
+                devices,
+                deviceToMeasurementIndexesMap,
+                Arrays.asList(
+                    new TestPlanBuilder()
+                        .aggregationTimeJoin(
+                            0,
+                            Arrays.asList(
+                                schemaMap.get("root.sg.d1.s2"), 
schemaMap.get("root.sg.d1.s1")),
+                            Arrays.asList(
+                                aggregationDescriptorList1_2, 
aggregationDescriptorList1_1),
+                            groupByTimeParameter)
+                        .columnInject("3", groupByTimeParameter)
+                        .getRoot(),
+                    new TestPlanBuilder()
+                        .alignedAggregationScan(
+                            "4",
+                            schemaMap.get("root.sg.d2.a"),
+                            aggregationDescriptorList2,
+                            groupByTimeParameter,
+                            false)
+                        .columnInject("5", groupByTimeParameter)
+                        .getRoot()))
+            .getRoot(),
+        new TestPlanBuilder()
+            .deviceView(
+                "6",
+                outputColumnNames,
+                devices,
+                deviceToMeasurementIndexesMap,
+                Arrays.asList(
+                    new TestPlanBuilder()
+                        .aggregationTimeJoin(
+                            0,
+                            Arrays.asList(
+                                schemaMap.get("root.sg.d1.s2"), 
schemaMap.get("root.sg.d1.s1")),
+                            Arrays.asList(
+                                aggregationDescriptorList1_2, 
aggregationDescriptorList1_1),
+                            groupByTimeParameter)
+                        .columnInject("3", groupByTimeParameter)
+                        .getRoot(),
+                    new TestPlanBuilder()
+                        .alignedAggregationScan(
+                            "4",
+                            schemaMap.get("root.sg.d2.a"),
+                            aggregationDescriptorList2,
+                            groupByTimeParameter,
+                            true)
+                        .getRoot()))
+            .getRoot());
+  }
+
+  private AggregationDescriptor getAggregationDescriptor(AggregationStep step, 
String path) {
+    return new AggregationDescriptor(
+        TAggregationType.COUNT.name().toLowerCase(),
+        step,
+        Collections.singletonList(new TimeSeriesOperand(schemaMap.get(path))),
+        new HashMap<>());
+  }
+}
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/optimization/LimitOffsetPushDownTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/optimization/LimitOffsetPushDownTest.java
index 717cf9116db..7f086ad0be3 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/optimization/LimitOffsetPushDownTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/optimization/LimitOffsetPushDownTest.java
@@ -31,10 +31,8 @@ import 
org.apache.iotdb.db.queryengine.plan.analyze.FakePartitionFetcherImpl;
 import org.apache.iotdb.db.queryengine.plan.analyze.FakeSchemaFetcherImpl;
 import org.apache.iotdb.db.queryengine.plan.expression.Expression;
 import org.apache.iotdb.db.queryengine.plan.parser.StatementGenerator;
-import org.apache.iotdb.db.queryengine.plan.planner.LogicalPlanner;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.GroupByTimeParameter;
-import org.apache.iotdb.db.queryengine.plan.statement.Statement;
 import org.apache.iotdb.db.queryengine.plan.statement.component.FillPolicy;
 import 
org.apache.iotdb.db.queryengine.plan.statement.component.GroupByTimeComponent;
 import org.apache.iotdb.db.queryengine.plan.statement.crud.QueryStatement;
@@ -57,6 +55,8 @@ import static 
org.apache.iotdb.db.queryengine.plan.expression.ExpressionFactory.
 import static 
org.apache.iotdb.db.queryengine.plan.expression.ExpressionFactory.gt;
 import static 
org.apache.iotdb.db.queryengine.plan.expression.ExpressionFactory.intValue;
 import static 
org.apache.iotdb.db.queryengine.plan.expression.ExpressionFactory.timeSeries;
+import static 
org.apache.iotdb.db.queryengine.plan.optimization.OptimizationTestUtil.checkCannotPushDown;
+import static 
org.apache.iotdb.db.queryengine.plan.optimization.OptimizationTestUtil.checkPushDown;
 
 public class LimitOffsetPushDownTest {
 
@@ -262,42 +262,19 @@ public class LimitOffsetPushDownTest {
             .filter(
                 "1",
                 
Collections.singletonList(timeSeries(schemaMap.get("root.sg.d1.s1"))),
-                gt(timeSeries(schemaMap.get("root.sg.d1.s1")), intValue("10")))
+                gt(timeSeries(schemaMap.get("root.sg.d1.s1")), intValue("10")),
+                false)
             .offset("2", 100)
             .limit("3", 100)
             .getRoot());
   }
 
   private void checkPushDown(String sql, PlanNode rawPlan, PlanNode optPlan) {
-    Statement statement = StatementGenerator.createStatement(sql, 
ZonedDateTime.now().getOffset());
-
-    MPPQueryContext context = new MPPQueryContext(new QueryId("test_query"));
-    Analyzer analyzer =
-        new Analyzer(context, new FakePartitionFetcherImpl(), new 
FakeSchemaFetcherImpl());
-    Analysis analysis = analyzer.analyze(statement);
-
-    LogicalPlanner planner = new LogicalPlanner(context, new ArrayList<>());
-    PlanNode actualPlan = planner.plan(analysis).getRootNode();
-    Assert.assertEquals(rawPlan, actualPlan);
-
-    PlanNode actualOptPlan = new LimitOffsetPushDown().optimize(actualPlan, 
analysis, context);
-    Assert.assertEquals(optPlan, actualOptPlan);
+    OptimizationTestUtil.checkPushDown(new LimitOffsetPushDown(), sql, 
rawPlan, optPlan);
   }
 
   private void checkCannotPushDown(String sql, PlanNode rawPlan) {
-    Statement statement = StatementGenerator.createStatement(sql, 
ZonedDateTime.now().getOffset());
-
-    MPPQueryContext context = new MPPQueryContext(new QueryId("test_query"));
-    Analyzer analyzer =
-        new Analyzer(context, new FakePartitionFetcherImpl(), new 
FakeSchemaFetcherImpl());
-    Analysis analysis = analyzer.analyze(statement);
-
-    LogicalPlanner planner = new LogicalPlanner(context, new ArrayList<>());
-    PlanNode actualPlan = planner.plan(analysis).getRootNode();
-
-    Assert.assertEquals(rawPlan, actualPlan);
-    Assert.assertEquals(
-        actualPlan, new LimitOffsetPushDown().optimize(actualPlan, analysis, 
context));
+    OptimizationTestUtil.checkCannotPushDown(new LimitOffsetPushDown(), sql, 
rawPlan);
   }
 
   // test for limit/offset push down in group by time
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/optimization/OptimizationTestUtil.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/optimization/OptimizationTestUtil.java
new file mode 100644
index 00000000000..73560bd2112
--- /dev/null
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/optimization/OptimizationTestUtil.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.optimization;
+
+import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
+import org.apache.iotdb.db.queryengine.common.QueryId;
+import org.apache.iotdb.db.queryengine.plan.analyze.Analysis;
+import org.apache.iotdb.db.queryengine.plan.analyze.Analyzer;
+import org.apache.iotdb.db.queryengine.plan.analyze.FakePartitionFetcherImpl;
+import org.apache.iotdb.db.queryengine.plan.analyze.FakeSchemaFetcherImpl;
+import org.apache.iotdb.db.queryengine.plan.parser.StatementGenerator;
+import org.apache.iotdb.db.queryengine.plan.planner.LogicalPlanner;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.queryengine.plan.statement.Statement;
+
+import org.junit.Assert;
+
+import java.time.ZonedDateTime;
+import java.util.ArrayList;
+
+public class OptimizationTestUtil {
+
+  private OptimizationTestUtil() {
+    // util class
+  }
+
+  public static void checkPushDown(
+      PlanOptimizer optimizer, String sql, PlanNode rawPlan, PlanNode optPlan) 
{
+    Statement statement = StatementGenerator.createStatement(sql, 
ZonedDateTime.now().getOffset());
+
+    MPPQueryContext context = new MPPQueryContext(new QueryId("test_query"));
+    Analyzer analyzer =
+        new Analyzer(context, new FakePartitionFetcherImpl(), new 
FakeSchemaFetcherImpl());
+    Analysis analysis = analyzer.analyze(statement);
+
+    LogicalPlanner planner = new LogicalPlanner(context, new ArrayList<>());
+    PlanNode actualPlan = planner.plan(analysis).getRootNode();
+    Assert.assertEquals(rawPlan, actualPlan);
+
+    PlanNode actualOptPlan = optimizer.optimize(actualPlan, analysis, context);
+    Assert.assertEquals(optPlan, actualOptPlan);
+  }
+
+  public static void checkCannotPushDown(PlanOptimizer optimizer, String sql, 
PlanNode rawPlan) {
+    Statement statement = StatementGenerator.createStatement(sql, 
ZonedDateTime.now().getOffset());
+
+    MPPQueryContext context = new MPPQueryContext(new QueryId("test_query"));
+    Analyzer analyzer =
+        new Analyzer(context, new FakePartitionFetcherImpl(), new 
FakeSchemaFetcherImpl());
+    Analysis analysis = analyzer.analyze(statement);
+
+    LogicalPlanner planner = new LogicalPlanner(context, new ArrayList<>());
+    PlanNode actualPlan = planner.plan(analysis).getRootNode();
+
+    Assert.assertEquals(rawPlan, actualPlan);
+    Assert.assertEquals(actualPlan, optimizer.optimize(actualPlan, analysis, 
context));
+  }
+}
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/optimization/TestPlanBuilder.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/optimization/TestPlanBuilder.java
index 699e7b265be..c3684bc9140 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/optimization/TestPlanBuilder.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/optimization/TestPlanBuilder.java
@@ -25,17 +25,24 @@ import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.db.queryengine.plan.expression.Expression;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.AggregationNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ColumnInjectNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.DeviceViewNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.FillNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.FilterNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.IntoNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.LimitNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.OffsetNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SlidingWindowAggregationNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TimeJoinNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TransformNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedSeriesAggregationScanNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedSeriesScanNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesAggregationScanNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesScanNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.AggregationDescriptor;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.FillDescriptor;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.GroupByTimeParameter;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.IntoPathDescriptor;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.OrderByParameter;
 import org.apache.iotdb.db.queryengine.plan.statement.component.FillPolicy;
@@ -43,6 +50,7 @@ import 
org.apache.iotdb.db.queryengine.plan.statement.component.OrderByKey;
 import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering;
 import org.apache.iotdb.db.queryengine.plan.statement.component.SortItem;
 import org.apache.iotdb.db.queryengine.plan.statement.literal.LongLiteral;
+import 
org.apache.iotdb.db.utils.columngenerator.parameter.SlidingTimeColumnGeneratorParameter;
 
 import java.time.ZonedDateTime;
 import java.util.ArrayList;
@@ -90,6 +98,111 @@ public class TestPlanBuilder {
     return this;
   }
 
+  public TestPlanBuilder aggregationScan(
+      String id,
+      PartialPath path,
+      List<AggregationDescriptor> aggregationDescriptors,
+      GroupByTimeParameter groupByTimeParameter,
+      boolean outputEndTime) {
+    SeriesAggregationScanNode aggregationScanNode =
+        new SeriesAggregationScanNode(
+            new PlanNodeId(id),
+            (MeasurementPath) path,
+            aggregationDescriptors,
+            Ordering.ASC,
+            groupByTimeParameter);
+    aggregationScanNode.setOutputEndTime(outputEndTime);
+    this.root = aggregationScanNode;
+    return this;
+  }
+
+  public TestPlanBuilder alignedAggregationScan(
+      String id,
+      PartialPath path,
+      List<AggregationDescriptor> aggregationDescriptors,
+      GroupByTimeParameter groupByTimeParameter,
+      boolean outputEndTime) {
+    AlignedSeriesAggregationScanNode aggregationScanNode =
+        new AlignedSeriesAggregationScanNode(
+            new PlanNodeId(id),
+            (AlignedPath) path,
+            aggregationDescriptors,
+            Ordering.ASC,
+            groupByTimeParameter);
+    aggregationScanNode.setOutputEndTime(outputEndTime);
+    this.root = aggregationScanNode;
+    return this;
+  }
+
+  public TestPlanBuilder rawDataAggregation(
+      String id,
+      List<AggregationDescriptor> aggregationDescriptors,
+      GroupByTimeParameter groupByTimeParameter,
+      boolean outputEndTime) {
+    AggregationNode aggregationNode =
+        new AggregationNode(
+            new PlanNodeId(String.valueOf(id)),
+            Collections.singletonList(getRoot()),
+            aggregationDescriptors,
+            groupByTimeParameter,
+            Ordering.ASC);
+    aggregationNode.setOutputEndTime(outputEndTime);
+    this.root = aggregationNode;
+    return this;
+  }
+
+  public TestPlanBuilder slidingWindow(
+      String id,
+      List<AggregationDescriptor> aggregationDescriptors,
+      GroupByTimeParameter groupByTimeParameter,
+      boolean outputEndTime) {
+    SlidingWindowAggregationNode slidingWindowAggregationNode =
+        new SlidingWindowAggregationNode(
+            new PlanNodeId(id),
+            getRoot(),
+            aggregationDescriptors,
+            groupByTimeParameter,
+            Ordering.ASC);
+    slidingWindowAggregationNode.setOutputEndTime(outputEndTime);
+    this.root = slidingWindowAggregationNode;
+    return this;
+  }
+
+  public TestPlanBuilder aggregationTimeJoin(
+      int startId,
+      List<PartialPath> paths,
+      List<List<AggregationDescriptor>> aggregationDescriptorsList,
+      GroupByTimeParameter groupByTimeParameter) {
+    int planId = startId;
+
+    List<PlanNode> seriesSourceNodes = new ArrayList<>();
+    for (int i = 0; i < paths.size(); i++) {
+      PartialPath path = paths.get(i);
+      if (path instanceof MeasurementPath) {
+        seriesSourceNodes.add(
+            new SeriesAggregationScanNode(
+                new PlanNodeId(String.valueOf(planId)),
+                (MeasurementPath) paths.get(i),
+                aggregationDescriptorsList.get(i),
+                Ordering.ASC,
+                groupByTimeParameter));
+      } else {
+        seriesSourceNodes.add(
+            new AlignedSeriesAggregationScanNode(
+                new PlanNodeId(String.valueOf(planId)),
+                (AlignedPath) paths.get(i),
+                aggregationDescriptorsList.get(i),
+                Ordering.ASC,
+                groupByTimeParameter));
+      }
+      planId++;
+    }
+
+    this.root =
+        new TimeJoinNode(new PlanNodeId(String.valueOf(planId)), Ordering.ASC, 
seriesSourceNodes);
+    return this;
+  }
+
   public TestPlanBuilder timeJoin(List<PartialPath> paths) {
     int planId = 0;
 
@@ -163,14 +276,15 @@ public class TestPlanBuilder {
     return this;
   }
 
-  public TestPlanBuilder filter(String id, List<Expression> expressions, 
Expression predicate) {
+  public TestPlanBuilder filter(
+      String id, List<Expression> expressions, Expression predicate, boolean 
isGroupByTime) {
     this.root =
         new FilterNode(
             new PlanNodeId(id),
             getRoot(),
             expressions.toArray(new Expression[0]),
             predicate,
-            false,
+            isGroupByTime,
             ZonedDateTime.now().getOffset(),
             Ordering.ASC);
     return this;
@@ -185,4 +299,35 @@ public class TestPlanBuilder {
     this.root = new IntoNode(new PlanNodeId(id), getRoot(), 
intoPathDescriptor);
     return this;
   }
+
+  public TestPlanBuilder columnInject(String id, GroupByTimeParameter 
groupByTimeParameter) {
+    this.root =
+        new ColumnInjectNode(
+            new PlanNodeId(id),
+            getRoot(),
+            0,
+            new SlidingTimeColumnGeneratorParameter(groupByTimeParameter, 
true));
+    return this;
+  }
+
+  public TestPlanBuilder deviceView(
+      String id,
+      List<String> outputColumnNames,
+      List<String> devices,
+      Map<String, List<Integer>> deviceToMeasurementIndexesMap,
+      List<PlanNode> children) {
+    DeviceViewNode deviceViewNode =
+        new DeviceViewNode(
+            new PlanNodeId(id),
+            new OrderByParameter(
+                Arrays.asList(
+                    new SortItem(OrderByKey.DEVICE, Ordering.ASC),
+                    new SortItem(OrderByKey.TIME, Ordering.ASC))),
+            outputColumnNames,
+            devices,
+            deviceToMeasurementIndexesMap);
+    deviceViewNode.setChildren(children);
+    this.root = deviceViewNode;
+    return this;
+  }
 }

Reply via email to