This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 4ebb89b91e8 implement the intersect (distinct | all ) for table model
(#16700)
4ebb89b91e8 is described below
commit 4ebb89b91e8e433c7997c2fe11ffeb4e2afdf197
Author: alpass163 <[email protected]>
AuthorDate: Fri Nov 7 17:05:08 2025 +0800
implement the intersect (distinct | all ) for table model (#16700)
---
.../it/query/recent/IoTDBIntersectTableIT.java | 153 +++++++++
.../plan/planner/plan/node/PlanGraphPrinter.java | 10 +
.../plan/planner/plan/node/PlanNodeType.java | 4 +
.../plan/planner/plan/node/PlanVisitor.java | 5 +
.../plan/relational/planner/RelationPlanner.java | 24 +-
.../iterative/rule/ImplementIntersectAll.java | 95 ++++++
.../rule/ImplementIntersectDistinctAsUnion.java | 88 +++++
.../iterative/rule/PruneDistinctAggregation.java | 9 +
.../iterative/rule/SetOperationNodeTranslator.java | 355 +++++++++++++++++++++
.../relational/planner/node/IntersectNode.java | 98 ++++++
.../plan/relational/planner/node/Patterns.java | 22 +-
.../optimizations/LogicalOptimizeFactory.java | 13 +
.../optimizations/UnaliasSymbolReferences.java | 31 ++
.../plan/relational/analyzer/IntersectTest.java | 121 +++++++
.../db/relational/grammar/sql/RelationalSql.g4 | 5 +-
15 files changed, 1015 insertions(+), 18 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/IoTDBIntersectTableIT.java
b/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/IoTDBIntersectTableIT.java
new file mode 100644
index 00000000000..2bc8d6bddb6
--- /dev/null
+++
b/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/IoTDBIntersectTableIT.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.relational.it.query.recent;
+
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.TableClusterIT;
+import org.apache.iotdb.itbase.category.TableLocalStandaloneIT;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import static org.apache.iotdb.db.it.utils.TestUtils.prepareTableData;
+import static org.apache.iotdb.db.it.utils.TestUtils.tableAssertTestFail;
+import static org.apache.iotdb.db.it.utils.TestUtils.tableResultSetEqualTest;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({TableLocalStandaloneIT.class, TableClusterIT.class})
+public class IoTDBIntersectTableIT {
+ protected static final String DATABASE_NAME = "test";
+ protected static final String[] createSqls =
+ new String[] {
+ "CREATE DATABASE " + DATABASE_NAME,
+ "USE " + DATABASE_NAME,
+ // table1: ('d1', 1, 1) * 2, ('d1', 2, 2) *1
+ "create table table1(device STRING TAG, s1 INT32 FIELD, s2 INT32
FIELD)",
+ "insert into table1 values (1, 'd1', 1, 1)",
+ "insert into table1 values (2, 'd1', 1, 1)",
+ "insert into table1 values (3, 'd1', 2, 2)",
+ // table2: ('d1', 1, 1.0) * 3, ('d1', 3, 3.0) *1
+ "create table table2(device STRING TAG, s1 INT64 FIELD, s2 DOUBLE
FIELD)",
+ "insert into table2 values (1, 'd1', 1, 1.0)",
+ "insert into table2 values (2, 'd1', 1, 1.0)",
+ "insert into table2 values (3, 'd1', 1, 1.0)",
+ "insert into table2 values (4, 'd1', 3, 3.0)",
+ // table3: use for testing alias
+ "create table table3(device STRING TAG, s1_testName INT64 FIELD,
s2_testName DOUBLE FIELD)",
+ "insert into table3 values (1, 'd1', 1, 1.0)",
+ "insert into table3 values (2, 'd1', 1, 1.0)",
+ "insert into table3 values (3, 'd1', 1, 1.0)",
+ "insert into table3 values (4, 'd1', 3, 3.0)",
+ // table4: test type compatible
+ "create table table4(device STRING TAG, s1 TEXT FIELD, s2 DOUBLE
FIELD)"
+ };
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ EnvFactory.getEnv().initClusterEnvironment();
+ prepareTableData(createSqls);
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ EnvFactory.getEnv().cleanClusterEnvironment();
+ }
+
+ @Test
+ public void normalTest() {
+ String[] expectedHeader = new String[] {"device", "s1", "s2"};
+
+ // --- INTERSECT (DISTINCT) ---
+ // table1 and table2, expected one tuple : ('d1', 1, 1.0)
+ String[] retArray =
+ new String[] {
+ "d1,1,1.0,",
+ };
+ tableResultSetEqualTest(
+ "select device, s1, s2 from table1 intersect select device, s1, s2
from table2",
+ expectedHeader,
+ retArray,
+ DATABASE_NAME);
+ tableResultSetEqualTest(
+ "select device, s1, s2 from table1 intersect distinct select device,
s1, s2 from table2",
+ expectedHeader,
+ retArray,
+ DATABASE_NAME);
+
+ // --- INTERSECT ALL ---
+ // (1, 1.0) shows twice in table1, shows three times in table2
+ // expected: min(2, 3) = 2 tuple
+ retArray = new String[] {"d1,1,1.0,", "d1,1,1.0,"};
+ tableResultSetEqualTest(
+ "select device, s1, s2 from table1 intersect all select device, s1, s2
from table2",
+ expectedHeader,
+ retArray,
+ DATABASE_NAME);
+ // test table3, the column name is different
+ tableResultSetEqualTest(
+ "select device, s1, s2 from table1 intersect all select device,
s1_testName, s2_testName from table3",
+ expectedHeader,
+ retArray,
+ DATABASE_NAME);
+ }
+
+ @Test
+ public void mappingTest() {
+ // table1 (aliased): (s1 as col_a) -> (1), (1), (2)
+ // table2 (aliased): (s2 as col_a) -> (1.0), (1.0), (1.0), (3.0)
+ // common value: (1.0)
+
+ String[] expectedHeader = new String[] {"col_a"};
+
+ // --- INTERSECT (DISTINCT) with alias ---
+ String[] retArray = new String[] {"1.0,"};
+ tableResultSetEqualTest(
+ "select col_a from ((select s1 as col_a, device as col_b from table1)
intersect (select s2 as col_a, device as col_b from table2)) order by col_a",
+ expectedHeader,
+ retArray,
+ DATABASE_NAME);
+
+ // --- INTERSECT ALL with alias ---
+ retArray = new String[] {"1.0,", "1.0,"};
+ tableResultSetEqualTest(
+ "select col_a from ((select s1 as col_a, device as col_b from table1)
intersect all (select s2 as col_a, device as col_b from table2)) order by
col_a",
+ expectedHeader,
+ retArray,
+ DATABASE_NAME);
+ }
+
+ @Test
+ public void exceptionTest() {
+ // type is incompatible (INT32 vs TEXT)
+ tableAssertTestFail(
+ "(select * from table1) intersect all (select * from table4)",
+ "has incompatible types: INT32, TEXT",
+ DATABASE_NAME);
+
+ tableAssertTestFail(
+ "(select * from table1) intersect all (select time from table4)",
+ "INTERSECT query has different number of fields: 4, 1",
+ DATABASE_NAME);
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java
index 6ffc534a712..82e03d8789b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java
@@ -75,6 +75,7 @@ import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.ExchangeNode
import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.ExplainAnalyzeNode;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.GapFillNode;
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.GroupNode;
+import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.IntersectNode;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.LinearFillNode;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.MarkDistinctNode;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.PatternRecognitionNode;
@@ -1106,6 +1107,15 @@ public class PlanGraphPrinter extends
PlanVisitor<List<String>, PlanGraphPrinter
return render(node, boxValue, context);
}
+ @Override
+ public List<String> visitIntersect(IntersectNode node, GraphContext context)
{
+ List<String> boxValue = new ArrayList<>();
+ boxValue.add(String.format("Intersect-%s", node.getPlanNodeId().getId()));
+ boxValue.add(String.format("OutputSymbols: %s", node.getOutputSymbols()));
+ boxValue.add(String.format("isDistinct: %s", node.isDistinct()));
+ return render(node, boxValue, context);
+ }
+
private List<String> render(PlanNode node, List<String> nodeBoxString,
GraphContext context) {
Box box = new Box(nodeBoxString);
List<List<String>> children = new ArrayList<>();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
index ca1fdebad7f..1100793ca88 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
@@ -122,6 +122,7 @@ import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.EnforceSingl
import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.GapFillNode;
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.GroupNode;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.InformationSchemaTableScanNode;
+import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.IntersectNode;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.LinearFillNode;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.MarkDistinctNode;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.PatternRecognitionNode;
@@ -312,6 +313,7 @@ public enum PlanNodeType {
TABLE_WINDOW_FUNCTION((short) 1032),
TABLE_INTO_NODE((short) 1033),
TABLE_UNION_NODE((short) 1034),
+ TABLE_INTERSECT_NODE((short) 1035),
RELATIONAL_INSERT_TABLET((short) 2000),
RELATIONAL_INSERT_ROW((short) 2001),
@@ -701,6 +703,8 @@ public enum PlanNodeType {
buffer);
case 1034:
return UnionNode.deserialize(buffer);
+ case 1035:
+ return IntersectNode.deserialize(buffer);
case 2000:
return RelationalInsertTabletNode.deserialize(buffer);
case 2001:
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java
index cdb3ce7b4be..9e8e1834247 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java
@@ -126,6 +126,7 @@ import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.DeviceTableS
import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.GapFillNode;
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.GroupNode;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.InformationSchemaTableScanNode;
+import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.IntersectNode;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.LinearFillNode;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.MarkDistinctNode;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.PatternRecognitionNode;
@@ -840,4 +841,8 @@ public abstract class PlanVisitor<R, C> {
public R visitUnion(UnionNode node, C context) {
return visitPlan(node, context);
}
+
+ public R visitIntersect(IntersectNode node, C context) {
+ return visitPlan(node, context);
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java
index af823554b7b..ce573ce1e2d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java
@@ -57,6 +57,7 @@ import
org.apache.iotdb.db.queryengine.plan.relational.planner.ir.IrUtils;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.DeviceTableScanNode;
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.FilterNode;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.InformationSchemaTableScanNode;
+import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.IntersectNode;
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.JoinNode;
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.Measure;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.PatternRecognitionNode;
@@ -1145,6 +1146,24 @@ public class RelationPlanner extends
AstVisitor<RelationPlan, Void> {
planNode, analysis.getScope(node), planNode.getOutputSymbols(),
outerContext);
}
+ @Override
+ protected RelationPlan visitIntersect(Intersect node, Void context) {
+ Preconditions.checkArgument(
+ !node.getRelations().isEmpty(), "No relations specified for
intersect");
+ SetOperationPlan setOperationPlan = process(node);
+
+ PlanNode intersectNode =
+ new IntersectNode(
+ idAllocator.genPlanNodeId(),
+ setOperationPlan.getChildren(),
+ setOperationPlan.getSymbolMapping(),
+ ImmutableList.copyOf(setOperationPlan.getSymbolMapping().keySet()),
+ node.isDistinct());
+
+ return new RelationPlan(
+ intersectNode, analysis.getScope(node),
intersectNode.getOutputSymbols(), outerContext);
+ }
+
private SetOperationPlan process(SetOperation node) {
RelationType outputFields = analysis.getOutputDescriptor(node);
List<Symbol> outputs =
@@ -1191,11 +1210,6 @@ public class RelationPlanner extends
AstVisitor<RelationPlan, Void> {
throw new IllegalStateException("Values is not supported in current
version.");
}
- @Override
- protected RelationPlan visitIntersect(Intersect node, Void context) {
- throw new IllegalStateException("Intersect is not supported in current
version.");
- }
-
@Override
protected RelationPlan visitExcept(Except node, Void context) {
throw new IllegalStateException("Except is not supported in current
version.");
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/ImplementIntersectAll.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/ImplementIntersectAll.java
new file mode 100644
index 00000000000..af31281c918
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/ImplementIntersectAll.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule;
+
+import
org.apache.iotdb.commons.udf.builtin.relational.TableBuiltinScalarFunction;
+import org.apache.iotdb.db.queryengine.plan.relational.metadata.Metadata;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.Assignments;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.Rule;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.node.FilterNode;
+import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.IntersectNode;
+import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.ProjectNode;
+import
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ComparisonExpression;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Expression;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.FunctionCall;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.QualifiedName;
+import org.apache.iotdb.db.queryengine.plan.relational.utils.matching.Captures;
+import org.apache.iotdb.db.queryengine.plan.relational.utils.matching.Pattern;
+
+import com.google.common.collect.ImmutableList;
+
+import static java.util.Objects.requireNonNull;
+import static
org.apache.iotdb.db.queryengine.plan.relational.planner.node.Patterns.Intersect.distinct;
+import static
org.apache.iotdb.db.queryengine.plan.relational.planner.node.Patterns.intersect;
+
+public class ImplementIntersectAll implements Rule<IntersectNode> {
+
+ private static final Pattern<IntersectNode> PATTERN =
intersect().with(distinct().equalTo(false));
+
+ private final Metadata metadata;
+
+ public ImplementIntersectAll(Metadata metadata) {
+ this.metadata = requireNonNull(metadata, "metadata is null");
+ }
+
+ @Override
+ public Pattern<IntersectNode> getPattern() {
+ return PATTERN;
+ }
+
+ @Override
+ public Result apply(IntersectNode node, Captures captures, Context context) {
+
+ SetOperationNodeTranslator translator =
+ new SetOperationNodeTranslator(
+ metadata, context.getSymbolAllocator(), context.getIdAllocator());
+
+ // 1. translate the intersect(all) node to other planNodes
+ SetOperationNodeTranslator.TranslationResult translationResult =
+ translator.makeSetContainmentPlanForAll(node);
+
+ // 2. add the filter node above the result node from translation process
+ // filter condition : row_number <= least(countA, countB...)
+ Expression minCount =
translationResult.getCountSymbols().get(0).toSymbolReference();
+ for (int i = 1; i < translationResult.getCountSymbols().size(); i++) {
+ minCount =
+ new FunctionCall(
+
QualifiedName.of(TableBuiltinScalarFunction.LEAST.getFunctionName()),
+ ImmutableList.of(
+ minCount,
translationResult.getCountSymbols().get(i).toSymbolReference()));
+ }
+
+ FilterNode filterNode =
+ new FilterNode(
+ context.getIdAllocator().genPlanNodeId(),
+ translationResult.getPlanNode(),
+ new ComparisonExpression(
+ ComparisonExpression.Operator.LESS_THAN_OR_EQUAL,
+ translationResult.getRowNumberSymbol().toSymbolReference(),
+ minCount));
+
+ // 3. add the project node to remove the redundant columns
+ return Result.ofPlanNode(
+ new ProjectNode(
+ context.getIdAllocator().genPlanNodeId(),
+ filterNode,
+ Assignments.identity(node.getOutputSymbols())));
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/ImplementIntersectDistinctAsUnion.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/ImplementIntersectDistinctAsUnion.java
new file mode 100644
index 00000000000..0a8f8c498ae
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/ImplementIntersectDistinctAsUnion.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule;
+
+import org.apache.iotdb.db.queryengine.plan.relational.metadata.Metadata;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.Assignments;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.Rule;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.node.FilterNode;
+import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.IntersectNode;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.node.Patterns;
+import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.ProjectNode;
+import
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ComparisonExpression;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Expression;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.GenericLiteral;
+import org.apache.iotdb.db.queryengine.plan.relational.utils.matching.Captures;
+import org.apache.iotdb.db.queryengine.plan.relational.utils.matching.Pattern;
+
+import com.google.common.collect.ImmutableList;
+
+import static java.util.Objects.requireNonNull;
+import static
org.apache.iotdb.db.queryengine.plan.relational.planner.ir.IrUtils.and;
+import static
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ComparisonExpression.Operator.GREATER_THAN_OR_EQUAL;
+
+public class ImplementIntersectDistinctAsUnion implements Rule<IntersectNode> {
+
+ private static final Pattern<IntersectNode> PATTERN =
+ Patterns.intersect().with(Patterns.Intersect.distinct().equalTo(true));
+
+ private final Metadata metadata;
+
+ @Override
+ public Pattern<IntersectNode> getPattern() {
+ return PATTERN;
+ }
+
+ public ImplementIntersectDistinctAsUnion(Metadata metadata) {
+ this.metadata = requireNonNull(metadata, "metadata is null");
+ }
+
+ @Override
+ public Result apply(IntersectNode node, Captures captures, Context context) {
+
+ SetOperationNodeTranslator translator =
+ new SetOperationNodeTranslator(
+ metadata, context.getSymbolAllocator(), context.getIdAllocator());
+
+ SetOperationNodeTranslator.TranslationResult result =
+ translator.makeSetContainmentPlanForDistinct(node);
+
+ // add the filterNode above the aggregation node
+ Expression predicate =
+ and(
+ result.getCountSymbols().stream()
+ .map(
+ symbol ->
+ new ComparisonExpression(
+ GREATER_THAN_OR_EQUAL,
+ symbol.toSymbolReference(),
+ new GenericLiteral("INT64", "1")))
+ .collect(ImmutableList.toImmutableList()));
+
+ FilterNode filterNode =
+ new FilterNode(context.getIdAllocator().genPlanNodeId(),
result.getPlanNode(), predicate);
+
+ return Result.ofPlanNode(
+ new ProjectNode(
+ context.getIdAllocator().genPlanNodeId(),
+ filterNode,
+ Assignments.identity(node.getOutputSymbols())));
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PruneDistinctAggregation.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PruneDistinctAggregation.java
index 5b3a4101478..26fd0278016 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PruneDistinctAggregation.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PruneDistinctAggregation.java
@@ -24,6 +24,7 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.Lookup;
import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.Rule;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationNode;
+import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.IntersectNode;
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.UnionNode;
import org.apache.iotdb.db.queryengine.plan.relational.utils.matching.Captures;
import org.apache.iotdb.db.queryengine.plan.relational.utils.matching.Pattern;
@@ -100,6 +101,14 @@ public class PruneDistinctAggregation implements
Rule<AggregationNode> {
return rewriteChildren(node, context);
}
+ @Override
+ public PlanNode visitIntersect(IntersectNode node, Boolean context) {
+ if (node.isDistinct()) {
+ return rewriteChildren(node, context);
+ }
+ return visitPlan(node, context);
+ }
+
/*@Override
public PlanNode visitUnion(UnionNode node, Boolean context)
{
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/SetOperationNodeTranslator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/SetOperationNodeTranslator.java
new file mode 100644
index 00000000000..3088860564e
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/SetOperationNodeTranslator.java
@@ -0,0 +1,355 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule;
+
+import org.apache.iotdb.db.queryengine.common.QueryId;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.queryengine.plan.relational.function.BoundSignature;
+import org.apache.iotdb.db.queryengine.plan.relational.function.FunctionId;
+import org.apache.iotdb.db.queryengine.plan.relational.function.FunctionKind;
+import
org.apache.iotdb.db.queryengine.plan.relational.metadata.FunctionNullability;
+import org.apache.iotdb.db.queryengine.plan.relational.metadata.Metadata;
+import
org.apache.iotdb.db.queryengine.plan.relational.metadata.ResolvedFunction;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.Assignments;
+import
org.apache.iotdb.db.queryengine.plan.relational.planner.DataOrganizationSpecification;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.OrderingScheme;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.SortOrder;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.SymbolAllocator;
+import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationNode;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.node.GroupNode;
+import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.ProjectNode;
+import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.SetOperationNode;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.node.UnionNode;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.node.WindowNode;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Cast;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Expression;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.FrameBound;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.NullLiteral;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.WindowFrame;
+import org.apache.iotdb.db.utils.constant.SqlConstant;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableListMultimap;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import org.apache.tsfile.read.common.type.Type;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+import static com.google.common.collect.Iterables.concat;
+import static java.util.Objects.requireNonNull;
+import static
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationNode.singleGroupingSet;
+import static
org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.Util.getResolvedBuiltInAggregateFunction;
+import static
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.BooleanLiteral.TRUE_LITERAL;
+import static
org.apache.iotdb.db.queryengine.plan.relational.type.TypeSignatureTranslator.toSqlType;
+import static org.apache.iotdb.db.utils.constant.SqlConstant.COUNT;
+import static org.apache.tsfile.read.common.type.BooleanType.BOOLEAN;
+import static org.apache.tsfile.read.common.type.LongType.INT64;
+
+public class SetOperationNodeTranslator {
+
+ private static final String MARKER = "marker";
+ private static final String COUNT_MARKER = "count";
+ private static final String ROW_NUMBER_SYMBOL = "row_number";
+ private final SymbolAllocator symbolAllocator;
+ private final QueryId idAllocator;
+ private final Metadata metadata;
+
+ public SetOperationNodeTranslator(
+ Metadata metadata, SymbolAllocator symbolAllocator, QueryId idAllocator)
{
+
+ this.metadata = requireNonNull(metadata, "metadata is null");
+ this.symbolAllocator = requireNonNull(symbolAllocator, "symbolAllocator is
null");
+ this.idAllocator = requireNonNull(idAllocator, "idAllocator is null");
+ }
+
+ /** for intersect distinct and except distinct , use true and false for
markers */
+ public TranslationResult makeSetContainmentPlanForDistinct(SetOperationNode
node) {
+
+ checkArgument(!(node instanceof UnionNode), "Cannot simplify a UnionNode");
+ List<Symbol> markers = allocateSymbols(node.getChildren().size(), MARKER,
BOOLEAN);
+
+ // 1. add the marker column to the origin planNode
+ List<PlanNode> projectNodesWithMarkers = appendMarkers(markers,
node.getChildren(), node);
+
+ // 2. add the union node over all new projection nodes.
+ // The outputs of the union must have the same name as the original
intersect node
+ UnionNode union =
+ union(
+ projectNodesWithMarkers,
+ ImmutableList.copyOf(concat(node.getOutputSymbols(), markers)));
+
+ // 3. add the aggregation node above the union node
+ List<Symbol> aggregationOutputs = allocateSymbols(markers.size(), COUNT,
INT64);
+ AggregationNode aggregation =
+ computeCounts(union, node.getOutputSymbols(), markers,
aggregationOutputs);
+
+ return new TranslationResult(aggregation, aggregationOutputs);
+ }
+
+ /** for intersect all and except all, use true and false for markers */
+ public TranslationResult makeSetContainmentPlanForAll(SetOperationNode node)
{
+
+ checkArgument(!(node instanceof UnionNode), "Cannot simplify a UnionNode");
+ List<Symbol> markers = allocateSymbols(node.getChildren().size(), MARKER,
BOOLEAN);
+
+ // for every child of SetOperation node, add the marker column for the
child
+ List<PlanNode> projectNodesWithMarkers = appendMarkers(markers,
node.getChildren(), node);
+
+ UnionNode union =
+ union(
+ projectNodesWithMarkers,
+ ImmutableList.copyOf(concat(node.getOutputSymbols(), markers)));
+ List<Symbol> countOutputs = allocateSymbols(markers.size(), COUNT_MARKER,
INT64);
+ Symbol rowNumberSymbol = symbolAllocator.newSymbol(ROW_NUMBER_SYMBOL,
INT64);
+ WindowNode windowNode =
+ appendCounts(union, node.getOutputSymbols(), markers, countOutputs,
rowNumberSymbol);
+
+ ProjectNode projectNode =
+ new ProjectNode(
+ idAllocator.genPlanNodeId(),
+ windowNode,
+ Assignments.identity(
+ ImmutableList.copyOf(
+ concat(
+ node.getOutputSymbols(),
+ countOutputs,
+ ImmutableList.of(rowNumberSymbol)))));
+
+ return new TranslationResult(projectNode, countOutputs,
Optional.of(rowNumberSymbol));
+ }
+
+ /**
+ * only for transforming the intersection (all) node, add the window node
and group node above the
+ * union node
+ */
+ private WindowNode appendCounts(
+ UnionNode union,
+ List<Symbol> originOutputSymbols,
+ List<Symbol> markers,
+ List<Symbol> countOutputs,
+ Symbol rowNumberSymbol) {
+
+ checkArgument(
+ markers.size() == countOutputs.size(),
+ "the size of markers should be same as the size of count output
symbols");
+
+ // Add group node above the union node to assist partitioning, preparing
for the window node
+ ImmutableMap.Builder<Symbol, SortOrder> sortOrderings =
ImmutableMap.builder();
+ ImmutableList.Builder<Symbol> sortSymbolBuilder = ImmutableList.builder();
+ for (Symbol originalColumn : originOutputSymbols) {
+ sortSymbolBuilder.add(originalColumn);
+ sortOrderings.put(originalColumn, SortOrder.ASC_NULLS_LAST);
+ }
+ ImmutableList<Symbol> sortSymbols = sortSymbolBuilder.build();
+ GroupNode groupNode =
+ new GroupNode(
+ idAllocator.genPlanNodeId(),
+ union,
+ new OrderingScheme(sortSymbols, sortOrderings.build()),
+ sortSymbols.size());
+
+ // build the windowFunctions for count(marker) and row_number
+ ImmutableMap.Builder<Symbol, WindowNode.Function> windowFunctions =
ImmutableMap.builder();
+ WindowNode.Frame windowFunctionFrame =
+ new WindowNode.Frame(
+ WindowFrame.Type.ROWS,
+ FrameBound.Type.UNBOUNDED_PRECEDING,
+ Optional.empty(),
+ Optional.empty(),
+ FrameBound.Type.UNBOUNDED_FOLLOWING,
+ Optional.empty(),
+ Optional.empty(),
+ Optional.empty(),
+ Optional.empty());
+
+ ResolvedFunction countFunction =
+ getResolvedBuiltInAggregateFunction(
+ metadata, SqlConstant.COUNT, Collections.singletonList(BOOLEAN));
+ for (int i = 0; i < markers.size(); i++) {
+ windowFunctions.put(
+ countOutputs.get(i),
+ new WindowNode.Function(
+ countFunction,
+ ImmutableList.of(markers.get(i).toSymbolReference()),
+ windowFunctionFrame,
+ false));
+ }
+
+ List<Type> argumentTypes = ImmutableList.of();
+ ResolvedFunction rowNumberFunction =
+ new ResolvedFunction(
+ new BoundSignature(
+ SqlConstant.ROW_NUMBER,
+ metadata.getFunctionReturnType(SqlConstant.ROW_NUMBER,
argumentTypes),
+ argumentTypes),
+ FunctionId.NOOP_FUNCTION_ID,
+ FunctionKind.WINDOW,
+ true,
+
FunctionNullability.getAggregationFunctionNullability(argumentTypes.size()));
+
+ windowFunctions.put(
+ rowNumberSymbol,
+ new WindowNode.Function(rowNumberFunction, ImmutableList.of(),
windowFunctionFrame, false));
+
+ // add the windowNode above the group node
+ return new WindowNode(
+ idAllocator.genPlanNodeId(),
+ groupNode,
+ new DataOrganizationSpecification(originOutputSymbols,
Optional.empty()),
+ windowFunctions.buildOrThrow(),
+ Optional.empty(),
+ ImmutableSet.of(),
+ 0);
+ }
+
+ /** get an array of markers, used for the new columns */
+ private List<Symbol> allocateSymbols(int count, String nameHint, Type type) {
+ ImmutableList.Builder<Symbol> symbolsBuilder = ImmutableList.builder();
+ for (int i = 0; i < count; i++) {
+ symbolsBuilder.add(symbolAllocator.newSymbol(nameHint, type));
+ }
+ return symbolsBuilder.build();
+ }
+
+ /**
+ * Builds projection nodes with marker columns for each child of the set
operation. Each child
+ * gets TRUE for its own marker and NULL (cast to BOOLEAN) for others. This
is used in the
+ * implementation of INTERSECT and EXCEPT set operations.
+ */
+ private List<PlanNode> appendMarkers(
+ List<Symbol> markers, List<PlanNode> children, SetOperationNode node) {
+ ImmutableList.Builder<PlanNode> projectionsWithMarker =
ImmutableList.builder();
+
+ Map<Symbol, Collection<Symbol>> symbolMapping =
node.getSymbolMapping().asMap();
+ for (int childIndex = 0; childIndex < children.size(); childIndex++) {
+
+ // add the original symbols to projection node
+ Assignments.Builder assignments = Assignments.builder();
+ for (Symbol outputSymbol : node.getOutputSymbols()) {
+ Collection<Symbol> inputSymbols = symbolMapping.get(outputSymbol);
+ Symbol sourceSymbol = Iterables.get(inputSymbols, childIndex);
+
+ Symbol newProjectedSymbol = symbolAllocator.newSymbol(outputSymbol);
+ assignments.put(newProjectedSymbol, sourceSymbol.toSymbolReference());
+ }
+
+ // add the new marker symbol to the new projection node
+ for (int j = 0; j < markers.size(); j++) {
+ Expression expression =
+ j == childIndex ? TRUE_LITERAL : new Cast(new NullLiteral(),
toSqlType(BOOLEAN));
+ assignments.put(symbolAllocator.newSymbol(markers.get(j).getName(),
BOOLEAN), expression);
+ }
+
+ projectionsWithMarker.add(
+ new ProjectNode(
+ idAllocator.genPlanNodeId(), children.get(childIndex),
assignments.build()));
+ }
+
+ return projectionsWithMarker.build();
+ }
+
+ private UnionNode union(List<PlanNode> projectNodesWithMarkers, List<Symbol>
outputs) {
+
+ ImmutableListMultimap.Builder<Symbol, Symbol> outputsToInputs =
ImmutableListMultimap.builder();
+
+ for (PlanNode projectionNode : projectNodesWithMarkers) {
+ List<Symbol> outputSymbols = projectionNode.getOutputSymbols();
+ for (int i = 0; i < outputSymbols.size(); i++) {
+ outputsToInputs.put(outputs.get(i), outputSymbols.get(i));
+ }
+ }
+
+ return new UnionNode(
+ idAllocator.genPlanNodeId(), projectNodesWithMarkers,
outputsToInputs.build(), outputs);
+ }
+
+ /** add the aggregation node above the union node */
+ private AggregationNode computeCounts(
+ UnionNode unionNode,
+ List<Symbol> originalColumns,
+ List<Symbol> markers,
+ List<Symbol> aggregationOutputs) {
+
+ ImmutableMap.Builder<Symbol, AggregationNode.Aggregation> aggregations =
ImmutableMap.builder();
+
+ ResolvedFunction resolvedFunction =
+ getResolvedBuiltInAggregateFunction(metadata, COUNT,
Collections.singletonList(BOOLEAN));
+
+ for (int i = 0; i < markers.size(); i++) {
+ Symbol countMarker = aggregationOutputs.get(i);
+ aggregations.put(
+ countMarker,
+ new AggregationNode.Aggregation(
+ resolvedFunction,
+ ImmutableList.of(markers.get(i).toSymbolReference()),
+ false,
+ Optional.empty(),
+ Optional.empty(),
+ Optional.empty()));
+ }
+
+ return AggregationNode.singleAggregation(
+ idAllocator.genPlanNodeId(),
+ unionNode,
+ aggregations.buildOrThrow(),
+ singleGroupingSet(originalColumns));
+ }
+
+ public static class TranslationResult {
+
+ private final PlanNode planNode;
+ private final List<Symbol> countSymbols;
+ private final Optional<Symbol> rowNumberSymbol;
+
+ public TranslationResult(PlanNode planNode, List<Symbol> countSymbols) {
+ this(planNode, countSymbols, Optional.empty());
+ }
+
+ public TranslationResult(
+ PlanNode planNode, List<Symbol> countSymbols, Optional<Symbol>
rowNumberSymbol) {
+ this.planNode = requireNonNull(planNode, "planNode is null");
+ this.countSymbols =
+ ImmutableList.copyOf(requireNonNull(countSymbols, "countSymbols is
null"));
+ this.rowNumberSymbol = requireNonNull(rowNumberSymbol, "rowNumberSymbol
is null");
+ }
+
+ public List<Symbol> getCountSymbols() {
+ return countSymbols;
+ }
+
+ public Symbol getRowNumberSymbol() {
+ checkState(rowNumberSymbol.isPresent(), "rowNumberSymbol is empty");
+ return rowNumberSymbol.get();
+ }
+
+ public PlanNode getPlanNode() {
+ return this.planNode;
+ }
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/IntersectNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/IntersectNode.java
new file mode 100644
index 00000000000..3f7a0864971
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/IntersectNode.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.relational.planner.node;
+
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol;
+
+import com.google.common.collect.ListMultimap;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+public class IntersectNode extends SetOperationNode {
+
+ private final boolean distinct;
+
+ public IntersectNode(
+ PlanNodeId id,
+ List<PlanNode> children,
+ ListMultimap<Symbol, Symbol> outputToInputs,
+ List<Symbol> outputs,
+ boolean distinct) {
+ super(id, children, outputToInputs, outputs);
+ this.distinct = distinct;
+ }
+
+ private IntersectNode(
+ PlanNodeId id,
+ ListMultimap<Symbol, Symbol> outputToInputs,
+ List<Symbol> outputs,
+ boolean distinct) {
+ super(id, outputToInputs, outputs);
+ this.distinct = distinct;
+ }
+
+ @Override
+ public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+ return visitor.visitIntersect(this, context);
+ }
+
+ public boolean isDistinct() {
+ return distinct;
+ }
+
+ @Override
+ public PlanNode clone() {
+ return new IntersectNode(getPlanNodeId(), getSymbolMapping(),
getOutputSymbols(), distinct);
+ }
+
+ @Override
+ public List<String> getOutputColumnNames() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ protected void serializeAttributes(ByteBuffer byteBuffer) {
+ throw new UnsupportedOperationException(
+ "IntersectNode should never be serialized in current version");
+ }
+
+ @Override
+ protected void serializeAttributes(DataOutputStream stream) throws
IOException {
+ throw new UnsupportedOperationException(
+ "IntersectNode should never be serialized in current version");
+ }
+
+ public static IntersectNode deserialize(ByteBuffer byteBuffer) {
+ throw new UnsupportedOperationException(
+ "IntersectNode should never be deserialized in current version");
+ }
+
+ @Override
+ public PlanNode replaceChildren(List<PlanNode> newChildren) {
+ return new IntersectNode(
+ getPlanNodeId(), newChildren, getSymbolMapping(), getOutputSymbols(),
isDistinct());
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/Patterns.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/Patterns.java
index 3ebd933d28c..d08ac9ecf72 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/Patterns.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/Patterns.java
@@ -189,6 +189,10 @@ public final class Patterns {
return typeOf(UnionNode.class);
}
+ public static Pattern<IntersectNode> intersect() {
+ return typeOf(IntersectNode.class);
+ }
+
/*public static Pattern<TableWriterNode> tableWriterNode()
{
return typeOf(TableWriterNode.class);
@@ -249,10 +253,6 @@ public final class Patterns {
return typeOf(DistinctLimitNode.class);
}
- public static Pattern<IntersectNode> intersect()
- {
- return typeOf(IntersectNode.class);
- }
public static Pattern<ExceptNode> except()
{
@@ -359,6 +359,12 @@ public final class Patterns {
}
}
+ public static final class Intersect {
+ public static Property<IntersectNode, Lookup, Boolean> distinct() {
+ return property("distinct", IntersectNode::isDistinct);
+ }
+ }
+
/*public static final class Sample
{
public static Property<SampleNode, Lookup, Double> sampleRatio()
@@ -415,13 +421,7 @@ public final class Patterns {
}
}*/
- /*public static final class Intersect
- {
- public static Property<IntersectNode, Lookup, Boolean> distinct()
- {
- return property("distinct", IntersectNode::isDistinct);
- }
- }
+ /*
public static final class Except
{
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/LogicalOptimizeFactory.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/LogicalOptimizeFactory.java
index d70f67114b0..65015715945 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/LogicalOptimizeFactory.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/LogicalOptimizeFactory.java
@@ -26,6 +26,8 @@ import
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.Iterati
import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.Rule;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.RuleStatsRecorder;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.CanonicalizeExpressions;
+import
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.ImplementIntersectAll;
+import
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.ImplementIntersectDistinctAsUnion;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.ImplementPatternRecognition;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.ImplementTableFunctionSource;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.InlineProjections;
@@ -275,6 +277,17 @@ public class LogicalOptimizeFactory {
// new MergeExcept
new PruneDistinctAggregation()))
.build()),
+ new IterativeOptimizer(
+ plannerContext,
+ ruleStats,
+ ImmutableSet.<Rule<?>>builder()
+ .add(
+ new ImplementIntersectDistinctAsUnion(metadata),
+ // new ImplementExceptDistinctAsUnion(metadata)
+ new ImplementIntersectAll(metadata)
+ // new ImplementExceptAll(metadata))),
+ )
+ .build()),
columnPruningOptimizer,
inlineProjectionLimitFiltersOptimizer,
new IterativeOptimizer(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/UnaliasSymbolReferences.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/UnaliasSymbolReferences.java
index 95f0f6cb0f2..ff3a01be849 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/UnaliasSymbolReferences.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/UnaliasSymbolReferences.java
@@ -41,6 +41,7 @@ import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.FilterNode;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.GapFillNode;
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.GroupNode;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.InformationSchemaTableScanNode;
+import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.IntersectNode;
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.IntoNode;
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.JoinNode;
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.LimitNode;
@@ -1015,6 +1016,36 @@ public class UnaliasSymbolReferences implements
PlanOptimizer {
mapping);
}
+ @Override
+ public PlanAndMappings visitIntersect(IntersectNode node, UnaliasContext
context) {
+
+ List<PlanAndMappings> rewrittenSources =
+ node.getChildren().stream()
+ .map(source -> source.accept(this, context))
+ .collect(toImmutableList());
+
+ List<SymbolMapper> inputMappers =
+ rewrittenSources.stream()
+ .map(source -> symbolMapper(new HashMap<>(source.getMappings())))
+ .collect(toImmutableList());
+
+ Map<Symbol, Symbol> mapping = new
HashMap<>(context.getCorrelationMapping());
+ SymbolMapper outputMapper = symbolMapper(mapping);
+
+ ListMultimap<Symbol, Symbol> newOutputToInputs =
+ rewriteOutputToInputsMap(node.getSymbolMapping(), outputMapper,
inputMappers);
+ List<Symbol> newOutputs =
outputMapper.mapAndDistinct(node.getOutputSymbols());
+
+ return new PlanAndMappings(
+ new IntersectNode(
+ node.getPlanNodeId(),
+
rewrittenSources.stream().map(PlanAndMappings::getRoot).collect(toImmutableList()),
+ newOutputToInputs,
+ newOutputs,
+ node.isDistinct()),
+ mapping);
+ }
+
private ListMultimap<Symbol, Symbol> rewriteOutputToInputsMap(
ListMultimap<Symbol, Symbol> oldMapping,
SymbolMapper outputMapper,
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/IntersectTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/IntersectTest.java
new file mode 100644
index 00000000000..13d18c89ddf
--- /dev/null
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/IntersectTest.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.relational.analyzer;
+
+import org.apache.iotdb.db.queryengine.plan.planner.plan.LogicalQueryPlan;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.PlanTester;
+
+import org.junit.Test;
+
+import static
org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanAssert.assertPlan;
+import static
org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.aggregation;
+import static
org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.filter;
+import static
org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.output;
+import static
org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.project;
+import static
org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.sort;
+import static
org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.tableScan;
+import static
org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.union;
+import static
org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.window;
+
+/** tests for intersect (distinct) and intersect all */
+public class IntersectTest {
+
+ @Test
+ public void intersectTest() {
+
+ PlanTester planTester = new PlanTester();
+ LogicalQueryPlan actualLogicalQueryPlan =
+ planTester.createPlan("select tag1 from t1 intersect select tag1 from
t2");
+ // just verify the Logical plan: `Output - project - filter -
aggregation - union - 2*(project
+ // - tableScan)`
+ assertPlan(
+ actualLogicalQueryPlan,
+ output(
+ project(
+ filter(
+ aggregation(
+ union(
+ project(tableScan("testdb.t1")),
project(tableScan("testdb.t2"))))))));
+ }
+
+ @Test
+ public void intersectAllTest() {
+
+ PlanTester planTester = new PlanTester();
+ LogicalQueryPlan actualLogicalQueryPlan =
+ planTester.createPlan("select tag1 from t1 intersect all select tag1
from t2");
+ assertPlan(
+ actualLogicalQueryPlan,
+ output(
+ project(
+ filter(
+ project(
+ window(
+ sort(
+ union(
+ project(tableScan("testdb.t1")),
+ project(tableScan("testdb.t2"))))))))));
+ }
+
+ @Test
+ public void typeCompatibleTest() {
+ // use CAST if types of according columns is not compatible
+ // s1 is INT64, s3 is DOUBLE
+
+ PlanTester planTester = new PlanTester();
+ LogicalQueryPlan actualLogicalQueryPlan =
+ planTester.createPlan("select s1, s3 from table2 intersect all select
s1, s1 from table3 ");
+
+ assertPlan(
+ actualLogicalQueryPlan,
+ output(
+ project(
+ filter(
+ project(
+ window(
+ sort(
+ union(
+ project(tableScan("testdb.table2")),
+
project(tableScan("testdb.table3"))))))))));
+ }
+
+ /** the priority of intersect is higher than that of the union */
+ @Test
+ public void setOperationPriority() {
+
+ PlanTester planTester = new PlanTester();
+ LogicalQueryPlan actualLogicalQueryPlan =
+ planTester.createPlan(
+ "select tag1 from t1 union select tag1 from t2 intersect select
tag1 from t3");
+
+ assertPlan(
+ actualLogicalQueryPlan,
+ output(
+ aggregation(
+ union(
+ tableScan("testdb.t1"),
+ project(
+ filter(
+ aggregation(
+ union(
+ project(tableScan("testdb.t2")),
+ project(tableScan("testdb.t3"))))))))));
+ }
+}
diff --git
a/iotdb-core/relational-grammar/src/main/antlr4/org/apache/iotdb/db/relational/grammar/sql/RelationalSql.g4
b/iotdb-core/relational-grammar/src/main/antlr4/org/apache/iotdb/db/relational/grammar/sql/RelationalSql.g4
index 59231b6de2b..7357196219f 100644
---
a/iotdb-core/relational-grammar/src/main/antlr4/org/apache/iotdb/db/relational/grammar/sql/RelationalSql.g4
+++
b/iotdb-core/relational-grammar/src/main/antlr4/org/apache/iotdb/db/relational/grammar/sql/RelationalSql.g4
@@ -930,8 +930,9 @@ rowCount
;
queryTerm
- : queryPrimary
#queryTermDefault
- | left=queryTerm operator=(INTERSECT | UNION | EXCEPT) setQuantifier?
right=queryTerm #setOperation
+ : queryPrimary
#queryTermDefault
+ | left=queryTerm operator=INTERSECT setQuantifier? right=queryTerm
#setOperation
+ | left=queryTerm operator=(UNION | EXCEPT) setQuantifier? right=queryTerm
#setOperation
;
queryPrimary