This is an automated email from the ASF dual-hosted git repository.

924060929 pushed a commit to branch fe_local_shuffle
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 6737acd386e09918a04b86dd3660c80023e39966
Author: 924060929 <[email protected]>
AuthorDate: Tue Jun 30 10:59:19 2026 +0800

    [opt](local shuffle) support bucket shuffle for set operation
    
    Re-enable bucket shuffle for set operation / union: the largest natural or
    storage-bucketed child keeps its bucket distribution and every other child 
is
    bucket-shuffled to it, avoiding a full reshuffle of the largest input (the 
same
    idea as bucket-shuffle join applied to set operations).
    
    This is only valid under the FE local-shuffle planner
    (enable_local_shuffle_planner): only then can the frontend plan the correct
    local shuffle type for the set sink/probe. With the BE-side local-shuffle
    planner the backend cannot infer the type and computes wrong results, so the
    plan falls back to execution-bucketed shuffle there and behavior is 
unchanged.
---
 .../glue/translator/PhysicalPlanTranslator.java    |  18 +--
 .../properties/ChildOutputPropertyDeriver.java     | 100 +++++++------
 .../properties/ChildrenPropertiesRegulator.java    | 158 +++++++++++----------
 .../nereids/properties/RequestPropertyDeriver.java |  14 +-
 .../bucket_shuffle_set_operation.groovy            |   3 -
 5 files changed, 158 insertions(+), 135 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
index 14e063ac7f2..86c9f6c4157 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
@@ -2496,14 +2496,16 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
             setOperationNode.setColocate(true);
         }
 
-        // TODO: open comment when support `enable_local_shuffle_planner`
-        // for (Plan child : setOperation.children()) {
-        //     PhysicalPlan childPhysicalPlan = (PhysicalPlan) child;
-        //     if 
(JoinUtils.isStorageBucketed(childPhysicalPlan.getPhysicalProperties())) {
-        //         
setOperationNode.setDistributionMode(DistributionMode.BUCKET_SHUFFLE);
-        //         break;
-        //     }
-        // }
+        // A storage-bucketed child means set-op bucket shuffle was chosen 
(only under the FE
+        // local-shuffle planner); mark the node BUCKET_SHUFFLE so the set 
sink/probe align by
+        // bucket instead of execution-bucketed hash.
+        for (Plan child : setOperation.children()) {
+            PhysicalPlan childPhysicalPlan = (PhysicalPlan) child;
+            if 
(JoinUtils.isStorageBucketed(childPhysicalPlan.getPhysicalProperties())) {
+                
setOperationNode.setDistributionMode(DistributionMode.BUCKET_SHUFFLE);
+                break;
+            }
+        }
 
         return setOperationFragment;
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java
index 8a71581ca97..d5858af29d8 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java
@@ -30,6 +30,7 @@ import org.apache.doris.nereids.trees.expressions.Slot;
 import org.apache.doris.nereids.trees.expressions.SlotReference;
 import 
org.apache.doris.nereids.trees.expressions.functions.table.TableValuedFunction;
 import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.algebra.Union;
 import org.apache.doris.nereids.trees.plans.physical.AbstractPhysicalSort;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalAssertNumRows;
 import 
org.apache.doris.nereids.trees.plans.physical.PhysicalBucketedHashAggregate;
@@ -71,9 +72,11 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashSet;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -440,53 +443,56 @@ public class ChildOutputPropertyDeriver extends 
PlanVisitor<PhysicalProperties,
             return PhysicalProperties.GATHER;
         }
 
-        // TODO: open comment when support `enable_local_shuffle_planner`
-        // int distributeToChildIndex
-        //         = 
setOperation.<Integer>getMutableState(PhysicalSetOperation.DISTRIBUTE_TO_CHILD_INDEX).orElse(-1);
-        // if (distributeToChildIndex >= 0
-        //         && childrenDistribution.get(distributeToChildIndex) 
instanceof DistributionSpecHash) {
-        //     DistributionSpecHash childDistribution
-        //             = (DistributionSpecHash) 
childrenDistribution.get(distributeToChildIndex);
-        //     List<SlotReference> childToIndex = 
setOperation.getRegularChildrenOutputs().get(distributeToChildIndex);
-        //     Map<ExprId, Integer> idToOutputIndex = new LinkedHashMap<>();
-        //     for (int j = 0; j < childToIndex.size(); j++) {
-        //         idToOutputIndex.put(childToIndex.get(j).getExprId(), j);
-        //     }
-        //
-        //     List<ExprId> orderedShuffledColumns = 
childDistribution.getOrderedShuffledColumns();
-        //     List<ExprId> setOperationDistributeColumnIds = new 
ArrayList<>();
-        //     for (ExprId tableDistributeColumnId : orderedShuffledColumns) {
-        //         Integer index = 
idToOutputIndex.get(tableDistributeColumnId);
-        //         if (index == null) {
-        //             break;
-        //         }
-        //         
setOperationDistributeColumnIds.add(setOperation.getOutput().get(index).getExprId());
-        //     }
-        //     // check whether the set operation output all distribution 
columns of the child
-        //     if (setOperationDistributeColumnIds.size() == 
orderedShuffledColumns.size()) {
-        //         boolean isUnion = setOperation instanceof Union;
-        //         boolean shuffleToRight = distributeToChildIndex > 0;
-        //         if (!isUnion && shuffleToRight) {
-        //             return new PhysicalProperties(
-        //                     new DistributionSpecHash(
-        //                             setOperationDistributeColumnIds,
-        //                             ShuffleType.EXECUTION_BUCKETED
-        //                     )
-        //             );
-        //         } else {
-        //             // keep the distribution as the child
-        //             return new PhysicalProperties(
-        //                     new DistributionSpecHash(
-        //                             setOperationDistributeColumnIds,
-        //                             childDistribution.getShuffleType(),
-        //                             childDistribution.getTableId(),
-        //                             childDistribution.getSelectedIndexId(),
-        //                             childDistribution.getPartitionIds()
-        //                     )
-        //             );
-        //         }
-        //     }
-        // }
+        // When set-op bucket shuffle is chosen (DISTRIBUTE_TO_CHILD_INDEX is 
set by
+        // ChildrenPropertiesRegulator, which only happens under the FE 
local-shuffle planner),
+        // the set operation keeps the basic child's bucket distribution as 
its own output so the
+        // bucket distribution propagates upward instead of being flattened to 
execution-bucketed.
+        int distributeToChildIndex
+                = 
setOperation.<Integer>getMutableState(PhysicalSetOperation.DISTRIBUTE_TO_CHILD_INDEX).orElse(-1);
+        if (distributeToChildIndex >= 0
+                && childrenDistribution.get(distributeToChildIndex) instanceof 
DistributionSpecHash) {
+            DistributionSpecHash childDistribution
+                    = (DistributionSpecHash) 
childrenDistribution.get(distributeToChildIndex);
+            List<SlotReference> childToIndex = 
setOperation.getRegularChildrenOutputs().get(distributeToChildIndex);
+            Map<ExprId, Integer> idToOutputIndex = new LinkedHashMap<>();
+            for (int j = 0; j < childToIndex.size(); j++) {
+                idToOutputIndex.put(childToIndex.get(j).getExprId(), j);
+            }
+
+            List<ExprId> orderedShuffledColumns = 
childDistribution.getOrderedShuffledColumns();
+            List<ExprId> setOperationDistributeColumnIds = new ArrayList<>();
+            for (ExprId tableDistributeColumnId : orderedShuffledColumns) {
+                Integer index = idToOutputIndex.get(tableDistributeColumnId);
+                if (index == null) {
+                    break;
+                }
+                
setOperationDistributeColumnIds.add(setOperation.getOutput().get(index).getExprId());
+            }
+            // check whether the set operation output all distribution columns 
of the child
+            if (setOperationDistributeColumnIds.size() == 
orderedShuffledColumns.size()) {
+                boolean isUnion = setOperation instanceof Union;
+                boolean shuffleToRight = distributeToChildIndex > 0;
+                if (!isUnion && shuffleToRight) {
+                    return new PhysicalProperties(
+                            new DistributionSpecHash(
+                                    setOperationDistributeColumnIds,
+                                    ShuffleType.EXECUTION_BUCKETED
+                            )
+                    );
+                } else {
+                    // keep the distribution as the child
+                    return new PhysicalProperties(
+                            new DistributionSpecHash(
+                                    setOperationDistributeColumnIds,
+                                    childDistribution.getShuffleType(),
+                                    childDistribution.getTableId(),
+                                    childDistribution.getSelectedIndexId(),
+                                    childDistribution.getPartitionIds()
+                            )
+                    );
+                }
+            }
+        }
 
         for (int i = 0; i < childrenDistribution.size(); i++) {
             DistributionSpec childDistribution = childrenDistribution.get(i);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java
index e5e0d9d1bd0..ca1691463a2 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java
@@ -57,6 +57,7 @@ import org.apache.doris.statistics.util.StatisticsUtil;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -651,83 +652,94 @@ public class ChildrenPropertiesRegulator extends 
PlanVisitor<List<List<PhysicalP
         } else if (requiredDistributionSpec instanceof DistributionSpecHash) {
             // TODO: should use the most common hash spec as basic
             DistributionSpecHash basic = (DistributionSpecHash) 
requiredDistributionSpec;
-            // TODO: open comment when support `enable_local_shuffle_planner`
-            // int bucketShuffleBasicIndex = -1;
-            // double basicRowCount = -1;
-
-            // find the bucket shuffle basic index
-            // try {
-            //     ImmutableSet<ShuffleType> supportBucketShuffleTypes = 
ImmutableSet.of(
-            //             ShuffleType.NATURAL,
-            //             ShuffleType.STORAGE_BUCKETED
-            //     );
-            //     for (int i = 0; i < originChildrenProperties.size(); i++) {
-            //         PhysicalProperties originChildrenProperty = 
originChildrenProperties.get(i);
-            //         DistributionSpec childDistribution = 
originChildrenProperty.getDistributionSpec();
-            //         if (childDistribution instanceof DistributionSpecHash
-            //                 && supportBucketShuffleTypes.contains(
-            //                         ((DistributionSpecHash) 
childDistribution).getShuffleType())
-            //                 && 
!(isBucketShuffleDownGrade(setOperation.child(i)))) {
-            //             Statistics stats = setOperation.child(i).getStats();
-            //             double rowCount = stats.getRowCount();
-            //             if (rowCount > basicRowCount) {
-            //                 basicRowCount = rowCount;
-            //                 bucketShuffleBasicIndex = i;
-            //             }
-            //         }
-            //     }
-            // } catch (Throwable t) {
-            //     // catch stats exception
-            //     LOG.warn("Can not find the most (bucket num, rowCount): " + 
t, t);
-            //     bucketShuffleBasicIndex = -1;
-            // }
-
-            // use bucket shuffle
-            // if (bucketShuffleBasicIndex >= 0) {
-            //     DistributionSpecHash notShuffleSideRequire
-            //             = (DistributionSpecHash) 
requiredProperties.get(bucketShuffleBasicIndex)
-            //                   .getDistributionSpec();
-            //
-            //     DistributionSpecHash notNeedShuffleOutput
-            //             = (DistributionSpecHash) 
originChildrenProperties.get(bucketShuffleBasicIndex)
-            //                 .getDistributionSpec();
-            //
-            //     for (int i = 0; i < originChildrenProperties.size(); i++) {
-            //         DistributionSpecHash current
-            //                 = (DistributionSpecHash) 
originChildrenProperties.get(i).getDistributionSpec();
-            //         if (i == bucketShuffleBasicIndex) {
-            //             continue;
-            //         }
-            //
-            //         DistributionSpecHash currentRequire
-            //                 = (DistributionSpecHash) 
requiredProperties.get(i).getDistributionSpec();
-            //
-            //         PhysicalProperties target = calAnotherSideRequired(
-            //                 ShuffleType.STORAGE_BUCKETED,
-            //                 notNeedShuffleOutput, current,
-            //                 notShuffleSideRequire,
-            //                 currentRequire);
-            //         updateChildEnforceAndCost(i, target);
-            //     }
-            //     setOperation.setMutableState(
-            //         PhysicalSetOperation.DISTRIBUTE_TO_CHILD_INDEX, 
bucketShuffleBasicIndex);
-            // use partitioned shuffle
-            // } else {
-            for (int i = 0; i < originChildrenProperties.size(); i++) {
-                DistributionSpecHash current
-                        = (DistributionSpecHash) 
originChildrenProperties.get(i).getDistributionSpec();
-                if (current.getShuffleType() != ShuffleType.EXECUTION_BUCKETED
-                        || !bothSideShuffleKeysAreSameOrder(basic, current,
-                        (DistributionSpecHash) 
requiredProperties.get(0).getDistributionSpec(),
-                        (DistributionSpecHash) 
requiredProperties.get(i).getDistributionSpec())) {
+            int bucketShuffleBasicIndex = -1;
+            double basicRowCount = -1;
+
+            // Bucket shuffle for set operation is only valid when the FE 
plans the local
+            // shuffle: with the BE-side local-shuffle planner the backend 
cannot infer the
+            // correct local shuffle type for the set sink/probe and computes 
wrong results.
+            // When the planner is off, keep bucketShuffleBasicIndex = -1 and 
fall back to the
+            // execution-bucketed (partitioned) shuffle below.
+            ConnectContext setOperationContext = ConnectContext.get();
+            boolean enableLocalShufflePlanner = setOperationContext != null
+                    && 
setOperationContext.getSessionVariable().isEnableLocalShufflePlanner();
+
+            // find the bucket shuffle basic index: the largest natural / 
storage-bucketed child
+            // keeps its bucket distribution, every other child is 
bucket-shuffled to it.
+            if (enableLocalShufflePlanner) {
+                try {
+                    ImmutableSet<ShuffleType> supportBucketShuffleTypes = 
ImmutableSet.of(
+                            ShuffleType.NATURAL,
+                            ShuffleType.STORAGE_BUCKETED
+                    );
+                    for (int i = 0; i < originChildrenProperties.size(); i++) {
+                        PhysicalProperties originChildrenProperty = 
originChildrenProperties.get(i);
+                        DistributionSpec childDistribution = 
originChildrenProperty.getDistributionSpec();
+                        if (childDistribution instanceof DistributionSpecHash
+                                && supportBucketShuffleTypes.contains(
+                                        ((DistributionSpecHash) 
childDistribution).getShuffleType())
+                                && 
!(isBucketShuffleDownGrade(setOperation.child(i)))) {
+                            Statistics stats = 
setOperation.child(i).getStats();
+                            double rowCount = stats.getRowCount();
+                            if (rowCount > basicRowCount) {
+                                basicRowCount = rowCount;
+                                bucketShuffleBasicIndex = i;
+                            }
+                        }
+                    }
+                } catch (Throwable t) {
+                    // catch stats exception
+                    LOG.warn("Can not find the most (bucket num, rowCount): " 
+ t, t);
+                    bucketShuffleBasicIndex = -1;
+                }
+            }
+
+            if (bucketShuffleBasicIndex >= 0) {
+                // use bucket shuffle
+                DistributionSpecHash notShuffleSideRequire
+                        = (DistributionSpecHash) 
requiredProperties.get(bucketShuffleBasicIndex)
+                              .getDistributionSpec();
+
+                DistributionSpecHash notNeedShuffleOutput
+                        = (DistributionSpecHash) 
originChildrenProperties.get(bucketShuffleBasicIndex)
+                            .getDistributionSpec();
+
+                for (int i = 0; i < originChildrenProperties.size(); i++) {
+                    DistributionSpecHash current
+                            = (DistributionSpecHash) 
originChildrenProperties.get(i).getDistributionSpec();
+                    if (i == bucketShuffleBasicIndex) {
+                        continue;
+                    }
+
+                    DistributionSpecHash currentRequire
+                            = (DistributionSpecHash) 
requiredProperties.get(i).getDistributionSpec();
+
                     PhysicalProperties target = calAnotherSideRequired(
-                            ShuffleType.EXECUTION_BUCKETED, basic, current,
-                            (DistributionSpecHash) 
requiredProperties.get(0).getDistributionSpec(),
-                            (DistributionSpecHash) 
requiredProperties.get(i).getDistributionSpec());
+                            ShuffleType.STORAGE_BUCKETED,
+                            notNeedShuffleOutput, current,
+                            notShuffleSideRequire,
+                            currentRequire);
                     updateChildEnforceAndCost(i, target);
                 }
+                setOperation.setMutableState(
+                        PhysicalSetOperation.DISTRIBUTE_TO_CHILD_INDEX, 
bucketShuffleBasicIndex);
+            } else {
+                // use partitioned shuffle
+                for (int i = 0; i < originChildrenProperties.size(); i++) {
+                    DistributionSpecHash current
+                            = (DistributionSpecHash) 
originChildrenProperties.get(i).getDistributionSpec();
+                    if (current.getShuffleType() != 
ShuffleType.EXECUTION_BUCKETED
+                            || !bothSideShuffleKeysAreSameOrder(basic, current,
+                            (DistributionSpecHash) 
requiredProperties.get(0).getDistributionSpec(),
+                            (DistributionSpecHash) 
requiredProperties.get(i).getDistributionSpec())) {
+                        PhysicalProperties target = calAnotherSideRequired(
+                                ShuffleType.EXECUTION_BUCKETED, basic, current,
+                                (DistributionSpecHash) 
requiredProperties.get(0).getDistributionSpec(),
+                                (DistributionSpecHash) 
requiredProperties.get(i).getDistributionSpec());
+                        updateChildEnforceAndCost(i, target);
+                    }
+                }
             }
-            // }
         }
         return ImmutableList.of(originChildrenProperties);
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/RequestPropertyDeriver.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/RequestPropertyDeriver.java
index 70f2b51665b..e0f4778b754 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/RequestPropertyDeriver.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/RequestPropertyDeriver.java
@@ -339,14 +339,20 @@ public class RequestPropertyDeriver extends 
PlanVisitor<Void, PlanContext> {
             // shuffle all column
             // TODO: for wide table, may be we should add a upper limit of 
shuffle columns
 
-            // TODO: open comment when support `enable_local_shuffle_planner` 
and change to REQUIRE
-            // intersect/except always need hash distribution, we use REQUIRE 
to auto select
-            // bucket shuffle or execution shuffle
+            // intersect/except always need hash distribution. Auto-selecting 
bucket shuffle
+            // (ShuffleType.REQUIRE) for set operation is only valid when the 
FE plans the local
+            // shuffle: with the BE-side local-shuffle planner the backend 
cannot infer the
+            // correct local shuffle type for the set sink/probe and computes 
wrong results, so
+            // fall back to EXECUTION_BUCKETED there.
+            ConnectContext setOperationContext = ConnectContext.get();
+            ShuffleType setOperationShuffleType = setOperationContext != null
+                    && 
setOperationContext.getSessionVariable().isEnableLocalShufflePlanner()
+                    ? ShuffleType.REQUIRE : ShuffleType.EXECUTION_BUCKETED;
             
addRequestPropertyToChildren(setOperation.getRegularChildrenOutputs().stream()
                     .map(childOutputs -> childOutputs.stream()
                             .map(SlotReference::getExprId)
                             .collect(ImmutableList.toImmutableList()))
-                    .map(l -> PhysicalProperties.createHash(l, 
ShuffleType.EXECUTION_BUCKETED))
+                    .map(l -> PhysicalProperties.createHash(l, 
setOperationShuffleType))
                     .collect(Collectors.toList()));
         }
         return null;
diff --git 
a/regression-test/suites/query_p0/set_operations/bucket_shuffle_set_operation.groovy
 
b/regression-test/suites/query_p0/set_operations/bucket_shuffle_set_operation.groovy
index 5533853eaa9..81ac60baca7 100644
--- 
a/regression-test/suites/query_p0/set_operations/bucket_shuffle_set_operation.groovy
+++ 
b/regression-test/suites/query_p0/set_operations/bucket_shuffle_set_operation.groovy
@@ -16,9 +16,6 @@
 // under the License.
 
 suite("bucket_shuffle_set_operation") {
-    // TODO: open comment when support `enable_local_shuffle_planner` and 
change to REQUIRE
-    return
-
     multi_sql """
         drop table if exists bucket_shuffle_set_operation1;
         create table bucket_shuffle_set_operation1(id int, value int) 
distributed by hash(id) buckets 10 properties('replication_num'='1');


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to