This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 37388a1d494 implement MergeUnion rule to flatten nested UnionNode
(#16657)
37388a1d494 is described below
commit 37388a1d49445a0412b286db5893088ebd804bb0
Author: alpass163 <[email protected]>
AuthorDate: Tue Oct 28 18:46:29 2025 +0800
implement MergeUnion rule to flatten nested UnionNode (#16657)
---
.../planner/iterative/rule/MergeUnion.java | 48 +++++++
.../planner/iterative/rule/SetOperationMerge.java | 144 +++++++++++++++++++++
.../optimizations/LogicalOptimizeFactory.java | 15 ++-
.../plan/relational/analyzer/MergeUnionTest.java | 98 ++++++++++++++
4 files changed, 304 insertions(+), 1 deletion(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/MergeUnion.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/MergeUnion.java
new file mode 100644
index 00000000000..2c378b78723
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/MergeUnion.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule;
+
+import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.Rule;
+import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.SetOperationNode;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.node.UnionNode;
+import org.apache.iotdb.db.queryengine.plan.relational.utils.matching.Captures;
+import org.apache.iotdb.db.queryengine.plan.relational.utils.matching.Pattern;
+
+import java.util.Optional;
+
+import static
org.apache.iotdb.db.queryengine.plan.relational.planner.node.Patterns.union;
+
+public class MergeUnion implements Rule<UnionNode> {
+
+ private final Pattern<UnionNode> pattern = union();
+
+ @Override
+ public Pattern<UnionNode> getPattern() {
+ return pattern;
+ }
+
+ @Override
+ public Result apply(UnionNode node, Captures captures, Context context) {
+
+ SetOperationMerge mergeOperation = new SetOperationMerge(node, context);
+ Optional<SetOperationNode> result = mergeOperation.merge();
+ return result.map(Result::ofPlanNode).orElseGet(Result::empty);
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/SetOperationMerge.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/SetOperationMerge.java
new file mode 100644
index 00000000000..563a3883cbb
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/SetOperationMerge.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule;
+
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol;
+import
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.Lookup;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.Rule;
+import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.SetOperationNode;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.node.UnionNode;
+
+import com.google.common.collect.ImmutableListMultimap;
+import com.google.common.collect.Iterables;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static com.google.common.base.Preconditions.checkState;
+import static com.google.common.collect.ImmutableList.toImmutableList;
+
+public class SetOperationMerge {
+
+ private final Rule.Context context;
+ private final SetOperationNode node;
+ private final List<PlanNode> newSources;
+
+ public SetOperationMerge(SetOperationNode node, Rule.Context context) {
+ this.node = node;
+ this.context = context;
+ this.newSources = new ArrayList<>();
+ }
+
+ // Merge multiple union into one union
+ public Optional<SetOperationNode> merge() {
+
+ checkState(
+ node instanceof UnionNode, "unexpected node type: %s",
node.getClass().getSimpleName());
+ Lookup lookup = context.getLookup();
+ // Pre-check
+ boolean anyMerge =
+ node.getChildren().stream()
+ .map(lookup::resolve)
+ .anyMatch(child -> node.getClass().equals(child.getClass()));
+ if (!anyMerge) {
+ return Optional.empty();
+ }
+
+ List<PlanNode> childrenOfUnion =
+
node.getChildren().stream().map(lookup::resolve).collect(toImmutableList());
+
+ ImmutableListMultimap.Builder<Symbol, Symbol> newMappingsBuilder =
+ ImmutableListMultimap.builder();
+
+ boolean rewritten = false;
+
+ for (int i = 0; i < childrenOfUnion.size(); i++) {
+ PlanNode child = childrenOfUnion.get(i);
+
+ // Determine if set operations can be merged and whether the resulting
set operation is
+ // quantified DISTINCT or ALL
+ Optional<Boolean> mergedQuantifier = mergedQuantifierIsDistinct(node,
child);
+ if (mergedQuantifier.isPresent()) {
+ addMergedMappings((SetOperationNode) child, i, newMappingsBuilder);
+ rewritten = true;
+ } else {
+ // Keep mapping as it is
+ addOriginalMappings(child, i, newMappingsBuilder);
+ }
+ }
+
+ if (!rewritten) {
+ return Optional.empty();
+ }
+
+ // the union has merged
+ return Optional.of(
+ new UnionNode(
+ node.getPlanNodeId(), newSources, newMappingsBuilder.build(),
node.getOutputSymbols()));
+ }
+
+ private void addMergedMappings(
+ SetOperationNode child,
+ int childIndex,
+ ImmutableListMultimap.Builder<Symbol, Symbol> newMappingsBuilder) {
+
+ newSources.addAll(child.getChildren());
+ Map<Symbol, Collection<Symbol>> symbolMappings =
node.getSymbolMapping().asMap();
+ for (Map.Entry<Symbol, Collection<Symbol>> mapping :
symbolMappings.entrySet()) {
+ Symbol input = Iterables.get(mapping.getValue(), childIndex);
+ newMappingsBuilder.putAll(mapping.getKey(),
child.getSymbolMapping().get(input));
+ }
+ }
+
+ private void addOriginalMappings(
+ PlanNode child,
+ int childIndex,
+ ImmutableListMultimap.Builder<Symbol, Symbol> newMappingsBuilder) {
+
+ newSources.add(child);
+ Map<Symbol, Collection<Symbol>> symbolMappings =
node.getSymbolMapping().asMap();
+ for (Map.Entry<Symbol, Collection<Symbol>> mapping :
symbolMappings.entrySet()) {
+ newMappingsBuilder.put(mapping.getKey(),
Iterables.get(mapping.getValue(), childIndex));
+ }
+ }
+
+ /**
+ * Check if node and child are mergeable based on their set operation type
and quantifier.
+ *
+ * <p>Optional.empty() indicates that merge is not possible.
+ */
+ private Optional<Boolean> mergedQuantifierIsDistinct(SetOperationNode node,
PlanNode child) {
+
+ if (!node.getClass().equals(child.getClass())) {
+ return Optional.empty();
+ }
+
+ if (node instanceof UnionNode) {
+ return Optional.of(false);
+ }
+
+ // the Judgment logic for intersect and except wait for supplying
+ return Optional.empty();
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/LogicalOptimizeFactory.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/LogicalOptimizeFactory.java
index 3af300db979..cdf99e55434 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/LogicalOptimizeFactory.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/LogicalOptimizeFactory.java
@@ -33,6 +33,7 @@ import
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.Me
import
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.MergeLimitOverProjectWithSort;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.MergeLimitWithSort;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.MergeLimits;
+import
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.MergeUnion;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.MultipleDistinctAggregationToMarkDistinct;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.OptimizeRowPattern;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.PruneAggregationColumns;
@@ -207,6 +208,7 @@ public class LogicalOptimizeFactory {
new MergeFilters(),
new InlineProjections(plannerContext),
new RemoveRedundantIdentityProjections(),
+ new MergeUnion(),
new MergeLimits(),
new RemoveTrivialFilters(),
// new RemoveRedundantLimit(),
@@ -245,7 +247,7 @@ public class LogicalOptimizeFactory {
.addAll(limitPushdownRules)
.addAll(
ImmutableSet.of(
- // new MergeUnion(),
+ new MergeUnion(),
// new RemoveEmptyUnionBranches(),
new MergeFilters(),
new RemoveTrivialFilters(),
@@ -255,6 +257,17 @@ public class LogicalOptimizeFactory {
.build()),
simplifyOptimizer,
new UnaliasSymbolReferences(plannerContext.getMetadata()),
+ new IterativeOptimizer(
+ plannerContext,
+ ruleStats,
+ ImmutableSet.<Rule<?>>builder()
+ .addAll(
+ ImmutableSet.of(
+ new MergeUnion(),
+ // new MergeIntersect
+ // new MergeExcept
+ new PruneDistinctAggregation()))
+ .build()),
columnPruningOptimizer,
inlineProjectionLimitFiltersOptimizer,
new IterativeOptimizer(
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/MergeUnionTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/MergeUnionTest.java
new file mode 100644
index 00000000000..d74e237082c
--- /dev/null
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/MergeUnionTest.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.relational.analyzer;
+
+import org.apache.iotdb.db.queryengine.plan.planner.plan.LogicalQueryPlan;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.SymbolAllocator;
+import
org.apache.iotdb.db.queryengine.plan.relational.planner.TableLogicalPlanner;
+
+import org.junit.Test;
+
+import static
org.apache.iotdb.db.queryengine.plan.relational.analyzer.AnalyzerTest.analyzeSQL;
+import static
org.apache.iotdb.db.queryengine.plan.relational.analyzer.TestUtils.DEFAULT_WARNING;
+import static
org.apache.iotdb.db.queryengine.plan.relational.analyzer.TestUtils.QUERY_CONTEXT;
+import static
org.apache.iotdb.db.queryengine.plan.relational.analyzer.TestUtils.SESSION_INFO;
+import static
org.apache.iotdb.db.queryengine.plan.relational.analyzer.TestUtils.TEST_MATADATA;
+import static
org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanAssert.assertPlan;
+import static
org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.output;
+import static
org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.tableScan;
+import static
org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.union;
+
+public class MergeUnionTest {
+
+ @Test
+ public void simpleLeftDeepMerge() {
+
+ String sql =
+ "(select tag1 from t1 union all select tag1 from t2) union all select
tag1 from t3 ";
+ Analysis analysis = analyzeSQL(sql, TEST_MATADATA, QUERY_CONTEXT);
+ SymbolAllocator symbolAllocator = new SymbolAllocator();
+ LogicalQueryPlan actualLogicalQueryPlan =
+ new TableLogicalPlanner(
+ QUERY_CONTEXT, TEST_MATADATA, SESSION_INFO, symbolAllocator,
DEFAULT_WARNING)
+ .plan(analysis);
+
+ // just verify the Logical plan `Output - union - 3*tableScan`
+ assertPlan(
+ actualLogicalQueryPlan,
+ output((union(tableScan("testdb.t1"), tableScan("testdb.t2"),
tableScan("testdb.t3")))));
+ }
+
+ @Test
+ public void simpleRightDeepMerge() {
+
+ String sql =
+ "select tag1 from t1 union all (select tag1 from t2 union all select
tag1 from t3) ";
+ Analysis analysis = analyzeSQL(sql, TEST_MATADATA, QUERY_CONTEXT);
+ SymbolAllocator symbolAllocator = new SymbolAllocator();
+ LogicalQueryPlan actualLogicalQueryPlan =
+ new TableLogicalPlanner(
+ QUERY_CONTEXT, TEST_MATADATA, SESSION_INFO, symbolAllocator,
DEFAULT_WARNING)
+ .plan(analysis);
+
+ // just verify the Logical plan `Output - union - 3*tableScan`
+ assertPlan(
+ actualLogicalQueryPlan,
+ output((union(tableScan("testdb.t1"), tableScan("testdb.t2"),
tableScan("testdb.t3")))));
+ }
+
+ @Test
+ public void bushyTreeMerge() {
+
+ String sql =
+ "(select tag1 from t1 union all select tag1 from t2) union all (select
tag1 from t3 union all select tag1 from t4) ";
+ Analysis analysis = analyzeSQL(sql, TEST_MATADATA, QUERY_CONTEXT);
+ SymbolAllocator symbolAllocator = new SymbolAllocator();
+ LogicalQueryPlan actualLogicalQueryPlan =
+ new TableLogicalPlanner(
+ QUERY_CONTEXT, TEST_MATADATA, SESSION_INFO, symbolAllocator,
DEFAULT_WARNING)
+ .plan(analysis);
+
+ // just verify the Logical plan `Output - union - 4*tableScan`
+ assertPlan(
+ actualLogicalQueryPlan,
+ output(
+ (union(
+ tableScan("testdb.t1"),
+ tableScan("testdb.t2"),
+ tableScan("testdb.t3"),
+ tableScan("testdb.t4")))));
+ }
+}