This is an automated email from the ASF dual-hosted git repository.

yashmayya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 38cae5b4923 Push aggregates through UNION ALL in MSE via 
AggregateUnionTranspose (#18513)
38cae5b4923 is described below

commit 38cae5b4923ee29e3baeee895c288d41c662ed00
Author: Yash Mayya <[email protected]>
AuthorDate: Thu May 28 12:55:27 2026 -0700

    Push aggregates through UNION ALL in MSE via AggregateUnionTranspose 
(#18513)
---
 .../rules/PinotAggregateUnionTransposeRule.java    | 191 +++++++++++++++++++++
 .../calcite/rel/rules/PinotQueryRuleSets.java      |  14 +-
 .../pinot/query/QueryPlannerRuleOptionsTest.java   |  21 ++-
 .../resources/queries/PhysicalOptimizerPlans.json  |   4 +-
 .../src/test/resources/queries/SetOpPlans.json     | 103 ++++++++++-
 .../apache/pinot/spi/utils/CommonConstants.java    |   1 +
 6 files changed, 320 insertions(+), 14 deletions(-)

diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotAggregateUnionTransposeRule.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotAggregateUnionTransposeRule.java
new file mode 100644
index 00000000000..bd4f13d5a84
--- /dev/null
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotAggregateUnionTransposeRule.java
@@ -0,0 +1,191 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.calcite.rel.rules;
+
+import com.google.common.collect.ImmutableList;
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelOptRuleOperand;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Aggregate;
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.rel.core.Union;
+import org.apache.calcite.rel.hint.RelHint;
+import org.apache.calcite.rel.logical.LogicalAggregate;
+import org.apache.calcite.rel.logical.LogicalUnion;
+import org.apache.calcite.rel.metadata.RelMdUtil;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rel.rules.TransformationRule;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.sql.SqlAggFunction;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.tools.RelBuilder;
+import org.apache.calcite.tools.RelBuilderFactory;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.calcite.util.mapping.Mapping;
+import org.apache.calcite.util.mapping.MappingType;
+import org.apache.calcite.util.mapping.Mappings;
+
+
+/// Pinot variant of Calcite's `AggregateUnionTransposeRule` that pushes an 
[Aggregate] past a non-distinct [Union].
+///
+/// Calcite's rule decides which aggregate functions are splittable by looking 
up the function's _class_ in a small
+/// allow-list. Pinot ships its own subclasses of [SqlAggFunction] (e.g. 
`PinotSumFunction`, `PinotMinMaxFunction`)
+/// that do not extend Calcite's `SqlSumAggFunction` or 
`SqlMinMaxAggFunction`, so the upstream rule never fires on
+/// Pinot's standard SUM/MIN/MAX. This variant matches on [SqlKind] instead so 
the rule also fires for Pinot's custom
+/// aggregate functions that share the same semantics.
+///
+/// Must run in the logical-planning phase (alongside 
`AggregateUnionAggregateRule`) before `LogicalAggregate` is
+/// rewritten to `PinotLogicalAggregate`; the operand pattern is fixed to 
[LogicalAggregate] / [LogicalUnion]. The
+/// emitted plain [Aggregate] nodes are subsequently split into LEAF / FINAL 
pairs by
+/// `PinotAggregateExchangeNodeInsertRule`.
+///
+/// This class is stateless and is safe to share across planners.
+public final class PinotAggregateUnionTransposeRule extends RelOptRule 
implements TransformationRule {
+
+  private static final EnumSet<SqlKind> SUPPORTED_KINDS = EnumSet.of(
+      SqlKind.SUM, SqlKind.SUM0, SqlKind.COUNT, SqlKind.MIN, SqlKind.MAX,
+      SqlKind.ANY_VALUE, SqlKind.BIT_AND, SqlKind.BIT_OR, SqlKind.BIT_XOR);
+
+  public static PinotAggregateUnionTransposeRule 
instanceWithDescription(String description) {
+    return new PinotAggregateUnionTransposeRule(
+        operand(LogicalAggregate.class, operand(LogicalUnion.class, any())),
+        PinotRuleUtils.PINOT_REL_FACTORY, description);
+  }
+
+  private PinotAggregateUnionTransposeRule(RelOptRuleOperand operand, 
RelBuilderFactory relBuilderFactory,
+      String description) {
+    super(operand, relBuilderFactory, description);
+  }
+
+  @Override
+  public void onMatch(RelOptRuleCall call) {
+    Aggregate aggRel = call.rel(0);
+    Union union = call.rel(1);
+
+    if (!union.all) {
+      // Only valid for UNION ALL: pushing aggregation past UNION DISTINCT 
would change semantics.
+      return;
+    }
+
+    int groupCount = aggRel.getGroupSet().cardinality();
+
+    List<AggregateCall> transformedAggCalls = transformAggCalls(aggRel, 
groupCount);
+    if (transformedAggCalls == null) {
+      // At least one aggregate function is not splittable (e.g. DISTINCT or 
AVG).
+      return;
+    }
+
+    RelMetadataQuery mq = call.getMetadataQuery();
+    boolean hasUniqueKeyInAllInputs = true;
+    for (RelNode input : union.getInputs()) {
+      if (!RelMdUtil.areColumnsDefinitelyUnique(mq, input, 
aggRel.getGroupSet())) {
+        hasUniqueKeyInAllInputs = false;
+        break;
+      }
+    }
+    if (hasUniqueKeyInAllInputs) {
+      // Every union branch is already unique on the group key, so pushing 
down the aggregate would be a no-op and
+      // could loop forever inside the planner.
+      return;
+    }
+
+    // Preserve the original aggregate's hints on every replacement aggregate; 
Pinot reads hints like
+    // is_skip_leaf_stage_group_by, is_partitioned_by_group_by_keys, 
is_leaf_return_final_result, and group-trim
+    // options from the Aggregate node in later planning stages.
+    List<RelHint> hints = aggRel.getHints();
+    RelBuilder relBuilder = call.builder();
+    RelDataType origUnionType = union.getRowType();
+    for (RelNode input : union.getInputs()) {
+      List<AggregateCall> childAggCalls = new 
ArrayList<>(aggRel.getAggCallList());
+      RelDataType inputRowType = input.getRowType();
+      for (int i = 0; i < childAggCalls.size(); i += 1) {
+        AggregateCall origCall = aggRel.getAggCallList().get(i);
+        if (origCall.getAggregation().getKind() == SqlKind.COUNT) {
+          // COUNT has no argument-nullability issue and is handled below by 
rolling up via SUM0.
+          continue;
+        }
+        if (origCall.getArgList().size() != 1) {
+          continue;
+        }
+        int field = origCall.getArgList().get(0);
+        if (origUnionType.getFieldList().get(field).getType().isNullable()
+            != inputRowType.getFieldList().get(field).getType().isNullable()) {
+          // Calcite re-creates the call so the inferred return type matches 
this branch's input nullability.
+          AggregateCall newCall =
+              AggregateCall.create(origCall.getParserPosition(), 
origCall.getAggregation(), origCall.isDistinct(),
+                  origCall.isApproximate(), origCall.ignoreNulls(), 
origCall.rexList, origCall.getArgList(), -1,
+                  origCall.distinctKeys, origCall.collation, groupCount, 
input, null, origCall.getName());
+          childAggCalls.set(i, newCall);
+        }
+      }
+      relBuilder.push(LogicalAggregate.create(input, hints, 
aggRel.getGroupSet(), null, childAggCalls));
+    }
+
+    // Build the new top-level Union over the branch aggregates, then wrap 
with the rolled-up top aggregate.
+    relBuilder.union(true, union.getInputs().size());
+
+    ImmutableBitSet groupSet = aggRel.getGroupSet();
+    Mapping topGroupMapping =
+        Mappings.create(MappingType.INVERSE_SURJECTION, 
union.getRowType().getFieldCount(), aggRel.getGroupCount());
+    for (int i = 0; i < groupSet.cardinality(); i += 1) {
+      topGroupMapping.set(groupSet.nth(i), i);
+    }
+    ImmutableBitSet topGroupSet = Mappings.apply(topGroupMapping, groupSet);
+    ImmutableList<ImmutableBitSet> topGroupSets = 
Mappings.apply2(topGroupMapping, aggRel.getGroupSets());
+
+    call.transformTo(
+        LogicalAggregate.create(relBuilder.build(), hints, topGroupSet, 
topGroupSets, transformedAggCalls));
+  }
+
+  /// Builds the rolled-up aggregate calls for the top aggregate, with arg 
indexes shifted into the new union output
+  /// and COUNT replaced by SUM0 (counts roll up by summation). Returns `null` 
if any call is not splittable.
+  private static @Nullable List<AggregateCall> transformAggCalls(Aggregate 
aggRel, int groupCount) {
+    List<AggregateCall> origCalls = aggRel.getAggCallList();
+    List<AggregateCall> newCalls = new ArrayList<>(origCalls.size());
+    for (int i = 0; i < origCalls.size(); i += 1) {
+      AggregateCall origCall = origCalls.get(i);
+      if (origCall.isDistinct() || 
!SUPPORTED_KINDS.contains(origCall.getAggregation().getKind())) {
+        return null;
+      }
+      SqlAggFunction aggFun;
+      RelDataType aggType;
+      if (origCall.getAggregation().getKind() == SqlKind.COUNT) {
+        // Per-branch COUNT becomes a top-level SUM0 to sum the partial counts.
+        aggFun = SqlStdOperatorTable.SUM0;
+        aggType = null;
+      } else {
+        aggFun = origCall.getAggregation();
+        aggType = origCall.getType();
+      }
+      // Pass aggRel itself so AggregateCall.create can infer return types 
from the (groupCols + aggCols) row type that
+      // the new top-of-union union will expose, not from the original union's 
pre-aggregation row type.
+      AggregateCall newCall = 
AggregateCall.create(origCall.getParserPosition(), aggFun, 
origCall.isDistinct(),
+          origCall.isApproximate(), origCall.ignoreNulls(), origCall.rexList, 
ImmutableList.of(groupCount + i), -1,
+          origCall.distinctKeys, origCall.collation, groupCount, aggRel, 
aggType, origCall.getName());
+      newCalls.add(newCall);
+    }
+    return newCalls;
+  }
+}
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotQueryRuleSets.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotQueryRuleSets.java
index b0c22f672df..8d05d3b782b 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotQueryRuleSets.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotQueryRuleSets.java
@@ -153,9 +153,21 @@ public class PinotQueryRuleSets {
       // push aggregate functions through join, disabled by default
       AggregateJoinTransposeRule.Config.EXTENDED
           
.withDescription(PlannerRuleNames.AGGREGATE_JOIN_TRANSPOSE_EXTENDED).toRule(),
-      // aggregate union rule
+      // AggregateUnionAggregate and AggregateUnionTranspose are inverses of 
each other and the defaults are chosen to
+      // optimize for distributed-OLAP cost (shuffle dominates compute):
+      //   - AggregateUnionTranspose (default-on) pushes the aggregate into 
every UNION ALL branch, so each branch
+      //     ships pre-aggregated rows across the next exchange instead of raw 
rows. With low-cardinality group keys
+      //     (the common analytics case) this trades a cheap extra per-branch 
aggregate for a much smaller shuffle.
+      //   - AggregateUnionAggregate (default-off) does the opposite: it 
collapses Agg(Union(Agg(A), B)) into
+      //     Agg(Union(A, B)), which loses the pre-aggregation on A and forces 
raw rows through the union's exchange.
+      //     We keep it available behind `usePlannerRules` for 
embedded/in-memory deployments where shuffle is free,
+      //     but enabling it alongside the default-on Transpose just undoes 
Transpose's work (see the order below).
       AggregateUnionAggregateRule.Config.DEFAULT
           
.withDescription(PlannerRuleNames.AGGREGATE_UNION_AGGREGATE).toRule(),
+      // Registered after AggregateUnionAggregate on purpose: if both are 
enabled, this phase runs last so Transpose
+      // wins and any merge AggregateUnionAggregate did first gets pushed back 
into the branches.
+      PinotAggregateUnionTransposeRule
+          .instanceWithDescription(PlannerRuleNames.AGGREGATE_UNION_TRANSPOSE),
 
       // reduce SUM and AVG
       // TODO: Consider not reduce at all. This can now be controlled by 
specifying
diff --git 
a/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryPlannerRuleOptionsTest.java
 
b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryPlannerRuleOptionsTest.java
index 432f1ee3d00..934eef30200 100644
--- 
a/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryPlannerRuleOptionsTest.java
+++ 
b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryPlannerRuleOptionsTest.java
@@ -51,10 +51,18 @@ public class QueryPlannerRuleOptionsTest extends 
QueryEnvironmentTestBase {
   }
 
   private String explainQueryWithRuleEnabled(String query, String 
ruleToEnable) {
+    return explainQueryWithRules(query, ruleToEnable, null);
+  }
+
+  private String explainQueryWithRules(String query, String rulesToEnable, 
String rulesToDisable) {
     SqlNode sqlNode = 
CalciteSqlParser.compileToSqlNodeAndOptions(query).getSqlNode();
     Map<String, String> options = new HashMap<>();
-    // disable rule
-    
options.put(CommonConstants.Broker.Request.QueryOptionKey.USE_PLANNER_RULES, 
ruleToEnable);
+    if (rulesToEnable != null) {
+      
options.put(CommonConstants.Broker.Request.QueryOptionKey.USE_PLANNER_RULES, 
rulesToEnable);
+    }
+    if (rulesToDisable != null) {
+      
options.put(CommonConstants.Broker.Request.QueryOptionKey.SKIP_PLANNER_RULES, 
rulesToDisable);
+    }
     SqlNodeAndOptions sqlNodeAndOptions =
         new SqlNodeAndOptions(
             sqlNode,
@@ -169,8 +177,10 @@ public class QueryPlannerRuleOptionsTest extends 
QueryEnvironmentTestBase {
         + "UNION\n"
         + "SELECT col1 FROM b;\n";
 
+    // Skip AggregateUnionTranspose too so the focus stays on PruneEmptyUnion; 
otherwise the surviving non-empty
+    // branch ends up with a partial aggregate, which is correct but unrelated 
to what this test is asserting.
     String explain = explainQueryWithRuleDisabled(query,
-        PlannerRuleNames.PRUNE_EMPTY_UNION);
+        PlannerRuleNames.PRUNE_EMPTY_UNION + "," + 
PlannerRuleNames.AGGREGATE_UNION_TRANSPOSE);
     //@formatter:off
     assertEquals(explain,
         "Execution Plan\n"
@@ -645,7 +655,10 @@ public class QueryPlannerRuleOptionsTest extends 
QueryEnvironmentTestBase {
         + "(SELECT DISTINCT col1 FROM b)";
     //@formatter:on
 
-    String explain = explainQueryWithRuleEnabled(query, 
PlannerRuleNames.AGGREGATE_UNION_AGGREGATE);
+    // Disable AggregateUnionTranspose for this test so the 
AggregateUnionAggregate merge behavior is isolated;
+    // otherwise transpose pushes aggregates back into each branch and undoes 
the merge.
+    String explain = explainQueryWithRules(query, 
PlannerRuleNames.AGGREGATE_UNION_AGGREGATE,
+        PlannerRuleNames.AGGREGATE_UNION_TRANSPOSE);
 
     // There shouldn't be aggregates above the table scans since they should 
be merged into the one above the UNION ALL
     assertEquals(explain,
diff --git 
a/pinot-query-planner/src/test/resources/queries/PhysicalOptimizerPlans.json 
b/pinot-query-planner/src/test/resources/queries/PhysicalOptimizerPlans.json
index f4bfeff1c2a..e2d5a7de71c 100644
--- a/pinot-query-planner/src/test/resources/queries/PhysicalOptimizerPlans.json
+++ b/pinot-query-planner/src/test/resources/queries/PhysicalOptimizerPlans.json
@@ -692,11 +692,11 @@
           "\n            PhysicalAggregate(group=[{0}], aggType=[DIRECT])",
           "\n              PhysicalUnion(all=[true])",
           "\n                
PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE])",
-          "\n                  PhysicalProject(col2=[$1])",
+          "\n                  PhysicalAggregate(group=[{1}], 
aggType=[DIRECT])",
           "\n                    PhysicalFilter(condition=[=($0, 
_UTF-8'foo')])",
           "\n                      PhysicalTableScan(table=[[default, a]])",
           "\n                
PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE])",
-          "\n                  PhysicalProject(col2=[$1])",
+          "\n                  PhysicalAggregate(group=[{1}], 
aggType=[DIRECT])",
           "\n                    PhysicalFilter(condition=[=($2, 1)])",
           "\n                      PhysicalTableScan(table=[[default, a]])",
           "\n"
diff --git a/pinot-query-planner/src/test/resources/queries/SetOpPlans.json 
b/pinot-query-planner/src/test/resources/queries/SetOpPlans.json
index 01d05a5ea90..1aa229eb9af 100644
--- a/pinot-query-planner/src/test/resources/queries/SetOpPlans.json
+++ b/pinot-query-planner/src/test/resources/queries/SetOpPlans.json
@@ -35,7 +35,7 @@
         ]
       },
       {
-        "description": "UNION from three tables",
+        "description": "UNION from three tables (dedup aggregates are pushed 
into each branch by AggregateUnionTranspose)",
         "sql": "EXPLAIN PLAN FOR SELECT col1, col2 FROM a UNION SELECT col1, 
col2 FROM b UNION SELECT col1, col2 FROM c",
         "output": [
           "Execution Plan",
@@ -49,14 +49,20 @@
           "\n              PinotLogicalAggregate(group=[{0, 1}], 
aggType=[LEAF])",
           "\n                LogicalUnion(all=[true])",
           "\n                  PinotLogicalExchange(distribution=[hash[0, 
1]])",
-          "\n                    LogicalProject(col1=[$0], col2=[$1])",
-          "\n                      PinotLogicalTableScan(table=[[default, 
a]])",
+          "\n                    PinotLogicalAggregate(group=[{0, 1}], 
aggType=[FINAL])",
+          "\n                      PinotLogicalExchange(distribution=[hash[0, 
1]])",
+          "\n                        PinotLogicalAggregate(group=[{0, 1}], 
aggType=[LEAF])",
+          "\n                          PinotLogicalTableScan(table=[[default, 
a]])",
           "\n                  PinotLogicalExchange(distribution=[hash[0, 
1]])",
-          "\n                    LogicalProject(col1=[$0], col2=[$1])",
-          "\n                      PinotLogicalTableScan(table=[[default, 
b]])",
+          "\n                    PinotLogicalAggregate(group=[{0, 1}], 
aggType=[FINAL])",
+          "\n                      PinotLogicalExchange(distribution=[hash[0, 
1]])",
+          "\n                        PinotLogicalAggregate(group=[{0, 1}], 
aggType=[LEAF])",
+          "\n                          PinotLogicalTableScan(table=[[default, 
b]])",
           "\n        PinotLogicalExchange(distribution=[hash[0, 1]])",
-          "\n          LogicalProject(col1=[$0], col2=[$1])",
-          "\n            PinotLogicalTableScan(table=[[default, c]])",
+          "\n          PinotLogicalAggregate(group=[{0, 1}], aggType=[FINAL])",
+          "\n            PinotLogicalExchange(distribution=[hash[0, 1]])",
+          "\n              PinotLogicalAggregate(group=[{0, 1}], 
aggType=[LEAF])",
+          "\n                PinotLogicalTableScan(table=[[default, c]])",
           "\n"
         ]
       },
@@ -80,6 +86,89 @@
           "\n"
         ]
       },
+      {
+        "description": "GROUP BY with SUM aggregate pushed into both branches 
of UNION ALL by AggregateUnionTranspose",
+        "sql": "EXPLAIN PLAN FOR SELECT col2, SUM(col3) FROM (SELECT col2, 
col3 FROM a UNION ALL SELECT col2, col3 FROM b) GROUP BY col2",
+        "output": [
+          "Execution Plan",
+          "\nPinotLogicalAggregate(group=[{0}], agg#0=[$SUM0($1)], 
aggType=[FINAL])",
+          "\n  PinotLogicalExchange(distribution=[hash[0]])",
+          "\n    PinotLogicalAggregate(group=[{0}], agg#0=[$SUM0($1)], 
aggType=[LEAF])",
+          "\n      LogicalUnion(all=[true])",
+          "\n        PinotLogicalExchange(distribution=[hash[0, 1]])",
+          "\n          PinotLogicalAggregate(group=[{0}], agg#0=[$SUM0($1)], 
aggType=[FINAL])",
+          "\n            PinotLogicalExchange(distribution=[hash[0]])",
+          "\n              PinotLogicalAggregate(group=[{1}], 
agg#0=[$SUM0($2)], aggType=[LEAF])",
+          "\n                PinotLogicalTableScan(table=[[default, a]])",
+          "\n        PinotLogicalExchange(distribution=[hash[0, 1]])",
+          "\n          PinotLogicalAggregate(group=[{0}], agg#0=[$SUM0($1)], 
aggType=[FINAL])",
+          "\n            PinotLogicalExchange(distribution=[hash[0]])",
+          "\n              PinotLogicalAggregate(group=[{1}], 
agg#0=[$SUM0($2)], aggType=[LEAF])",
+          "\n                PinotLogicalTableScan(table=[[default, b]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "GROUP BY with COUNT(*) pushed into UNION ALL branches: 
per-branch COUNT rolls up via top-level SUM0",
+        "sql": "EXPLAIN PLAN FOR SELECT col2, COUNT(*) FROM (SELECT col2 FROM 
a UNION ALL SELECT col2 FROM b) GROUP BY col2",
+        "output": [
+          "Execution Plan",
+          "\nPinotLogicalAggregate(group=[{0}], agg#0=[$SUM0($1)], 
aggType=[FINAL])",
+          "\n  PinotLogicalExchange(distribution=[hash[0]])",
+          "\n    PinotLogicalAggregate(group=[{0}], agg#0=[$SUM0($1)], 
aggType=[LEAF])",
+          "\n      LogicalUnion(all=[true])",
+          "\n        PinotLogicalExchange(distribution=[hash[0, 1]])",
+          "\n          PinotLogicalAggregate(group=[{0}], agg#0=[COUNT($1)], 
aggType=[FINAL])",
+          "\n            PinotLogicalExchange(distribution=[hash[0]])",
+          "\n              PinotLogicalAggregate(group=[{1}], agg#0=[COUNT()], 
aggType=[LEAF])",
+          "\n                PinotLogicalTableScan(table=[[default, a]])",
+          "\n        PinotLogicalExchange(distribution=[hash[0, 1]])",
+          "\n          PinotLogicalAggregate(group=[{0}], agg#0=[COUNT($1)], 
aggType=[FINAL])",
+          "\n            PinotLogicalExchange(distribution=[hash[0]])",
+          "\n              PinotLogicalAggregate(group=[{1}], agg#0=[COUNT()], 
aggType=[LEAF])",
+          "\n                PinotLogicalTableScan(table=[[default, b]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "Aggregate hints (e.g. is_skip_leaf_stage_group_by) are 
preserved on both the per-branch and rolled-up aggregates produced by 
AggregateUnionTranspose",
+        "sql": "EXPLAIN PLAN FOR SELECT /*+ 
aggOptions(is_skip_leaf_stage_group_by='true') */ col2, SUM(col3) FROM (SELECT 
col2, col3 FROM a UNION ALL SELECT col2, col3 FROM b) GROUP BY col2",
+        "output": [
+          "Execution Plan",
+          "\nPinotLogicalAggregate(group=[{0}], agg#0=[$SUM0($1)], 
aggType=[DIRECT])",
+          "\n  PinotLogicalExchange(distribution=[hash[0]])",
+          "\n    LogicalUnion(all=[true])",
+          "\n      PinotLogicalExchange(distribution=[hash[0, 1]])",
+          "\n        PinotLogicalAggregate(group=[{0}], agg#0=[$SUM0($1)], 
aggType=[DIRECT])",
+          "\n          PinotLogicalExchange(distribution=[hash[0]])",
+          "\n            LogicalProject(col2=[$1], col3=[$2])",
+          "\n              PinotLogicalTableScan(table=[[default, a]])",
+          "\n      PinotLogicalExchange(distribution=[hash[0, 1]])",
+          "\n        PinotLogicalAggregate(group=[{0}], agg#0=[$SUM0($1)], 
aggType=[DIRECT])",
+          "\n          PinotLogicalExchange(distribution=[hash[0]])",
+          "\n            LogicalProject(col2=[$1], col3=[$2])",
+          "\n              PinotLogicalTableScan(table=[[default, b]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "Disabling AggregateUnionTranspose via skipPlannerRules 
keeps the aggregate above the UNION ALL",
+        "sql": "SET skipPlannerRules='AggregateUnionTranspose'; EXPLAIN PLAN 
FOR SELECT col2, SUM(col3) FROM (SELECT col2, col3 FROM a UNION ALL SELECT 
col2, col3 FROM b) GROUP BY col2",
+        "output": [
+          "Execution Plan",
+          "\nPinotLogicalAggregate(group=[{0}], agg#0=[$SUM0($1)], 
aggType=[FINAL])",
+          "\n  PinotLogicalExchange(distribution=[hash[0]])",
+          "\n    PinotLogicalAggregate(group=[{0}], agg#0=[$SUM0($1)], 
aggType=[LEAF])",
+          "\n      LogicalUnion(all=[true])",
+          "\n        PinotLogicalExchange(distribution=[hash[0, 1]])",
+          "\n          LogicalProject(col2=[$1], col3=[$2])",
+          "\n            PinotLogicalTableScan(table=[[default, a]])",
+          "\n        PinotLogicalExchange(distribution=[hash[0, 1]])",
+          "\n          LogicalProject(col2=[$1], col3=[$2])",
+          "\n            PinotLogicalTableScan(table=[[default, b]])",
+          "\n"
+        ]
+      },
       {
         "description": "EXCEPT from three tables",
         "sql": "EXPLAIN PLAN FOR SELECT col1, col2 FROM a EXCEPT SELECT col1, 
col2 FROM b EXCEPT SELECT col1, col2 FROM c",
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index fcfac94f4bf..694f64cc4e3 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -1025,6 +1025,7 @@ public class CommonConstants {
       public static final String AGGREGATE_REMOVE = "AggregateRemove";
       public static final String AGGREGATE_JOIN_TRANSPOSE = 
"AggregateJoinTranspose";
       public static final String AGGREGATE_UNION_AGGREGATE = 
"AggregateUnionAggregate";
+      public static final String AGGREGATE_UNION_TRANSPOSE = 
"AggregateUnionTranspose";
       public static final String AGGREGATE_REDUCE_FUNCTIONS = 
"AggregateReduceFunctions";
       public static final String AGGREGATE_FUNCTION_REWRITE = 
"AggregateFunctionRewrite";
       public static final String AGGREGATE_CASE_TO_FILTER = 
"AggregateCaseToFilter";


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to