This is an automated email from the ASF dual-hosted git repository.
yashmayya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 38cae5b4923 Push aggregates through UNION ALL in MSE via
AggregateUnionTranspose (#18513)
38cae5b4923 is described below
commit 38cae5b4923ee29e3baeee895c288d41c662ed00
Author: Yash Mayya <[email protected]>
AuthorDate: Thu May 28 12:55:27 2026 -0700
Push aggregates through UNION ALL in MSE via AggregateUnionTranspose
(#18513)
---
.../rules/PinotAggregateUnionTransposeRule.java | 191 +++++++++++++++++++++
.../calcite/rel/rules/PinotQueryRuleSets.java | 14 +-
.../pinot/query/QueryPlannerRuleOptionsTest.java | 21 ++-
.../resources/queries/PhysicalOptimizerPlans.json | 4 +-
.../src/test/resources/queries/SetOpPlans.json | 103 ++++++++++-
.../apache/pinot/spi/utils/CommonConstants.java | 1 +
6 files changed, 320 insertions(+), 14 deletions(-)
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotAggregateUnionTransposeRule.java
b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotAggregateUnionTransposeRule.java
new file mode 100644
index 00000000000..bd4f13d5a84
--- /dev/null
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotAggregateUnionTransposeRule.java
@@ -0,0 +1,191 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.calcite.rel.rules;
+
+import com.google.common.collect.ImmutableList;
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelOptRuleOperand;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Aggregate;
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.rel.core.Union;
+import org.apache.calcite.rel.hint.RelHint;
+import org.apache.calcite.rel.logical.LogicalAggregate;
+import org.apache.calcite.rel.logical.LogicalUnion;
+import org.apache.calcite.rel.metadata.RelMdUtil;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rel.rules.TransformationRule;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.sql.SqlAggFunction;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.tools.RelBuilder;
+import org.apache.calcite.tools.RelBuilderFactory;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.calcite.util.mapping.Mapping;
+import org.apache.calcite.util.mapping.MappingType;
+import org.apache.calcite.util.mapping.Mappings;
+
+
+/// Pinot variant of Calcite's `AggregateUnionTransposeRule` that pushes an
[Aggregate] past a non-distinct [Union].
+///
+/// Calcite's rule decides which aggregate functions are splittable by looking
up the function's _class_ in a small
+/// allow-list. Pinot ships its own subclasses of [SqlAggFunction] (e.g.
`PinotSumFunction`, `PinotMinMaxFunction`)
+/// that do not extend Calcite's `SqlSumAggFunction` or
`SqlMinMaxAggFunction`, so the upstream rule never fires on
+/// Pinot's standard SUM/MIN/MAX. This variant matches on [SqlKind] instead so
the rule also fires for Pinot's custom
+/// aggregate functions that share the same semantics.
+///
+/// Must run in the logical-planning phase (alongside
`AggregateUnionAggregateRule`) before `LogicalAggregate` is
+/// rewritten to `PinotLogicalAggregate`; the operand pattern is fixed to
[LogicalAggregate] / [LogicalUnion]. The
+/// emitted plain [Aggregate] nodes are subsequently split into LEAF / FINAL
pairs by
+/// `PinotAggregateExchangeNodeInsertRule`.
+///
+/// This class is stateless and is safe to share across planners.
+public final class PinotAggregateUnionTransposeRule extends RelOptRule
implements TransformationRule {
+
+ private static final EnumSet<SqlKind> SUPPORTED_KINDS = EnumSet.of(
+ SqlKind.SUM, SqlKind.SUM0, SqlKind.COUNT, SqlKind.MIN, SqlKind.MAX,
+ SqlKind.ANY_VALUE, SqlKind.BIT_AND, SqlKind.BIT_OR, SqlKind.BIT_XOR);
+
+ public static PinotAggregateUnionTransposeRule
instanceWithDescription(String description) {
+ return new PinotAggregateUnionTransposeRule(
+ operand(LogicalAggregate.class, operand(LogicalUnion.class, any())),
+ PinotRuleUtils.PINOT_REL_FACTORY, description);
+ }
+
+ private PinotAggregateUnionTransposeRule(RelOptRuleOperand operand,
RelBuilderFactory relBuilderFactory,
+ String description) {
+ super(operand, relBuilderFactory, description);
+ }
+
+ @Override
+ public void onMatch(RelOptRuleCall call) {
+ Aggregate aggRel = call.rel(0);
+ Union union = call.rel(1);
+
+ if (!union.all) {
+ // Only valid for UNION ALL: pushing aggregation past UNION DISTINCT
would change semantics.
+ return;
+ }
+
+ int groupCount = aggRel.getGroupSet().cardinality();
+
+ List<AggregateCall> transformedAggCalls = transformAggCalls(aggRel,
groupCount);
+ if (transformedAggCalls == null) {
+ // At least one aggregate function is not splittable (e.g. DISTINCT or
AVG).
+ return;
+ }
+
+ RelMetadataQuery mq = call.getMetadataQuery();
+ boolean hasUniqueKeyInAllInputs = true;
+ for (RelNode input : union.getInputs()) {
+ if (!RelMdUtil.areColumnsDefinitelyUnique(mq, input,
aggRel.getGroupSet())) {
+ hasUniqueKeyInAllInputs = false;
+ break;
+ }
+ }
+ if (hasUniqueKeyInAllInputs) {
+ // Every union branch is already unique on the group key, so pushing
down the aggregate would be a no-op and
+ // could loop forever inside the planner.
+ return;
+ }
+
+ // Preserve the original aggregate's hints on every replacement aggregate;
Pinot reads hints like
+ // is_skip_leaf_stage_group_by, is_partitioned_by_group_by_keys,
is_leaf_return_final_result, and group-trim
+ // options from the Aggregate node in later planning stages.
+ List<RelHint> hints = aggRel.getHints();
+ RelBuilder relBuilder = call.builder();
+ RelDataType origUnionType = union.getRowType();
+ for (RelNode input : union.getInputs()) {
+ List<AggregateCall> childAggCalls = new
ArrayList<>(aggRel.getAggCallList());
+ RelDataType inputRowType = input.getRowType();
+ for (int i = 0; i < childAggCalls.size(); i += 1) {
+ AggregateCall origCall = aggRel.getAggCallList().get(i);
+ if (origCall.getAggregation().getKind() == SqlKind.COUNT) {
+ // COUNT has no argument-nullability issue and is handled below by
rolling up via SUM0.
+ continue;
+ }
+ if (origCall.getArgList().size() != 1) {
+ continue;
+ }
+ int field = origCall.getArgList().get(0);
+ if (origUnionType.getFieldList().get(field).getType().isNullable()
+ != inputRowType.getFieldList().get(field).getType().isNullable()) {
+ // Calcite re-creates the call so the inferred return type matches
this branch's input nullability.
+ AggregateCall newCall =
+ AggregateCall.create(origCall.getParserPosition(),
origCall.getAggregation(), origCall.isDistinct(),
+ origCall.isApproximate(), origCall.ignoreNulls(),
origCall.rexList, origCall.getArgList(), -1,
+ origCall.distinctKeys, origCall.collation, groupCount,
input, null, origCall.getName());
+ childAggCalls.set(i, newCall);
+ }
+ }
+ relBuilder.push(LogicalAggregate.create(input, hints,
aggRel.getGroupSet(), null, childAggCalls));
+ }
+
+ // Build the new top-level Union over the branch aggregates, then wrap
with the rolled-up top aggregate.
+ relBuilder.union(true, union.getInputs().size());
+
+ ImmutableBitSet groupSet = aggRel.getGroupSet();
+ Mapping topGroupMapping =
+ Mappings.create(MappingType.INVERSE_SURJECTION,
union.getRowType().getFieldCount(), aggRel.getGroupCount());
+ for (int i = 0; i < groupSet.cardinality(); i += 1) {
+ topGroupMapping.set(groupSet.nth(i), i);
+ }
+ ImmutableBitSet topGroupSet = Mappings.apply(topGroupMapping, groupSet);
+ ImmutableList<ImmutableBitSet> topGroupSets =
Mappings.apply2(topGroupMapping, aggRel.getGroupSets());
+
+ call.transformTo(
+ LogicalAggregate.create(relBuilder.build(), hints, topGroupSet,
topGroupSets, transformedAggCalls));
+ }
+
+ /// Builds the rolled-up aggregate calls for the top aggregate, with arg
indexes shifted into the new union output
+ /// and COUNT replaced by SUM0 (counts roll up by summation). Returns `null`
if any call is not splittable.
+ private static @Nullable List<AggregateCall> transformAggCalls(Aggregate
aggRel, int groupCount) {
+ List<AggregateCall> origCalls = aggRel.getAggCallList();
+ List<AggregateCall> newCalls = new ArrayList<>(origCalls.size());
+ for (int i = 0; i < origCalls.size(); i += 1) {
+ AggregateCall origCall = origCalls.get(i);
+ if (origCall.isDistinct() ||
!SUPPORTED_KINDS.contains(origCall.getAggregation().getKind())) {
+ return null;
+ }
+ SqlAggFunction aggFun;
+ RelDataType aggType;
+ if (origCall.getAggregation().getKind() == SqlKind.COUNT) {
+ // Per-branch COUNT becomes a top-level SUM0 to sum the partial counts.
+ aggFun = SqlStdOperatorTable.SUM0;
+ aggType = null;
+ } else {
+ aggFun = origCall.getAggregation();
+ aggType = origCall.getType();
+ }
+ // Pass aggRel itself so AggregateCall.create can infer return types
from the (groupCols + aggCols) row type that
+ // the new top-of-union union will expose, not from the original union's
pre-aggregation row type.
+ AggregateCall newCall =
AggregateCall.create(origCall.getParserPosition(), aggFun,
origCall.isDistinct(),
+ origCall.isApproximate(), origCall.ignoreNulls(), origCall.rexList,
ImmutableList.of(groupCount + i), -1,
+ origCall.distinctKeys, origCall.collation, groupCount, aggRel,
aggType, origCall.getName());
+ newCalls.add(newCall);
+ }
+ return newCalls;
+ }
+}
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotQueryRuleSets.java
b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotQueryRuleSets.java
index b0c22f672df..8d05d3b782b 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotQueryRuleSets.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotQueryRuleSets.java
@@ -153,9 +153,21 @@ public class PinotQueryRuleSets {
// push aggregate functions through join, disabled by default
AggregateJoinTransposeRule.Config.EXTENDED
.withDescription(PlannerRuleNames.AGGREGATE_JOIN_TRANSPOSE_EXTENDED).toRule(),
- // aggregate union rule
+ // AggregateUnionAggregate and AggregateUnionTranspose are inverses of
each other and the defaults are chosen to
+ // optimize for distributed-OLAP cost (shuffle dominates compute):
+ // - AggregateUnionTranspose (default-on) pushes the aggregate into
every UNION ALL branch, so each branch
+ // ships pre-aggregated rows across the next exchange instead of raw
rows. With low-cardinality group keys
+ // (the common analytics case) this trades a cheap extra per-branch
aggregate for a much smaller shuffle.
+ // - AggregateUnionAggregate (default-off) does the opposite: it
collapses Agg(Union(Agg(A), B)) into
+ // Agg(Union(A, B)), which loses the pre-aggregation on A and forces
raw rows through the union's exchange.
+ // We keep it available behind `usePlannerRules` for
embedded/in-memory deployments where shuffle is free,
+ // but enabling it alongside the default-on Transpose just undoes
Transpose's work (see the order below).
AggregateUnionAggregateRule.Config.DEFAULT
.withDescription(PlannerRuleNames.AGGREGATE_UNION_AGGREGATE).toRule(),
+ // Registered after AggregateUnionAggregate on purpose: if both are
enabled, this phase runs last so Transpose
+ // wins and any merge AggregateUnionAggregate did first gets pushed back
into the branches.
+ PinotAggregateUnionTransposeRule
+ .instanceWithDescription(PlannerRuleNames.AGGREGATE_UNION_TRANSPOSE),
// reduce SUM and AVG
// TODO: Consider not reduce at all. This can now be controlled by
specifying
diff --git
a/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryPlannerRuleOptionsTest.java
b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryPlannerRuleOptionsTest.java
index 432f1ee3d00..934eef30200 100644
---
a/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryPlannerRuleOptionsTest.java
+++
b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryPlannerRuleOptionsTest.java
@@ -51,10 +51,18 @@ public class QueryPlannerRuleOptionsTest extends
QueryEnvironmentTestBase {
}
private String explainQueryWithRuleEnabled(String query, String
ruleToEnable) {
+ return explainQueryWithRules(query, ruleToEnable, null);
+ }
+
+ private String explainQueryWithRules(String query, String rulesToEnable,
String rulesToDisable) {
SqlNode sqlNode =
CalciteSqlParser.compileToSqlNodeAndOptions(query).getSqlNode();
Map<String, String> options = new HashMap<>();
- // disable rule
-
options.put(CommonConstants.Broker.Request.QueryOptionKey.USE_PLANNER_RULES,
ruleToEnable);
+ if (rulesToEnable != null) {
+
options.put(CommonConstants.Broker.Request.QueryOptionKey.USE_PLANNER_RULES,
rulesToEnable);
+ }
+ if (rulesToDisable != null) {
+
options.put(CommonConstants.Broker.Request.QueryOptionKey.SKIP_PLANNER_RULES,
rulesToDisable);
+ }
SqlNodeAndOptions sqlNodeAndOptions =
new SqlNodeAndOptions(
sqlNode,
@@ -169,8 +177,10 @@ public class QueryPlannerRuleOptionsTest extends
QueryEnvironmentTestBase {
+ "UNION\n"
+ "SELECT col1 FROM b;\n";
+ // Skip AggregateUnionTranspose too so the focus stays on PruneEmptyUnion;
otherwise the surviving non-empty
+ // branch ends up with a partial aggregate, which is correct but unrelated
to what this test is asserting.
String explain = explainQueryWithRuleDisabled(query,
- PlannerRuleNames.PRUNE_EMPTY_UNION);
+ PlannerRuleNames.PRUNE_EMPTY_UNION + "," +
PlannerRuleNames.AGGREGATE_UNION_TRANSPOSE);
//@formatter:off
assertEquals(explain,
"Execution Plan\n"
@@ -645,7 +655,10 @@ public class QueryPlannerRuleOptionsTest extends
QueryEnvironmentTestBase {
+ "(SELECT DISTINCT col1 FROM b)";
//@formatter:on
- String explain = explainQueryWithRuleEnabled(query,
PlannerRuleNames.AGGREGATE_UNION_AGGREGATE);
+ // Disable AggregateUnionTranspose for this test so the
AggregateUnionAggregate merge behavior is isolated;
+ // otherwise transpose pushes aggregates back into each branch and undoes
the merge.
+ String explain = explainQueryWithRules(query,
PlannerRuleNames.AGGREGATE_UNION_AGGREGATE,
+ PlannerRuleNames.AGGREGATE_UNION_TRANSPOSE);
// There shouldn't be aggregates above the table scans since they should
be merged into the one above the UNION ALL
assertEquals(explain,
diff --git
a/pinot-query-planner/src/test/resources/queries/PhysicalOptimizerPlans.json
b/pinot-query-planner/src/test/resources/queries/PhysicalOptimizerPlans.json
index f4bfeff1c2a..e2d5a7de71c 100644
--- a/pinot-query-planner/src/test/resources/queries/PhysicalOptimizerPlans.json
+++ b/pinot-query-planner/src/test/resources/queries/PhysicalOptimizerPlans.json
@@ -692,11 +692,11 @@
"\n PhysicalAggregate(group=[{0}], aggType=[DIRECT])",
"\n PhysicalUnion(all=[true])",
"\n
PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE])",
- "\n PhysicalProject(col2=[$1])",
+ "\n PhysicalAggregate(group=[{1}],
aggType=[DIRECT])",
"\n PhysicalFilter(condition=[=($0,
_UTF-8'foo')])",
"\n PhysicalTableScan(table=[[default, a]])",
"\n
PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE])",
- "\n PhysicalProject(col2=[$1])",
+ "\n PhysicalAggregate(group=[{1}],
aggType=[DIRECT])",
"\n PhysicalFilter(condition=[=($2, 1)])",
"\n PhysicalTableScan(table=[[default, a]])",
"\n"
diff --git a/pinot-query-planner/src/test/resources/queries/SetOpPlans.json
b/pinot-query-planner/src/test/resources/queries/SetOpPlans.json
index 01d05a5ea90..1aa229eb9af 100644
--- a/pinot-query-planner/src/test/resources/queries/SetOpPlans.json
+++ b/pinot-query-planner/src/test/resources/queries/SetOpPlans.json
@@ -35,7 +35,7 @@
]
},
{
- "description": "UNION from three tables",
+ "description": "UNION from three tables (dedup aggregates are pushed
into each branch by AggregateUnionTranspose)",
"sql": "EXPLAIN PLAN FOR SELECT col1, col2 FROM a UNION SELECT col1,
col2 FROM b UNION SELECT col1, col2 FROM c",
"output": [
"Execution Plan",
@@ -49,14 +49,20 @@
"\n PinotLogicalAggregate(group=[{0, 1}],
aggType=[LEAF])",
"\n LogicalUnion(all=[true])",
"\n PinotLogicalExchange(distribution=[hash[0,
1]])",
- "\n LogicalProject(col1=[$0], col2=[$1])",
- "\n PinotLogicalTableScan(table=[[default,
a]])",
+ "\n PinotLogicalAggregate(group=[{0, 1}],
aggType=[FINAL])",
+ "\n PinotLogicalExchange(distribution=[hash[0,
1]])",
+ "\n PinotLogicalAggregate(group=[{0, 1}],
aggType=[LEAF])",
+ "\n PinotLogicalTableScan(table=[[default,
a]])",
"\n PinotLogicalExchange(distribution=[hash[0,
1]])",
- "\n LogicalProject(col1=[$0], col2=[$1])",
- "\n PinotLogicalTableScan(table=[[default,
b]])",
+ "\n PinotLogicalAggregate(group=[{0, 1}],
aggType=[FINAL])",
+ "\n PinotLogicalExchange(distribution=[hash[0,
1]])",
+ "\n PinotLogicalAggregate(group=[{0, 1}],
aggType=[LEAF])",
+ "\n PinotLogicalTableScan(table=[[default,
b]])",
"\n PinotLogicalExchange(distribution=[hash[0, 1]])",
- "\n LogicalProject(col1=[$0], col2=[$1])",
- "\n PinotLogicalTableScan(table=[[default, c]])",
+ "\n PinotLogicalAggregate(group=[{0, 1}], aggType=[FINAL])",
+ "\n PinotLogicalExchange(distribution=[hash[0, 1]])",
+ "\n PinotLogicalAggregate(group=[{0, 1}],
aggType=[LEAF])",
+ "\n PinotLogicalTableScan(table=[[default, c]])",
"\n"
]
},
@@ -80,6 +86,89 @@
"\n"
]
},
+ {
+ "description": "GROUP BY with SUM aggregate pushed into both branches
of UNION ALL by AggregateUnionTranspose",
+ "sql": "EXPLAIN PLAN FOR SELECT col2, SUM(col3) FROM (SELECT col2,
col3 FROM a UNION ALL SELECT col2, col3 FROM b) GROUP BY col2",
+ "output": [
+ "Execution Plan",
+ "\nPinotLogicalAggregate(group=[{0}], agg#0=[$SUM0($1)],
aggType=[FINAL])",
+ "\n PinotLogicalExchange(distribution=[hash[0]])",
+ "\n PinotLogicalAggregate(group=[{0}], agg#0=[$SUM0($1)],
aggType=[LEAF])",
+ "\n LogicalUnion(all=[true])",
+ "\n PinotLogicalExchange(distribution=[hash[0, 1]])",
+ "\n PinotLogicalAggregate(group=[{0}], agg#0=[$SUM0($1)],
aggType=[FINAL])",
+ "\n PinotLogicalExchange(distribution=[hash[0]])",
+ "\n PinotLogicalAggregate(group=[{1}],
agg#0=[$SUM0($2)], aggType=[LEAF])",
+ "\n PinotLogicalTableScan(table=[[default, a]])",
+ "\n PinotLogicalExchange(distribution=[hash[0, 1]])",
+ "\n PinotLogicalAggregate(group=[{0}], agg#0=[$SUM0($1)],
aggType=[FINAL])",
+ "\n PinotLogicalExchange(distribution=[hash[0]])",
+ "\n PinotLogicalAggregate(group=[{1}],
agg#0=[$SUM0($2)], aggType=[LEAF])",
+ "\n PinotLogicalTableScan(table=[[default, b]])",
+ "\n"
+ ]
+ },
+ {
+ "description": "GROUP BY with COUNT(*) pushed into UNION ALL branches:
per-branch COUNT rolls up via top-level SUM0",
+ "sql": "EXPLAIN PLAN FOR SELECT col2, COUNT(*) FROM (SELECT col2 FROM
a UNION ALL SELECT col2 FROM b) GROUP BY col2",
+ "output": [
+ "Execution Plan",
+ "\nPinotLogicalAggregate(group=[{0}], agg#0=[$SUM0($1)],
aggType=[FINAL])",
+ "\n PinotLogicalExchange(distribution=[hash[0]])",
+ "\n PinotLogicalAggregate(group=[{0}], agg#0=[$SUM0($1)],
aggType=[LEAF])",
+ "\n LogicalUnion(all=[true])",
+ "\n PinotLogicalExchange(distribution=[hash[0, 1]])",
+ "\n PinotLogicalAggregate(group=[{0}], agg#0=[COUNT($1)],
aggType=[FINAL])",
+ "\n PinotLogicalExchange(distribution=[hash[0]])",
+ "\n PinotLogicalAggregate(group=[{1}], agg#0=[COUNT()],
aggType=[LEAF])",
+ "\n PinotLogicalTableScan(table=[[default, a]])",
+ "\n PinotLogicalExchange(distribution=[hash[0, 1]])",
+ "\n PinotLogicalAggregate(group=[{0}], agg#0=[COUNT($1)],
aggType=[FINAL])",
+ "\n PinotLogicalExchange(distribution=[hash[0]])",
+ "\n PinotLogicalAggregate(group=[{1}], agg#0=[COUNT()],
aggType=[LEAF])",
+ "\n PinotLogicalTableScan(table=[[default, b]])",
+ "\n"
+ ]
+ },
+ {
+ "description": "Aggregate hints (e.g. is_skip_leaf_stage_group_by) are
preserved on both the per-branch and rolled-up aggregates produced by
AggregateUnionTranspose",
+ "sql": "EXPLAIN PLAN FOR SELECT /*+
aggOptions(is_skip_leaf_stage_group_by='true') */ col2, SUM(col3) FROM (SELECT
col2, col3 FROM a UNION ALL SELECT col2, col3 FROM b) GROUP BY col2",
+ "output": [
+ "Execution Plan",
+ "\nPinotLogicalAggregate(group=[{0}], agg#0=[$SUM0($1)],
aggType=[DIRECT])",
+ "\n PinotLogicalExchange(distribution=[hash[0]])",
+ "\n LogicalUnion(all=[true])",
+ "\n PinotLogicalExchange(distribution=[hash[0, 1]])",
+ "\n PinotLogicalAggregate(group=[{0}], agg#0=[$SUM0($1)],
aggType=[DIRECT])",
+ "\n PinotLogicalExchange(distribution=[hash[0]])",
+ "\n LogicalProject(col2=[$1], col3=[$2])",
+ "\n PinotLogicalTableScan(table=[[default, a]])",
+ "\n PinotLogicalExchange(distribution=[hash[0, 1]])",
+ "\n PinotLogicalAggregate(group=[{0}], agg#0=[$SUM0($1)],
aggType=[DIRECT])",
+ "\n PinotLogicalExchange(distribution=[hash[0]])",
+ "\n LogicalProject(col2=[$1], col3=[$2])",
+ "\n PinotLogicalTableScan(table=[[default, b]])",
+ "\n"
+ ]
+ },
+ {
+ "description": "Disabling AggregateUnionTranspose via skipPlannerRules
keeps the aggregate above the UNION ALL",
+ "sql": "SET skipPlannerRules='AggregateUnionTranspose'; EXPLAIN PLAN
FOR SELECT col2, SUM(col3) FROM (SELECT col2, col3 FROM a UNION ALL SELECT
col2, col3 FROM b) GROUP BY col2",
+ "output": [
+ "Execution Plan",
+ "\nPinotLogicalAggregate(group=[{0}], agg#0=[$SUM0($1)],
aggType=[FINAL])",
+ "\n PinotLogicalExchange(distribution=[hash[0]])",
+ "\n PinotLogicalAggregate(group=[{0}], agg#0=[$SUM0($1)],
aggType=[LEAF])",
+ "\n LogicalUnion(all=[true])",
+ "\n PinotLogicalExchange(distribution=[hash[0, 1]])",
+ "\n LogicalProject(col2=[$1], col3=[$2])",
+ "\n PinotLogicalTableScan(table=[[default, a]])",
+ "\n PinotLogicalExchange(distribution=[hash[0, 1]])",
+ "\n LogicalProject(col2=[$1], col3=[$2])",
+ "\n PinotLogicalTableScan(table=[[default, b]])",
+ "\n"
+ ]
+ },
{
"description": "EXCEPT from three tables",
"sql": "EXPLAIN PLAN FOR SELECT col1, col2 FROM a EXCEPT SELECT col1,
col2 FROM b EXCEPT SELECT col1, col2 FROM c",
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index fcfac94f4bf..694f64cc4e3 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -1025,6 +1025,7 @@ public class CommonConstants {
public static final String AGGREGATE_REMOVE = "AggregateRemove";
public static final String AGGREGATE_JOIN_TRANSPOSE =
"AggregateJoinTranspose";
public static final String AGGREGATE_UNION_AGGREGATE =
"AggregateUnionAggregate";
+ public static final String AGGREGATE_UNION_TRANSPOSE =
"AggregateUnionTranspose";
public static final String AGGREGATE_REDUCE_FUNCTIONS =
"AggregateReduceFunctions";
public static final String AGGREGATE_FUNCTION_REWRITE =
"AggregateFunctionRewrite";
public static final String AGGREGATE_CASE_TO_FILTER =
"AggregateCaseToFilter";
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]