This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 50d01a78f56 Fix ColumnInjectionPushDown bug & add UTs
50d01a78f56 is described below
commit 50d01a78f566f9119875e51aef3f0d1fb8a03cd8
Author: liuminghui233 <[email protected]>
AuthorDate: Wed Dec 20 08:44:16 2023 +0800
Fix ColumnInjectionPushDown bug & add UTs
---
.../plan/optimization/ColumnInjectionPushDown.java | 57 +-
.../plan/planner/plan/node/PlanVisitor.java | 3 +-
.../optimization/ColumnInjectionPushDownTest.java | 657 +++++++++++++++++++++
.../plan/optimization/LimitOffsetPushDownTest.java | 35 +-
.../plan/optimization/OptimizationTestUtil.java | 75 +++
.../plan/optimization/TestPlanBuilder.java | 149 ++++-
6 files changed, 905 insertions(+), 71 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/optimization/ColumnInjectionPushDown.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/optimization/ColumnInjectionPushDown.java
index 40a869d18a1..b8cd9f43453 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/optimization/ColumnInjectionPushDown.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/optimization/ColumnInjectionPushDown.java
@@ -35,8 +35,6 @@ import
org.apache.iotdb.db.queryengine.plan.statement.crud.QueryStatement;
import java.util.ArrayList;
import java.util.List;
-import static org.apache.iotdb.tsfile.utils.Preconditions.checkArgument;
-
/**
* <b>Optimization phase:</b> Distributed plan planning.
*
@@ -67,34 +65,38 @@ public class ColumnInjectionPushDown implements
PlanOptimizer {
// SeriesAggregationNode,
// If it is and has overlap in groupByParameter, there is
SlidingWindowNode
// There will be a ColumnInjectNode on them, so we need to check if it
can be pushed down.
- return plan.accept(new Rewriter(), new RewriterContext());
+ return plan.accept(new Rewriter(), null);
}
return plan;
}
- private static class Rewriter extends PlanVisitor<PlanNode, RewriterContext>
{
+ private static class Rewriter extends PlanVisitor<PlanNode, Void> {
@Override
- public PlanNode visitPlan(PlanNode node, RewriterContext context) {
- for (PlanNode child : node.getChildren()) {
- context.setParent(node);
- child.accept(this, context);
- }
+ public PlanNode visitPlan(PlanNode node, Void context) {
+ // other source node, just return
return node;
}
@Override
- public PlanNode visitMultiChildProcess(MultiChildProcessNode node,
RewriterContext context) {
- List<PlanNode> children = new ArrayList<>();
+ public PlanNode visitSingleChildProcess(SingleChildProcessNode node, Void
context) {
+ PlanNode rewrittenChild = node.getChild().accept(this, context);
+ node.setChild(rewrittenChild);
+ return node;
+ }
+
+ @Override
+ public PlanNode visitMultiChildProcess(MultiChildProcessNode node, Void
context) {
+ List<PlanNode> rewrittenChildren = new ArrayList<>();
for (PlanNode child : node.getChildren()) {
- context.setParent(null);
- children.add(child.accept(this, context));
+ rewrittenChildren.add(child.accept(this, context));
}
- return node.cloneWithChildren(children);
+ node.setChildren(rewrittenChildren);
+ return node;
}
@Override
- public PlanNode visitColumnInject(ColumnInjectNode node, RewriterContext
context) {
+ public PlanNode visitColumnInject(ColumnInjectNode node, Void context) {
PlanNode child = node.getChild();
boolean columnInjectPushDown = true;
@@ -109,32 +111,9 @@ public class ColumnInjectionPushDown implements
PlanOptimizer {
}
if (columnInjectPushDown) {
- return concatParentWithChild(context.getParent(), child);
- }
- return node;
- }
-
- private PlanNode concatParentWithChild(PlanNode parent, PlanNode child) {
- if (parent == null) {
return child;
}
-
- checkArgument(parent instanceof SingleChildProcessNode);
- ((SingleChildProcessNode) parent).setChild(child);
- return parent;
- }
- }
-
- private static class RewriterContext {
-
- private PlanNode parent;
-
- public PlanNode getParent() {
- return parent;
- }
-
- public void setParent(PlanNode parent) {
- this.parent = parent;
+ return node;
}
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java
index 10bde9ccf6c..e8f6b149f0b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java
@@ -103,6 +103,7 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNo
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsOfOneDeviceNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
+@SuppressWarnings("java:S6539") // suppress "Monster class" warning
public abstract class PlanVisitor<R, C> {
public R process(PlanNode node, C context) {
@@ -200,7 +201,7 @@ public abstract class PlanVisitor<R, C> {
}
public R visitColumnInject(ColumnInjectNode node, C context) {
- return visitPlan(node, context);
+ return visitSingleChildProcess(node, context);
}
public R visitSingleDeviceView(SingleDeviceViewNode node, C context) {
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/optimization/ColumnInjectionPushDownTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/optimization/ColumnInjectionPushDownTest.java
new file mode 100644
index 00000000000..f5dd0507b27
--- /dev/null
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/optimization/ColumnInjectionPushDownTest.java
@@ -0,0 +1,657 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.optimization;
+
+import org.apache.iotdb.common.rpc.thrift.TAggregationType;
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.path.AlignedPath;
+import org.apache.iotdb.commons.path.MeasurementPath;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.queryengine.plan.expression.ExpressionFactory;
+import org.apache.iotdb.db.queryengine.plan.expression.leaf.TimeSeriesOperand;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.AggregationDescriptor;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.AggregationStep;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.GroupByTimeParameter;
+import org.apache.iotdb.db.queryengine.plan.statement.component.FillPolicy;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.utils.TimeDuration;
+
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+public class ColumnInjectionPushDownTest {
+
+ private static final Map<String, PartialPath> schemaMap = new HashMap<>();
+
+ static {
+ try {
+ schemaMap.put("root.sg.d1.s1", new MeasurementPath("root.sg.d1.s1",
TSDataType.INT32));
+ schemaMap.put("root.sg.d1.s2", new MeasurementPath("root.sg.d1.s2",
TSDataType.DOUBLE));
+
+ MeasurementPath d2s1 = new MeasurementPath("root.sg.d2.a.s1",
TSDataType.INT32);
+ d2s1.setUnderAlignedEntity(true);
+ schemaMap.put("root.sg.d2.a.s1", d2s1);
+
+ AlignedPath aligned_d2s1 =
+ new AlignedPath(
+ "root.sg.d2.a",
+ Collections.singletonList("s1"),
+ Collections.singletonList(d2s1.getMeasurementSchema()));
+ schemaMap.put("aligned_root.sg.d2.a.s1", aligned_d2s1);
+
+ MeasurementPath d2s2 = new MeasurementPath("root.sg.d2.a.s2",
TSDataType.DOUBLE);
+ d2s2.setUnderAlignedEntity(true);
+ schemaMap.put("root.sg.d2.a.s2", d2s2);
+
+ AlignedPath aligned_d2s2 =
+ new AlignedPath(
+ "root.sg.d2.a",
+ Collections.singletonList("s2"),
+ Collections.singletonList(d2s2.getMeasurementSchema()));
+ schemaMap.put("aligned_root.sg.d2.a.s2", aligned_d2s2);
+
+ AlignedPath alignedPath =
+ new AlignedPath(
+ "root.sg.d2.a",
+ Arrays.asList("s2", "s1"),
+ Arrays.asList(d2s2.getMeasurementSchema(),
d2s1.getMeasurementSchema()));
+ schemaMap.put("root.sg.d2.a", alignedPath);
+ } catch (IllegalPathException e) {
+ e.printStackTrace();
+ }
+ }
+
+ private void checkPushDown(String sql, PlanNode rawPlan, PlanNode optPlan) {
+ OptimizationTestUtil.checkPushDown(new ColumnInjectionPushDown(), sql,
rawPlan, optPlan);
+ }
+
+ private void checkCannotPushDown(String sql, PlanNode rawPlan) {
+ OptimizationTestUtil.checkCannotPushDown(new ColumnInjectionPushDown(),
sql, rawPlan);
+ }
+
+ @Test
+ public void testPushDownAggregationSourceAlignByTime() {
+ GroupByTimeParameter groupByTimeParameter =
+ new GroupByTimeParameter(0, 100, new TimeDuration(0, 10), new
TimeDuration(0, 10), true);
+ List<AggregationDescriptor> aggregationDescriptorList =
+ Collections.singletonList(
+ getAggregationDescriptor(AggregationStep.SINGLE, "root.sg.d1.s1"));
+
+ checkPushDown(
+ "select __endTime, count(s1) from root.sg.d1 group by ([0, 100),
10ms);",
+ new TestPlanBuilder()
+ .aggregationScan(
+ "0",
+ schemaMap.get("root.sg.d1.s1"),
+ aggregationDescriptorList,
+ groupByTimeParameter,
+ false)
+ .columnInject("1", groupByTimeParameter)
+ .getRoot(),
+ new TestPlanBuilder()
+ .aggregationScan(
+ "0",
+ schemaMap.get("root.sg.d1.s1"),
+ aggregationDescriptorList,
+ groupByTimeParameter,
+ true)
+ .getRoot());
+ checkPushDown(
+ "select __endTime, count(s1) from root.sg.d1 group by ([0, 100), 10ms)
fill(previous);",
+ new TestPlanBuilder()
+ .aggregationScan(
+ "0",
+ schemaMap.get("root.sg.d1.s1"),
+ aggregationDescriptorList,
+ groupByTimeParameter,
+ false)
+ .columnInject("1", groupByTimeParameter)
+ .fill("2", FillPolicy.PREVIOUS)
+ .getRoot(),
+ new TestPlanBuilder()
+ .aggregationScan(
+ "0",
+ schemaMap.get("root.sg.d1.s1"),
+ aggregationDescriptorList,
+ groupByTimeParameter,
+ true)
+ .fill("2", FillPolicy.PREVIOUS)
+ .getRoot());
+ }
+
+ @Test
+ public void testPushDownAlignedAggregationSourceAlignByTime() {
+ GroupByTimeParameter groupByTimeParameter =
+ new GroupByTimeParameter(0, 100, new TimeDuration(0, 10), new
TimeDuration(0, 10), true);
+ List<AggregationDescriptor> aggregationDescriptorList =
+ Collections.singletonList(
+ getAggregationDescriptor(AggregationStep.SINGLE,
"root.sg.d2.a.s1"));
+
+ checkPushDown(
+ "select __endTime, count(s1) from root.sg.d2.a group by ([0, 100),
10ms);",
+ new TestPlanBuilder()
+ .alignedAggregationScan(
+ "0",
+ schemaMap.get("aligned_root.sg.d2.a.s1"),
+ aggregationDescriptorList,
+ groupByTimeParameter,
+ false)
+ .columnInject("1", groupByTimeParameter)
+ .getRoot(),
+ new TestPlanBuilder()
+ .alignedAggregationScan(
+ "0",
+ schemaMap.get("aligned_root.sg.d2.a.s1"),
+ aggregationDescriptorList,
+ groupByTimeParameter,
+ true)
+ .getRoot());
+ checkPushDown(
+ "select __endTime, count(s1) from root.sg.d2.a group by ([0, 100),
10ms) fill(previous);",
+ new TestPlanBuilder()
+ .alignedAggregationScan(
+ "0",
+ schemaMap.get("aligned_root.sg.d2.a.s1"),
+ aggregationDescriptorList,
+ groupByTimeParameter,
+ false)
+ .columnInject("1", groupByTimeParameter)
+ .fill("2", FillPolicy.PREVIOUS)
+ .getRoot(),
+ new TestPlanBuilder()
+ .alignedAggregationScan(
+ "0",
+ schemaMap.get("aligned_root.sg.d2.a.s1"),
+ aggregationDescriptorList,
+ groupByTimeParameter,
+ true)
+ .fill("2", FillPolicy.PREVIOUS)
+ .getRoot());
+ }
+
+ @Test
+ public void testPushDownSlidingWindowAlignByTime() {
+ GroupByTimeParameter groupByTimeParameter =
+ new GroupByTimeParameter(0, 100, new TimeDuration(0, 10), new
TimeDuration(0, 5), true);
+ List<AggregationDescriptor> aggregationDescriptorList1 =
+ Collections.singletonList(
+ getAggregationDescriptor(AggregationStep.PARTIAL,
"root.sg.d1.s1"));
+ List<AggregationDescriptor> aggregationDescriptorList2 =
+
Collections.singletonList(getAggregationDescriptor(AggregationStep.FINAL,
"root.sg.d1.s1"));
+ checkPushDown(
+ "select __endTime, count(s1) from root.sg.d1 group by ([0, 100), 10ms,
5ms);",
+ new TestPlanBuilder()
+ .aggregationScan(
+ "0",
+ schemaMap.get("root.sg.d1.s1"),
+ aggregationDescriptorList1,
+ groupByTimeParameter,
+ false)
+ .slidingWindow("1", aggregationDescriptorList2,
groupByTimeParameter, false)
+ .columnInject("2", groupByTimeParameter)
+ .getRoot(),
+ new TestPlanBuilder()
+ .aggregationScan(
+ "0",
+ schemaMap.get("root.sg.d1.s1"),
+ aggregationDescriptorList1,
+ groupByTimeParameter,
+ false)
+ .slidingWindow("1", aggregationDescriptorList2,
groupByTimeParameter, true)
+ .getRoot());
+ checkPushDown(
+ "select __endTime, count(s1) from root.sg.d1 group by ([0, 100), 10ms,
5ms) fill(previous);",
+ new TestPlanBuilder()
+ .aggregationScan(
+ "0",
+ schemaMap.get("root.sg.d1.s1"),
+ aggregationDescriptorList1,
+ groupByTimeParameter,
+ false)
+ .slidingWindow("1", aggregationDescriptorList2,
groupByTimeParameter, false)
+ .columnInject("2", groupByTimeParameter)
+ .fill("3", FillPolicy.PREVIOUS)
+ .getRoot(),
+ new TestPlanBuilder()
+ .aggregationScan(
+ "0",
+ schemaMap.get("root.sg.d1.s1"),
+ aggregationDescriptorList1,
+ groupByTimeParameter,
+ false)
+ .slidingWindow("1", aggregationDescriptorList2,
groupByTimeParameter, true)
+ .fill("3", FillPolicy.PREVIOUS)
+ .getRoot());
+ }
+
+ @Test
+ public void testPushDownRawDataAggregationAlignByTime() {
+ GroupByTimeParameter groupByTimeParameter =
+ new GroupByTimeParameter(0, 100, new TimeDuration(0, 10), new
TimeDuration(0, 10), true);
+ List<AggregationDescriptor> aggregationDescriptorList =
+ Collections.singletonList(
+ getAggregationDescriptor(AggregationStep.SINGLE, "root.sg.d1.s1"));
+
+ checkPushDown(
+ "select __endTime, count(s1) from root.sg.d1 where s1 > 10 group by
([0, 100), 10ms);",
+ new TestPlanBuilder()
+ .scan("0", schemaMap.get("root.sg.d1.s1"))
+ .filter(
+ "1",
+ Collections.singletonList(
+
ExpressionFactory.timeSeries(schemaMap.get("root.sg.d1.s1"))),
+ ExpressionFactory.gt(
+
ExpressionFactory.timeSeries(schemaMap.get("root.sg.d1.s1")),
+ ExpressionFactory.intValue("10")),
+ true)
+ .rawDataAggregation("2", aggregationDescriptorList,
groupByTimeParameter, true)
+ .columnInject("3", groupByTimeParameter)
+ .getRoot(),
+ new TestPlanBuilder()
+ .scan("0", schemaMap.get("root.sg.d1.s1"))
+ .filter(
+ "1",
+ Collections.singletonList(
+
ExpressionFactory.timeSeries(schemaMap.get("root.sg.d1.s1"))),
+ ExpressionFactory.gt(
+
ExpressionFactory.timeSeries(schemaMap.get("root.sg.d1.s1")),
+ ExpressionFactory.intValue("10")),
+ true)
+ .rawDataAggregation("2", aggregationDescriptorList,
groupByTimeParameter, true)
+ .getRoot());
+ }
+
+ @Test
+ public void testCannotPushDownTimeJoinAlignByTime() {
+ GroupByTimeParameter groupByTimeParameter =
+ new GroupByTimeParameter(0, 100, new TimeDuration(0, 10), new
TimeDuration(0, 10), true);
+ List<AggregationDescriptor> aggregationDescriptorList1 =
+ Collections.singletonList(
+ getAggregationDescriptor(AggregationStep.SINGLE, "root.sg.d1.s1"));
+ List<AggregationDescriptor> aggregationDescriptorList2 =
+ Collections.singletonList(
+ getAggregationDescriptor(AggregationStep.SINGLE, "root.sg.d1.s2"));
+
+ checkCannotPushDown(
+ "select __endTime, count(s1), count(s2) from root.sg.d1 group by ([0,
100), 10ms);",
+ new TestPlanBuilder()
+ .aggregationTimeJoin(
+ 0,
+ Arrays.asList(schemaMap.get("root.sg.d1.s2"),
schemaMap.get("root.sg.d1.s1")),
+ Arrays.asList(aggregationDescriptorList2,
aggregationDescriptorList1),
+ groupByTimeParameter)
+ .columnInject("3", groupByTimeParameter)
+ .getRoot());
+ checkCannotPushDown(
+ "select __endTime, count(s1), count(s2) from root.sg.d1 group by ([0,
100), 10ms) fill(previous);",
+ new TestPlanBuilder()
+ .aggregationTimeJoin(
+ 0,
+ Arrays.asList(schemaMap.get("root.sg.d1.s2"),
schemaMap.get("root.sg.d1.s1")),
+ Arrays.asList(aggregationDescriptorList2,
aggregationDescriptorList1),
+ groupByTimeParameter)
+ .columnInject("3", groupByTimeParameter)
+ .fill("4", FillPolicy.PREVIOUS)
+ .getRoot());
+ }
+
+ @Test
+ public void testPushDownAggregationSourceAlignByDevice() {
+ GroupByTimeParameter groupByTimeParameter =
+ new GroupByTimeParameter(0, 100, new TimeDuration(0, 10), new
TimeDuration(0, 10), true);
+ List<AggregationDescriptor> aggregationDescriptorList1 =
+ Collections.singletonList(
+ getAggregationDescriptor(AggregationStep.SINGLE, "root.sg.d1.s1"));
+ List<AggregationDescriptor> aggregationDescriptorList2 =
+ Collections.singletonList(
+ getAggregationDescriptor(AggregationStep.SINGLE,
"root.sg.d2.a.s1"));
+
+ List<String> outputColumnNames = Arrays.asList("Device", "__endTime",
"count(s1)");
+ List<String> devices = Arrays.asList("root.sg.d1", "root.sg.d2.a");
+ Map<String, List<Integer>> deviceToMeasurementIndexesMap = new HashMap<>();
+ deviceToMeasurementIndexesMap.put("root.sg.d1", Arrays.asList(1, 2));
+ deviceToMeasurementIndexesMap.put("root.sg.d2.a", Arrays.asList(1, 2));
+
+ checkPushDown(
+ "select __endTime, count(s1) from root.sg.d1, root.sg.d2.a group by
([0, 100), 10ms) align by device;",
+ new TestPlanBuilder()
+ .deviceView(
+ "4",
+ outputColumnNames,
+ devices,
+ deviceToMeasurementIndexesMap,
+ Arrays.asList(
+ new TestPlanBuilder()
+ .aggregationScan(
+ "0",
+ schemaMap.get("root.sg.d1.s1"),
+ aggregationDescriptorList1,
+ groupByTimeParameter,
+ false)
+ .columnInject("1", groupByTimeParameter)
+ .getRoot(),
+ new TestPlanBuilder()
+ .alignedAggregationScan(
+ "2",
+ schemaMap.get("aligned_root.sg.d2.a.s1"),
+ aggregationDescriptorList2,
+ groupByTimeParameter,
+ false)
+ .columnInject("3", groupByTimeParameter)
+ .getRoot()))
+ .getRoot(),
+ new TestPlanBuilder()
+ .deviceView(
+ "4",
+ outputColumnNames,
+ devices,
+ deviceToMeasurementIndexesMap,
+ Arrays.asList(
+ new TestPlanBuilder()
+ .aggregationScan(
+ "0",
+ schemaMap.get("root.sg.d1.s1"),
+ aggregationDescriptorList1,
+ groupByTimeParameter,
+ true)
+ .getRoot(),
+ new TestPlanBuilder()
+ .alignedAggregationScan(
+ "2",
+ schemaMap.get("aligned_root.sg.d2.a.s1"),
+ aggregationDescriptorList2,
+ groupByTimeParameter,
+ true)
+ .getRoot()))
+ .getRoot());
+ }
+
+ @Test
+ public void testPushDownSlidingWindowAlignByDevice() {
+ GroupByTimeParameter groupByTimeParameter =
+ new GroupByTimeParameter(0, 100, new TimeDuration(0, 10), new
TimeDuration(0, 5), true);
+
+ List<AggregationDescriptor> aggregationDescriptorList1_1 =
+ Collections.singletonList(
+ getAggregationDescriptor(AggregationStep.PARTIAL,
"root.sg.d1.s1"));
+ List<AggregationDescriptor> aggregationDescriptorList1_2 =
+ Collections.singletonList(
+ getAggregationDescriptor(AggregationStep.PARTIAL,
"root.sg.d2.a.s1"));
+
+ List<AggregationDescriptor> aggregationDescriptorList2_1 =
+
Collections.singletonList(getAggregationDescriptor(AggregationStep.FINAL,
"root.sg.d1.s1"));
+ List<AggregationDescriptor> aggregationDescriptorList2_2 =
+ Collections.singletonList(
+ getAggregationDescriptor(AggregationStep.FINAL,
"root.sg.d2.a.s1"));
+
+ List<String> outputColumnNames = Arrays.asList("Device", "__endTime",
"count(s1)");
+ List<String> devices = Arrays.asList("root.sg.d1", "root.sg.d2.a");
+ Map<String, List<Integer>> deviceToMeasurementIndexesMap = new HashMap<>();
+ deviceToMeasurementIndexesMap.put("root.sg.d1", Arrays.asList(1, 2));
+ deviceToMeasurementIndexesMap.put("root.sg.d2.a", Arrays.asList(1, 2));
+
+ checkPushDown(
+ "select __endTime, count(s1) from root.sg.d1, root.sg.d2.a group by
([0, 100), 10ms, 5ms) align by device;",
+ new TestPlanBuilder()
+ .deviceView(
+ "6",
+ outputColumnNames,
+ devices,
+ deviceToMeasurementIndexesMap,
+ Arrays.asList(
+ new TestPlanBuilder()
+ .aggregationScan(
+ "0",
+ schemaMap.get("root.sg.d1.s1"),
+ aggregationDescriptorList1_1,
+ groupByTimeParameter,
+ false)
+ .slidingWindow(
+ "1", aggregationDescriptorList2_1,
groupByTimeParameter, false)
+ .columnInject("2", groupByTimeParameter)
+ .getRoot(),
+ new TestPlanBuilder()
+ .alignedAggregationScan(
+ "3",
+ schemaMap.get("aligned_root.sg.d2.a.s1"),
+ aggregationDescriptorList1_2,
+ groupByTimeParameter,
+ false)
+ .slidingWindow(
+ "4", aggregationDescriptorList2_2,
groupByTimeParameter, false)
+ .columnInject("5", groupByTimeParameter)
+ .getRoot()))
+ .getRoot(),
+ new TestPlanBuilder()
+ .deviceView(
+ "6",
+ outputColumnNames,
+ devices,
+ deviceToMeasurementIndexesMap,
+ Arrays.asList(
+ new TestPlanBuilder()
+ .aggregationScan(
+ "0",
+ schemaMap.get("root.sg.d1.s1"),
+ aggregationDescriptorList1_1,
+ groupByTimeParameter,
+ false)
+ .slidingWindow(
+ "1", aggregationDescriptorList2_1,
groupByTimeParameter, true)
+ .getRoot(),
+ new TestPlanBuilder()
+ .alignedAggregationScan(
+ "3",
+ schemaMap.get("aligned_root.sg.d2.a.s1"),
+ aggregationDescriptorList1_2,
+ groupByTimeParameter,
+ false)
+ .slidingWindow(
+ "4", aggregationDescriptorList2_2,
groupByTimeParameter, true)
+ .getRoot()))
+ .getRoot());
+ }
+
+ @Test
+ public void testPushDownRawDataAggregationAlignByDevice() {
+ GroupByTimeParameter groupByTimeParameter =
+ new GroupByTimeParameter(0, 100, new TimeDuration(0, 10), new
TimeDuration(0, 10), true);
+
+ List<AggregationDescriptor> aggregationDescriptorList1 =
+ Collections.singletonList(
+ getAggregationDescriptor(AggregationStep.SINGLE, "root.sg.d1.s1"));
+ List<AggregationDescriptor> aggregationDescriptorList2 =
+ Collections.singletonList(
+ getAggregationDescriptor(AggregationStep.SINGLE,
"root.sg.d2.a.s1"));
+
+ List<String> outputColumnNames = Arrays.asList("Device", "__endTime",
"count(s1)");
+ List<String> devices = Arrays.asList("root.sg.d1", "root.sg.d2.a");
+ Map<String, List<Integer>> deviceToMeasurementIndexesMap = new HashMap<>();
+ deviceToMeasurementIndexesMap.put("root.sg.d1", Arrays.asList(1, 2));
+ deviceToMeasurementIndexesMap.put("root.sg.d2.a", Arrays.asList(1, 2));
+
+ checkPushDown(
+ "select __endTime, count(s1) from root.sg.d1, root.sg.d2.a where s1 >
10 group by ([0, 100), 10ms) align by device;",
+ new TestPlanBuilder()
+ .deviceView(
+ "8",
+ outputColumnNames,
+ devices,
+ deviceToMeasurementIndexesMap,
+ Arrays.asList(
+ new TestPlanBuilder()
+ .scan("0", schemaMap.get("root.sg.d1.s1"))
+ .filter(
+ "1",
+ Collections.singletonList(
+
ExpressionFactory.timeSeries(schemaMap.get("root.sg.d1.s1"))),
+ ExpressionFactory.gt(
+
ExpressionFactory.timeSeries(schemaMap.get("root.sg.d1.s1")),
+ ExpressionFactory.intValue("10")),
+ true)
+ .rawDataAggregation(
+ "2", aggregationDescriptorList1,
groupByTimeParameter, true)
+ .columnInject("3", groupByTimeParameter)
+ .getRoot(),
+ new TestPlanBuilder()
+ .scanAligned("4",
schemaMap.get("aligned_root.sg.d2.a.s1"))
+ .filter(
+ "5",
+ Collections.singletonList(
+
ExpressionFactory.timeSeries(schemaMap.get("root.sg.d2.a.s1"))),
+ ExpressionFactory.gt(
+
ExpressionFactory.timeSeries(schemaMap.get("root.sg.d2.a.s1")),
+ ExpressionFactory.intValue("10")),
+ true)
+ .rawDataAggregation(
+ "6", aggregationDescriptorList2,
groupByTimeParameter, true)
+ .columnInject("7", groupByTimeParameter)
+ .getRoot()))
+ .getRoot(),
+ new TestPlanBuilder()
+ .deviceView(
+ "8",
+ outputColumnNames,
+ devices,
+ deviceToMeasurementIndexesMap,
+ Arrays.asList(
+ new TestPlanBuilder()
+ .scan("0", schemaMap.get("root.sg.d1.s1"))
+ .filter(
+ "1",
+ Collections.singletonList(
+
ExpressionFactory.timeSeries(schemaMap.get("root.sg.d1.s1"))),
+ ExpressionFactory.gt(
+
ExpressionFactory.timeSeries(schemaMap.get("root.sg.d1.s1")),
+ ExpressionFactory.intValue("10")),
+ true)
+ .rawDataAggregation(
+ "2", aggregationDescriptorList1,
groupByTimeParameter, true)
+ .getRoot(),
+ new TestPlanBuilder()
+ .scanAligned("4",
schemaMap.get("aligned_root.sg.d2.a.s1"))
+ .filter(
+ "5",
+ Collections.singletonList(
+
ExpressionFactory.timeSeries(schemaMap.get("root.sg.d2.a.s1"))),
+ ExpressionFactory.gt(
+
ExpressionFactory.timeSeries(schemaMap.get("root.sg.d2.a.s1")),
+ ExpressionFactory.intValue("10")),
+ true)
+ .rawDataAggregation(
+ "6", aggregationDescriptorList2,
groupByTimeParameter, true)
+ .getRoot()))
+ .getRoot());
+ }
+
+ @Test
+ public void testPartialPushDownTimeJoinAlignByDevice() {
+ GroupByTimeParameter groupByTimeParameter =
+ new GroupByTimeParameter(0, 100, new TimeDuration(0, 10), new
TimeDuration(0, 10), true);
+ List<AggregationDescriptor> aggregationDescriptorList1_1 =
+ Collections.singletonList(
+ getAggregationDescriptor(AggregationStep.SINGLE, "root.sg.d1.s1"));
+ List<AggregationDescriptor> aggregationDescriptorList1_2 =
+ Collections.singletonList(
+ getAggregationDescriptor(AggregationStep.SINGLE, "root.sg.d1.s2"));
+
+ List<AggregationDescriptor> aggregationDescriptorList2 =
+ Arrays.asList(
+ getAggregationDescriptor(AggregationStep.SINGLE,
"root.sg.d2.a.s2"),
+ getAggregationDescriptor(AggregationStep.SINGLE,
"root.sg.d2.a.s1"));
+
+ List<String> outputColumnNames = Arrays.asList("Device", "__endTime",
"count(s1)", "count(s2)");
+ List<String> devices = Arrays.asList("root.sg.d1", "root.sg.d2.a");
+ Map<String, List<Integer>> deviceToMeasurementIndexesMap = new
LinkedHashMap<>();
+ deviceToMeasurementIndexesMap.put("root.sg.d1", Arrays.asList(1, 3, 2));
+ deviceToMeasurementIndexesMap.put("root.sg.d2.a", Arrays.asList(1, 3, 2));
+
+ checkPushDown(
+ "select __endTime, count(s1), count(s2) from root.sg.d1, root.sg.d2.a
group by ([0, 100), 10ms) align by device;",
+ new TestPlanBuilder()
+ .deviceView(
+ "6",
+ outputColumnNames,
+ devices,
+ deviceToMeasurementIndexesMap,
+ Arrays.asList(
+ new TestPlanBuilder()
+ .aggregationTimeJoin(
+ 0,
+ Arrays.asList(
+ schemaMap.get("root.sg.d1.s2"),
schemaMap.get("root.sg.d1.s1")),
+ Arrays.asList(
+ aggregationDescriptorList1_2,
aggregationDescriptorList1_1),
+ groupByTimeParameter)
+ .columnInject("3", groupByTimeParameter)
+ .getRoot(),
+ new TestPlanBuilder()
+ .alignedAggregationScan(
+ "4",
+ schemaMap.get("root.sg.d2.a"),
+ aggregationDescriptorList2,
+ groupByTimeParameter,
+ false)
+ .columnInject("5", groupByTimeParameter)
+ .getRoot()))
+ .getRoot(),
+ new TestPlanBuilder()
+ .deviceView(
+ "6",
+ outputColumnNames,
+ devices,
+ deviceToMeasurementIndexesMap,
+ Arrays.asList(
+ new TestPlanBuilder()
+ .aggregationTimeJoin(
+ 0,
+ Arrays.asList(
+ schemaMap.get("root.sg.d1.s2"),
schemaMap.get("root.sg.d1.s1")),
+ Arrays.asList(
+ aggregationDescriptorList1_2,
aggregationDescriptorList1_1),
+ groupByTimeParameter)
+ .columnInject("3", groupByTimeParameter)
+ .getRoot(),
+ new TestPlanBuilder()
+ .alignedAggregationScan(
+ "4",
+ schemaMap.get("root.sg.d2.a"),
+ aggregationDescriptorList2,
+ groupByTimeParameter,
+ true)
+ .getRoot()))
+ .getRoot());
+ }
+
+ private AggregationDescriptor getAggregationDescriptor(AggregationStep step,
String path) {
+ return new AggregationDescriptor(
+ TAggregationType.COUNT.name().toLowerCase(),
+ step,
+ Collections.singletonList(new TimeSeriesOperand(schemaMap.get(path))),
+ new HashMap<>());
+ }
+}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/optimization/LimitOffsetPushDownTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/optimization/LimitOffsetPushDownTest.java
index 717cf9116db..7f086ad0be3 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/optimization/LimitOffsetPushDownTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/optimization/LimitOffsetPushDownTest.java
@@ -31,10 +31,8 @@ import
org.apache.iotdb.db.queryengine.plan.analyze.FakePartitionFetcherImpl;
import org.apache.iotdb.db.queryengine.plan.analyze.FakeSchemaFetcherImpl;
import org.apache.iotdb.db.queryengine.plan.expression.Expression;
import org.apache.iotdb.db.queryengine.plan.parser.StatementGenerator;
-import org.apache.iotdb.db.queryengine.plan.planner.LogicalPlanner;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.GroupByTimeParameter;
-import org.apache.iotdb.db.queryengine.plan.statement.Statement;
import org.apache.iotdb.db.queryengine.plan.statement.component.FillPolicy;
import
org.apache.iotdb.db.queryengine.plan.statement.component.GroupByTimeComponent;
import org.apache.iotdb.db.queryengine.plan.statement.crud.QueryStatement;
@@ -57,6 +55,8 @@ import static
org.apache.iotdb.db.queryengine.plan.expression.ExpressionFactory.
import static
org.apache.iotdb.db.queryengine.plan.expression.ExpressionFactory.gt;
import static
org.apache.iotdb.db.queryengine.plan.expression.ExpressionFactory.intValue;
import static
org.apache.iotdb.db.queryengine.plan.expression.ExpressionFactory.timeSeries;
+import static
org.apache.iotdb.db.queryengine.plan.optimization.OptimizationTestUtil.checkCannotPushDown;
+import static
org.apache.iotdb.db.queryengine.plan.optimization.OptimizationTestUtil.checkPushDown;
public class LimitOffsetPushDownTest {
@@ -262,42 +262,19 @@ public class LimitOffsetPushDownTest {
.filter(
"1",
Collections.singletonList(timeSeries(schemaMap.get("root.sg.d1.s1"))),
- gt(timeSeries(schemaMap.get("root.sg.d1.s1")), intValue("10")))
+ gt(timeSeries(schemaMap.get("root.sg.d1.s1")), intValue("10")),
+ false)
.offset("2", 100)
.limit("3", 100)
.getRoot());
}
private void checkPushDown(String sql, PlanNode rawPlan, PlanNode optPlan) {
- Statement statement = StatementGenerator.createStatement(sql,
ZonedDateTime.now().getOffset());
-
- MPPQueryContext context = new MPPQueryContext(new QueryId("test_query"));
- Analyzer analyzer =
- new Analyzer(context, new FakePartitionFetcherImpl(), new
FakeSchemaFetcherImpl());
- Analysis analysis = analyzer.analyze(statement);
-
- LogicalPlanner planner = new LogicalPlanner(context, new ArrayList<>());
- PlanNode actualPlan = planner.plan(analysis).getRootNode();
- Assert.assertEquals(rawPlan, actualPlan);
-
- PlanNode actualOptPlan = new LimitOffsetPushDown().optimize(actualPlan,
analysis, context);
- Assert.assertEquals(optPlan, actualOptPlan);
+ OptimizationTestUtil.checkPushDown(new LimitOffsetPushDown(), sql,
rawPlan, optPlan);
}
private void checkCannotPushDown(String sql, PlanNode rawPlan) {
- Statement statement = StatementGenerator.createStatement(sql,
ZonedDateTime.now().getOffset());
-
- MPPQueryContext context = new MPPQueryContext(new QueryId("test_query"));
- Analyzer analyzer =
- new Analyzer(context, new FakePartitionFetcherImpl(), new
FakeSchemaFetcherImpl());
- Analysis analysis = analyzer.analyze(statement);
-
- LogicalPlanner planner = new LogicalPlanner(context, new ArrayList<>());
- PlanNode actualPlan = planner.plan(analysis).getRootNode();
-
- Assert.assertEquals(rawPlan, actualPlan);
- Assert.assertEquals(
- actualPlan, new LimitOffsetPushDown().optimize(actualPlan, analysis,
context));
+ OptimizationTestUtil.checkCannotPushDown(new LimitOffsetPushDown(), sql,
rawPlan);
}
// test for limit/offset push down in group by time
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/optimization/OptimizationTestUtil.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/optimization/OptimizationTestUtil.java
new file mode 100644
index 00000000000..73560bd2112
--- /dev/null
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/optimization/OptimizationTestUtil.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.optimization;
+
+import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
+import org.apache.iotdb.db.queryengine.common.QueryId;
+import org.apache.iotdb.db.queryengine.plan.analyze.Analysis;
+import org.apache.iotdb.db.queryengine.plan.analyze.Analyzer;
+import org.apache.iotdb.db.queryengine.plan.analyze.FakePartitionFetcherImpl;
+import org.apache.iotdb.db.queryengine.plan.analyze.FakeSchemaFetcherImpl;
+import org.apache.iotdb.db.queryengine.plan.parser.StatementGenerator;
+import org.apache.iotdb.db.queryengine.plan.planner.LogicalPlanner;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.queryengine.plan.statement.Statement;
+
+import org.junit.Assert;
+
+import java.time.ZonedDateTime;
+import java.util.ArrayList;
+
+public class OptimizationTestUtil {
+
+ private OptimizationTestUtil() {
+ // util class
+ }
+
+ public static void checkPushDown(
+ PlanOptimizer optimizer, String sql, PlanNode rawPlan, PlanNode optPlan)
{
+ Statement statement = StatementGenerator.createStatement(sql,
ZonedDateTime.now().getOffset());
+
+ MPPQueryContext context = new MPPQueryContext(new QueryId("test_query"));
+ Analyzer analyzer =
+ new Analyzer(context, new FakePartitionFetcherImpl(), new
FakeSchemaFetcherImpl());
+ Analysis analysis = analyzer.analyze(statement);
+
+ LogicalPlanner planner = new LogicalPlanner(context, new ArrayList<>());
+ PlanNode actualPlan = planner.plan(analysis).getRootNode();
+ Assert.assertEquals(rawPlan, actualPlan);
+
+ PlanNode actualOptPlan = optimizer.optimize(actualPlan, analysis, context);
+ Assert.assertEquals(optPlan, actualOptPlan);
+ }
+
+ public static void checkCannotPushDown(PlanOptimizer optimizer, String sql,
PlanNode rawPlan) {
+ Statement statement = StatementGenerator.createStatement(sql,
ZonedDateTime.now().getOffset());
+
+ MPPQueryContext context = new MPPQueryContext(new QueryId("test_query"));
+ Analyzer analyzer =
+ new Analyzer(context, new FakePartitionFetcherImpl(), new
FakeSchemaFetcherImpl());
+ Analysis analysis = analyzer.analyze(statement);
+
+ LogicalPlanner planner = new LogicalPlanner(context, new ArrayList<>());
+ PlanNode actualPlan = planner.plan(analysis).getRootNode();
+
+ Assert.assertEquals(rawPlan, actualPlan);
+ Assert.assertEquals(actualPlan, optimizer.optimize(actualPlan, analysis,
context));
+ }
+}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/optimization/TestPlanBuilder.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/optimization/TestPlanBuilder.java
index 699e7b265be..c3684bc9140 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/optimization/TestPlanBuilder.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/optimization/TestPlanBuilder.java
@@ -25,17 +25,24 @@ import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.queryengine.plan.expression.Expression;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.AggregationNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ColumnInjectNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.DeviceViewNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.FillNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.FilterNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.IntoNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.LimitNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.OffsetNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SlidingWindowAggregationNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TimeJoinNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TransformNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedSeriesAggregationScanNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedSeriesScanNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesAggregationScanNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesScanNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.AggregationDescriptor;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.FillDescriptor;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.GroupByTimeParameter;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.IntoPathDescriptor;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.OrderByParameter;
import org.apache.iotdb.db.queryengine.plan.statement.component.FillPolicy;
@@ -43,6 +50,7 @@ import
org.apache.iotdb.db.queryengine.plan.statement.component.OrderByKey;
import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering;
import org.apache.iotdb.db.queryengine.plan.statement.component.SortItem;
import org.apache.iotdb.db.queryengine.plan.statement.literal.LongLiteral;
+import
org.apache.iotdb.db.utils.columngenerator.parameter.SlidingTimeColumnGeneratorParameter;
import java.time.ZonedDateTime;
import java.util.ArrayList;
@@ -90,6 +98,111 @@ public class TestPlanBuilder {
return this;
}
+ public TestPlanBuilder aggregationScan(
+ String id,
+ PartialPath path,
+ List<AggregationDescriptor> aggregationDescriptors,
+ GroupByTimeParameter groupByTimeParameter,
+ boolean outputEndTime) {
+ SeriesAggregationScanNode aggregationScanNode =
+ new SeriesAggregationScanNode(
+ new PlanNodeId(id),
+ (MeasurementPath) path,
+ aggregationDescriptors,
+ Ordering.ASC,
+ groupByTimeParameter);
+ aggregationScanNode.setOutputEndTime(outputEndTime);
+ this.root = aggregationScanNode;
+ return this;
+ }
+
+ public TestPlanBuilder alignedAggregationScan(
+ String id,
+ PartialPath path,
+ List<AggregationDescriptor> aggregationDescriptors,
+ GroupByTimeParameter groupByTimeParameter,
+ boolean outputEndTime) {
+ AlignedSeriesAggregationScanNode aggregationScanNode =
+ new AlignedSeriesAggregationScanNode(
+ new PlanNodeId(id),
+ (AlignedPath) path,
+ aggregationDescriptors,
+ Ordering.ASC,
+ groupByTimeParameter);
+ aggregationScanNode.setOutputEndTime(outputEndTime);
+ this.root = aggregationScanNode;
+ return this;
+ }
+
+ public TestPlanBuilder rawDataAggregation(
+ String id,
+ List<AggregationDescriptor> aggregationDescriptors,
+ GroupByTimeParameter groupByTimeParameter,
+ boolean outputEndTime) {
+ AggregationNode aggregationNode =
+ new AggregationNode(
+ new PlanNodeId(String.valueOf(id)),
+ Collections.singletonList(getRoot()),
+ aggregationDescriptors,
+ groupByTimeParameter,
+ Ordering.ASC);
+ aggregationNode.setOutputEndTime(outputEndTime);
+ this.root = aggregationNode;
+ return this;
+ }
+
+ public TestPlanBuilder slidingWindow(
+ String id,
+ List<AggregationDescriptor> aggregationDescriptors,
+ GroupByTimeParameter groupByTimeParameter,
+ boolean outputEndTime) {
+ SlidingWindowAggregationNode slidingWindowAggregationNode =
+ new SlidingWindowAggregationNode(
+ new PlanNodeId(id),
+ getRoot(),
+ aggregationDescriptors,
+ groupByTimeParameter,
+ Ordering.ASC);
+ slidingWindowAggregationNode.setOutputEndTime(outputEndTime);
+ this.root = slidingWindowAggregationNode;
+ return this;
+ }
+
+ public TestPlanBuilder aggregationTimeJoin(
+ int startId,
+ List<PartialPath> paths,
+ List<List<AggregationDescriptor>> aggregationDescriptorsList,
+ GroupByTimeParameter groupByTimeParameter) {
+ int planId = startId;
+
+ List<PlanNode> seriesSourceNodes = new ArrayList<>();
+ for (int i = 0; i < paths.size(); i++) {
+ PartialPath path = paths.get(i);
+ if (path instanceof MeasurementPath) {
+ seriesSourceNodes.add(
+ new SeriesAggregationScanNode(
+ new PlanNodeId(String.valueOf(planId)),
+ (MeasurementPath) paths.get(i),
+ aggregationDescriptorsList.get(i),
+ Ordering.ASC,
+ groupByTimeParameter));
+ } else {
+ seriesSourceNodes.add(
+ new AlignedSeriesAggregationScanNode(
+ new PlanNodeId(String.valueOf(planId)),
+ (AlignedPath) paths.get(i),
+ aggregationDescriptorsList.get(i),
+ Ordering.ASC,
+ groupByTimeParameter));
+ }
+ planId++;
+ }
+
+ this.root =
+ new TimeJoinNode(new PlanNodeId(String.valueOf(planId)), Ordering.ASC,
seriesSourceNodes);
+ return this;
+ }
+
public TestPlanBuilder timeJoin(List<PartialPath> paths) {
int planId = 0;
@@ -163,14 +276,15 @@ public class TestPlanBuilder {
return this;
}
- public TestPlanBuilder filter(String id, List<Expression> expressions,
Expression predicate) {
+ public TestPlanBuilder filter(
+ String id, List<Expression> expressions, Expression predicate, boolean
isGroupByTime) {
this.root =
new FilterNode(
new PlanNodeId(id),
getRoot(),
expressions.toArray(new Expression[0]),
predicate,
- false,
+ isGroupByTime,
ZonedDateTime.now().getOffset(),
Ordering.ASC);
return this;
@@ -185,4 +299,35 @@ public class TestPlanBuilder {
this.root = new IntoNode(new PlanNodeId(id), getRoot(),
intoPathDescriptor);
return this;
}
+
+ public TestPlanBuilder columnInject(String id, GroupByTimeParameter
groupByTimeParameter) {
+ this.root =
+ new ColumnInjectNode(
+ new PlanNodeId(id),
+ getRoot(),
+ 0,
+ new SlidingTimeColumnGeneratorParameter(groupByTimeParameter,
true));
+ return this;
+ }
+
+ public TestPlanBuilder deviceView(
+ String id,
+ List<String> outputColumnNames,
+ List<String> devices,
+ Map<String, List<Integer>> deviceToMeasurementIndexesMap,
+ List<PlanNode> children) {
+ DeviceViewNode deviceViewNode =
+ new DeviceViewNode(
+ new PlanNodeId(id),
+ new OrderByParameter(
+ Arrays.asList(
+ new SortItem(OrderByKey.DEVICE, Ordering.ASC),
+ new SortItem(OrderByKey.TIME, Ordering.ASC))),
+ outputColumnNames,
+ devices,
+ deviceToMeasurementIndexesMap);
+ deviceViewNode.setChildren(children);
+ this.root = deviceViewNode;
+ return this;
+ }
}