924060929 commented on code in PR #65129:
URL: https://github.com/apache/doris/pull/65129#discussion_r3523146573


##########
fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java:
##########
@@ -651,83 +652,109 @@ public List<List<PhysicalProperties>> 
visitPhysicalSetOperation(PhysicalSetOpera
         } else if (requiredDistributionSpec instanceof DistributionSpecHash) {
             // TODO: should use the most common hash spec as basic
             DistributionSpecHash basic = (DistributionSpecHash) 
requiredDistributionSpec;
-            // TODO: open comment when support `enable_local_shuffle_planner`
-            // int bucketShuffleBasicIndex = -1;
-            // double basicRowCount = -1;
-
-            // find the bucket shuffle basic index
-            // try {
-            //     ImmutableSet<ShuffleType> supportBucketShuffleTypes = 
ImmutableSet.of(
-            //             ShuffleType.NATURAL,
-            //             ShuffleType.STORAGE_BUCKETED
-            //     );
-            //     for (int i = 0; i < originChildrenProperties.size(); i++) {
-            //         PhysicalProperties originChildrenProperty = 
originChildrenProperties.get(i);
-            //         DistributionSpec childDistribution = 
originChildrenProperty.getDistributionSpec();
-            //         if (childDistribution instanceof DistributionSpecHash
-            //                 && supportBucketShuffleTypes.contains(
-            //                         ((DistributionSpecHash) 
childDistribution).getShuffleType())
-            //                 && 
!(isBucketShuffleDownGrade(setOperation.child(i)))) {
-            //             Statistics stats = setOperation.child(i).getStats();
-            //             double rowCount = stats.getRowCount();
-            //             if (rowCount > basicRowCount) {
-            //                 basicRowCount = rowCount;
-            //                 bucketShuffleBasicIndex = i;
-            //             }
-            //         }
-            //     }
-            // } catch (Throwable t) {
-            //     // catch stats exception
-            //     LOG.warn("Can not find the most (bucket num, rowCount): " + 
t, t);
-            //     bucketShuffleBasicIndex = -1;
-            // }
-
-            // use bucket shuffle
-            // if (bucketShuffleBasicIndex >= 0) {
-            //     DistributionSpecHash notShuffleSideRequire
-            //             = (DistributionSpecHash) 
requiredProperties.get(bucketShuffleBasicIndex)
-            //                   .getDistributionSpec();
-            //
-            //     DistributionSpecHash notNeedShuffleOutput
-            //             = (DistributionSpecHash) 
originChildrenProperties.get(bucketShuffleBasicIndex)
-            //                 .getDistributionSpec();
-            //
-            //     for (int i = 0; i < originChildrenProperties.size(); i++) {
-            //         DistributionSpecHash current
-            //                 = (DistributionSpecHash) 
originChildrenProperties.get(i).getDistributionSpec();
-            //         if (i == bucketShuffleBasicIndex) {
-            //             continue;
-            //         }
-            //
-            //         DistributionSpecHash currentRequire
-            //                 = (DistributionSpecHash) 
requiredProperties.get(i).getDistributionSpec();
-            //
-            //         PhysicalProperties target = calAnotherSideRequired(
-            //                 ShuffleType.STORAGE_BUCKETED,
-            //                 notNeedShuffleOutput, current,
-            //                 notShuffleSideRequire,
-            //                 currentRequire);
-            //         updateChildEnforceAndCost(i, target);
-            //     }
-            //     setOperation.setMutableState(
-            //         PhysicalSetOperation.DISTRIBUTE_TO_CHILD_INDEX, 
bucketShuffleBasicIndex);
-            // use partitioned shuffle
-            // } else {
-            for (int i = 0; i < originChildrenProperties.size(); i++) {
-                DistributionSpecHash current
-                        = (DistributionSpecHash) 
originChildrenProperties.get(i).getDistributionSpec();
-                if (current.getShuffleType() != ShuffleType.EXECUTION_BUCKETED
-                        || !bothSideShuffleKeysAreSameOrder(basic, current,
-                        (DistributionSpecHash) 
requiredProperties.get(0).getDistributionSpec(),
-                        (DistributionSpecHash) 
requiredProperties.get(i).getDistributionSpec())) {
-                    PhysicalProperties target = calAnotherSideRequired(
-                            ShuffleType.EXECUTION_BUCKETED, basic, current,
-                            (DistributionSpecHash) 
requiredProperties.get(0).getDistributionSpec(),
-                            (DistributionSpecHash) 
requiredProperties.get(i).getDistributionSpec());
+            int bucketShuffleBasicIndex = -1;
+            double basicRowCount = -1;
+
+            // Bucket shuffle for set operation is only valid when the FE 
plans the local
+            // shuffle: with the BE-side local-shuffle planner the backend 
cannot infer the
+            // correct local shuffle type for the set sink/probe and computes 
wrong results.
+            // It also requires the nereids distribute planner: the legacy 
coordinator only
+            // supports bucket-shuffle-partitioned sinks whose dest fragment 
contains a bucket
+            // shuffle join, so a bucket-shuffle set operation fragment cannot 
be scheduled there.
+            // Otherwise, keep bucketShuffleBasicIndex = -1 and fall back to 
the
+            // execution-bucketed (partitioned) shuffle below.
+            ConnectContext setOperationContext = ConnectContext.get();
+            boolean enableLocalShufflePlanner = setOperationContext != null
+                    && 
setOperationContext.getSessionVariable().isEnableLocalShuffle()
+                    && 
setOperationContext.getSessionVariable().isEnableLocalShufflePlanner()
+                    && 
SessionVariable.canUseNereidsDistributePlanner(setOperationContext);
+
+            // find the bucket shuffle basic index: the largest natural / 
storage-bucketed child
+            // keeps its bucket distribution, every other child is 
bucket-shuffled to it.
+            // isBucketShuffleDownGrade reuses the join-side heuristics on 
purpose, including
+            // the enable_bucket_shuffle_join switch and 
bucket_shuffle_downgrade_ratio: bucket
+            // shuffle for set operation belongs to the same optimization 
family as bucket
+            // shuffle join, so the join switches govern both instead of 
introducing a separate
+            // session variable.
+            if (enableLocalShufflePlanner) {
+                try {
+                    ImmutableSet<ShuffleType> supportBucketShuffleTypes = 
ImmutableSet.of(
+                            ShuffleType.NATURAL,
+                            ShuffleType.STORAGE_BUCKETED
+                    );
+                    for (int i = 0; i < originChildrenProperties.size(); i++) {
+                        PhysicalProperties originChildrenProperty = 
originChildrenProperties.get(i);
+                        DistributionSpec childDistribution = 
originChildrenProperty.getDistributionSpec();
+                        if (childDistribution instanceof DistributionSpecHash
+                                && supportBucketShuffleTypes.contains(

Review Comment:
   Addressed in both places. The basic-child selection now requires a known 
storage layout (`tableId >= 0`) — a STORAGE_BUCKETED child whose layout was 
cleared (e.g. a hash join output through 
`withShuffleTypeAndForbidColocateJoin`) cannot serve as the alignment anchor, 
so the set operation falls back to the execution-bucketed shuffle. The 
translator applies the same check before marking the node BUCKET_SHUFFLE, so an 
unproved layout never reaches the bucket-hash local exchange / scheduling path.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to