This is an automated email from the ASF dual-hosted git repository. hui pushed a commit to branch lmh/PredicatePushDown in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit ad1e882fb8bc0530d7ef8f7d49f151a3a808a057 Author: Minghui Liu <[email protected]> AuthorDate: Sun Jan 7 23:08:40 2024 +0800 add UTs --- .../optimization/ColumnInjectionPushDownTest.java | 51 +--- .../plan/optimization/LimitOffsetPushDownTest.java | 38 +-- .../plan/optimization/OptimizationTestUtil.java | 59 +++- .../plan/optimization/PredicatePushDownTest.java | 333 +++++++++++++++++++++ .../plan/optimization/TestPlanBuilder.java | 74 ++++- 5 files changed, 468 insertions(+), 87 deletions(-) diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/optimization/ColumnInjectionPushDownTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/optimization/ColumnInjectionPushDownTest.java index f5dd0507b27..2db73bb9a3f 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/optimization/ColumnInjectionPushDownTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/optimization/ColumnInjectionPushDownTest.java @@ -20,10 +20,6 @@ package org.apache.iotdb.db.queryengine.plan.optimization; import org.apache.iotdb.common.rpc.thrift.TAggregationType; -import org.apache.iotdb.commons.exception.IllegalPathException; -import org.apache.iotdb.commons.path.AlignedPath; -import org.apache.iotdb.commons.path.MeasurementPath; -import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.db.queryengine.plan.expression.ExpressionFactory; import org.apache.iotdb.db.queryengine.plan.expression.leaf.TimeSeriesOperand; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; @@ -31,7 +27,6 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.AggregationDe import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.AggregationStep; import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.GroupByTimeParameter; import org.apache.iotdb.db.queryengine.plan.statement.component.FillPolicy; -import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.utils.TimeDuration; import org.junit.Test; @@ -43,47 +38,9 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; -public class ColumnInjectionPushDownTest { +import static org.apache.iotdb.db.queryengine.plan.optimization.OptimizationTestUtil.schemaMap; - private static final Map<String, PartialPath> schemaMap = new HashMap<>(); - - static { - try { - schemaMap.put("root.sg.d1.s1", new MeasurementPath("root.sg.d1.s1", TSDataType.INT32)); - schemaMap.put("root.sg.d1.s2", new MeasurementPath("root.sg.d1.s2", TSDataType.DOUBLE)); - - MeasurementPath d2s1 = new MeasurementPath("root.sg.d2.a.s1", TSDataType.INT32); - d2s1.setUnderAlignedEntity(true); - schemaMap.put("root.sg.d2.a.s1", d2s1); - - AlignedPath aligned_d2s1 = - new AlignedPath( - "root.sg.d2.a", - Collections.singletonList("s1"), - Collections.singletonList(d2s1.getMeasurementSchema())); - schemaMap.put("aligned_root.sg.d2.a.s1", aligned_d2s1); - - MeasurementPath d2s2 = new MeasurementPath("root.sg.d2.a.s2", TSDataType.DOUBLE); - d2s2.setUnderAlignedEntity(true); - schemaMap.put("root.sg.d2.a.s2", d2s2); - - AlignedPath aligned_d2s2 = - new AlignedPath( - "root.sg.d2.a", - Collections.singletonList("s2"), - Collections.singletonList(d2s2.getMeasurementSchema())); - schemaMap.put("aligned_root.sg.d2.a.s2", aligned_d2s2); - - AlignedPath alignedPath = - new AlignedPath( - "root.sg.d2.a", - Arrays.asList("s2", "s1"), - Arrays.asList(d2s2.getMeasurementSchema(), d2s1.getMeasurementSchema())); - schemaMap.put("root.sg.d2.a", alignedPath); - } catch (IllegalPathException e) { - e.printStackTrace(); - } - } +public class ColumnInjectionPushDownTest { private void checkPushDown(String sql, PlanNode rawPlan, PlanNode optPlan) { OptimizationTestUtil.checkPushDown(new ColumnInjectionPushDown(), sql, rawPlan, optPlan); @@ -612,7 +569,7 @@ public class ColumnInjectionPushDownTest { new TestPlanBuilder() .alignedAggregationScan( "4", - schemaMap.get("root.sg.d2.a"), + schemaMap.get("desc_root.sg.d2.a"), aggregationDescriptorList2, groupByTimeParameter, false) @@ -639,7 +596,7 @@ public class ColumnInjectionPushDownTest { new TestPlanBuilder() .alignedAggregationScan( "4", - schemaMap.get("root.sg.d2.a"), + schemaMap.get("desc_root.sg.d2.a"), aggregationDescriptorList2, groupByTimeParameter, true) diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/optimization/LimitOffsetPushDownTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/optimization/LimitOffsetPushDownTest.java index 7f086ad0be3..a234741a02b 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/optimization/LimitOffsetPushDownTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/optimization/LimitOffsetPushDownTest.java @@ -19,9 +19,6 @@ package org.apache.iotdb.db.queryengine.plan.optimization; -import org.apache.iotdb.commons.exception.IllegalPathException; -import org.apache.iotdb.commons.path.AlignedPath; -import org.apache.iotdb.commons.path.MeasurementPath; import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.db.queryengine.common.MPPQueryContext; import org.apache.iotdb.db.queryengine.common.QueryId; @@ -36,7 +33,6 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.GroupByTimePa import org.apache.iotdb.db.queryengine.plan.statement.component.FillPolicy; import org.apache.iotdb.db.queryengine.plan.statement.component.GroupByTimeComponent; import org.apache.iotdb.db.queryengine.plan.statement.crud.QueryStatement; -import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.junit.Assert; import org.junit.Test; @@ -45,47 +41,18 @@ import java.time.ZonedDateTime; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; -import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; -import java.util.Map; import static org.apache.iotdb.db.queryengine.plan.expression.ExpressionFactory.add; import static org.apache.iotdb.db.queryengine.plan.expression.ExpressionFactory.function; import static org.apache.iotdb.db.queryengine.plan.expression.ExpressionFactory.gt; import static org.apache.iotdb.db.queryengine.plan.expression.ExpressionFactory.intValue; import static org.apache.iotdb.db.queryengine.plan.expression.ExpressionFactory.timeSeries; -import static org.apache.iotdb.db.queryengine.plan.optimization.OptimizationTestUtil.checkCannotPushDown; -import static org.apache.iotdb.db.queryengine.plan.optimization.OptimizationTestUtil.checkPushDown; +import static org.apache.iotdb.db.queryengine.plan.optimization.OptimizationTestUtil.schemaMap; public class LimitOffsetPushDownTest { - private static final Map<String, PartialPath> schemaMap = new HashMap<>(); - - static { - try { - schemaMap.put("root.sg.d1.s1", new MeasurementPath("root.sg.d1.s1", TSDataType.INT32)); - schemaMap.put("root.sg.d1.s2", new MeasurementPath("root.sg.d1.s2", TSDataType.DOUBLE)); - schemaMap.put("root.sg.d2.s1", new MeasurementPath("root.sg.d2.s1", TSDataType.INT32)); - schemaMap.put("root.sg.d2.s2", new MeasurementPath("root.sg.d2.s2", TSDataType.DOUBLE)); - - MeasurementPath aS1 = new MeasurementPath("root.sg.d2.a.s1", TSDataType.INT32); - MeasurementPath aS2 = new MeasurementPath("root.sg.d2.a.s2", TSDataType.DOUBLE); - AlignedPath alignedPath = - new AlignedPath( - "root.sg.d2.a", - Arrays.asList("s1", "s2"), - Arrays.asList(aS1.getMeasurementSchema(), aS2.getMeasurementSchema())); - aS1.setUnderAlignedEntity(true); - aS2.setUnderAlignedEntity(true); - schemaMap.put("root.sg.d2.a.s1", aS1); - schemaMap.put("root.sg.d2.a.s2", aS2); - schemaMap.put("root.sg.d2.a", alignedPath); - } catch (IllegalPathException e) { - e.printStackTrace(); - } - } - @Test public void testNonAlignedPushDown() { checkPushDown( @@ -207,7 +174,8 @@ public class LimitOffsetPushDownTest { checkCannotPushDown( "select s1, s2 from root.sg.d1 limit 100;", new TestPlanBuilder() - .timeJoin(Arrays.asList(schemaMap.get("root.sg.d1.s1"), schemaMap.get("root.sg.d1.s2"))) + .fullOuterTimeJoin( + Arrays.asList(schemaMap.get("root.sg.d1.s1"), schemaMap.get("root.sg.d1.s2"))) .limit("3", 100) .getRoot()); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/optimization/OptimizationTestUtil.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/optimization/OptimizationTestUtil.java index 73560bd2112..02ccd3ca1e8 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/optimization/OptimizationTestUtil.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/optimization/OptimizationTestUtil.java @@ -19,6 +19,10 @@ package org.apache.iotdb.db.queryengine.plan.optimization; +import org.apache.iotdb.commons.exception.IllegalPathException; +import org.apache.iotdb.commons.path.AlignedPath; +import org.apache.iotdb.commons.path.MeasurementPath; +import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.db.queryengine.common.MPPQueryContext; import org.apache.iotdb.db.queryengine.common.QueryId; import org.apache.iotdb.db.queryengine.plan.analyze.Analysis; @@ -29,11 +33,16 @@ import org.apache.iotdb.db.queryengine.plan.parser.StatementGenerator; import org.apache.iotdb.db.queryengine.plan.planner.LogicalPlanner; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; import org.apache.iotdb.db.queryengine.plan.statement.Statement; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.junit.Assert; import java.time.ZonedDateTime; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; public class OptimizationTestUtil { @@ -41,6 +50,52 @@ public class OptimizationTestUtil { // util class } + public static final Map<String, PartialPath> schemaMap = new HashMap<>(); + + static { + try { + schemaMap.put("root.sg.d1.s1", new MeasurementPath("root.sg.d1.s1", TSDataType.INT32)); + schemaMap.put("root.sg.d1.s2", new MeasurementPath("root.sg.d1.s2", TSDataType.DOUBLE)); + schemaMap.put("root.sg.d2.s1", new MeasurementPath("root.sg.d2.s1", TSDataType.INT32)); + schemaMap.put("root.sg.d2.s2", new MeasurementPath("root.sg.d2.s2", TSDataType.DOUBLE)); + + MeasurementPath aS1 = new MeasurementPath("root.sg.d2.a.s1", TSDataType.INT32); + aS1.setUnderAlignedEntity(true); + MeasurementPath aS2 = new MeasurementPath("root.sg.d2.a.s2", TSDataType.DOUBLE); + aS2.setUnderAlignedEntity(true); + schemaMap.put("root.sg.d2.a.s1", aS1); + schemaMap.put("root.sg.d2.a.s2", aS2); + + AlignedPath alignedPath = + new AlignedPath( + "root.sg.d2.a", + Arrays.asList("s1", "s2"), + Arrays.asList(aS1.getMeasurementSchema(), aS2.getMeasurementSchema())); + AlignedPath descOrderAlignedPath = + new AlignedPath( + "root.sg.d2.a", + Arrays.asList("s2", "s1"), + Arrays.asList(aS2.getMeasurementSchema(), aS1.getMeasurementSchema())); + schemaMap.put("root.sg.d2.a", alignedPath); + schemaMap.put("desc_root.sg.d2.a", descOrderAlignedPath); + + AlignedPath aligned_d2s1 = + new AlignedPath( + "root.sg.d2.a", + Collections.singletonList("s1"), + Collections.singletonList(aS1.getMeasurementSchema())); + schemaMap.put("aligned_root.sg.d2.a.s1", aligned_d2s1); + AlignedPath aligned_d2s2 = + new AlignedPath( + "root.sg.d2.a", + Collections.singletonList("s2"), + Collections.singletonList(aS2.getMeasurementSchema())); + schemaMap.put("aligned_root.sg.d2.a.s2", aligned_d2s2); + } catch (IllegalPathException e) { + e.printStackTrace(); + } + } + public static void checkPushDown( PlanOptimizer optimizer, String sql, PlanNode rawPlan, PlanNode optPlan) { Statement statement = StatementGenerator.createStatement(sql, ZonedDateTime.now().getOffset()); @@ -70,6 +125,8 @@ public class OptimizationTestUtil { PlanNode actualPlan = planner.plan(analysis).getRootNode(); Assert.assertEquals(rawPlan, actualPlan); - Assert.assertEquals(actualPlan, optimizer.optimize(actualPlan, analysis, context)); + + PlanNode actualOptPlan = optimizer.optimize(actualPlan, analysis, context); + Assert.assertEquals(actualPlan, actualOptPlan); } } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/optimization/PredicatePushDownTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/optimization/PredicatePushDownTest.java new file mode 100644 index 00000000000..a28a103fda7 --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/optimization/PredicatePushDownTest.java @@ -0,0 +1,333 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.optimization; + +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; +import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering; + +import org.junit.Test; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.iotdb.db.queryengine.plan.expression.ExpressionFactory.and; +import static org.apache.iotdb.db.queryengine.plan.expression.ExpressionFactory.gt; +import static org.apache.iotdb.db.queryengine.plan.expression.ExpressionFactory.intValue; +import static org.apache.iotdb.db.queryengine.plan.expression.ExpressionFactory.or; +import static org.apache.iotdb.db.queryengine.plan.expression.ExpressionFactory.timeSeries; +import static org.apache.iotdb.db.queryengine.plan.optimization.OptimizationTestUtil.schemaMap; + +public class PredicatePushDownTest { + + @Test + public void testPushDownAlignByTime() { + checkPushDown( + "select s1, s2 from root.sg.d1 where time > 100 and s1 > 10", + new TestPlanBuilder() + .fullOuterTimeJoin( + Arrays.asList(schemaMap.get("root.sg.d1.s1"), schemaMap.get("root.sg.d1.s2"))) + .filter( + "3", + Arrays.asList( + timeSeries(schemaMap.get("root.sg.d1.s1")), + timeSeries(schemaMap.get("root.sg.d1.s2"))), + gt(timeSeries(schemaMap.get("root.sg.d1.s1")), intValue("10")), + false) + .getRoot(), + new TestPlanBuilder() + .leftOuterTimeJoin( + "4", + Ordering.ASC, + new TestPlanBuilder() + .scan( + "0", + schemaMap.get("root.sg.d1.s1"), + gt(timeSeries(schemaMap.get("root.sg.d1.s1")), intValue("10"))) + .getRoot(), + new TestPlanBuilder().scan("1", schemaMap.get("root.sg.d1.s2")).getRoot()) + .getRoot()); + + checkPushDown( + "select s1, s2 from root.sg.d1 where time > 100 and s1 > 10 and s2 > 10", + new TestPlanBuilder() + .fullOuterTimeJoin( + Arrays.asList(schemaMap.get("root.sg.d1.s1"), schemaMap.get("root.sg.d1.s2"))) + .filter( + "3", + Arrays.asList( + timeSeries(schemaMap.get("root.sg.d1.s1")), + timeSeries(schemaMap.get("root.sg.d1.s2"))), + and( + gt(timeSeries(schemaMap.get("root.sg.d1.s1")), intValue("10")), + gt(timeSeries(schemaMap.get("root.sg.d1.s2")), intValue("10"))), + false) + .getRoot(), + new TestPlanBuilder() + .innerTimeJoin( + "4", + Ordering.ASC, + Arrays.asList("0", "1"), + Arrays.asList(schemaMap.get("root.sg.d1.s1"), schemaMap.get("root.sg.d1.s2")), + Arrays.asList( + gt(timeSeries(schemaMap.get("root.sg.d1.s1")), intValue("10")), + gt(timeSeries(schemaMap.get("root.sg.d1.s2")), intValue("10")))) + .getRoot()); + + checkPushDown( + "select s1, s2 from root.sg.d2.a where time > 100 and s1 > 10", + new TestPlanBuilder() + .scanAligned("0", schemaMap.get("root.sg.d2.a")) + .filter( + "1", + Arrays.asList( + timeSeries(schemaMap.get("root.sg.d2.a.s1")), + timeSeries(schemaMap.get("root.sg.d2.a.s2"))), + gt(timeSeries(schemaMap.get("root.sg.d2.a.s1")), intValue("10")), + false) + .getRoot(), + new TestPlanBuilder() + .scanAligned( + "0", + schemaMap.get("root.sg.d2.a"), + gt(timeSeries(schemaMap.get("root.sg.d2.a.s1")), intValue("10"))) + .getRoot()); + + checkPushDown( + "select s1, s2 from root.sg.d2.a where time > 100 and (s1 > 10 or s2 > 10)", + new TestPlanBuilder() + .scanAligned("0", schemaMap.get("root.sg.d2.a")) + .filter( + "1", + Arrays.asList( + timeSeries(schemaMap.get("root.sg.d2.a.s1")), + timeSeries(schemaMap.get("root.sg.d2.a.s2"))), + or( + gt(timeSeries(schemaMap.get("root.sg.d2.a.s1")), intValue("10")), + gt(timeSeries(schemaMap.get("root.sg.d2.a.s2")), intValue("10"))), + false) + .getRoot(), + new TestPlanBuilder() + .scanAligned( + "0", + schemaMap.get("root.sg.d2.a"), + or( + gt(timeSeries(schemaMap.get("root.sg.d2.a.s1")), intValue("10")), + gt(timeSeries(schemaMap.get("root.sg.d2.a.s2")), intValue("10")))) + .getRoot()); + + checkPushDown( + "select s1, s2 from root.sg.* where time > 100 and s1 > 10", + new TestPlanBuilder() + .fullOuterTimeJoin( + Arrays.asList( + schemaMap.get("root.sg.d2.s1"), + schemaMap.get("root.sg.d1.s1"), + schemaMap.get("root.sg.d2.s2"), + schemaMap.get("root.sg.d1.s2"))) + .filter( + "5", + Arrays.asList( + timeSeries(schemaMap.get("root.sg.d2.s1")), + timeSeries(schemaMap.get("root.sg.d1.s1")), + timeSeries(schemaMap.get("root.sg.d2.s2")), + timeSeries(schemaMap.get("root.sg.d1.s2"))), + and( + gt(timeSeries(schemaMap.get("root.sg.d1.s1")), intValue("10")), + gt(timeSeries(schemaMap.get("root.sg.d2.s1")), intValue("10"))), + false) + .getRoot(), + new TestPlanBuilder() + .leftOuterTimeJoin( + "8", + Ordering.ASC, + new TestPlanBuilder() + .innerTimeJoin( + "6", + Ordering.ASC, + Arrays.asList("0", "1"), + Arrays.asList( + schemaMap.get("root.sg.d2.s1"), schemaMap.get("root.sg.d1.s1")), + Arrays.asList( + gt(timeSeries(schemaMap.get("root.sg.d2.s1")), intValue("10")), + gt(timeSeries(schemaMap.get("root.sg.d1.s1")), intValue("10")))) + .getRoot(), + new TestPlanBuilder() + .fullOuterTimeJoin( + "7", + Ordering.ASC, + Arrays.asList("2", "3"), + Arrays.asList( + schemaMap.get("root.sg.d2.s2"), schemaMap.get("root.sg.d1.s2"))) + .getRoot()) + .getRoot()); + } + + @Test + public void testCannotPushDownAlignByTime() { + checkCannotPushDown( + "select s1, s2 from root.sg.d1 where time > 100 and (s1 > 10 or s2 > 10)", + new TestPlanBuilder() + .fullOuterTimeJoin( + Arrays.asList(schemaMap.get("root.sg.d1.s1"), schemaMap.get("root.sg.d1.s2"))) + .filter( + "3", + Arrays.asList( + timeSeries(schemaMap.get("root.sg.d1.s1")), + timeSeries(schemaMap.get("root.sg.d1.s2"))), + or( + gt(timeSeries(schemaMap.get("root.sg.d1.s1")), intValue("10")), + gt(timeSeries(schemaMap.get("root.sg.d1.s2")), intValue("10"))), + false) + .getRoot()); + + checkCannotPushDown( + "select a.s1, a.s2 from root.sg.d2 where time > 100 and (a.s1 > 10 or s2 > 10)", + new TestPlanBuilder() + .fullOuterTimeJoin( + "2", + Ordering.ASC, + Arrays.asList("0", "1"), + Arrays.asList(schemaMap.get("root.sg.d2.s2"), schemaMap.get("root.sg.d2.a"))) + .filter( + "3", + Arrays.asList( + timeSeries(schemaMap.get("root.sg.d2.a.s1")), + timeSeries(schemaMap.get("root.sg.d2.a.s2"))), + or( + gt(timeSeries(schemaMap.get("root.sg.d2.a.s1")), intValue("10")), + gt(timeSeries(schemaMap.get("root.sg.d2.s2")), intValue("10"))), + false) + .getRoot()); + } + + @Test + public void testPushDownAlignByDevice() { + + List<String> outputColumnNames = Arrays.asList("Device", "s1", "s2"); + List<String> devices = Arrays.asList("root.sg.d1", "root.sg.d2", "root.sg.d2.a"); + Map<String, List<Integer>> deviceToMeasurementIndexesMap = new HashMap<>(); + deviceToMeasurementIndexesMap.put("root.sg.d1", Arrays.asList(1, 2)); + deviceToMeasurementIndexesMap.put("root.sg.d2", Arrays.asList(1, 2)); + deviceToMeasurementIndexesMap.put("root.sg.d2.a", Arrays.asList(1, 2)); + + checkPushDown( + "select s1, s2 from root.sg.** where time > 100 and s1 > 10 align by device", + new TestPlanBuilder() + .deviceView( + "10", + outputColumnNames, + devices, + deviceToMeasurementIndexesMap, + Arrays.asList( + new TestPlanBuilder() + .fullOuterTimeJoin( + "2", + Ordering.ASC, + Arrays.asList("0", "1"), + Arrays.asList( + schemaMap.get("root.sg.d1.s1"), schemaMap.get("root.sg.d1.s2"))) + .filter( + "3", + Arrays.asList( + timeSeries(schemaMap.get("root.sg.d1.s1")), + timeSeries(schemaMap.get("root.sg.d1.s2"))), + gt(timeSeries(schemaMap.get("root.sg.d1.s1")), intValue("10")), + false) + .getRoot(), + new TestPlanBuilder() + .fullOuterTimeJoin( + "6", + Ordering.ASC, + Arrays.asList("4", "5"), + Arrays.asList( + schemaMap.get("root.sg.d2.s1"), schemaMap.get("root.sg.d2.s2"))) + .filter( + "7", + Arrays.asList( + timeSeries(schemaMap.get("root.sg.d2.s1")), + timeSeries(schemaMap.get("root.sg.d2.s2"))), + gt(timeSeries(schemaMap.get("root.sg.d2.s1")), intValue("10")), + false) + .getRoot(), + new TestPlanBuilder() + .scanAligned("8", schemaMap.get("root.sg.d2.a")) + .filter( + "9", + Arrays.asList( + timeSeries(schemaMap.get("root.sg.d2.a.s1")), + timeSeries(schemaMap.get("root.sg.d2.a.s2"))), + gt(timeSeries(schemaMap.get("root.sg.d2.a.s1")), intValue("10")), + false) + .getRoot())) + .getRoot(), + new TestPlanBuilder() + .deviceView( + "10", + outputColumnNames, + devices, + deviceToMeasurementIndexesMap, + Arrays.asList( + new TestPlanBuilder() + .leftOuterTimeJoin( + "11", + Ordering.ASC, + new TestPlanBuilder() + .scan( + "0", + schemaMap.get("root.sg.d1.s1"), + gt(timeSeries(schemaMap.get("root.sg.d1.s1")), intValue("10"))) + .getRoot(), + new TestPlanBuilder() + .scan("1", schemaMap.get("root.sg.d1.s2")) + .getRoot()) + .getRoot(), + new TestPlanBuilder() + .leftOuterTimeJoin( + "12", + Ordering.ASC, + new TestPlanBuilder() + .scan( + "4", + schemaMap.get("root.sg.d2.s1"), + gt(timeSeries(schemaMap.get("root.sg.d2.s1")), intValue("10"))) + .getRoot(), + new TestPlanBuilder() + .scan("5", schemaMap.get("root.sg.d2.s2")) + .getRoot()) + .getRoot(), + new TestPlanBuilder() + .scanAligned( + "8", + schemaMap.get("root.sg.d2.a"), + gt(timeSeries(schemaMap.get("root.sg.d2.a.s1")), intValue("10"))) + .getRoot())) + .getRoot()); + } + + private void checkPushDown(String sql, PlanNode rawPlan, PlanNode optPlan) { + OptimizationTestUtil.checkPushDown(new PredicatePushDown(), sql, rawPlan, optPlan); + } + + private void checkCannotPushDown(String sql, PlanNode rawPlan) { + OptimizationTestUtil.checkCannotPushDown(new PredicatePushDown(), sql, rawPlan); + } +} diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/optimization/TestPlanBuilder.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/optimization/TestPlanBuilder.java index affceaf2fd8..9d15f382e23 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/optimization/TestPlanBuilder.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/optimization/TestPlanBuilder.java @@ -36,6 +36,8 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.OffsetNode import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SlidingWindowAggregationNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.TransformNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.join.FullOuterTimeJoinNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.join.InnerTimeJoinNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.join.LeftOuterTimeJoinNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedSeriesAggregationScanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedSeriesScanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesAggregationScanNode; @@ -90,6 +92,13 @@ public class TestPlanBuilder { return this; } + public TestPlanBuilder scan(String id, PartialPath path, Expression predicate) { + SeriesScanNode node = new SeriesScanNode(new PlanNodeId(id), (MeasurementPath) path); + node.setPushDownPredicate(predicate); + this.root = node; + return this; + } + public TestPlanBuilder scanAligned(String id, PartialPath path, int limit, int offset) { AlignedSeriesScanNode node = new AlignedSeriesScanNode(new PlanNodeId(id), (AlignedPath) path); node.setPushDownLimit(limit); @@ -98,6 +107,13 @@ public class TestPlanBuilder { return this; } + public TestPlanBuilder scanAligned(String id, PartialPath path, Expression predicate) { + AlignedSeriesScanNode node = new AlignedSeriesScanNode(new PlanNodeId(id), (AlignedPath) path); + node.setPushDownPredicate(predicate); + this.root = node; + return this; + } + public TestPlanBuilder aggregationScan( String id, PartialPath path, @@ -204,13 +220,17 @@ public class TestPlanBuilder { return this; } - public TestPlanBuilder timeJoin(List<PartialPath> paths) { + public TestPlanBuilder fullOuterTimeJoin(List<PartialPath> paths) { int planId = 0; - List<PlanNode> seriesSourceNodes = new ArrayList<>(); for (PartialPath path : paths) { - seriesSourceNodes.add( - new SeriesScanNode(new PlanNodeId(String.valueOf(planId)), (MeasurementPath) path)); + if (path instanceof AlignedPath) { + seriesSourceNodes.add( + new AlignedSeriesScanNode(new PlanNodeId(String.valueOf(planId)), (AlignedPath) path)); + } else { + seriesSourceNodes.add( + new SeriesScanNode(new PlanNodeId(String.valueOf(planId)), (MeasurementPath) path)); + } planId++; } this.root = @@ -219,6 +239,25 @@ public class TestPlanBuilder { return this; } + public TestPlanBuilder fullOuterTimeJoin( + String rootId, Ordering mergeOrder, List<String> ids, List<PartialPath> paths) { + List<PlanNode> seriesSourceNodes = new ArrayList<>(); + for (int i = 0; i < ids.size(); i++) { + PartialPath path = paths.get(i); + if (path instanceof AlignedPath) { + seriesSourceNodes.add( + new AlignedSeriesScanNode(new PlanNodeId(ids.get(i)), (AlignedPath) paths.get(i))); + } else { + seriesSourceNodes.add( + new SeriesScanNode(new PlanNodeId(ids.get(i)), (MeasurementPath) paths.get(i))); + } + } + this.root = + new FullOuterTimeJoinNode( + new PlanNodeId(String.valueOf(rootId)), mergeOrder, seriesSourceNodes); + return this; + } + public TestPlanBuilder transform(String id, List<Expression> expressions) { this.root = new TransformNode( @@ -332,4 +371,31 @@ public class TestPlanBuilder { this.root = deviceViewNode; return this; } + + public TestPlanBuilder leftOuterTimeJoin( + String id, Ordering mergeOrder, PlanNode leftChild, PlanNode rightChild) { + this.root = new LeftOuterTimeJoinNode(new PlanNodeId(id), mergeOrder, leftChild, rightChild); + return this; + } + + public TestPlanBuilder innerTimeJoin( + String rootId, + Ordering mergeOrder, + List<String> ids, + List<PartialPath> paths, + List<Expression> predicates) { + List<PlanNode> seriesSourceNodes = new ArrayList<>(); + for (int i = 0; i < ids.size(); i++) { + SeriesScanNode seriesScanNode = + new SeriesScanNode( + new PlanNodeId(String.valueOf(ids.get(i))), (MeasurementPath) paths.get(i)); + seriesScanNode.setPushDownPredicate(predicates.get(i)); + seriesSourceNodes.add(seriesScanNode); + } + + this.root = + new InnerTimeJoinNode( + new PlanNodeId(String.valueOf(rootId)), mergeOrder, seriesSourceNodes); + return this; + } }
