924060929 commented on code in PR #63366:
URL: https://github.com/apache/doris/pull/63366#discussion_r3413410484
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java:
##########
@@ -634,7 +636,8 @@ protected void splitFragments(PhysicalPlan resultPlan) {
return;
}
- PlanTranslatorContext planTranslatorContext = new
PlanTranslatorContext(cascadesContext);
+ this.planTranslatorContext = new
PlanTranslatorContext(cascadesContext);
+ PlanTranslatorContext planTranslatorContext =
this.planTranslatorContext;
Review Comment:
Done in `fe9a3316be0` — removed; subsequent references resolve to the field
`this.planTranslatorContext`.
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java:
##########
@@ -657,7 +660,8 @@ protected void splitFragments(PhysicalPlan resultPlan) {
scanNodeList.addAll(planTranslatorContext.getScanNodes());
physicalRelations.addAll(planTranslatorContext.getPhysicalRelations());
descTable = planTranslatorContext.getDescTable();
- fragments = new ArrayList<>(planTranslatorContext.getPlanFragments());
+ List<PlanFragment> planFragments =
planTranslatorContext.getPlanFragments();
Review Comment:
Done in `fe9a3316be0` — inlined into `fragments = new
ArrayList<>(planTranslatorContext.getPlanFragments())`.
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java:
##########
@@ -740,6 +744,18 @@ protected void distribute(PhysicalPlan physicalPlan,
ExplainLevel explainLevel)
splitFragments(physicalPlan);
doDistribute(canUseNereidsDistributePlanner, explainLevel);
+
+ SessionVariable sessionVariable =
cascadesContext.getConnectContext().getSessionVariable();
+ if (sessionVariable.isEnableLocalShufflePlanner() &&
sessionVariable.isEnableLocalShuffle()) {
+ addLocalExchangeAfterDistribute();
+ }
Review Comment:
Done in `fe9a3316be0` — moved the `enable_local_shuffle_planner &&
enable_local_shuffle` gate into `addLocalExchangeAfterDistribute()`; the caller
now just invokes it unconditionally.
##########
fe/fe-core/src/main/java/org/apache/doris/planner/BucketedAggregationNode.java:
##########
@@ -97,7 +98,7 @@ public String getNodeExplainString(String detailPrefix,
TExplainLevel detailLeve
}
@Override
- public boolean isSerialOperator() {
+ public boolean isSerialOperatorOnBe(ConnectContext context) {
Review Comment:
Changed in `fe9a3316be0` to override `isSerialNode()` (returning `false`)
instead of `isSerialOperatorOnBe()`, matching every other node —
behavior-equivalent.
One correction on the premise: `BucketedAggregationNode extends PlanNode`,
not `AggregationNode`, so it never inherited `AggregationNode.isSerialNode()`;
`isSerialNode()` was already the base `false`, and the old
`isSerialOperatorOnBe()=false` override was effectively redundant. So there was
no latent inconsistency, but overriding `isSerialNode()` is the
cleaner/consistent form regardless.
##########
fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java:
##########
@@ -258,4 +268,155 @@ public boolean isQueryCacheCandidate() {
public void setQueryCacheCandidate(boolean queryCacheCandidate) {
this.queryCacheCandidate = queryCacheCandidate;
}
+
+ @Override
+ public Pair<PlanNode, LocalExchangeType> enforceAndDeriveLocalExchange(
+ PlanTranslatorContext translatorContext, PlanNode parent,
LocalExchangeTypeRequire parentRequire) {
+
+ ConnectContext connectContext = translatorContext.getConnectContext();
+ SessionVariable sessionVariable = connectContext.getSessionVariable();
+ // PR #62438: when false, non-finalize agg falls back to BE base class.
+ boolean enableLeBeforeAgg =
sessionVariable.enableLocalExchangeBeforeAgg;
+ boolean hasKeys = !aggInfo.getGroupingExprs().isEmpty();
+
+ // Each branch mirrors the corresponding BE operator's
required_data_distribution()
+ // check order 1:1. The helper baseClassRequire() expands BE's base
class behavior.
+ LocalExchangeTypeRequire requireChild;
+ if (canUseDistinctStreamingAgg(sessionVariable)) {
+ // DistinctStreamingAggOperatorX. Two flavors share this operator
class:
+ // - streaming preagg (useStreamingPreagg=true):
performance-only,
+ // flag controls
+ // - non-streaming dedup (useStreamingPreagg=false):
correctness-required,
+ // always HASH regardless of flag
+ // Diverges from BE: BE's `!_needs_finalize &&
!enable_local_exchange_before_agg`
+ // early return catches non-streaming dedup too, causing the same
family of
+ // wrong-result bug as AggSink (DORIS-25413).
+ if (needsFinalize && !hasKeys) {
+ requireChild = LocalExchangeTypeRequire.noRequire();
+ } else if (!needsFinalize && useStreamingPreagg &&
!enableLeBeforeAgg) {
+ requireChild = baseClassRequire(connectContext);
+ } else if (needsFinalize || (hasKeys && !useStreamingPreagg)) {
+ requireChild = AddLocalExchange.isColocated(this)
+ ? LocalExchangeTypeRequire.requireHash()
+ : parentRequire.autoRequireHash();
+ } else if
(sessionVariable.enableDistinctStreamingAggForcePassthrough) {
+ requireChild = LocalExchangeTypeRequire.requirePassthrough();
+ } else {
+ requireChild = baseClassRequire(connectContext);
+ }
+ } else if (useStreamingPreagg) {
+ // StreamingAggOperatorX
+ if (children.get(0) instanceof HashJoinNode
+ &&
sessionVariable.enableStreamingAggHashJoinForcePassthrough) {
+ requireChild = LocalExchangeTypeRequire.requirePassthrough();
+ } else if (!needsFinalize && !enableLeBeforeAgg) {
+ requireChild = baseClassRequire(connectContext);
+ } else if (!hasKeys) {
+ requireChild = needsFinalize
+ ? LocalExchangeTypeRequire.noRequire()
+ : baseClassRequire(connectContext);
+ } else {
+ requireChild = LocalExchangeTypeRequire.requireHash();
+ }
+ } else {
+ // AggSinkOperatorX — covers finalize phase AND non-finalize
phases (LOCAL
+ // preagg / FIRST_MERGE dedup). Streaming preagg goes through the
StreamingAgg
+ // branch above, not here.
+ //
+ // Phase semantics for !needsFinalize:
+ // - FIRST / SECOND (LOCAL phase, !isMerge): performance-only,
flag controls
+ // - FIRST_MERGE (correctness-required): always HASH regardless
of flag
+ //
+ // Diverges from BE here: BE's `!_needs_finalize &&
!enable_local_exchange_before_agg`
+ // early return also catches FIRST_MERGE, dropping the HASH
requirement and
+ // causing wrong-result (e.g. PASSTHROUGH over serial child breaks
the
+ // group-by-key invariant — DORIS-25413).
+ if (!hasKeys) {
+ requireChild = needsFinalize
+ ? LocalExchangeTypeRequire.noRequire()
+ : baseClassRequire(connectContext);
+ } else if (!needsFinalize && !aggInfo.isMerge() &&
!enableLeBeforeAgg) {
+ // LOCAL phase (FIRST preagg / SECOND distinct local) + user
opted out
+ // of pre-agg LE → base class decides: serial child →
PASSTHROUGH
+ // (parallelism), non-serial child → NOOP (no LE).
+ requireChild = baseClassRequire(connectContext);
+ } else if (!needsFinalize || AddLocalExchange.isColocated(this)) {
+ // FIRST_MERGE (correctness) or finalize+colocate → HASH.
+ requireChild = parentRequire.autoRequireHash();
+ } else if (hasPartitionExprs(parentRequire)) {
+ // FE-only heuristic: finalize non-colocate with parent hash
requirement
+ // → inherit parent's specific hash type.
+ requireChild = parentRequire.autoRequireHash();
+ } else {
+ // FE-only heuristic: finalize non-colocate without parent
hash → skip
+ // LE (child Exchange already provides hash distribution).
+ requireChild = LocalExchangeTypeRequire.noRequire();
+ }
+ }
+
+ Pair<PlanNode, LocalExchangeType> enforceResult
+ = enforceRequire(translatorContext, children.get(0), 0,
requireChild);
+ children = Lists.newArrayList(enforceResult.first);
+ return Pair.of(this, enforceResult.second);
+ }
+
+ /** BE base class required_data_distribution: serial child → PASSTHROUGH,
else → NOOP. */
+ private LocalExchangeTypeRequire baseClassRequire(ConnectContext
connectContext) {
+ return children.get(0).isSerialOperatorOnBe(connectContext)
+ ? LocalExchangeTypeRequire.requirePassthrough()
+ : LocalExchangeTypeRequire.noRequire();
+ }
+
+ @Override
+ protected List<Expr> getSemanticPartitionExprs() {
+ return aggInfo.getGroupingExprs();
+ }
+
+ @Override
+ protected List<Expr> getLocalExchangeDistributeExprs(int childIndex,
boolean followedByShuffled) {
+ // Mirror BE AggSinkOperatorX::update_operator /
StreamingAggOperatorX::update_operator:
+ // _partition_exprs = (distribute_expr_lists set &&
(followed_by_shuffled || has_distinct))
+ // ? distribute_expr_lists[0] : grouping_exprs
+ // The HASH LocalExchange must partition by _partition_exprs so a
streaming partial preagg
+ // locally collapses same-key rows. Using child distribution
(default) for a non-shuffled
+ // chain scatters same-group rows across N instances, leaving
partial_preagg essentially a
+ // no-op and breaking row-arrival order at downstream merge-finalize
(e.g. group_concat).
+ List<Expr> childDist = getChildDistributeExprList(childIndex);
+ boolean hasDistinct = aggInfo.getAggregateExprs().stream()
+ .map(FunctionCallExpr::getFnName)
+ .filter(name -> name != null)
+ .map(name -> name.getFunction())
+ .filter(name -> name != null)
+ .anyMatch(name -> name.startsWith("multi_distinct_"));
Review Comment:
There is no structural signal available at this layer, so the name match is
intentional. Nereids rewrites `count/sum(distinct ...)` into dedicated
`MultiDistinct*` functions constructed with `distinct=false` and a
`multi_distinct_` name; by the time it reaches this legacy `FunctionCallExpr` /
`AggregateInfo` layer, `isDistinct()` is already `false` and the function name
is the only remaining signal. An `AggInfo.isMultiDistinct()` helper would still
have to match the name internally — it couldn't be more "structural". (The BE
`__isset.aggregate_functions` check isn't a distinct test either.) Added a
comment in `fe9a3316be0` explaining this so the prefix match isn't mistaken for
laziness.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]