924060929 commented on code in PR #63366:
URL: https://github.com/apache/doris/pull/63366#discussion_r3452056959
##########
fe/fe-core/src/main/java/org/apache/doris/planner/RepeatNode.java:
##########
@@ -86,11 +90,30 @@ public String getNodeExplainString(String detailPrefix,
TExplainLevel detailLeve
// Determined by its child.
@Override
- public boolean isSerialOperator() {
- return children.get(0).isSerialOperator();
+ public boolean isSerialNode() {
+ return children.get(0).isSerialNode();
}
public GroupingInfo getGroupingInfo() {
return groupingInfo;
}
+
+ @Override
+ public Pair<PlanNode, LocalExchangeType> enforceAndDeriveLocalExchange(
+ PlanTranslatorContext translatorContext, PlanNode parent,
LocalExchangeTypeRequire parentRequire) {
+ // REPEAT (rollup/grouping sets) is NOT distribution-preserving: it
NULLs grouping
+ // columns per set and produces GROUPING_ID, which is part of the
downstream agg hash
+ // key but does not exist below the repeat. Forwarding the parent HASH
require down
+ // would push the local exchange before the row expansion AND hash by
the child
+ // distribution (a single upstream shuffle key) instead of the agg
grouping_exprs,
+ // collapsing rows onto one instance (tpcds q67, +73%). Recurse with
noRequire so the
+ // parent inserts its hash local exchange ABOVE the repeat using its
own grouping_exprs
+ // (mirrors BE, whose RepeatOperatorX has a NOOP
required_data_distribution).
+ Pair<PlanNode, LocalExchangeType> enforceResult
+ = enforceRequire(translatorContext, children.get(0), 0,
+ LocalExchangeTypeRequire.noRequire());
+ children = new java.util.ArrayList<>();
+ children.add(enforceResult.first);
Review Comment:
Thanks for the detailed analysis. I checked this against the BE native
local-shuffle path and I don't believe it's a correctness issue — forwarding
the child's distribution here is a deliberate mirror of BE, and the local
exchange above `Repeat` is parallelism-only.
**The grouping-id correctness shuffle is a cross-fragment network exchange,
not a local exchange.** `EXPLAIN` of `GROUP BY GROUPING SETS` over a bucketed
table:
```
Fragment 1: VOlapScanNode -> VREPEAT_NODE (output: ..., GROUPING_ID)
-> VAGGREGATE (update serialize, STREAMING) //
partial pre-agg
-> STREAM DATA SINK HASH_PARTITIONED: <keys>, GROUPING_ID //
network shuffle
Fragment 0: VEXCHANGE -> VAGGREGATE (merge finalize) //
final agg
```
The aggregate directly above `Repeat` is a **streaming partial
pre-aggregation**; it does not require its input partitioned by the grouping
keys. Global correctness (grouping by `GROUPING_ID`) is guaranteed by the
network shuffle (`HASH_PARTITIONED` by the grouping exprs + `GROUPING_ID`)
feeding the final merge aggregate in Fragment 0. A `LocalExchangeNode` above
`Repeat` can therefore only change the partial agg's within-backend parallelism
— never the result.
**BE's native path skips the exchange here too.**
`Pipeline::need_to_local_exchange` (`be/src/exec/pipeline/pipeline.cpp`):
```cpp
return _data_distribution.distribution_type !=
target_data_distribution.distribution_type &&
!(is_hash_exchange(_data_distribution.distribution_type) &&
is_hash_exchange(target_data_distribution.distribution_type)); //
both hash -> skip
```
For `Scan -> Repeat -> AggSink`, the pipeline `_data_distribution` is the
scan's `required_data_distribution` = `BUCKET_HASH_SHUFFLE`
(`scan_operator.h`), `RepeatOperatorX` requires `NOOP` (no LE), and the AggSink
requires `GLOBAL_EXECUTION_HASH_SHUFFLE` — both hash, so BE skips the local
exchange above `Repeat`.
`RequireHash.satisfy(<any hash>) == true` on the FE side is the exact mirror
of that BE rule, and `RepeatNode` forwarding the child's distribution is what
lets the FE satisfy-check observe the same distribution BE's
`_data_distribution` holds. Returning `NOOP` would force a local exchange BE
never inserts, diverging from the native path.
So I'll keep the current behavior. Happy to add a coverage case asserting
that a hash-distributed child is forwarded and no LE is inserted above
`Repeat`, to lock in the parity.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]