This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 37388a1d494 implement MergeUnion rule to flatten nested UnionNode 
(#16657)
37388a1d494 is described below

commit 37388a1d49445a0412b286db5893088ebd804bb0
Author: alpass163 <[email protected]>
AuthorDate: Tue Oct 28 18:46:29 2025 +0800

    implement MergeUnion rule to flatten nested UnionNode (#16657)
---
 .../planner/iterative/rule/MergeUnion.java         |  48 +++++++
 .../planner/iterative/rule/SetOperationMerge.java  | 144 +++++++++++++++++++++
 .../optimizations/LogicalOptimizeFactory.java      |  15 ++-
 .../plan/relational/analyzer/MergeUnionTest.java   |  98 ++++++++++++++
 4 files changed, 304 insertions(+), 1 deletion(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/MergeUnion.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/MergeUnion.java
new file mode 100644
index 00000000000..2c378b78723
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/MergeUnion.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule;
+
+import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.Rule;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.SetOperationNode;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.node.UnionNode;
+import org.apache.iotdb.db.queryengine.plan.relational.utils.matching.Captures;
+import org.apache.iotdb.db.queryengine.plan.relational.utils.matching.Pattern;
+
+import java.util.Optional;
+
+import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.Patterns.union;
+
+public class MergeUnion implements Rule<UnionNode> {
+
+  private final Pattern<UnionNode> pattern = union();
+
+  @Override
+  public Pattern<UnionNode> getPattern() {
+    return pattern;
+  }
+
+  @Override
+  public Result apply(UnionNode node, Captures captures, Context context) {
+
+    SetOperationMerge mergeOperation = new SetOperationMerge(node, context);
+    Optional<SetOperationNode> result = mergeOperation.merge();
+    return result.map(Result::ofPlanNode).orElseGet(Result::empty);
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/SetOperationMerge.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/SetOperationMerge.java
new file mode 100644
index 00000000000..563a3883cbb
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/SetOperationMerge.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule;
+
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.Lookup;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.Rule;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.SetOperationNode;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.node.UnionNode;
+
+import com.google.common.collect.ImmutableListMultimap;
+import com.google.common.collect.Iterables;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static com.google.common.base.Preconditions.checkState;
+import static com.google.common.collect.ImmutableList.toImmutableList;
+
+public class SetOperationMerge {
+
+  private final Rule.Context context;
+  private final SetOperationNode node;
+  private final List<PlanNode> newSources;
+
+  public SetOperationMerge(SetOperationNode node, Rule.Context context) {
+    this.node = node;
+    this.context = context;
+    this.newSources = new ArrayList<>();
+  }
+
+  // Merge multiple union into one union
+  public Optional<SetOperationNode> merge() {
+
+    checkState(
+        node instanceof UnionNode, "unexpected node type: %s", 
node.getClass().getSimpleName());
+    Lookup lookup = context.getLookup();
+    // Pre-check
+    boolean anyMerge =
+        node.getChildren().stream()
+            .map(lookup::resolve)
+            .anyMatch(child -> node.getClass().equals(child.getClass()));
+    if (!anyMerge) {
+      return Optional.empty();
+    }
+
+    List<PlanNode> childrenOfUnion =
+        
node.getChildren().stream().map(lookup::resolve).collect(toImmutableList());
+
+    ImmutableListMultimap.Builder<Symbol, Symbol> newMappingsBuilder =
+        ImmutableListMultimap.builder();
+
+    boolean rewritten = false;
+
+    for (int i = 0; i < childrenOfUnion.size(); i++) {
+      PlanNode child = childrenOfUnion.get(i);
+
+      // Determine if set operations can be merged and whether the resulting 
set operation is
+      // quantified DISTINCT or ALL
+      Optional<Boolean> mergedQuantifier = mergedQuantifierIsDistinct(node, 
child);
+      if (mergedQuantifier.isPresent()) {
+        addMergedMappings((SetOperationNode) child, i, newMappingsBuilder);
+        rewritten = true;
+      } else {
+        // Keep mapping as it is
+        addOriginalMappings(child, i, newMappingsBuilder);
+      }
+    }
+
+    if (!rewritten) {
+      return Optional.empty();
+    }
+
+    // the union has merged
+    return Optional.of(
+        new UnionNode(
+            node.getPlanNodeId(), newSources, newMappingsBuilder.build(), 
node.getOutputSymbols()));
+  }
+
+  private void addMergedMappings(
+      SetOperationNode child,
+      int childIndex,
+      ImmutableListMultimap.Builder<Symbol, Symbol> newMappingsBuilder) {
+
+    newSources.addAll(child.getChildren());
+    Map<Symbol, Collection<Symbol>> symbolMappings = 
node.getSymbolMapping().asMap();
+    for (Map.Entry<Symbol, Collection<Symbol>> mapping : 
symbolMappings.entrySet()) {
+      Symbol input = Iterables.get(mapping.getValue(), childIndex);
+      newMappingsBuilder.putAll(mapping.getKey(), 
child.getSymbolMapping().get(input));
+    }
+  }
+
+  private void addOriginalMappings(
+      PlanNode child,
+      int childIndex,
+      ImmutableListMultimap.Builder<Symbol, Symbol> newMappingsBuilder) {
+
+    newSources.add(child);
+    Map<Symbol, Collection<Symbol>> symbolMappings = 
node.getSymbolMapping().asMap();
+    for (Map.Entry<Symbol, Collection<Symbol>> mapping : 
symbolMappings.entrySet()) {
+      newMappingsBuilder.put(mapping.getKey(), 
Iterables.get(mapping.getValue(), childIndex));
+    }
+  }
+
+  /**
+   * Check if node and child are mergeable based on their set operation type 
and quantifier.
+   *
+   * <p>Optional.empty() indicates that merge is not possible.
+   */
+  private Optional<Boolean> mergedQuantifierIsDistinct(SetOperationNode node, 
PlanNode child) {
+
+    if (!node.getClass().equals(child.getClass())) {
+      return Optional.empty();
+    }
+
+    if (node instanceof UnionNode) {
+      return Optional.of(false);
+    }
+
+    // the Judgment logic for intersect and except wait for supplying
+    return Optional.empty();
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/LogicalOptimizeFactory.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/LogicalOptimizeFactory.java
index 3af300db979..cdf99e55434 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/LogicalOptimizeFactory.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/LogicalOptimizeFactory.java
@@ -33,6 +33,7 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.Me
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.MergeLimitOverProjectWithSort;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.MergeLimitWithSort;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.MergeLimits;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.MergeUnion;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.MultipleDistinctAggregationToMarkDistinct;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.OptimizeRowPattern;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.PruneAggregationColumns;
@@ -207,6 +208,7 @@ public class LogicalOptimizeFactory {
                         new MergeFilters(),
                         new InlineProjections(plannerContext),
                         new RemoveRedundantIdentityProjections(),
+                        new MergeUnion(),
                         new MergeLimits(),
                         new RemoveTrivialFilters(),
                         //                        new RemoveRedundantLimit(),
@@ -245,7 +247,7 @@ public class LogicalOptimizeFactory {
                 .addAll(limitPushdownRules)
                 .addAll(
                     ImmutableSet.of(
-                        // new MergeUnion(),
+                        new MergeUnion(),
                         // new RemoveEmptyUnionBranches(),
                         new MergeFilters(),
                         new RemoveTrivialFilters(),
@@ -255,6 +257,17 @@ public class LogicalOptimizeFactory {
                 .build()),
         simplifyOptimizer,
         new UnaliasSymbolReferences(plannerContext.getMetadata()),
+        new IterativeOptimizer(
+            plannerContext,
+            ruleStats,
+            ImmutableSet.<Rule<?>>builder()
+                .addAll(
+                    ImmutableSet.of(
+                        new MergeUnion(),
+                        // new MergeIntersect
+                        // new MergeExcept
+                        new PruneDistinctAggregation()))
+                .build()),
         columnPruningOptimizer,
         inlineProjectionLimitFiltersOptimizer,
         new IterativeOptimizer(
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/MergeUnionTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/MergeUnionTest.java
new file mode 100644
index 00000000000..d74e237082c
--- /dev/null
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/MergeUnionTest.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.relational.analyzer;
+
+import org.apache.iotdb.db.queryengine.plan.planner.plan.LogicalQueryPlan;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.SymbolAllocator;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.TableLogicalPlanner;
+
+import org.junit.Test;
+
+import static 
org.apache.iotdb.db.queryengine.plan.relational.analyzer.AnalyzerTest.analyzeSQL;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.analyzer.TestUtils.DEFAULT_WARNING;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.analyzer.TestUtils.QUERY_CONTEXT;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.analyzer.TestUtils.SESSION_INFO;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.analyzer.TestUtils.TEST_MATADATA;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanAssert.assertPlan;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.output;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.tableScan;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.union;
+
+public class MergeUnionTest {
+
+  @Test
+  public void simpleLeftDeepMerge() {
+
+    String sql =
+        "(select tag1 from t1 union all select tag1 from t2) union all select 
tag1 from t3 ";
+    Analysis analysis = analyzeSQL(sql, TEST_MATADATA, QUERY_CONTEXT);
+    SymbolAllocator symbolAllocator = new SymbolAllocator();
+    LogicalQueryPlan actualLogicalQueryPlan =
+        new TableLogicalPlanner(
+                QUERY_CONTEXT, TEST_MATADATA, SESSION_INFO, symbolAllocator, 
DEFAULT_WARNING)
+            .plan(analysis);
+
+    // just verify the Logical plan    `Output  - union - 3*tableScan`
+    assertPlan(
+        actualLogicalQueryPlan,
+        output((union(tableScan("testdb.t1"), tableScan("testdb.t2"), 
tableScan("testdb.t3")))));
+  }
+
+  @Test
+  public void simpleRightDeepMerge() {
+
+    String sql =
+        "select tag1 from t1 union all (select tag1 from t2 union all select 
tag1 from t3) ";
+    Analysis analysis = analyzeSQL(sql, TEST_MATADATA, QUERY_CONTEXT);
+    SymbolAllocator symbolAllocator = new SymbolAllocator();
+    LogicalQueryPlan actualLogicalQueryPlan =
+        new TableLogicalPlanner(
+                QUERY_CONTEXT, TEST_MATADATA, SESSION_INFO, symbolAllocator, 
DEFAULT_WARNING)
+            .plan(analysis);
+
+    // just verify the Logical plan    `Output  - union - 3*tableScan`
+    assertPlan(
+        actualLogicalQueryPlan,
+        output((union(tableScan("testdb.t1"), tableScan("testdb.t2"), 
tableScan("testdb.t3")))));
+  }
+
+  @Test
+  public void bushyTreeMerge() {
+
+    String sql =
+        "(select tag1 from t1 union all select tag1 from t2) union all (select 
tag1 from t3 union all select tag1 from t4) ";
+    Analysis analysis = analyzeSQL(sql, TEST_MATADATA, QUERY_CONTEXT);
+    SymbolAllocator symbolAllocator = new SymbolAllocator();
+    LogicalQueryPlan actualLogicalQueryPlan =
+        new TableLogicalPlanner(
+                QUERY_CONTEXT, TEST_MATADATA, SESSION_INFO, symbolAllocator, 
DEFAULT_WARNING)
+            .plan(analysis);
+
+    // just verify the Logical plan    `Output  - union - 4*tableScan`
+    assertPlan(
+        actualLogicalQueryPlan,
+        output(
+            (union(
+                tableScan("testdb.t1"),
+                tableScan("testdb.t2"),
+                tableScan("testdb.t3"),
+                tableScan("testdb.t4")))));
+  }
+}

Reply via email to