morrySnow commented on code in PR #65129:
URL: https://github.com/apache/doris/pull/65129#discussion_r3510485923
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java:
##########
@@ -2496,14 +2496,16 @@ public PlanFragment visitPhysicalSetOperation(
setOperationNode.setColocate(true);
}
- // TODO: open comment when support `enable_local_shuffle_planner`
- // for (Plan child : setOperation.children()) {
- // PhysicalPlan childPhysicalPlan = (PhysicalPlan) child;
- // if
(JoinUtils.isStorageBucketed(childPhysicalPlan.getPhysicalProperties())) {
- //
setOperationNode.setDistributionMode(DistributionMode.BUCKET_SHUFFLE);
- // break;
- // }
- // }
+ // A storage-bucketed child means set-op bucket shuffle was chosen
(only under the FE
+ // local-shuffle planner); mark the node BUCKET_SHUFFLE so the set
sink/probe align by
+ // bucket instead of execution-bucketed hash.
+ for (Plan child : setOperation.children()) {
+ PhysicalPlan childPhysicalPlan = (PhysicalPlan) child;
+ if
(JoinUtils.isStorageBucketed(childPhysicalPlan.getPhysicalProperties())) {
+
setOperationNode.setDistributionMode(DistributionMode.BUCKET_SHUFFLE);
Review Comment:
**Colocate and BUCKET_SHUFFLE can both be set on the same SetOperationNode.**
The existing `setColocate(true)` at line 2496 and the new
`setDistributionMode(BUCKET_SHUFFLE)` here can both fire on the same node. The
basic bucket-shuffle child's OlapScan may be directly visible (no exchange
wrapper), so `findOlapScanNodesByPassExchangeNode` finds it and sets
colocate=true. Then this new check also finds storage-bucketed children and
sets BUCKET_SHUFFLE.
For hash joins, colocate and BUCKET_SHUFFLE are mutually exclusive via an
explicit if-else chain (lines 1718 vs 1728). For set operations, no such
exclusivity exists. The interaction between colocate scheduling
(`hasColocatePlanNode`) and bucket shuffle scheduling (`isBucketShuffle()`) on
the same set operation node is unclear and untested.
**Suggestion:** Follow the hash join pattern — make colocate and
BUCKET_SHUFFLE mutually exclusive for set operations too, or document the
expected interaction.
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java:
##########
@@ -2496,14 +2496,16 @@ public PlanFragment visitPhysicalSetOperation(
setOperationNode.setColocate(true);
}
- // TODO: open comment when support `enable_local_shuffle_planner`
- // for (Plan child : setOperation.children()) {
- // PhysicalPlan childPhysicalPlan = (PhysicalPlan) child;
- // if
(JoinUtils.isStorageBucketed(childPhysicalPlan.getPhysicalProperties())) {
- //
setOperationNode.setDistributionMode(DistributionMode.BUCKET_SHUFFLE);
- // break;
- // }
- // }
+ // A storage-bucketed child means set-op bucket shuffle was chosen
(only under the FE
+ // local-shuffle planner); mark the node BUCKET_SHUFFLE so the set
sink/probe align by
+ // bucket instead of execution-bucketed hash.
+ for (Plan child : setOperation.children()) {
+ PhysicalPlan childPhysicalPlan = (PhysicalPlan) child;
+ if
(JoinUtils.isStorageBucketed(childPhysicalPlan.getPhysicalProperties())) {
Review Comment:
**Missing `enableLocalShufflePlanner` gate.**
The translator unconditionally sets `BUCKET_SHUFFLE` when storage-bucketed
children are detected. `ChildrenPropertiesRegulator` (line 664) and
`RequestPropertyDeriver` (line 348) both explicitly gate their new behavior on
`enableLocalShufflePlanner`, but the translator does not.
While the invariant holds today — storage-bucketed children only appear when
the planner gate is active — the dependency is implicit. A defensive guard (or
at minimum a comment) would make this explicit and prevent silent breakage if a
future planner change produces `STORAGE_BUCKETED` children without the gate.
```java
// Suggestion: add guard
if (enableLocalShufflePlanner) {
for (Plan child : setOperation.children()) { ... }
}
```
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java:
##########
@@ -651,83 +652,94 @@ public List<List<PhysicalProperties>>
visitPhysicalSetOperation(PhysicalSetOpera
} else if (requiredDistributionSpec instanceof DistributionSpecHash) {
// TODO: should use the most common hash spec as basic
DistributionSpecHash basic = (DistributionSpecHash)
requiredDistributionSpec;
- // TODO: open comment when support `enable_local_shuffle_planner`
- // int bucketShuffleBasicIndex = -1;
- // double basicRowCount = -1;
-
- // find the bucket shuffle basic index
- // try {
- // ImmutableSet<ShuffleType> supportBucketShuffleTypes =
ImmutableSet.of(
- // ShuffleType.NATURAL,
- // ShuffleType.STORAGE_BUCKETED
- // );
- // for (int i = 0; i < originChildrenProperties.size(); i++) {
- // PhysicalProperties originChildrenProperty =
originChildrenProperties.get(i);
- // DistributionSpec childDistribution =
originChildrenProperty.getDistributionSpec();
- // if (childDistribution instanceof DistributionSpecHash
- // && supportBucketShuffleTypes.contains(
- // ((DistributionSpecHash)
childDistribution).getShuffleType())
- // &&
!(isBucketShuffleDownGrade(setOperation.child(i)))) {
- // Statistics stats = setOperation.child(i).getStats();
- // double rowCount = stats.getRowCount();
- // if (rowCount > basicRowCount) {
- // basicRowCount = rowCount;
- // bucketShuffleBasicIndex = i;
- // }
- // }
- // }
- // } catch (Throwable t) {
- // // catch stats exception
- // LOG.warn("Can not find the most (bucket num, rowCount): " +
t, t);
- // bucketShuffleBasicIndex = -1;
- // }
-
- // use bucket shuffle
- // if (bucketShuffleBasicIndex >= 0) {
- // DistributionSpecHash notShuffleSideRequire
- // = (DistributionSpecHash)
requiredProperties.get(bucketShuffleBasicIndex)
- // .getDistributionSpec();
- //
- // DistributionSpecHash notNeedShuffleOutput
- // = (DistributionSpecHash)
originChildrenProperties.get(bucketShuffleBasicIndex)
- // .getDistributionSpec();
- //
- // for (int i = 0; i < originChildrenProperties.size(); i++) {
- // DistributionSpecHash current
- // = (DistributionSpecHash)
originChildrenProperties.get(i).getDistributionSpec();
- // if (i == bucketShuffleBasicIndex) {
- // continue;
- // }
- //
- // DistributionSpecHash currentRequire
- // = (DistributionSpecHash)
requiredProperties.get(i).getDistributionSpec();
- //
- // PhysicalProperties target = calAnotherSideRequired(
- // ShuffleType.STORAGE_BUCKETED,
- // notNeedShuffleOutput, current,
- // notShuffleSideRequire,
- // currentRequire);
- // updateChildEnforceAndCost(i, target);
- // }
- // setOperation.setMutableState(
- // PhysicalSetOperation.DISTRIBUTE_TO_CHILD_INDEX,
bucketShuffleBasicIndex);
- // use partitioned shuffle
- // } else {
- for (int i = 0; i < originChildrenProperties.size(); i++) {
- DistributionSpecHash current
- = (DistributionSpecHash)
originChildrenProperties.get(i).getDistributionSpec();
- if (current.getShuffleType() != ShuffleType.EXECUTION_BUCKETED
- || !bothSideShuffleKeysAreSameOrder(basic, current,
- (DistributionSpecHash)
requiredProperties.get(0).getDistributionSpec(),
- (DistributionSpecHash)
requiredProperties.get(i).getDistributionSpec())) {
+ int bucketShuffleBasicIndex = -1;
+ double basicRowCount = -1;
+
+ // Bucket shuffle for set operation is only valid when the FE
plans the local
+ // shuffle: with the BE-side local-shuffle planner the backend
cannot infer the
+ // correct local shuffle type for the set sink/probe and computes
wrong results.
+ // When the planner is off, keep bucketShuffleBasicIndex = -1 and
fall back to the
+ // execution-bucketed (partitioned) shuffle below.
+ ConnectContext setOperationContext = ConnectContext.get();
+ boolean enableLocalShufflePlanner = setOperationContext != null
+ &&
setOperationContext.getSessionVariable().isEnableLocalShufflePlanner();
+
+ // find the bucket shuffle basic index: the largest natural /
storage-bucketed child
+ // keeps its bucket distribution, every other child is
bucket-shuffled to it.
+ if (enableLocalShufflePlanner) {
+ try {
+ ImmutableSet<ShuffleType> supportBucketShuffleTypes =
ImmutableSet.of(
+ ShuffleType.NATURAL,
+ ShuffleType.STORAGE_BUCKETED
+ );
+ for (int i = 0; i < originChildrenProperties.size(); i++) {
+ PhysicalProperties originChildrenProperty =
originChildrenProperties.get(i);
+ DistributionSpec childDistribution =
originChildrenProperty.getDistributionSpec();
+ if (childDistribution instanceof DistributionSpecHash
+ && supportBucketShuffleTypes.contains(
+ ((DistributionSpecHash)
childDistribution).getShuffleType())
+ &&
!(isBucketShuffleDownGrade(setOperation.child(i)))) {
Review Comment:
**`isEnableBucketShuffleJoin` controls set operation bucket shuffle.**
The `isBucketShuffleDownGrade` method (line 294) checks
`ConnectContext.get().getSessionVariable().isEnableBucketShuffleJoin()`, a
join-specific session variable. A user who disables bucket shuffle for joins
(`enable_bucket_shuffle_join=false`) also silently loses the set operation
bucket shuffle optimization, with no independent control.
This coupling existed in the original commented-out PR #59006 code and is
now activated. Consider whether set operations warrant a separate session
variable, or at minimum document this coupling in the session variable
description.
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/properties/RequestPropertyDeriver.java:
##########
@@ -339,14 +339,20 @@ public Void
visitPhysicalSetOperation(PhysicalSetOperation setOperation, PlanCon
// shuffle all column
// TODO: for wide table, may be we should add a upper limit of
shuffle columns
- // TODO: open comment when support `enable_local_shuffle_planner`
and change to REQUIRE
- // intersect/except always need hash distribution, we use REQUIRE
to auto select
- // bucket shuffle or execution shuffle
+ // intersect/except always need hash distribution. Auto-selecting
bucket shuffle
+ // (ShuffleType.REQUIRE) for set operation is only valid when the
FE plans the local
+ // shuffle: with the BE-side local-shuffle planner the backend
cannot infer the
+ // correct local shuffle type for the set sink/probe and computes
wrong results, so
+ // fall back to EXECUTION_BUCKETED there.
+ ConnectContext setOperationContext = ConnectContext.get();
Review Comment:
**Redundant `ConnectContext.get()` — use `this.connectContext` instead.**
The class already holds a `private final ConnectContext connectContext`
field (line 102), set in both constructors (lines 107, 112) and used throughout
the class (lines 150, 160, 171, 182). The new code calls `ConnectContext.get()`
(thread-local lookup) unnecessarily. Using the existing field would be both
simpler and consistent with the rest of the class.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]