This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 4ebb89b91e8 implement the intersect (distinct | all ) for table model 
(#16700)
4ebb89b91e8 is described below

commit 4ebb89b91e8e433c7997c2fe11ffeb4e2afdf197
Author: alpass163 <[email protected]>
AuthorDate: Fri Nov 7 17:05:08 2025 +0800

    implement the intersect (distinct | all ) for table model (#16700)
---
 .../it/query/recent/IoTDBIntersectTableIT.java     | 153 +++++++++
 .../plan/planner/plan/node/PlanGraphPrinter.java   |  10 +
 .../plan/planner/plan/node/PlanNodeType.java       |   4 +
 .../plan/planner/plan/node/PlanVisitor.java        |   5 +
 .../plan/relational/planner/RelationPlanner.java   |  24 +-
 .../iterative/rule/ImplementIntersectAll.java      |  95 ++++++
 .../rule/ImplementIntersectDistinctAsUnion.java    |  88 +++++
 .../iterative/rule/PruneDistinctAggregation.java   |   9 +
 .../iterative/rule/SetOperationNodeTranslator.java | 355 +++++++++++++++++++++
 .../relational/planner/node/IntersectNode.java     |  98 ++++++
 .../plan/relational/planner/node/Patterns.java     |  22 +-
 .../optimizations/LogicalOptimizeFactory.java      |  13 +
 .../optimizations/UnaliasSymbolReferences.java     |  31 ++
 .../plan/relational/analyzer/IntersectTest.java    | 121 +++++++
 .../db/relational/grammar/sql/RelationalSql.g4     |   5 +-
 15 files changed, 1015 insertions(+), 18 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/IoTDBIntersectTableIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/IoTDBIntersectTableIT.java
new file mode 100644
index 00000000000..2bc8d6bddb6
--- /dev/null
+++ 
b/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/IoTDBIntersectTableIT.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.relational.it.query.recent;
+
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.TableClusterIT;
+import org.apache.iotdb.itbase.category.TableLocalStandaloneIT;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import static org.apache.iotdb.db.it.utils.TestUtils.prepareTableData;
+import static org.apache.iotdb.db.it.utils.TestUtils.tableAssertTestFail;
+import static org.apache.iotdb.db.it.utils.TestUtils.tableResultSetEqualTest;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({TableLocalStandaloneIT.class, TableClusterIT.class})
+public class IoTDBIntersectTableIT {
+  protected static final String DATABASE_NAME = "test";
+  protected static final String[] createSqls =
+      new String[] {
+        "CREATE DATABASE " + DATABASE_NAME,
+        "USE " + DATABASE_NAME,
+        // table1:  ('d1', 1, 1) * 2, ('d1', 2, 2) *1
+        "create table table1(device STRING TAG, s1 INT32 FIELD, s2 INT32 
FIELD)",
+        "insert into table1 values (1, 'd1', 1, 1)",
+        "insert into table1 values (2, 'd1', 1, 1)",
+        "insert into table1 values (3, 'd1', 2, 2)",
+        // table2: ('d1', 1, 1.0) * 3, ('d1', 3, 3.0) *1
+        "create table table2(device STRING TAG, s1 INT64 FIELD, s2 DOUBLE 
FIELD)",
+        "insert into table2 values (1, 'd1', 1, 1.0)",
+        "insert into table2 values (2, 'd1', 1, 1.0)",
+        "insert into table2 values (3, 'd1', 1, 1.0)",
+        "insert into table2 values (4, 'd1', 3, 3.0)",
+        // table3: use for testing alias
+        "create table table3(device STRING TAG, s1_testName INT64 FIELD, 
s2_testName DOUBLE FIELD)",
+        "insert into table3 values (1, 'd1', 1, 1.0)",
+        "insert into table3 values (2, 'd1', 1, 1.0)",
+        "insert into table3 values (3, 'd1', 1, 1.0)",
+        "insert into table3 values (4, 'd1', 3, 3.0)",
+        // table4: test type compatible
+        "create table table4(device STRING TAG, s1 TEXT FIELD, s2 DOUBLE 
FIELD)"
+      };
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    EnvFactory.getEnv().initClusterEnvironment();
+    prepareTableData(createSqls);
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    EnvFactory.getEnv().cleanClusterEnvironment();
+  }
+
+  @Test
+  public void normalTest() {
+    String[] expectedHeader = new String[] {"device", "s1", "s2"};
+
+    // --- INTERSECT (DISTINCT) ---
+    // table1 and table2, expected one tuple : ('d1', 1, 1.0)
+    String[] retArray =
+        new String[] {
+          "d1,1,1.0,",
+        };
+    tableResultSetEqualTest(
+        "select device, s1, s2 from table1 intersect select device, s1, s2 
from table2",
+        expectedHeader,
+        retArray,
+        DATABASE_NAME);
+    tableResultSetEqualTest(
+        "select device, s1, s2 from table1 intersect distinct select device, 
s1, s2 from table2",
+        expectedHeader,
+        retArray,
+        DATABASE_NAME);
+
+    // --- INTERSECT ALL ---
+    // (1, 1.0) shows twice in table1, shows three times in table2
+    // expected: min(2, 3) = 2 tuple
+    retArray = new String[] {"d1,1,1.0,", "d1,1,1.0,"};
+    tableResultSetEqualTest(
+        "select device, s1, s2 from table1 intersect all select device, s1, s2 
from table2",
+        expectedHeader,
+        retArray,
+        DATABASE_NAME);
+    // test table3, the column name is different
+    tableResultSetEqualTest(
+        "select device, s1, s2 from table1 intersect all select device, 
s1_testName, s2_testName from table3",
+        expectedHeader,
+        retArray,
+        DATABASE_NAME);
+  }
+
+  @Test
+  public void mappingTest() {
+    // table1 (aliased): (s1 as col_a) -> (1), (1), (2)
+    // table2 (aliased): (s2 as col_a) -> (1.0), (1.0), (1.0), (3.0)
+    // common value: (1.0)
+
+    String[] expectedHeader = new String[] {"col_a"};
+
+    // --- INTERSECT (DISTINCT) with alias ---
+    String[] retArray = new String[] {"1.0,"};
+    tableResultSetEqualTest(
+        "select col_a from ((select s1 as col_a, device as col_b from table1) 
intersect (select s2 as col_a, device as col_b from table2)) order by col_a",
+        expectedHeader,
+        retArray,
+        DATABASE_NAME);
+
+    // --- INTERSECT ALL with alias ---
+    retArray = new String[] {"1.0,", "1.0,"};
+    tableResultSetEqualTest(
+        "select col_a from ((select s1 as col_a, device as col_b from table1) 
intersect all (select s2 as col_a, device as col_b from table2)) order by 
col_a",
+        expectedHeader,
+        retArray,
+        DATABASE_NAME);
+  }
+
+  @Test
+  public void exceptionTest() {
+    // type is incompatible (INT32 vs TEXT)
+    tableAssertTestFail(
+        "(select * from table1) intersect all (select * from table4)",
+        "has incompatible types: INT32, TEXT",
+        DATABASE_NAME);
+
+    tableAssertTestFail(
+        "(select * from table1) intersect all (select time from table4)",
+        "INTERSECT query has different number of fields: 4, 1",
+        DATABASE_NAME);
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java
index 6ffc534a712..82e03d8789b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java
@@ -75,6 +75,7 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.ExchangeNode
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.ExplainAnalyzeNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.GapFillNode;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.node.GroupNode;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.IntersectNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.LinearFillNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.MarkDistinctNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.PatternRecognitionNode;
@@ -1106,6 +1107,15 @@ public class PlanGraphPrinter extends 
PlanVisitor<List<String>, PlanGraphPrinter
     return render(node, boxValue, context);
   }
 
+  @Override
+  public List<String> visitIntersect(IntersectNode node, GraphContext context) 
{
+    List<String> boxValue = new ArrayList<>();
+    boxValue.add(String.format("Intersect-%s", node.getPlanNodeId().getId()));
+    boxValue.add(String.format("OutputSymbols: %s", node.getOutputSymbols()));
+    boxValue.add(String.format("isDistinct: %s", node.isDistinct()));
+    return render(node, boxValue, context);
+  }
+
   private List<String> render(PlanNode node, List<String> nodeBoxString, 
GraphContext context) {
     Box box = new Box(nodeBoxString);
     List<List<String>> children = new ArrayList<>();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
index ca1fdebad7f..1100793ca88 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
@@ -122,6 +122,7 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.EnforceSingl
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.GapFillNode;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.node.GroupNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.InformationSchemaTableScanNode;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.IntersectNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.LinearFillNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.MarkDistinctNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.PatternRecognitionNode;
@@ -312,6 +313,7 @@ public enum PlanNodeType {
   TABLE_WINDOW_FUNCTION((short) 1032),
   TABLE_INTO_NODE((short) 1033),
   TABLE_UNION_NODE((short) 1034),
+  TABLE_INTERSECT_NODE((short) 1035),
 
   RELATIONAL_INSERT_TABLET((short) 2000),
   RELATIONAL_INSERT_ROW((short) 2001),
@@ -701,6 +703,8 @@ public enum PlanNodeType {
             buffer);
       case 1034:
         return UnionNode.deserialize(buffer);
+      case 1035:
+        return IntersectNode.deserialize(buffer);
       case 2000:
         return RelationalInsertTabletNode.deserialize(buffer);
       case 2001:
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java
index cdb3ce7b4be..9e8e1834247 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java
@@ -126,6 +126,7 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.DeviceTableS
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.GapFillNode;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.node.GroupNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.InformationSchemaTableScanNode;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.IntersectNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.LinearFillNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.MarkDistinctNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.PatternRecognitionNode;
@@ -840,4 +841,8 @@ public abstract class PlanVisitor<R, C> {
   public R visitUnion(UnionNode node, C context) {
     return visitPlan(node, context);
   }
+
+  public R visitIntersect(IntersectNode node, C context) {
+    return visitPlan(node, context);
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java
index af823554b7b..ce573ce1e2d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java
@@ -57,6 +57,7 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.planner.ir.IrUtils;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.DeviceTableScanNode;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.node.FilterNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.InformationSchemaTableScanNode;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.IntersectNode;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.node.JoinNode;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.node.Measure;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.PatternRecognitionNode;
@@ -1145,6 +1146,24 @@ public class RelationPlanner extends 
AstVisitor<RelationPlan, Void> {
         planNode, analysis.getScope(node), planNode.getOutputSymbols(), 
outerContext);
   }
 
+  @Override
+  protected RelationPlan visitIntersect(Intersect node, Void context) {
+    Preconditions.checkArgument(
+        !node.getRelations().isEmpty(), "No relations specified for 
intersect");
+    SetOperationPlan setOperationPlan = process(node);
+
+    PlanNode intersectNode =
+        new IntersectNode(
+            idAllocator.genPlanNodeId(),
+            setOperationPlan.getChildren(),
+            setOperationPlan.getSymbolMapping(),
+            ImmutableList.copyOf(setOperationPlan.getSymbolMapping().keySet()),
+            node.isDistinct());
+
+    return new RelationPlan(
+        intersectNode, analysis.getScope(node), 
intersectNode.getOutputSymbols(), outerContext);
+  }
+
   private SetOperationPlan process(SetOperation node) {
     RelationType outputFields = analysis.getOutputDescriptor(node);
     List<Symbol> outputs =
@@ -1191,11 +1210,6 @@ public class RelationPlanner extends 
AstVisitor<RelationPlan, Void> {
     throw new IllegalStateException("Values is not supported in current 
version.");
   }
 
-  @Override
-  protected RelationPlan visitIntersect(Intersect node, Void context) {
-    throw new IllegalStateException("Intersect is not supported in current 
version.");
-  }
-
   @Override
   protected RelationPlan visitExcept(Except node, Void context) {
     throw new IllegalStateException("Except is not supported in current 
version.");
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/ImplementIntersectAll.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/ImplementIntersectAll.java
new file mode 100644
index 00000000000..af31281c918
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/ImplementIntersectAll.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule;
+
+import 
org.apache.iotdb.commons.udf.builtin.relational.TableBuiltinScalarFunction;
+import org.apache.iotdb.db.queryengine.plan.relational.metadata.Metadata;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.Assignments;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.Rule;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.node.FilterNode;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.IntersectNode;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.ProjectNode;
+import 
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ComparisonExpression;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Expression;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.FunctionCall;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.QualifiedName;
+import org.apache.iotdb.db.queryengine.plan.relational.utils.matching.Captures;
+import org.apache.iotdb.db.queryengine.plan.relational.utils.matching.Pattern;
+
+import com.google.common.collect.ImmutableList;
+
+import static java.util.Objects.requireNonNull;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.Patterns.Intersect.distinct;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.Patterns.intersect;
+
+public class ImplementIntersectAll implements Rule<IntersectNode> {
+
+  private static final Pattern<IntersectNode> PATTERN = 
intersect().with(distinct().equalTo(false));
+
+  private final Metadata metadata;
+
+  public ImplementIntersectAll(Metadata metadata) {
+    this.metadata = requireNonNull(metadata, "metadata is null");
+  }
+
+  @Override
+  public Pattern<IntersectNode> getPattern() {
+    return PATTERN;
+  }
+
+  @Override
+  public Result apply(IntersectNode node, Captures captures, Context context) {
+
+    SetOperationNodeTranslator translator =
+        new SetOperationNodeTranslator(
+            metadata, context.getSymbolAllocator(), context.getIdAllocator());
+
+    // 1. translate the intersect(all) node to other planNodes
+    SetOperationNodeTranslator.TranslationResult translationResult =
+        translator.makeSetContainmentPlanForAll(node);
+
+    // 2. add the filter node above the result node from translation process
+    // filter condition : row_number <= least(countA, countB...)
+    Expression minCount = 
translationResult.getCountSymbols().get(0).toSymbolReference();
+    for (int i = 1; i < translationResult.getCountSymbols().size(); i++) {
+      minCount =
+          new FunctionCall(
+              
QualifiedName.of(TableBuiltinScalarFunction.LEAST.getFunctionName()),
+              ImmutableList.of(
+                  minCount, 
translationResult.getCountSymbols().get(i).toSymbolReference()));
+    }
+
+    FilterNode filterNode =
+        new FilterNode(
+            context.getIdAllocator().genPlanNodeId(),
+            translationResult.getPlanNode(),
+            new ComparisonExpression(
+                ComparisonExpression.Operator.LESS_THAN_OR_EQUAL,
+                translationResult.getRowNumberSymbol().toSymbolReference(),
+                minCount));
+
+    // 3. add the project node to remove the redundant columns
+    return Result.ofPlanNode(
+        new ProjectNode(
+            context.getIdAllocator().genPlanNodeId(),
+            filterNode,
+            Assignments.identity(node.getOutputSymbols())));
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/ImplementIntersectDistinctAsUnion.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/ImplementIntersectDistinctAsUnion.java
new file mode 100644
index 00000000000..0a8f8c498ae
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/ImplementIntersectDistinctAsUnion.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule;
+
+import org.apache.iotdb.db.queryengine.plan.relational.metadata.Metadata;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.Assignments;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.Rule;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.node.FilterNode;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.IntersectNode;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.node.Patterns;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.ProjectNode;
+import 
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ComparisonExpression;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Expression;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.GenericLiteral;
+import org.apache.iotdb.db.queryengine.plan.relational.utils.matching.Captures;
+import org.apache.iotdb.db.queryengine.plan.relational.utils.matching.Pattern;
+
+import com.google.common.collect.ImmutableList;
+
+import static java.util.Objects.requireNonNull;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.ir.IrUtils.and;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ComparisonExpression.Operator.GREATER_THAN_OR_EQUAL;
+
+public class ImplementIntersectDistinctAsUnion implements Rule<IntersectNode> {
+
+  private static final Pattern<IntersectNode> PATTERN =
+      Patterns.intersect().with(Patterns.Intersect.distinct().equalTo(true));
+
+  private final Metadata metadata;
+
+  @Override
+  public Pattern<IntersectNode> getPattern() {
+    return PATTERN;
+  }
+
+  public ImplementIntersectDistinctAsUnion(Metadata metadata) {
+    this.metadata = requireNonNull(metadata, "metadata is null");
+  }
+
+  @Override
+  public Result apply(IntersectNode node, Captures captures, Context context) {
+
+    SetOperationNodeTranslator translator =
+        new SetOperationNodeTranslator(
+            metadata, context.getSymbolAllocator(), context.getIdAllocator());
+
+    SetOperationNodeTranslator.TranslationResult result =
+        translator.makeSetContainmentPlanForDistinct(node);
+
+    // add the filterNode above the aggregation node
+    Expression predicate =
+        and(
+            result.getCountSymbols().stream()
+                .map(
+                    symbol ->
+                        new ComparisonExpression(
+                            GREATER_THAN_OR_EQUAL,
+                            symbol.toSymbolReference(),
+                            new GenericLiteral("INT64", "1")))
+                .collect(ImmutableList.toImmutableList()));
+
+    FilterNode filterNode =
+        new FilterNode(context.getIdAllocator().genPlanNodeId(), 
result.getPlanNode(), predicate);
+
+    return Result.ofPlanNode(
+        new ProjectNode(
+            context.getIdAllocator().genPlanNodeId(),
+            filterNode,
+            Assignments.identity(node.getOutputSymbols())));
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PruneDistinctAggregation.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PruneDistinctAggregation.java
index 5b3a4101478..26fd0278016 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PruneDistinctAggregation.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PruneDistinctAggregation.java
@@ -24,6 +24,7 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.Lookup;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.Rule;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationNode;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.IntersectNode;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.node.UnionNode;
 import org.apache.iotdb.db.queryengine.plan.relational.utils.matching.Captures;
 import org.apache.iotdb.db.queryengine.plan.relational.utils.matching.Pattern;
@@ -100,6 +101,14 @@ public class PruneDistinctAggregation implements 
Rule<AggregationNode> {
       return rewriteChildren(node, context);
     }
 
+    @Override
+    public PlanNode visitIntersect(IntersectNode node, Boolean context) {
+      if (node.isDistinct()) {
+        return rewriteChildren(node, context);
+      }
+      return visitPlan(node, context);
+    }
+
     /*@Override
     public PlanNode visitUnion(UnionNode node, Boolean context)
     {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/SetOperationNodeTranslator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/SetOperationNodeTranslator.java
new file mode 100644
index 00000000000..3088860564e
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/SetOperationNodeTranslator.java
@@ -0,0 +1,355 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule;
+
+import org.apache.iotdb.db.queryengine.common.QueryId;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.queryengine.plan.relational.function.BoundSignature;
+import org.apache.iotdb.db.queryengine.plan.relational.function.FunctionId;
+import org.apache.iotdb.db.queryengine.plan.relational.function.FunctionKind;
+import 
org.apache.iotdb.db.queryengine.plan.relational.metadata.FunctionNullability;
+import org.apache.iotdb.db.queryengine.plan.relational.metadata.Metadata;
+import 
org.apache.iotdb.db.queryengine.plan.relational.metadata.ResolvedFunction;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.Assignments;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.DataOrganizationSpecification;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.OrderingScheme;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.SortOrder;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.SymbolAllocator;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationNode;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.node.GroupNode;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.ProjectNode;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.SetOperationNode;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.node.UnionNode;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.node.WindowNode;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Cast;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Expression;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.FrameBound;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.NullLiteral;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.WindowFrame;
+import org.apache.iotdb.db.utils.constant.SqlConstant;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableListMultimap;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import org.apache.tsfile.read.common.type.Type;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+import static com.google.common.collect.Iterables.concat;
+import static java.util.Objects.requireNonNull;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationNode.singleGroupingSet;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.Util.getResolvedBuiltInAggregateFunction;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.BooleanLiteral.TRUE_LITERAL;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.type.TypeSignatureTranslator.toSqlType;
+import static org.apache.iotdb.db.utils.constant.SqlConstant.COUNT;
+import static org.apache.tsfile.read.common.type.BooleanType.BOOLEAN;
+import static org.apache.tsfile.read.common.type.LongType.INT64;
+
+public class SetOperationNodeTranslator {
+
+  private static final String MARKER = "marker";
+  private static final String COUNT_MARKER = "count";
+  private static final String ROW_NUMBER_SYMBOL = "row_number";
+  private final SymbolAllocator symbolAllocator;
+  private final QueryId idAllocator;
+  private final Metadata metadata;
+
+  public SetOperationNodeTranslator(
+      Metadata metadata, SymbolAllocator symbolAllocator, QueryId idAllocator) 
{
+
+    this.metadata = requireNonNull(metadata, "metadata is null");
+    this.symbolAllocator = requireNonNull(symbolAllocator, "symbolAllocator is 
null");
+    this.idAllocator = requireNonNull(idAllocator, "idAllocator is null");
+  }
+
+  /** for intersect distinct and except distinct , use true and false for 
markers */
+  public TranslationResult makeSetContainmentPlanForDistinct(SetOperationNode 
node) {
+
+    checkArgument(!(node instanceof UnionNode), "Cannot simplify a UnionNode");
+    List<Symbol> markers = allocateSymbols(node.getChildren().size(), MARKER, 
BOOLEAN);
+
+    // 1. add the marker column to the origin planNode
+    List<PlanNode> projectNodesWithMarkers = appendMarkers(markers, 
node.getChildren(), node);
+
+    // 2. add the union node over all new projection nodes.
+    // The outputs of the union must have the same name as the original 
intersect node
+    UnionNode union =
+        union(
+            projectNodesWithMarkers,
+            ImmutableList.copyOf(concat(node.getOutputSymbols(), markers)));
+
+    // 3. add the aggregation node above the union node
+    List<Symbol> aggregationOutputs = allocateSymbols(markers.size(), COUNT, 
INT64);
+    AggregationNode aggregation =
+        computeCounts(union, node.getOutputSymbols(), markers, 
aggregationOutputs);
+
+    return new TranslationResult(aggregation, aggregationOutputs);
+  }
+
+  /** for intersect all and except all, use true and false for markers */
+  public TranslationResult makeSetContainmentPlanForAll(SetOperationNode node) 
{
+
+    checkArgument(!(node instanceof UnionNode), "Cannot simplify a UnionNode");
+    List<Symbol> markers = allocateSymbols(node.getChildren().size(), MARKER, 
BOOLEAN);
+
+    // for every child of SetOperation node, add the marker column for the 
child
+    List<PlanNode> projectNodesWithMarkers = appendMarkers(markers, 
node.getChildren(), node);
+
+    UnionNode union =
+        union(
+            projectNodesWithMarkers,
+            ImmutableList.copyOf(concat(node.getOutputSymbols(), markers)));
+    List<Symbol> countOutputs = allocateSymbols(markers.size(), COUNT_MARKER, 
INT64);
+    Symbol rowNumberSymbol = symbolAllocator.newSymbol(ROW_NUMBER_SYMBOL, 
INT64);
+    WindowNode windowNode =
+        appendCounts(union, node.getOutputSymbols(), markers, countOutputs, 
rowNumberSymbol);
+
+    ProjectNode projectNode =
+        new ProjectNode(
+            idAllocator.genPlanNodeId(),
+            windowNode,
+            Assignments.identity(
+                ImmutableList.copyOf(
+                    concat(
+                        node.getOutputSymbols(),
+                        countOutputs,
+                        ImmutableList.of(rowNumberSymbol)))));
+
+    return new TranslationResult(projectNode, countOutputs, 
Optional.of(rowNumberSymbol));
+  }
+
+  /**
+   * only for transforming the intersection (all) node, add the window node 
and group node above the
+   * union node
+   */
+  private WindowNode appendCounts(
+      UnionNode union,
+      List<Symbol> originOutputSymbols,
+      List<Symbol> markers,
+      List<Symbol> countOutputs,
+      Symbol rowNumberSymbol) {
+
+    checkArgument(
+        markers.size() == countOutputs.size(),
+        "the size of markers should be same as the size of count output 
symbols");
+
+    // Add group node above the union node to assist partitioning, preparing 
for the window node
+    ImmutableMap.Builder<Symbol, SortOrder> sortOrderings = 
ImmutableMap.builder();
+    ImmutableList.Builder<Symbol> sortSymbolBuilder = ImmutableList.builder();
+    for (Symbol originalColumn : originOutputSymbols) {
+      sortSymbolBuilder.add(originalColumn);
+      sortOrderings.put(originalColumn, SortOrder.ASC_NULLS_LAST);
+    }
+    ImmutableList<Symbol> sortSymbols = sortSymbolBuilder.build();
+    GroupNode groupNode =
+        new GroupNode(
+            idAllocator.genPlanNodeId(),
+            union,
+            new OrderingScheme(sortSymbols, sortOrderings.build()),
+            sortSymbols.size());
+
+    // build the windowFunctions for count(marker) and row_number
+    ImmutableMap.Builder<Symbol, WindowNode.Function> windowFunctions = 
ImmutableMap.builder();
+    WindowNode.Frame windowFunctionFrame =
+        new WindowNode.Frame(
+            WindowFrame.Type.ROWS,
+            FrameBound.Type.UNBOUNDED_PRECEDING,
+            Optional.empty(),
+            Optional.empty(),
+            FrameBound.Type.UNBOUNDED_FOLLOWING,
+            Optional.empty(),
+            Optional.empty(),
+            Optional.empty(),
+            Optional.empty());
+
+    ResolvedFunction countFunction =
+        getResolvedBuiltInAggregateFunction(
+            metadata, SqlConstant.COUNT, Collections.singletonList(BOOLEAN));
+    for (int i = 0; i < markers.size(); i++) {
+      windowFunctions.put(
+          countOutputs.get(i),
+          new WindowNode.Function(
+              countFunction,
+              ImmutableList.of(markers.get(i).toSymbolReference()),
+              windowFunctionFrame,
+              false));
+    }
+
+    List<Type> argumentTypes = ImmutableList.of();
+    ResolvedFunction rowNumberFunction =
+        new ResolvedFunction(
+            new BoundSignature(
+                SqlConstant.ROW_NUMBER,
+                metadata.getFunctionReturnType(SqlConstant.ROW_NUMBER, 
argumentTypes),
+                argumentTypes),
+            FunctionId.NOOP_FUNCTION_ID,
+            FunctionKind.WINDOW,
+            true,
+            
FunctionNullability.getAggregationFunctionNullability(argumentTypes.size()));
+
+    windowFunctions.put(
+        rowNumberSymbol,
+        new WindowNode.Function(rowNumberFunction, ImmutableList.of(), 
windowFunctionFrame, false));
+
+    // add the windowNode above the group node
+    return new WindowNode(
+        idAllocator.genPlanNodeId(),
+        groupNode,
+        new DataOrganizationSpecification(originOutputSymbols, 
Optional.empty()),
+        windowFunctions.buildOrThrow(),
+        Optional.empty(),
+        ImmutableSet.of(),
+        0);
+  }
+
+  /** get an array of markers, used for the new columns */
+  private List<Symbol> allocateSymbols(int count, String nameHint, Type type) {
+    ImmutableList.Builder<Symbol> symbolsBuilder = ImmutableList.builder();
+    for (int i = 0; i < count; i++) {
+      symbolsBuilder.add(symbolAllocator.newSymbol(nameHint, type));
+    }
+    return symbolsBuilder.build();
+  }
+
+  /**
+   * Builds projection nodes with marker columns for each child of the set 
operation. Each child
+   * gets TRUE for its own marker and NULL (cast to BOOLEAN) for others. This 
is used in the
+   * implementation of INTERSECT and EXCEPT set operations.
+   */
+  private List<PlanNode> appendMarkers(
+      List<Symbol> markers, List<PlanNode> children, SetOperationNode node) {
+    ImmutableList.Builder<PlanNode> projectionsWithMarker = 
ImmutableList.builder();
+
+    Map<Symbol, Collection<Symbol>> symbolMapping = 
node.getSymbolMapping().asMap();
+    for (int childIndex = 0; childIndex < children.size(); childIndex++) {
+
+      // add the original symbols to projection node
+      Assignments.Builder assignments = Assignments.builder();
+      for (Symbol outputSymbol : node.getOutputSymbols()) {
+        Collection<Symbol> inputSymbols = symbolMapping.get(outputSymbol);
+        Symbol sourceSymbol = Iterables.get(inputSymbols, childIndex);
+
+        Symbol newProjectedSymbol = symbolAllocator.newSymbol(outputSymbol);
+        assignments.put(newProjectedSymbol, sourceSymbol.toSymbolReference());
+      }
+
+      // add the new marker symbol to the new projection node
+      for (int j = 0; j < markers.size(); j++) {
+        Expression expression =
+            j == childIndex ? TRUE_LITERAL : new Cast(new NullLiteral(), 
toSqlType(BOOLEAN));
+        assignments.put(symbolAllocator.newSymbol(markers.get(j).getName(), 
BOOLEAN), expression);
+      }
+
+      projectionsWithMarker.add(
+          new ProjectNode(
+              idAllocator.genPlanNodeId(), children.get(childIndex), 
assignments.build()));
+    }
+
+    return projectionsWithMarker.build();
+  }
+
+  private UnionNode union(List<PlanNode> projectNodesWithMarkers, List<Symbol> 
outputs) {
+
+    ImmutableListMultimap.Builder<Symbol, Symbol> outputsToInputs = 
ImmutableListMultimap.builder();
+
+    for (PlanNode projectionNode : projectNodesWithMarkers) {
+      List<Symbol> outputSymbols = projectionNode.getOutputSymbols();
+      for (int i = 0; i < outputSymbols.size(); i++) {
+        outputsToInputs.put(outputs.get(i), outputSymbols.get(i));
+      }
+    }
+
+    return new UnionNode(
+        idAllocator.genPlanNodeId(), projectNodesWithMarkers, 
outputsToInputs.build(), outputs);
+  }
+
+  /** add the aggregation node above the union node */
+  private AggregationNode computeCounts(
+      UnionNode unionNode,
+      List<Symbol> originalColumns,
+      List<Symbol> markers,
+      List<Symbol> aggregationOutputs) {
+
+    ImmutableMap.Builder<Symbol, AggregationNode.Aggregation> aggregations = 
ImmutableMap.builder();
+
+    ResolvedFunction resolvedFunction =
+        getResolvedBuiltInAggregateFunction(metadata, COUNT, 
Collections.singletonList(BOOLEAN));
+
+    for (int i = 0; i < markers.size(); i++) {
+      Symbol countMarker = aggregationOutputs.get(i);
+      aggregations.put(
+          countMarker,
+          new AggregationNode.Aggregation(
+              resolvedFunction,
+              ImmutableList.of(markers.get(i).toSymbolReference()),
+              false,
+              Optional.empty(),
+              Optional.empty(),
+              Optional.empty()));
+    }
+
+    return AggregationNode.singleAggregation(
+        idAllocator.genPlanNodeId(),
+        unionNode,
+        aggregations.buildOrThrow(),
+        singleGroupingSet(originalColumns));
+  }
+
+  public static class TranslationResult {
+
+    private final PlanNode planNode;
+    private final List<Symbol> countSymbols;
+    private final Optional<Symbol> rowNumberSymbol;
+
+    public TranslationResult(PlanNode planNode, List<Symbol> countSymbols) {
+      this(planNode, countSymbols, Optional.empty());
+    }
+
+    public TranslationResult(
+        PlanNode planNode, List<Symbol> countSymbols, Optional<Symbol> 
rowNumberSymbol) {
+      this.planNode = requireNonNull(planNode, "planNode is null");
+      this.countSymbols =
+          ImmutableList.copyOf(requireNonNull(countSymbols, "countSymbols is 
null"));
+      this.rowNumberSymbol = requireNonNull(rowNumberSymbol, "rowNumberSymbol 
is null");
+    }
+
+    public List<Symbol> getCountSymbols() {
+      return countSymbols;
+    }
+
+    public Symbol getRowNumberSymbol() {
+      checkState(rowNumberSymbol.isPresent(), "rowNumberSymbol is empty");
+      return rowNumberSymbol.get();
+    }
+
+    public PlanNode getPlanNode() {
+      return this.planNode;
+    }
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/IntersectNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/IntersectNode.java
new file mode 100644
index 00000000000..3f7a0864971
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/IntersectNode.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.relational.planner.node;
+
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol;
+
+import com.google.common.collect.ListMultimap;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+public class IntersectNode extends SetOperationNode {
+
+  private final boolean distinct;
+
+  public IntersectNode(
+      PlanNodeId id,
+      List<PlanNode> children,
+      ListMultimap<Symbol, Symbol> outputToInputs,
+      List<Symbol> outputs,
+      boolean distinct) {
+    super(id, children, outputToInputs, outputs);
+    this.distinct = distinct;
+  }
+
+  private IntersectNode(
+      PlanNodeId id,
+      ListMultimap<Symbol, Symbol> outputToInputs,
+      List<Symbol> outputs,
+      boolean distinct) {
+    super(id, outputToInputs, outputs);
+    this.distinct = distinct;
+  }
+
+  @Override
+  public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+    return visitor.visitIntersect(this, context);
+  }
+
+  public boolean isDistinct() {
+    return distinct;
+  }
+
+  @Override
+  public PlanNode clone() {
+    return new IntersectNode(getPlanNodeId(), getSymbolMapping(), 
getOutputSymbols(), distinct);
+  }
+
+  @Override
+  public List<String> getOutputColumnNames() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  protected void serializeAttributes(ByteBuffer byteBuffer) {
+    throw new UnsupportedOperationException(
+        "IntersectNode should never be serialized in current version");
+  }
+
+  @Override
+  protected void serializeAttributes(DataOutputStream stream) throws 
IOException {
+    throw new UnsupportedOperationException(
+        "IntersectNode should never be serialized in current version");
+  }
+
+  public static IntersectNode deserialize(ByteBuffer byteBuffer) {
+    throw new UnsupportedOperationException(
+        "IntersectNode should never be deserialized in current version");
+  }
+
+  @Override
+  public PlanNode replaceChildren(List<PlanNode> newChildren) {
+    return new IntersectNode(
+        getPlanNodeId(), newChildren, getSymbolMapping(), getOutputSymbols(), 
isDistinct());
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/Patterns.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/Patterns.java
index 3ebd933d28c..d08ac9ecf72 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/Patterns.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/Patterns.java
@@ -189,6 +189,10 @@ public final class Patterns {
     return typeOf(UnionNode.class);
   }
 
+  public static Pattern<IntersectNode> intersect() {
+    return typeOf(IntersectNode.class);
+  }
+
   /*public static Pattern<TableWriterNode> tableWriterNode()
   {
       return typeOf(TableWriterNode.class);
@@ -249,10 +253,6 @@ public final class Patterns {
       return typeOf(DistinctLimitNode.class);
   }
 
-  public static Pattern<IntersectNode> intersect()
-  {
-      return typeOf(IntersectNode.class);
-  }
 
   public static Pattern<ExceptNode> except()
   {
@@ -359,6 +359,12 @@ public final class Patterns {
     }
   }
 
+  public static final class Intersect {
+    public static Property<IntersectNode, Lookup, Boolean> distinct() {
+      return property("distinct", IntersectNode::isDistinct);
+    }
+  }
+
   /*public static final class Sample
   {
       public static Property<SampleNode, Lookup, Double> sampleRatio()
@@ -415,13 +421,7 @@ public final class Patterns {
       }
   }*/
 
-  /*public static final class Intersect
-  {
-      public static Property<IntersectNode, Lookup, Boolean> distinct()
-      {
-          return property("distinct", IntersectNode::isDistinct);
-      }
-  }
+  /*
 
   public static final class Except
   {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/LogicalOptimizeFactory.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/LogicalOptimizeFactory.java
index d70f67114b0..65015715945 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/LogicalOptimizeFactory.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/LogicalOptimizeFactory.java
@@ -26,6 +26,8 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.Iterati
 import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.Rule;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.RuleStatsRecorder;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.CanonicalizeExpressions;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.ImplementIntersectAll;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.ImplementIntersectDistinctAsUnion;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.ImplementPatternRecognition;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.ImplementTableFunctionSource;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.InlineProjections;
@@ -275,6 +277,17 @@ public class LogicalOptimizeFactory {
                         // new MergeExcept
                         new PruneDistinctAggregation()))
                 .build()),
+        new IterativeOptimizer(
+            plannerContext,
+            ruleStats,
+            ImmutableSet.<Rule<?>>builder()
+                .add(
+                    new ImplementIntersectDistinctAsUnion(metadata),
+                    // new ImplementExceptDistinctAsUnion(metadata)
+                    new ImplementIntersectAll(metadata)
+                    // new ImplementExceptAll(metadata))),
+                    )
+                .build()),
         columnPruningOptimizer,
         inlineProjectionLimitFiltersOptimizer,
         new IterativeOptimizer(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/UnaliasSymbolReferences.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/UnaliasSymbolReferences.java
index 95f0f6cb0f2..ff3a01be849 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/UnaliasSymbolReferences.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/UnaliasSymbolReferences.java
@@ -41,6 +41,7 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.FilterNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.GapFillNode;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.node.GroupNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.InformationSchemaTableScanNode;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.IntersectNode;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.node.IntoNode;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.node.JoinNode;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.node.LimitNode;
@@ -1015,6 +1016,36 @@ public class UnaliasSymbolReferences implements 
PlanOptimizer {
           mapping);
     }
 
+    @Override
+    public PlanAndMappings visitIntersect(IntersectNode node, UnaliasContext 
context) {
+
+      List<PlanAndMappings> rewrittenSources =
+          node.getChildren().stream()
+              .map(source -> source.accept(this, context))
+              .collect(toImmutableList());
+
+      List<SymbolMapper> inputMappers =
+          rewrittenSources.stream()
+              .map(source -> symbolMapper(new HashMap<>(source.getMappings())))
+              .collect(toImmutableList());
+
+      Map<Symbol, Symbol> mapping = new 
HashMap<>(context.getCorrelationMapping());
+      SymbolMapper outputMapper = symbolMapper(mapping);
+
+      ListMultimap<Symbol, Symbol> newOutputToInputs =
+          rewriteOutputToInputsMap(node.getSymbolMapping(), outputMapper, 
inputMappers);
+      List<Symbol> newOutputs = 
outputMapper.mapAndDistinct(node.getOutputSymbols());
+
+      return new PlanAndMappings(
+          new IntersectNode(
+              node.getPlanNodeId(),
+              
rewrittenSources.stream().map(PlanAndMappings::getRoot).collect(toImmutableList()),
+              newOutputToInputs,
+              newOutputs,
+              node.isDistinct()),
+          mapping);
+    }
+
     private ListMultimap<Symbol, Symbol> rewriteOutputToInputsMap(
         ListMultimap<Symbol, Symbol> oldMapping,
         SymbolMapper outputMapper,
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/IntersectTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/IntersectTest.java
new file mode 100644
index 00000000000..13d18c89ddf
--- /dev/null
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/IntersectTest.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.relational.analyzer;
+
+import org.apache.iotdb.db.queryengine.plan.planner.plan.LogicalQueryPlan;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.PlanTester;
+
+import org.junit.Test;
+
+import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanAssert.assertPlan;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.aggregation;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.filter;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.output;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.project;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.sort;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.tableScan;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.union;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.window;
+
+/** tests for intersect (distinct) and intersect all */
+public class IntersectTest {
+
+  @Test
+  public void intersectTest() {
+
+    PlanTester planTester = new PlanTester();
+    LogicalQueryPlan actualLogicalQueryPlan =
+        planTester.createPlan("select tag1 from t1 intersect select tag1 from 
t2");
+    // just verify the Logical plan:  `Output  - project - filter - 
aggregation - union - 2*(project
+    // - tableScan)`
+    assertPlan(
+        actualLogicalQueryPlan,
+        output(
+            project(
+                filter(
+                    aggregation(
+                        union(
+                            project(tableScan("testdb.t1")), 
project(tableScan("testdb.t2"))))))));
+  }
+
+  @Test
+  public void intersectAllTest() {
+
+    PlanTester planTester = new PlanTester();
+    LogicalQueryPlan actualLogicalQueryPlan =
+        planTester.createPlan("select tag1 from t1 intersect all select tag1 
from t2");
+    assertPlan(
+        actualLogicalQueryPlan,
+        output(
+            project(
+                filter(
+                    project(
+                        window(
+                            sort(
+                                union(
+                                    project(tableScan("testdb.t1")),
+                                    project(tableScan("testdb.t2"))))))))));
+  }
+
+  @Test
+  public void typeCompatibleTest() {
+    // use CAST if types of according columns is not compatible
+    // s1 is INT64, s3 is DOUBLE
+
+    PlanTester planTester = new PlanTester();
+    LogicalQueryPlan actualLogicalQueryPlan =
+        planTester.createPlan("select s1, s3 from table2 intersect all select 
s1, s1 from table3 ");
+
+    assertPlan(
+        actualLogicalQueryPlan,
+        output(
+            project(
+                filter(
+                    project(
+                        window(
+                            sort(
+                                union(
+                                    project(tableScan("testdb.table2")),
+                                    
project(tableScan("testdb.table3"))))))))));
+  }
+
+  /** the priority of intersect is higher than that of the union */
+  @Test
+  public void setOperationPriority() {
+
+    PlanTester planTester = new PlanTester();
+    LogicalQueryPlan actualLogicalQueryPlan =
+        planTester.createPlan(
+            "select tag1 from t1 union select tag1 from t2 intersect select 
tag1 from t3");
+
+    assertPlan(
+        actualLogicalQueryPlan,
+        output(
+            aggregation(
+                union(
+                    tableScan("testdb.t1"),
+                    project(
+                        filter(
+                            aggregation(
+                                union(
+                                    project(tableScan("testdb.t2")),
+                                    project(tableScan("testdb.t3"))))))))));
+  }
+}
diff --git 
a/iotdb-core/relational-grammar/src/main/antlr4/org/apache/iotdb/db/relational/grammar/sql/RelationalSql.g4
 
b/iotdb-core/relational-grammar/src/main/antlr4/org/apache/iotdb/db/relational/grammar/sql/RelationalSql.g4
index 59231b6de2b..7357196219f 100644
--- 
a/iotdb-core/relational-grammar/src/main/antlr4/org/apache/iotdb/db/relational/grammar/sql/RelationalSql.g4
+++ 
b/iotdb-core/relational-grammar/src/main/antlr4/org/apache/iotdb/db/relational/grammar/sql/RelationalSql.g4
@@ -930,8 +930,9 @@ rowCount
     ;
 
 queryTerm
-    : queryPrimary                                                             
                   #queryTermDefault
-    | left=queryTerm operator=(INTERSECT | UNION | EXCEPT) setQuantifier? 
right=queryTerm         #setOperation
+    : queryPrimary                                                             
    #queryTermDefault
+    | left=queryTerm operator=INTERSECT setQuantifier? right=queryTerm         
    #setOperation
+    | left=queryTerm operator=(UNION | EXCEPT) setQuantifier? right=queryTerm  
    #setOperation
     ;
 
 queryPrimary

Reply via email to