morrySnow commented on code in PR #65129:
URL: https://github.com/apache/doris/pull/65129#discussion_r3510485923


##########
fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java:
##########
@@ -2496,14 +2496,16 @@ public PlanFragment visitPhysicalSetOperation(
             setOperationNode.setColocate(true);
         }
 
-        // TODO: open comment when support `enable_local_shuffle_planner`
-        // for (Plan child : setOperation.children()) {
-        //     PhysicalPlan childPhysicalPlan = (PhysicalPlan) child;
-        //     if 
(JoinUtils.isStorageBucketed(childPhysicalPlan.getPhysicalProperties())) {
-        //         
setOperationNode.setDistributionMode(DistributionMode.BUCKET_SHUFFLE);
-        //         break;
-        //     }
-        // }
+        // A storage-bucketed child means set-op bucket shuffle was chosen 
(only under the FE
+        // local-shuffle planner); mark the node BUCKET_SHUFFLE so the set 
sink/probe align by
+        // bucket instead of execution-bucketed hash.
+        for (Plan child : setOperation.children()) {
+            PhysicalPlan childPhysicalPlan = (PhysicalPlan) child;
+            if 
(JoinUtils.isStorageBucketed(childPhysicalPlan.getPhysicalProperties())) {
+                
setOperationNode.setDistributionMode(DistributionMode.BUCKET_SHUFFLE);

Review Comment:
   **Colocate and BUCKET_SHUFFLE can both be set on the same SetOperationNode.**
   
   The existing `setColocate(true)` at line 2496 and the new 
`setDistributionMode(BUCKET_SHUFFLE)` here can both fire on the same node. The 
basic bucket-shuffle child's OlapScan may be directly visible (no exchange 
wrapper), so `findOlapScanNodesByPassExchangeNode` finds it and sets 
colocate=true. Then this new check also finds storage-bucketed children and 
sets BUCKET_SHUFFLE.
   
   For hash joins, colocate and BUCKET_SHUFFLE are mutually exclusive via an 
explicit if-else chain (lines 1718 vs 1728). For set operations, no such 
exclusivity exists. The interaction between colocate scheduling 
(`hasColocatePlanNode`) and bucket shuffle scheduling (`isBucketShuffle()`) on 
the same set operation node is unclear and untested.
   
   **Suggestion:** Follow the hash join pattern — make colocate and 
BUCKET_SHUFFLE mutually exclusive for set operations too, or document the 
expected interaction.



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java:
##########
@@ -2496,14 +2496,16 @@ public PlanFragment visitPhysicalSetOperation(
             setOperationNode.setColocate(true);
         }
 
-        // TODO: open comment when support `enable_local_shuffle_planner`
-        // for (Plan child : setOperation.children()) {
-        //     PhysicalPlan childPhysicalPlan = (PhysicalPlan) child;
-        //     if 
(JoinUtils.isStorageBucketed(childPhysicalPlan.getPhysicalProperties())) {
-        //         
setOperationNode.setDistributionMode(DistributionMode.BUCKET_SHUFFLE);
-        //         break;
-        //     }
-        // }
+        // A storage-bucketed child means set-op bucket shuffle was chosen 
(only under the FE
+        // local-shuffle planner); mark the node BUCKET_SHUFFLE so the set 
sink/probe align by
+        // bucket instead of execution-bucketed hash.
+        for (Plan child : setOperation.children()) {
+            PhysicalPlan childPhysicalPlan = (PhysicalPlan) child;
+            if 
(JoinUtils.isStorageBucketed(childPhysicalPlan.getPhysicalProperties())) {

Review Comment:
   **Missing `enableLocalShufflePlanner` gate.**
   
   The translator unconditionally sets `BUCKET_SHUFFLE` when storage-bucketed 
children are detected. `ChildrenPropertiesRegulator` (line 664) and 
`RequestPropertyDeriver` (line 348) both explicitly gate their new behavior on 
`enableLocalShufflePlanner`, but the translator does not.
   
   While the invariant holds today — storage-bucketed children only appear when 
the planner gate is active — the dependency is implicit. A defensive guard (or 
at minimum a comment) would make this explicit and prevent silent breakage if a 
future planner change produces `STORAGE_BUCKETED` children without the gate.
   
   ```java
   // Suggestion: add guard
   if (enableLocalShufflePlanner) {
       for (Plan child : setOperation.children()) { ... }
   }
   ```



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java:
##########
@@ -651,83 +652,94 @@ public List<List<PhysicalProperties>> 
visitPhysicalSetOperation(PhysicalSetOpera
         } else if (requiredDistributionSpec instanceof DistributionSpecHash) {
             // TODO: should use the most common hash spec as basic
             DistributionSpecHash basic = (DistributionSpecHash) 
requiredDistributionSpec;
-            // TODO: open comment when support `enable_local_shuffle_planner`
-            // int bucketShuffleBasicIndex = -1;
-            // double basicRowCount = -1;
-
-            // find the bucket shuffle basic index
-            // try {
-            //     ImmutableSet<ShuffleType> supportBucketShuffleTypes = 
ImmutableSet.of(
-            //             ShuffleType.NATURAL,
-            //             ShuffleType.STORAGE_BUCKETED
-            //     );
-            //     for (int i = 0; i < originChildrenProperties.size(); i++) {
-            //         PhysicalProperties originChildrenProperty = 
originChildrenProperties.get(i);
-            //         DistributionSpec childDistribution = 
originChildrenProperty.getDistributionSpec();
-            //         if (childDistribution instanceof DistributionSpecHash
-            //                 && supportBucketShuffleTypes.contains(
-            //                         ((DistributionSpecHash) 
childDistribution).getShuffleType())
-            //                 && 
!(isBucketShuffleDownGrade(setOperation.child(i)))) {
-            //             Statistics stats = setOperation.child(i).getStats();
-            //             double rowCount = stats.getRowCount();
-            //             if (rowCount > basicRowCount) {
-            //                 basicRowCount = rowCount;
-            //                 bucketShuffleBasicIndex = i;
-            //             }
-            //         }
-            //     }
-            // } catch (Throwable t) {
-            //     // catch stats exception
-            //     LOG.warn("Can not find the most (bucket num, rowCount): " + 
t, t);
-            //     bucketShuffleBasicIndex = -1;
-            // }
-
-            // use bucket shuffle
-            // if (bucketShuffleBasicIndex >= 0) {
-            //     DistributionSpecHash notShuffleSideRequire
-            //             = (DistributionSpecHash) 
requiredProperties.get(bucketShuffleBasicIndex)
-            //                   .getDistributionSpec();
-            //
-            //     DistributionSpecHash notNeedShuffleOutput
-            //             = (DistributionSpecHash) 
originChildrenProperties.get(bucketShuffleBasicIndex)
-            //                 .getDistributionSpec();
-            //
-            //     for (int i = 0; i < originChildrenProperties.size(); i++) {
-            //         DistributionSpecHash current
-            //                 = (DistributionSpecHash) 
originChildrenProperties.get(i).getDistributionSpec();
-            //         if (i == bucketShuffleBasicIndex) {
-            //             continue;
-            //         }
-            //
-            //         DistributionSpecHash currentRequire
-            //                 = (DistributionSpecHash) 
requiredProperties.get(i).getDistributionSpec();
-            //
-            //         PhysicalProperties target = calAnotherSideRequired(
-            //                 ShuffleType.STORAGE_BUCKETED,
-            //                 notNeedShuffleOutput, current,
-            //                 notShuffleSideRequire,
-            //                 currentRequire);
-            //         updateChildEnforceAndCost(i, target);
-            //     }
-            //     setOperation.setMutableState(
-            //         PhysicalSetOperation.DISTRIBUTE_TO_CHILD_INDEX, 
bucketShuffleBasicIndex);
-            // use partitioned shuffle
-            // } else {
-            for (int i = 0; i < originChildrenProperties.size(); i++) {
-                DistributionSpecHash current
-                        = (DistributionSpecHash) 
originChildrenProperties.get(i).getDistributionSpec();
-                if (current.getShuffleType() != ShuffleType.EXECUTION_BUCKETED
-                        || !bothSideShuffleKeysAreSameOrder(basic, current,
-                        (DistributionSpecHash) 
requiredProperties.get(0).getDistributionSpec(),
-                        (DistributionSpecHash) 
requiredProperties.get(i).getDistributionSpec())) {
+            int bucketShuffleBasicIndex = -1;
+            double basicRowCount = -1;
+
+            // Bucket shuffle for set operation is only valid when the FE 
plans the local
+            // shuffle: with the BE-side local-shuffle planner the backend 
cannot infer the
+            // correct local shuffle type for the set sink/probe and computes 
wrong results.
+            // When the planner is off, keep bucketShuffleBasicIndex = -1 and 
fall back to the
+            // execution-bucketed (partitioned) shuffle below.
+            ConnectContext setOperationContext = ConnectContext.get();
+            boolean enableLocalShufflePlanner = setOperationContext != null
+                    && 
setOperationContext.getSessionVariable().isEnableLocalShufflePlanner();
+
+            // find the bucket shuffle basic index: the largest natural / 
storage-bucketed child
+            // keeps its bucket distribution, every other child is 
bucket-shuffled to it.
+            if (enableLocalShufflePlanner) {
+                try {
+                    ImmutableSet<ShuffleType> supportBucketShuffleTypes = 
ImmutableSet.of(
+                            ShuffleType.NATURAL,
+                            ShuffleType.STORAGE_BUCKETED
+                    );
+                    for (int i = 0; i < originChildrenProperties.size(); i++) {
+                        PhysicalProperties originChildrenProperty = 
originChildrenProperties.get(i);
+                        DistributionSpec childDistribution = 
originChildrenProperty.getDistributionSpec();
+                        if (childDistribution instanceof DistributionSpecHash
+                                && supportBucketShuffleTypes.contains(
+                                        ((DistributionSpecHash) 
childDistribution).getShuffleType())
+                                && 
!(isBucketShuffleDownGrade(setOperation.child(i)))) {

Review Comment:
   **`isEnableBucketShuffleJoin` controls set operation bucket shuffle.**
   
   The `isBucketShuffleDownGrade` method (line 294) checks 
`ConnectContext.get().getSessionVariable().isEnableBucketShuffleJoin()`, a 
join-specific session variable. A user who disables bucket shuffle for joins 
(`enable_bucket_shuffle_join=false`) also silently loses the set operation 
bucket shuffle optimization, with no independent control.
   
   This coupling existed in the original commented-out PR #59006 code and is 
now activated. Consider whether set operations warrant a separate session 
variable, or at minimum document this coupling in the session variable 
description.



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/properties/RequestPropertyDeriver.java:
##########
@@ -339,14 +339,20 @@ public Void 
visitPhysicalSetOperation(PhysicalSetOperation setOperation, PlanCon
             // shuffle all column
             // TODO: for wide table, may be we should add a upper limit of 
shuffle columns
 
-            // TODO: open comment when support `enable_local_shuffle_planner` 
and change to REQUIRE
-            // intersect/except always need hash distribution, we use REQUIRE 
to auto select
-            // bucket shuffle or execution shuffle
+            // intersect/except always need hash distribution. Auto-selecting 
bucket shuffle
+            // (ShuffleType.REQUIRE) for set operation is only valid when the 
FE plans the local
+            // shuffle: with the BE-side local-shuffle planner the backend 
cannot infer the
+            // correct local shuffle type for the set sink/probe and computes 
wrong results, so
+            // fall back to EXECUTION_BUCKETED there.
+            ConnectContext setOperationContext = ConnectContext.get();

Review Comment:
   **Redundant `ConnectContext.get()` — use `this.connectContext` instead.**
   
   The class already holds a `private final ConnectContext connectContext` 
field (line 102), set in both constructors (lines 107, 112) and used throughout 
the class (lines 150, 160, 171, 182). The new code calls `ConnectContext.get()` 
(thread-local lookup) unnecessarily. Using the existing field would be both 
simpler and consistent with the rest of the class.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to