yashmayya opened a new pull request, #18513:
URL: https://github.com/apache/pinot/pull/18513
## Summary
Adds a planner rule to the multi-stage engine (MSE) that pushes an
`Aggregate` past a `UNION ALL` so that each branch aggregates locally before
any cross-stage shuffle. This is enabled by default.
For `SELECT g, SUM(x) FROM (a UNION ALL b) GROUP BY g`, the rule rewrites
```
Aggregate(group=g, SUM(x))
Union ALL
a
b
```
into
```
Aggregate(group=g, SUM(x))
Union ALL
Aggregate(group=g, SUM(x)) -- a
Aggregate(group=g, SUM(x)) -- b
```
After Pinot's exchange-insertion rules run, each branch then ships
*pre-aggregated* rows (one per group key per branch) through its exchange
instead of raw rows.
## Why this matters for Pinot specifically
In a distributed OLAP engine, cross-stage data shuffle is the dominant cost
— moving bytes across the network is far more expensive than running an extra
aggregate pass on a worker. With N rows per branch and C distinct group keys,
the shuffle cost goes from O(N) raw rows to O(C) partial aggregates per branch.
For low-cardinality dimensions over large fact tables — the common analytics
shape — this is a big win.
Trade-offs:
- **When it helps a lot:** group key cardinality ≪ row count, large
branches, or skewed branch sizes where the larger branch benefits
disproportionately.
- **When it doesn't help:** group keys are nearly unique (per-branch
aggregation compresses nothing), or branches are very small (fixed aggregator
overhead dominates).
For the edge cases, users can opt out with `SET
skipPlannerRules='AggregateUnionTranspose'`.
This matches what most distributed OLAP planners do by default (Spark,
BigQuery, Snowflake, Druid, ClickHouse all push aggregates through unions).
## Why a custom rule instead of Calcite's `AggregateUnionTransposeRule`
Calcite ships an `AggregateUnionTransposeRule` for exactly this
transformation. Adding it to `PinotQueryRuleSets` directly — which is the first
thing I tried — produces no effect: the rule fires but immediately bails out
without transforming the tree.
The cause is in how the upstream rule decides which aggregate functions are
splittable. It does a
`SUPPORTED_AGGREGATES.containsKey(call.getAggregation().getClass())` lookup
against a hard-coded allow-list of Calcite classes:
```
SqlSumAggFunction, SqlMinMaxAggFunction, SqlCountAggFunction,
SqlSumEmptyIsZeroAggFunction, SqlAnyValueAggFunction, SqlBitOpAggFunction
```
Pinot ships its own subclasses of `SqlAggFunction` — `PinotSumFunction`,
`PinotMinMaxFunction`, etc. — which intentionally do **not** extend Calcite's
`SqlSumAggFunction` / `SqlMinMaxAggFunction` (see
[PinotOperatorTable.java](https://github.com/apache/pinot/blob/master/pinot-query-planner/src/main/java/org/apache/pinot/calcite/sql/fun/PinotOperatorTable.java#L489-L512);
the comment explains it needs to carry custom return-type inference and
operand checkers that Calcite's classes don't allow overriding cleanly). So the
class-equality check in upstream's rule never matches, `transformAggCalls`
returns null, and the rule no-ops on essentially every Pinot query.
The Pinot variant in this PR matches on `SqlKind` (`SUM`, `SUM0`, `COUNT`,
`MIN`, `MAX`, `ANY_VALUE`, `BIT_AND`, `BIT_OR`, `BIT_XOR`) instead. The rest of
the logic — the `union.all` guard, the `hasUniqueKeyInAllInputs` short-circuit
(which is what prevents the rule from looping on its own output), the `COUNT` →
`SUM0` rollup, and the per-branch nullability re-typing — is a straight port of
upstream's behavior. A few line-by-line differences:
- `AggregateCall.create` is called with `aggRel` (the full Aggregate node,
so its `(groupCols, aggCols)` row type is what the new top-of-union exchange
exposes) instead of `aggRel.getInput()`. Upstream uses `aggRel.copy(...)` for
the same effect; passing `aggRel` directly is slightly cheaper.
- Pinot checkstyle bans `i++` / `++i` in `for`-loop iterators, so the loops
use `i += 1`.
If/when Pinot ever switches to using Calcite's stock `SqlSumAggFunction`
family, the upstream rule becomes a drop-in replacement and the Pinot variant
can be deleted.
## Why default-on, and how it interacts with `AggregateUnionAggregateRule`
`AggregateUnionTransposeRule` and `AggregateUnionAggregateRule` are roughly
inverse operations:
- **Transpose:** `Agg(Union(A, B))` → `Agg(Union(Agg(A), Agg(B)))` (push
aggregates into branches).
- **UnionAggregate:** `Agg(Union(Agg(A), B))` → `Agg(Union(A, B))` (collapse
pre-aggregation back up).
For Pinot's cost profile the right defaults are asymmetric:
- Transpose is **on** by default — shuffle-reducing.
- UnionAggregate stays **off** by default — shuffle-increasing, since
collapsing the branch-level aggregate means the union's exchange now carries
raw rows.
The two rules sit in separate `HepInstruction`s, with
`AggregateUnionAggregateRule` registered first and
`PinotAggregateUnionTransposeRule` second. Each instruction runs its rule to
fixpoint and then control moves on, so there is no oscillation. If a user opts
into `AggregateUnionAggregateRule`, Transpose still runs afterward and
effectively undoes the merge — that combination is rarely useful, and
`QueryPlannerRuleOptionsTest.testAggregateUnionAggregateEnabled` now documents
this by explicitly skipping Transpose so it can verify the merge behavior in
isolation.
A rationale comment in `PinotQueryRuleSets` captures all of the above so
future maintainers don't flip the defaults.
## Self-recursion safety
Transpose's output `Agg(Union(Agg(A), Agg(B)))` still matches its own
pattern `Agg(Union(...))`. Without a guard the rule would loop within its own
`HepInstruction`. The guard is the `hasUniqueKeyInAllInputs` check: after one
application, every union input is an `Aggregate` and is therefore unique on the
group keys, so the next attempt bails. This is the same mechanism Calcite's
upstream rule uses.
## Test plan
- [x] New EXPLAIN PLAN fixtures in `SetOpPlans.json`:
- `SELECT col2, SUM(col3) FROM (a UNION ALL b) GROUP BY col2` — confirms
the partial aggregate is pushed into each branch.
- `SELECT col2, COUNT(*) FROM (a UNION ALL b) GROUP BY col2` — confirms
per-branch `COUNT` rolls up via top-level `SUM0`.
- `SET skipPlannerRules='AggregateUnionTranspose'` variant — confirms the
opt-out keeps the un-pushed plan.
- [x] Updated existing fixtures whose plans now show the pushed-down
aggregates:
- `SetOpPlans.json` — "UNION from three tables" (UNION DISTINCT, because
`UnionToDistinctRule` introduces an `Aggregate` that Transpose then pushes down
recursively).
- `PhysicalOptimizerPlans.json` — "Union, distinct, etc. but still
maximally identity exchange".
- [x] Updated
`QueryPlannerRuleOptionsTest.testAggregateUnionAggregateEnabled` and
`testDisablePruneEmptyUnion` to also disable Transpose so they continue to
exercise their target rule in isolation. Added a small
`explainQueryWithRules(query, enable, disable)` helper to support
enable+disable in one call.
- [x] `./mvnw -pl pinot-query-planner test` — 1151 tests, 0 failures.
- [x] `./mvnw -pl pinot-query-runtime test` — 4230 tests, 0 failures (6
pre-existing skips).
- [x] Spotless / checkstyle / license-format / license-check all clean.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]