This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch force_ci/object_type in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 0e15626c69151bcadb4c206e0048d3a0716c4bba Author: alpass163 <[email protected]> AuthorDate: Fri Nov 7 17:05:08 2025 +0800 implement the intersect (distinct | all ) for table model (#16700) (cherry picked from commit 4ebb89b91e8e433c7997c2fe11ffeb4e2afdf197) --- .../it/query/recent/IoTDBIntersectTableIT.java | 153 +++++++++ .../plan/planner/plan/node/PlanGraphPrinter.java | 10 + .../plan/planner/plan/node/PlanNodeType.java | 4 + .../plan/planner/plan/node/PlanVisitor.java | 5 + .../plan/relational/planner/RelationPlanner.java | 24 +- .../iterative/rule/ImplementIntersectAll.java | 95 ++++++ .../rule/ImplementIntersectDistinctAsUnion.java | 88 +++++ .../iterative/rule/PruneDistinctAggregation.java | 9 + .../iterative/rule/SetOperationNodeTranslator.java | 355 +++++++++++++++++++++ .../relational/planner/node/IntersectNode.java | 98 ++++++ .../plan/relational/planner/node/Patterns.java | 22 +- .../optimizations/LogicalOptimizeFactory.java | 13 + .../optimizations/UnaliasSymbolReferences.java | 31 ++ .../plan/relational/analyzer/IntersectTest.java | 121 +++++++ .../db/relational/grammar/sql/RelationalSql.g4 | 5 +- 15 files changed, 1015 insertions(+), 18 deletions(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/IoTDBIntersectTableIT.java b/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/IoTDBIntersectTableIT.java new file mode 100644 index 00000000000..2bc8d6bddb6 --- /dev/null +++ b/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/IoTDBIntersectTableIT.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.relational.it.query.recent; + +import org.apache.iotdb.it.env.EnvFactory; +import org.apache.iotdb.it.framework.IoTDBTestRunner; +import org.apache.iotdb.itbase.category.TableClusterIT; +import org.apache.iotdb.itbase.category.TableLocalStandaloneIT; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; + +import static org.apache.iotdb.db.it.utils.TestUtils.prepareTableData; +import static org.apache.iotdb.db.it.utils.TestUtils.tableAssertTestFail; +import static org.apache.iotdb.db.it.utils.TestUtils.tableResultSetEqualTest; + +@RunWith(IoTDBTestRunner.class) +@Category({TableLocalStandaloneIT.class, TableClusterIT.class}) +public class IoTDBIntersectTableIT { + protected static final String DATABASE_NAME = "test"; + protected static final String[] createSqls = + new String[] { + "CREATE DATABASE " + DATABASE_NAME, + "USE " + DATABASE_NAME, + // table1: ('d1', 1, 1) * 2, ('d1', 2, 2) *1 + "create table table1(device STRING TAG, s1 INT32 FIELD, s2 INT32 FIELD)", + "insert into table1 values (1, 'd1', 1, 1)", + "insert into table1 values (2, 'd1', 1, 1)", + "insert into table1 values (3, 'd1', 2, 2)", + // table2: ('d1', 1, 1.0) * 3, ('d1', 3, 3.0) *1 + "create table table2(device STRING TAG, s1 INT64 FIELD, s2 DOUBLE FIELD)", + "insert into table2 values (1, 'd1', 1, 1.0)", + "insert into table2 values (2, 'd1', 1, 1.0)", + "insert into table2 values (3, 'd1', 1, 1.0)", + "insert into table2 values (4, 'd1', 3, 3.0)", + // table3: use for testing alias + "create table table3(device STRING TAG, s1_testName INT64 FIELD, s2_testName DOUBLE FIELD)", + "insert into table3 values (1, 'd1', 1, 1.0)", + "insert into table3 values (2, 'd1', 1, 1.0)", + "insert into table3 values (3, 'd1', 1, 1.0)", + "insert into table3 values (4, 'd1', 3, 3.0)", + // table4: test type compatible + "create table table4(device STRING TAG, s1 TEXT FIELD, s2 DOUBLE FIELD)" + }; + + @BeforeClass + public static void setUp() throws Exception { + EnvFactory.getEnv().initClusterEnvironment(); + prepareTableData(createSqls); + } + + @AfterClass + public static void tearDown() throws Exception { + EnvFactory.getEnv().cleanClusterEnvironment(); + } + + @Test + public void normalTest() { + String[] expectedHeader = new String[] {"device", "s1", "s2"}; + + // --- INTERSECT (DISTINCT) --- + // table1 and table2, expected one tuple : ('d1', 1, 1.0) + String[] retArray = + new String[] { + "d1,1,1.0,", + }; + tableResultSetEqualTest( + "select device, s1, s2 from table1 intersect select device, s1, s2 from table2", + expectedHeader, + retArray, + DATABASE_NAME); + tableResultSetEqualTest( + "select device, s1, s2 from table1 intersect distinct select device, s1, s2 from table2", + expectedHeader, + retArray, + DATABASE_NAME); + + // --- INTERSECT ALL --- + // (1, 1.0) shows twice in table1, shows three times in table2 + // expected: min(2, 3) = 2 tuple + retArray = new String[] {"d1,1,1.0,", "d1,1,1.0,"}; + tableResultSetEqualTest( + "select device, s1, s2 from table1 intersect all select device, s1, s2 from table2", + expectedHeader, + retArray, + DATABASE_NAME); + // test table3, the column name is different + tableResultSetEqualTest( + "select device, s1, s2 from table1 intersect all select device, s1_testName, s2_testName from table3", + expectedHeader, + retArray, + DATABASE_NAME); + } + + @Test + public void mappingTest() { + // table1 (aliased): (s1 as col_a) -> (1), (1), (2) + // table2 (aliased): (s2 as col_a) -> (1.0), (1.0), (1.0), (3.0) + // common value: (1.0) + + String[] expectedHeader = new String[] {"col_a"}; + + // --- INTERSECT (DISTINCT) with alias --- + String[] retArray = new String[] {"1.0,"}; + tableResultSetEqualTest( + "select col_a from ((select s1 as col_a, device as col_b from table1) intersect (select s2 as col_a, device as col_b from table2)) order by col_a", + expectedHeader, + retArray, + DATABASE_NAME); + + // --- INTERSECT ALL with alias --- + retArray = new String[] {"1.0,", "1.0,"}; + tableResultSetEqualTest( + "select col_a from ((select s1 as col_a, device as col_b from table1) intersect all (select s2 as col_a, device as col_b from table2)) order by col_a", + expectedHeader, + retArray, + DATABASE_NAME); + } + + @Test + public void exceptionTest() { + // type is incompatible (INT32 vs TEXT) + tableAssertTestFail( + "(select * from table1) intersect all (select * from table4)", + "has incompatible types: INT32, TEXT", + DATABASE_NAME); + + tableAssertTestFail( + "(select * from table1) intersect all (select time from table4)", + "INTERSECT query has different number of fields: 4, 1", + DATABASE_NAME); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java index 6ffc534a712..82e03d8789b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java @@ -75,6 +75,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ExchangeNode import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ExplainAnalyzeNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.GapFillNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.GroupNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.IntersectNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.LinearFillNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.MarkDistinctNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.PatternRecognitionNode; @@ -1106,6 +1107,15 @@ public class PlanGraphPrinter extends PlanVisitor<List<String>, PlanGraphPrinter return render(node, boxValue, context); } + @Override + public List<String> visitIntersect(IntersectNode node, GraphContext context) { + List<String> boxValue = new ArrayList<>(); + boxValue.add(String.format("Intersect-%s", node.getPlanNodeId().getId())); + boxValue.add(String.format("OutputSymbols: %s", node.getOutputSymbols())); + boxValue.add(String.format("isDistinct: %s", node.isDistinct())); + return render(node, boxValue, context); + } + private List<String> render(PlanNode node, List<String> nodeBoxString, GraphContext context) { Box box = new Box(nodeBoxString); List<List<String>> children = new ArrayList<>(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java index 902102797c9..c4b77def053 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java @@ -123,6 +123,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.planner.node.EnforceSingl import org.apache.iotdb.db.queryengine.plan.relational.planner.node.GapFillNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.GroupNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.InformationSchemaTableScanNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.IntersectNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.LinearFillNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.MarkDistinctNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.PatternRecognitionNode; @@ -313,6 +314,7 @@ public enum PlanNodeType { TABLE_WINDOW_FUNCTION((short) 1032), TABLE_INTO_NODE((short) 1033), TABLE_UNION_NODE((short) 1034), + TABLE_INTERSECT_NODE((short) 1035), RELATIONAL_INSERT_TABLET((short) 2000), RELATIONAL_INSERT_ROW((short) 2001), @@ -707,6 +709,8 @@ public enum PlanNodeType { buffer); case 1034: return UnionNode.deserialize(buffer); + case 1035: + return IntersectNode.deserialize(buffer); case 2000: return RelationalInsertTabletNode.deserialize(buffer); case 2001: diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java index bbc83cce18b..3e237650a6a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java @@ -127,6 +127,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.planner.node.DeviceTableS import org.apache.iotdb.db.queryengine.plan.relational.planner.node.GapFillNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.GroupNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.InformationSchemaTableScanNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.IntersectNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.LinearFillNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.MarkDistinctNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.PatternRecognitionNode; @@ -845,4 +846,8 @@ public abstract class PlanVisitor<R, C> { public R visitUnion(UnionNode node, C context) { return visitPlan(node, context); } + + public R visitIntersect(IntersectNode node, C context) { + return visitPlan(node, context); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java index af823554b7b..ce573ce1e2d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java @@ -57,6 +57,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.planner.ir.IrUtils; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.DeviceTableScanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.FilterNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.InformationSchemaTableScanNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.IntersectNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.JoinNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.Measure; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.PatternRecognitionNode; @@ -1145,6 +1146,24 @@ public class RelationPlanner extends AstVisitor<RelationPlan, Void> { planNode, analysis.getScope(node), planNode.getOutputSymbols(), outerContext); } + @Override + protected RelationPlan visitIntersect(Intersect node, Void context) { + Preconditions.checkArgument( + !node.getRelations().isEmpty(), "No relations specified for intersect"); + SetOperationPlan setOperationPlan = process(node); + + PlanNode intersectNode = + new IntersectNode( + idAllocator.genPlanNodeId(), + setOperationPlan.getChildren(), + setOperationPlan.getSymbolMapping(), + ImmutableList.copyOf(setOperationPlan.getSymbolMapping().keySet()), + node.isDistinct()); + + return new RelationPlan( + intersectNode, analysis.getScope(node), intersectNode.getOutputSymbols(), outerContext); + } + private SetOperationPlan process(SetOperation node) { RelationType outputFields = analysis.getOutputDescriptor(node); List<Symbol> outputs = @@ -1191,11 +1210,6 @@ public class RelationPlanner extends AstVisitor<RelationPlan, Void> { throw new IllegalStateException("Values is not supported in current version."); } - @Override - protected RelationPlan visitIntersect(Intersect node, Void context) { - throw new IllegalStateException("Intersect is not supported in current version."); - } - @Override protected RelationPlan visitExcept(Except node, Void context) { throw new IllegalStateException("Except is not supported in current version."); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/ImplementIntersectAll.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/ImplementIntersectAll.java new file mode 100644 index 00000000000..af31281c918 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/ImplementIntersectAll.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule; + +import org.apache.iotdb.commons.udf.builtin.relational.TableBuiltinScalarFunction; +import org.apache.iotdb.db.queryengine.plan.relational.metadata.Metadata; +import org.apache.iotdb.db.queryengine.plan.relational.planner.Assignments; +import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.Rule; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.FilterNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.IntersectNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ProjectNode; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ComparisonExpression; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Expression; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.FunctionCall; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.QualifiedName; +import org.apache.iotdb.db.queryengine.plan.relational.utils.matching.Captures; +import org.apache.iotdb.db.queryengine.plan.relational.utils.matching.Pattern; + +import com.google.common.collect.ImmutableList; + +import static java.util.Objects.requireNonNull; +import static org.apache.iotdb.db.queryengine.plan.relational.planner.node.Patterns.Intersect.distinct; +import static org.apache.iotdb.db.queryengine.plan.relational.planner.node.Patterns.intersect; + +public class ImplementIntersectAll implements Rule<IntersectNode> { + + private static final Pattern<IntersectNode> PATTERN = intersect().with(distinct().equalTo(false)); + + private final Metadata metadata; + + public ImplementIntersectAll(Metadata metadata) { + this.metadata = requireNonNull(metadata, "metadata is null"); + } + + @Override + public Pattern<IntersectNode> getPattern() { + return PATTERN; + } + + @Override + public Result apply(IntersectNode node, Captures captures, Context context) { + + SetOperationNodeTranslator translator = + new SetOperationNodeTranslator( + metadata, context.getSymbolAllocator(), context.getIdAllocator()); + + // 1. translate the intersect(all) node to other planNodes + SetOperationNodeTranslator.TranslationResult translationResult = + translator.makeSetContainmentPlanForAll(node); + + // 2. add the filter node above the result node from translation process + // filter condition : row_number <= least(countA, countB...) + Expression minCount = translationResult.getCountSymbols().get(0).toSymbolReference(); + for (int i = 1; i < translationResult.getCountSymbols().size(); i++) { + minCount = + new FunctionCall( + QualifiedName.of(TableBuiltinScalarFunction.LEAST.getFunctionName()), + ImmutableList.of( + minCount, translationResult.getCountSymbols().get(i).toSymbolReference())); + } + + FilterNode filterNode = + new FilterNode( + context.getIdAllocator().genPlanNodeId(), + translationResult.getPlanNode(), + new ComparisonExpression( + ComparisonExpression.Operator.LESS_THAN_OR_EQUAL, + translationResult.getRowNumberSymbol().toSymbolReference(), + minCount)); + + // 3. add the project node to remove the redundant columns + return Result.ofPlanNode( + new ProjectNode( + context.getIdAllocator().genPlanNodeId(), + filterNode, + Assignments.identity(node.getOutputSymbols()))); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/ImplementIntersectDistinctAsUnion.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/ImplementIntersectDistinctAsUnion.java new file mode 100644 index 00000000000..0a8f8c498ae --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/ImplementIntersectDistinctAsUnion.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule; + +import org.apache.iotdb.db.queryengine.plan.relational.metadata.Metadata; +import org.apache.iotdb.db.queryengine.plan.relational.planner.Assignments; +import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.Rule; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.FilterNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.IntersectNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.Patterns; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ProjectNode; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ComparisonExpression; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Expression; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.GenericLiteral; +import org.apache.iotdb.db.queryengine.plan.relational.utils.matching.Captures; +import org.apache.iotdb.db.queryengine.plan.relational.utils.matching.Pattern; + +import com.google.common.collect.ImmutableList; + +import static java.util.Objects.requireNonNull; +import static org.apache.iotdb.db.queryengine.plan.relational.planner.ir.IrUtils.and; +import static org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ComparisonExpression.Operator.GREATER_THAN_OR_EQUAL; + +public class ImplementIntersectDistinctAsUnion implements Rule<IntersectNode> { + + private static final Pattern<IntersectNode> PATTERN = + Patterns.intersect().with(Patterns.Intersect.distinct().equalTo(true)); + + private final Metadata metadata; + + @Override + public Pattern<IntersectNode> getPattern() { + return PATTERN; + } + + public ImplementIntersectDistinctAsUnion(Metadata metadata) { + this.metadata = requireNonNull(metadata, "metadata is null"); + } + + @Override + public Result apply(IntersectNode node, Captures captures, Context context) { + + SetOperationNodeTranslator translator = + new SetOperationNodeTranslator( + metadata, context.getSymbolAllocator(), context.getIdAllocator()); + + SetOperationNodeTranslator.TranslationResult result = + translator.makeSetContainmentPlanForDistinct(node); + + // add the filterNode above the aggregation node + Expression predicate = + and( + result.getCountSymbols().stream() + .map( + symbol -> + new ComparisonExpression( + GREATER_THAN_OR_EQUAL, + symbol.toSymbolReference(), + new GenericLiteral("INT64", "1"))) + .collect(ImmutableList.toImmutableList())); + + FilterNode filterNode = + new FilterNode(context.getIdAllocator().genPlanNodeId(), result.getPlanNode(), predicate); + + return Result.ofPlanNode( + new ProjectNode( + context.getIdAllocator().genPlanNodeId(), + filterNode, + Assignments.identity(node.getOutputSymbols()))); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PruneDistinctAggregation.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PruneDistinctAggregation.java index 5b3a4101478..26fd0278016 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PruneDistinctAggregation.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PruneDistinctAggregation.java @@ -24,6 +24,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor; import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.Lookup; import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.Rule; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.IntersectNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.UnionNode; import org.apache.iotdb.db.queryengine.plan.relational.utils.matching.Captures; import org.apache.iotdb.db.queryengine.plan.relational.utils.matching.Pattern; @@ -100,6 +101,14 @@ public class PruneDistinctAggregation implements Rule<AggregationNode> { return rewriteChildren(node, context); } + @Override + public PlanNode visitIntersect(IntersectNode node, Boolean context) { + if (node.isDistinct()) { + return rewriteChildren(node, context); + } + return visitPlan(node, context); + } + /*@Override public PlanNode visitUnion(UnionNode node, Boolean context) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/SetOperationNodeTranslator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/SetOperationNodeTranslator.java new file mode 100644 index 00000000000..3088860564e --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/SetOperationNodeTranslator.java @@ -0,0 +1,355 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule; + +import org.apache.iotdb.db.queryengine.common.QueryId; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; +import org.apache.iotdb.db.queryengine.plan.relational.function.BoundSignature; +import org.apache.iotdb.db.queryengine.plan.relational.function.FunctionId; +import org.apache.iotdb.db.queryengine.plan.relational.function.FunctionKind; +import org.apache.iotdb.db.queryengine.plan.relational.metadata.FunctionNullability; +import org.apache.iotdb.db.queryengine.plan.relational.metadata.Metadata; +import org.apache.iotdb.db.queryengine.plan.relational.metadata.ResolvedFunction; +import org.apache.iotdb.db.queryengine.plan.relational.planner.Assignments; +import org.apache.iotdb.db.queryengine.plan.relational.planner.DataOrganizationSpecification; +import org.apache.iotdb.db.queryengine.plan.relational.planner.OrderingScheme; +import org.apache.iotdb.db.queryengine.plan.relational.planner.SortOrder; +import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol; +import org.apache.iotdb.db.queryengine.plan.relational.planner.SymbolAllocator; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.GroupNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ProjectNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SetOperationNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.UnionNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.WindowNode; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Cast; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Expression; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.FrameBound; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.NullLiteral; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.WindowFrame; +import org.apache.iotdb.db.utils.constant.SqlConstant; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableListMultimap; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; +import org.apache.tsfile.read.common.type.Type; + +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkState; +import static com.google.common.collect.Iterables.concat; +import static java.util.Objects.requireNonNull; +import static org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationNode.singleGroupingSet; +import static org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.Util.getResolvedBuiltInAggregateFunction; +import static org.apache.iotdb.db.queryengine.plan.relational.sql.ast.BooleanLiteral.TRUE_LITERAL; +import static org.apache.iotdb.db.queryengine.plan.relational.type.TypeSignatureTranslator.toSqlType; +import static org.apache.iotdb.db.utils.constant.SqlConstant.COUNT; +import static org.apache.tsfile.read.common.type.BooleanType.BOOLEAN; +import static org.apache.tsfile.read.common.type.LongType.INT64; + +public class SetOperationNodeTranslator { + + private static final String MARKER = "marker"; + private static final String COUNT_MARKER = "count"; + private static final String ROW_NUMBER_SYMBOL = "row_number"; + private final SymbolAllocator symbolAllocator; + private final QueryId idAllocator; + private final Metadata metadata; + + public SetOperationNodeTranslator( + Metadata metadata, SymbolAllocator symbolAllocator, QueryId idAllocator) { + + this.metadata = requireNonNull(metadata, "metadata is null"); + this.symbolAllocator = requireNonNull(symbolAllocator, "symbolAllocator is null"); + this.idAllocator = requireNonNull(idAllocator, "idAllocator is null"); + } + + /** for intersect distinct and except distinct , use true and false for markers */ + public TranslationResult makeSetContainmentPlanForDistinct(SetOperationNode node) { + + checkArgument(!(node instanceof UnionNode), "Cannot simplify a UnionNode"); + List<Symbol> markers = allocateSymbols(node.getChildren().size(), MARKER, BOOLEAN); + + // 1. add the marker column to the origin planNode + List<PlanNode> projectNodesWithMarkers = appendMarkers(markers, node.getChildren(), node); + + // 2. add the union node over all new projection nodes. + // The outputs of the union must have the same name as the original intersect node + UnionNode union = + union( + projectNodesWithMarkers, + ImmutableList.copyOf(concat(node.getOutputSymbols(), markers))); + + // 3. add the aggregation node above the union node + List<Symbol> aggregationOutputs = allocateSymbols(markers.size(), COUNT, INT64); + AggregationNode aggregation = + computeCounts(union, node.getOutputSymbols(), markers, aggregationOutputs); + + return new TranslationResult(aggregation, aggregationOutputs); + } + + /** for intersect all and except all, use true and false for markers */ + public TranslationResult makeSetContainmentPlanForAll(SetOperationNode node) { + + checkArgument(!(node instanceof UnionNode), "Cannot simplify a UnionNode"); + List<Symbol> markers = allocateSymbols(node.getChildren().size(), MARKER, BOOLEAN); + + // for every child of SetOperation node, add the marker column for the child + List<PlanNode> projectNodesWithMarkers = appendMarkers(markers, node.getChildren(), node); + + UnionNode union = + union( + projectNodesWithMarkers, + ImmutableList.copyOf(concat(node.getOutputSymbols(), markers))); + List<Symbol> countOutputs = allocateSymbols(markers.size(), COUNT_MARKER, INT64); + Symbol rowNumberSymbol = symbolAllocator.newSymbol(ROW_NUMBER_SYMBOL, INT64); + WindowNode windowNode = + appendCounts(union, node.getOutputSymbols(), markers, countOutputs, rowNumberSymbol); + + ProjectNode projectNode = + new ProjectNode( + idAllocator.genPlanNodeId(), + windowNode, + Assignments.identity( + ImmutableList.copyOf( + concat( + node.getOutputSymbols(), + countOutputs, + ImmutableList.of(rowNumberSymbol))))); + + return new TranslationResult(projectNode, countOutputs, Optional.of(rowNumberSymbol)); + } + + /** + * only for transforming the intersection (all) node, add the window node and group node above the + * union node + */ + private WindowNode appendCounts( + UnionNode union, + List<Symbol> originOutputSymbols, + List<Symbol> markers, + List<Symbol> countOutputs, + Symbol rowNumberSymbol) { + + checkArgument( + markers.size() == countOutputs.size(), + "the size of markers should be same as the size of count output symbols"); + + // Add group node above the union node to assist partitioning, preparing for the window node + ImmutableMap.Builder<Symbol, SortOrder> sortOrderings = ImmutableMap.builder(); + ImmutableList.Builder<Symbol> sortSymbolBuilder = ImmutableList.builder(); + for (Symbol originalColumn : originOutputSymbols) { + sortSymbolBuilder.add(originalColumn); + sortOrderings.put(originalColumn, SortOrder.ASC_NULLS_LAST); + } + ImmutableList<Symbol> sortSymbols = sortSymbolBuilder.build(); + GroupNode groupNode = + new GroupNode( + idAllocator.genPlanNodeId(), + union, + new OrderingScheme(sortSymbols, sortOrderings.build()), + sortSymbols.size()); + + // build the windowFunctions for count(marker) and row_number + ImmutableMap.Builder<Symbol, WindowNode.Function> windowFunctions = ImmutableMap.builder(); + WindowNode.Frame windowFunctionFrame = + new WindowNode.Frame( + WindowFrame.Type.ROWS, + FrameBound.Type.UNBOUNDED_PRECEDING, + Optional.empty(), + Optional.empty(), + FrameBound.Type.UNBOUNDED_FOLLOWING, + Optional.empty(), + Optional.empty(), + Optional.empty(), + Optional.empty()); + + ResolvedFunction countFunction = + getResolvedBuiltInAggregateFunction( + metadata, SqlConstant.COUNT, Collections.singletonList(BOOLEAN)); + for (int i = 0; i < markers.size(); i++) { + windowFunctions.put( + countOutputs.get(i), + new WindowNode.Function( + countFunction, + ImmutableList.of(markers.get(i).toSymbolReference()), + windowFunctionFrame, + false)); + } + + List<Type> argumentTypes = ImmutableList.of(); + ResolvedFunction rowNumberFunction = + new ResolvedFunction( + new BoundSignature( + SqlConstant.ROW_NUMBER, + metadata.getFunctionReturnType(SqlConstant.ROW_NUMBER, argumentTypes), + argumentTypes), + FunctionId.NOOP_FUNCTION_ID, + FunctionKind.WINDOW, + true, + FunctionNullability.getAggregationFunctionNullability(argumentTypes.size())); + + windowFunctions.put( + rowNumberSymbol, + new WindowNode.Function(rowNumberFunction, ImmutableList.of(), windowFunctionFrame, false)); + + // add the windowNode above the group node + return new WindowNode( + idAllocator.genPlanNodeId(), + groupNode, + new DataOrganizationSpecification(originOutputSymbols, Optional.empty()), + windowFunctions.buildOrThrow(), + Optional.empty(), + ImmutableSet.of(), + 0); + } + + /** get an array of markers, used for the new columns */ + private List<Symbol> allocateSymbols(int count, String nameHint, Type type) { + ImmutableList.Builder<Symbol> symbolsBuilder = ImmutableList.builder(); + for (int i = 0; i < count; i++) { + symbolsBuilder.add(symbolAllocator.newSymbol(nameHint, type)); + } + return symbolsBuilder.build(); + } + + /** + * Builds projection nodes with marker columns for each child of the set operation. Each child + * gets TRUE for its own marker and NULL (cast to BOOLEAN) for others. This is used in the + * implementation of INTERSECT and EXCEPT set operations. + */ + private List<PlanNode> appendMarkers( + List<Symbol> markers, List<PlanNode> children, SetOperationNode node) { + ImmutableList.Builder<PlanNode> projectionsWithMarker = ImmutableList.builder(); + + Map<Symbol, Collection<Symbol>> symbolMapping = node.getSymbolMapping().asMap(); + for (int childIndex = 0; childIndex < children.size(); childIndex++) { + + // add the original symbols to projection node + Assignments.Builder assignments = Assignments.builder(); + for (Symbol outputSymbol : node.getOutputSymbols()) { + Collection<Symbol> inputSymbols = symbolMapping.get(outputSymbol); + Symbol sourceSymbol = Iterables.get(inputSymbols, childIndex); + + Symbol newProjectedSymbol = symbolAllocator.newSymbol(outputSymbol); + assignments.put(newProjectedSymbol, sourceSymbol.toSymbolReference()); + } + + // add the new marker symbol to the new projection node + for (int j = 0; j < markers.size(); j++) { + Expression expression = + j == childIndex ? TRUE_LITERAL : new Cast(new NullLiteral(), toSqlType(BOOLEAN)); + assignments.put(symbolAllocator.newSymbol(markers.get(j).getName(), BOOLEAN), expression); + } + + projectionsWithMarker.add( + new ProjectNode( + idAllocator.genPlanNodeId(), children.get(childIndex), assignments.build())); + } + + return projectionsWithMarker.build(); + } + + private UnionNode union(List<PlanNode> projectNodesWithMarkers, List<Symbol> outputs) { + + ImmutableListMultimap.Builder<Symbol, Symbol> outputsToInputs = ImmutableListMultimap.builder(); + + for (PlanNode projectionNode : projectNodesWithMarkers) { + List<Symbol> outputSymbols = projectionNode.getOutputSymbols(); + for (int i = 0; i < outputSymbols.size(); i++) { + outputsToInputs.put(outputs.get(i), outputSymbols.get(i)); + } + } + + return new UnionNode( + idAllocator.genPlanNodeId(), projectNodesWithMarkers, outputsToInputs.build(), outputs); + } + + /** add the aggregation node above the union node */ + private AggregationNode computeCounts( + UnionNode unionNode, + List<Symbol> originalColumns, + List<Symbol> markers, + List<Symbol> aggregationOutputs) { + + ImmutableMap.Builder<Symbol, AggregationNode.Aggregation> aggregations = ImmutableMap.builder(); + + ResolvedFunction resolvedFunction = + getResolvedBuiltInAggregateFunction(metadata, COUNT, Collections.singletonList(BOOLEAN)); + + for (int i = 0; i < markers.size(); i++) { + Symbol countMarker = aggregationOutputs.get(i); + aggregations.put( + countMarker, + new AggregationNode.Aggregation( + resolvedFunction, + ImmutableList.of(markers.get(i).toSymbolReference()), + false, + Optional.empty(), + Optional.empty(), + Optional.empty())); + } + + return AggregationNode.singleAggregation( + idAllocator.genPlanNodeId(), + unionNode, + aggregations.buildOrThrow(), + singleGroupingSet(originalColumns)); + } + + public static class TranslationResult { + + private final PlanNode planNode; + private final List<Symbol> countSymbols; + private final Optional<Symbol> rowNumberSymbol; + + public TranslationResult(PlanNode planNode, List<Symbol> countSymbols) { + this(planNode, countSymbols, Optional.empty()); + } + + public TranslationResult( + PlanNode planNode, List<Symbol> countSymbols, Optional<Symbol> rowNumberSymbol) { + this.planNode = requireNonNull(planNode, "planNode is null"); + this.countSymbols = + ImmutableList.copyOf(requireNonNull(countSymbols, "countSymbols is null")); + this.rowNumberSymbol = requireNonNull(rowNumberSymbol, "rowNumberSymbol is null"); + } + + public List<Symbol> getCountSymbols() { + return countSymbols; + } + + public Symbol getRowNumberSymbol() { + checkState(rowNumberSymbol.isPresent(), "rowNumberSymbol is empty"); + return rowNumberSymbol.get(); + } + + public PlanNode getPlanNode() { + return this.planNode; + } + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/IntersectNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/IntersectNode.java new file mode 100644 index 00000000000..3f7a0864971 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/IntersectNode.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.relational.planner.node; + +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor; +import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol; + +import com.google.common.collect.ListMultimap; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; + +public class IntersectNode extends SetOperationNode { + + private final boolean distinct; + + public IntersectNode( + PlanNodeId id, + List<PlanNode> children, + ListMultimap<Symbol, Symbol> outputToInputs, + List<Symbol> outputs, + boolean distinct) { + super(id, children, outputToInputs, outputs); + this.distinct = distinct; + } + + private IntersectNode( + PlanNodeId id, + ListMultimap<Symbol, Symbol> outputToInputs, + List<Symbol> outputs, + boolean distinct) { + super(id, outputToInputs, outputs); + this.distinct = distinct; + } + + @Override + public <R, C> R accept(PlanVisitor<R, C> visitor, C context) { + return visitor.visitIntersect(this, context); + } + + public boolean isDistinct() { + return distinct; + } + + @Override + public PlanNode clone() { + return new IntersectNode(getPlanNodeId(), getSymbolMapping(), getOutputSymbols(), distinct); + } + + @Override + public List<String> getOutputColumnNames() { + throw new UnsupportedOperationException(); + } + + @Override + protected void serializeAttributes(ByteBuffer byteBuffer) { + throw new UnsupportedOperationException( + "IntersectNode should never be serialized in current version"); + } + + @Override + protected void serializeAttributes(DataOutputStream stream) throws IOException { + throw new UnsupportedOperationException( + "IntersectNode should never be serialized in current version"); + } + + public static IntersectNode deserialize(ByteBuffer byteBuffer) { + throw new UnsupportedOperationException( + "IntersectNode should never be deserialized in current version"); + } + + @Override + public PlanNode replaceChildren(List<PlanNode> newChildren) { + return new IntersectNode( + getPlanNodeId(), newChildren, getSymbolMapping(), getOutputSymbols(), isDistinct()); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/Patterns.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/Patterns.java index 3ebd933d28c..d08ac9ecf72 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/Patterns.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/Patterns.java @@ -189,6 +189,10 @@ public final class Patterns { return typeOf(UnionNode.class); } + public static Pattern<IntersectNode> intersect() { + return typeOf(IntersectNode.class); + } + /*public static Pattern<TableWriterNode> tableWriterNode() { return typeOf(TableWriterNode.class); @@ -249,10 +253,6 @@ public final class Patterns { return typeOf(DistinctLimitNode.class); } - public static Pattern<IntersectNode> intersect() - { - return typeOf(IntersectNode.class); - } public static Pattern<ExceptNode> except() { @@ -359,6 +359,12 @@ public final class Patterns { } } + public static final class Intersect { + public static Property<IntersectNode, Lookup, Boolean> distinct() { + return property("distinct", IntersectNode::isDistinct); + } + } + /*public static final class Sample { public static Property<SampleNode, Lookup, Double> sampleRatio() @@ -415,13 +421,7 @@ public final class Patterns { } }*/ - /*public static final class Intersect - { - public static Property<IntersectNode, Lookup, Boolean> distinct() - { - return property("distinct", IntersectNode::isDistinct); - } - } + /* public static final class Except { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/LogicalOptimizeFactory.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/LogicalOptimizeFactory.java index d70f67114b0..65015715945 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/LogicalOptimizeFactory.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/LogicalOptimizeFactory.java @@ -26,6 +26,8 @@ import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.Iterati import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.Rule; import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.RuleStatsRecorder; import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.CanonicalizeExpressions; +import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.ImplementIntersectAll; +import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.ImplementIntersectDistinctAsUnion; import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.ImplementPatternRecognition; import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.ImplementTableFunctionSource; import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.InlineProjections; @@ -275,6 +277,17 @@ public class LogicalOptimizeFactory { // new MergeExcept new PruneDistinctAggregation())) .build()), + new IterativeOptimizer( + plannerContext, + ruleStats, + ImmutableSet.<Rule<?>>builder() + .add( + new ImplementIntersectDistinctAsUnion(metadata), + // new ImplementExceptDistinctAsUnion(metadata) + new ImplementIntersectAll(metadata) + // new ImplementExceptAll(metadata))), + ) + .build()), columnPruningOptimizer, inlineProjectionLimitFiltersOptimizer, new IterativeOptimizer( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/UnaliasSymbolReferences.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/UnaliasSymbolReferences.java index 95f0f6cb0f2..ff3a01be849 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/UnaliasSymbolReferences.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/UnaliasSymbolReferences.java @@ -41,6 +41,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.planner.node.FilterNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.GapFillNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.GroupNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.InformationSchemaTableScanNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.IntersectNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.IntoNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.JoinNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.LimitNode; @@ -1015,6 +1016,36 @@ public class UnaliasSymbolReferences implements PlanOptimizer { mapping); } + @Override + public PlanAndMappings visitIntersect(IntersectNode node, UnaliasContext context) { + + List<PlanAndMappings> rewrittenSources = + node.getChildren().stream() + .map(source -> source.accept(this, context)) + .collect(toImmutableList()); + + List<SymbolMapper> inputMappers = + rewrittenSources.stream() + .map(source -> symbolMapper(new HashMap<>(source.getMappings()))) + .collect(toImmutableList()); + + Map<Symbol, Symbol> mapping = new HashMap<>(context.getCorrelationMapping()); + SymbolMapper outputMapper = symbolMapper(mapping); + + ListMultimap<Symbol, Symbol> newOutputToInputs = + rewriteOutputToInputsMap(node.getSymbolMapping(), outputMapper, inputMappers); + List<Symbol> newOutputs = outputMapper.mapAndDistinct(node.getOutputSymbols()); + + return new PlanAndMappings( + new IntersectNode( + node.getPlanNodeId(), + rewrittenSources.stream().map(PlanAndMappings::getRoot).collect(toImmutableList()), + newOutputToInputs, + newOutputs, + node.isDistinct()), + mapping); + } + private ListMultimap<Symbol, Symbol> rewriteOutputToInputsMap( ListMultimap<Symbol, Symbol> oldMapping, SymbolMapper outputMapper, diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/IntersectTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/IntersectTest.java new file mode 100644 index 00000000000..13d18c89ddf --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/IntersectTest.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.relational.analyzer; + +import org.apache.iotdb.db.queryengine.plan.planner.plan.LogicalQueryPlan; +import org.apache.iotdb.db.queryengine.plan.relational.planner.PlanTester; + +import org.junit.Test; + +import static org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanAssert.assertPlan; +import static org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.aggregation; +import static org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.filter; +import static org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.output; +import static org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.project; +import static org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.sort; +import static org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.tableScan; +import static org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.union; +import static org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.window; + +/** tests for intersect (distinct) and intersect all */ +public class IntersectTest { + + @Test + public void intersectTest() { + + PlanTester planTester = new PlanTester(); + LogicalQueryPlan actualLogicalQueryPlan = + planTester.createPlan("select tag1 from t1 intersect select tag1 from t2"); + // just verify the Logical plan: `Output - project - filter - aggregation - union - 2*(project + // - tableScan)` + assertPlan( + actualLogicalQueryPlan, + output( + project( + filter( + aggregation( + union( + project(tableScan("testdb.t1")), project(tableScan("testdb.t2")))))))); + } + + @Test + public void intersectAllTest() { + + PlanTester planTester = new PlanTester(); + LogicalQueryPlan actualLogicalQueryPlan = + planTester.createPlan("select tag1 from t1 intersect all select tag1 from t2"); + assertPlan( + actualLogicalQueryPlan, + output( + project( + filter( + project( + window( + sort( + union( + project(tableScan("testdb.t1")), + project(tableScan("testdb.t2")))))))))); + } + + @Test + public void typeCompatibleTest() { + // use CAST if types of according columns is not compatible + // s1 is INT64, s3 is DOUBLE + + PlanTester planTester = new PlanTester(); + LogicalQueryPlan actualLogicalQueryPlan = + planTester.createPlan("select s1, s3 from table2 intersect all select s1, s1 from table3 "); + + assertPlan( + actualLogicalQueryPlan, + output( + project( + filter( + project( + window( + sort( + union( + project(tableScan("testdb.table2")), + project(tableScan("testdb.table3")))))))))); + } + + /** the priority of intersect is higher than that of the union */ + @Test + public void setOperationPriority() { + + PlanTester planTester = new PlanTester(); + LogicalQueryPlan actualLogicalQueryPlan = + planTester.createPlan( + "select tag1 from t1 union select tag1 from t2 intersect select tag1 from t3"); + + assertPlan( + actualLogicalQueryPlan, + output( + aggregation( + union( + tableScan("testdb.t1"), + project( + filter( + aggregation( + union( + project(tableScan("testdb.t2")), + project(tableScan("testdb.t3")))))))))); + } +} diff --git a/iotdb-core/relational-grammar/src/main/antlr4/org/apache/iotdb/db/relational/grammar/sql/RelationalSql.g4 b/iotdb-core/relational-grammar/src/main/antlr4/org/apache/iotdb/db/relational/grammar/sql/RelationalSql.g4 index 59231b6de2b..7357196219f 100644 --- a/iotdb-core/relational-grammar/src/main/antlr4/org/apache/iotdb/db/relational/grammar/sql/RelationalSql.g4 +++ b/iotdb-core/relational-grammar/src/main/antlr4/org/apache/iotdb/db/relational/grammar/sql/RelationalSql.g4 @@ -930,8 +930,9 @@ rowCount ; queryTerm - : queryPrimary #queryTermDefault - | left=queryTerm operator=(INTERSECT | UNION | EXCEPT) setQuantifier? right=queryTerm #setOperation + : queryPrimary #queryTermDefault + | left=queryTerm operator=INTERSECT setQuantifier? right=queryTerm #setOperation + | left=queryTerm operator=(UNION | EXCEPT) setQuantifier? right=queryTerm #setOperation ; queryPrimary
