github-actions[bot] commented on code in PR #65129:
URL: https://github.com/apache/doris/pull/65129#discussion_r3518177866


##########
fe/fe-core/src/main/java/org/apache/doris/nereids/properties/RequestPropertyDeriver.java:
##########
@@ -339,14 +340,22 @@ public Void 
visitPhysicalSetOperation(PhysicalSetOperation setOperation, PlanCon
             // shuffle all column
             // TODO: for wide table, may be we should add a upper limit of 
shuffle columns
 
-            // TODO: open comment when support `enable_local_shuffle_planner` 
and change to REQUIRE
-            // intersect/except always need hash distribution, we use REQUIRE 
to auto select
-            // bucket shuffle or execution shuffle
+            // intersect/except always need hash distribution. Auto-selecting 
bucket shuffle
+            // (ShuffleType.REQUIRE) for set operation is only valid when the 
FE plans the local
+            // shuffle: with the BE-side local-shuffle planner the backend 
cannot infer the
+            // correct local shuffle type for the set sink/probe and computes 
wrong results.
+            // It also requires the nereids distribute planner: the legacy 
coordinator only
+            // supports bucket-shuffle-partitioned sinks whose dest fragment 
contains a bucket
+            // shuffle join. Fall back to EXECUTION_BUCKETED otherwise.

Review Comment:
   This still enables bucket-shuffle set operations when 
`enable_local_shuffle=false`. The predicate here only checks 
`enable_local_shuffle_planner`, and the same predicate is used later when 
choosing `DISTRIBUTE_TO_CHILD_INDEX` and marking the translated 
`SetOperationNode` as `BUCKET_SHUFFLE`. However 
`NereidsPlanner.addLocalExchangeAfterDistribute()` skips the FE local-exchange 
pass unless both `enable_local_shuffle_planner` and `enable_local_shuffle` are 
true, and BE only plans local shuffle when `enable_local_shuffle` is true and 
FE planning is off. So a legal session like `set enable_local_shuffle=false;` 
with the planner variable left at its default can still choose a 
storage-bucketed set operation, but neither side inserts the bucket/local hash 
exchanges that this PR relies on for set sink/probe alignment. Please gate this 
path on the same effective condition as the local-exchange pass 
(`enable_local_shuffle && enable_local_shuffle_planner && 
canUseNereidsDistributePlanner(...
 )`) in request derivation/regulation/translation, and ideally cover the 
disabled-local-shuffle case in the regression.



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java:
##########
@@ -440,53 +443,56 @@ public PhysicalProperties 
visitPhysicalSetOperation(PhysicalSetOperation setOper
             return PhysicalProperties.GATHER;
         }
 
-        // TODO: open comment when support `enable_local_shuffle_planner`
-        // int distributeToChildIndex
-        //         = 
setOperation.<Integer>getMutableState(PhysicalSetOperation.DISTRIBUTE_TO_CHILD_INDEX).orElse(-1);
-        // if (distributeToChildIndex >= 0
-        //         && childrenDistribution.get(distributeToChildIndex) 
instanceof DistributionSpecHash) {
-        //     DistributionSpecHash childDistribution
-        //             = (DistributionSpecHash) 
childrenDistribution.get(distributeToChildIndex);
-        //     List<SlotReference> childToIndex = 
setOperation.getRegularChildrenOutputs().get(distributeToChildIndex);
-        //     Map<ExprId, Integer> idToOutputIndex = new LinkedHashMap<>();
-        //     for (int j = 0; j < childToIndex.size(); j++) {
-        //         idToOutputIndex.put(childToIndex.get(j).getExprId(), j);
-        //     }
-        //
-        //     List<ExprId> orderedShuffledColumns = 
childDistribution.getOrderedShuffledColumns();
-        //     List<ExprId> setOperationDistributeColumnIds = new 
ArrayList<>();
-        //     for (ExprId tableDistributeColumnId : orderedShuffledColumns) {
-        //         Integer index = 
idToOutputIndex.get(tableDistributeColumnId);
-        //         if (index == null) {
-        //             break;
-        //         }
-        //         
setOperationDistributeColumnIds.add(setOperation.getOutput().get(index).getExprId());
-        //     }
-        //     // check whether the set operation output all distribution 
columns of the child
-        //     if (setOperationDistributeColumnIds.size() == 
orderedShuffledColumns.size()) {
-        //         boolean isUnion = setOperation instanceof Union;
-        //         boolean shuffleToRight = distributeToChildIndex > 0;
-        //         if (!isUnion && shuffleToRight) {
-        //             return new PhysicalProperties(
-        //                     new DistributionSpecHash(
-        //                             setOperationDistributeColumnIds,
-        //                             ShuffleType.EXECUTION_BUCKETED
-        //                     )
-        //             );
-        //         } else {
-        //             // keep the distribution as the child
-        //             return new PhysicalProperties(
-        //                     new DistributionSpecHash(
-        //                             setOperationDistributeColumnIds,
-        //                             childDistribution.getShuffleType(),
-        //                             childDistribution.getTableId(),
-        //                             childDistribution.getSelectedIndexId(),
-        //                             childDistribution.getPartitionIds()
-        //                     )
-        //             );
-        //         }
-        //     }
-        // }
+        // When set-op bucket shuffle is chosen (DISTRIBUTE_TO_CHILD_INDEX is 
set by
+        // ChildrenPropertiesRegulator, which only happens under the FE 
local-shuffle planner),
+        // the set operation keeps the basic child's bucket distribution as 
its own output so the

Review Comment:
   This branch depends on `DISTRIBUTE_TO_CHILD_INDEX` being present on the 
final `PhysicalSetOperation`, but that value is only mutable planner state. 
`chooseBestPlan()` rebuilds the selected node with `withChildren()`, 
`withGroupExpression()`, and `withPhysicalPropertiesAndStats()`, and the set-op 
`with...` methods only use `copyWithSameId()`, so the selected basic-child 
index is not carried onto the final plan. If a later post-processor rewrites 
anything, the default-enabled `ShuffleKeyPruner` runs 
`RecomputePhysicalPropertiesPostProcessor`, which calls this deriver again and 
copies only `KEY_GROUP`; now this branch is skipped and the code falls back to 
the old all-children derivation. For a bucket-shuffle set op with a 
NATURAL/STORAGE_BUCKETED basic child and STORAGE_BUCKETED enforced children, 
that can downgrade the output from the costed bucket hash to 
`STORAGE_ANY`/`EXECUTION_BUCKETED`, so an upstream join or aggregate can see a 
different distribution contract from the one sele
 cted by CBO. Please make the basic-child choice part of the immutable 
`PhysicalSetOperation` state copied by all `with...` methods, or recompute the 
same choice from child properties, and cover a recompute case with shuffle-key 
pruning enabled.



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java:
##########
@@ -440,53 +443,56 @@ public PhysicalProperties 
visitPhysicalSetOperation(PhysicalSetOperation setOper
             return PhysicalProperties.GATHER;
         }
 
-        // TODO: open comment when support `enable_local_shuffle_planner`
-        // int distributeToChildIndex
-        //         = 
setOperation.<Integer>getMutableState(PhysicalSetOperation.DISTRIBUTE_TO_CHILD_INDEX).orElse(-1);
-        // if (distributeToChildIndex >= 0
-        //         && childrenDistribution.get(distributeToChildIndex) 
instanceof DistributionSpecHash) {
-        //     DistributionSpecHash childDistribution
-        //             = (DistributionSpecHash) 
childrenDistribution.get(distributeToChildIndex);
-        //     List<SlotReference> childToIndex = 
setOperation.getRegularChildrenOutputs().get(distributeToChildIndex);
-        //     Map<ExprId, Integer> idToOutputIndex = new LinkedHashMap<>();
-        //     for (int j = 0; j < childToIndex.size(); j++) {
-        //         idToOutputIndex.put(childToIndex.get(j).getExprId(), j);
-        //     }
-        //
-        //     List<ExprId> orderedShuffledColumns = 
childDistribution.getOrderedShuffledColumns();
-        //     List<ExprId> setOperationDistributeColumnIds = new 
ArrayList<>();
-        //     for (ExprId tableDistributeColumnId : orderedShuffledColumns) {
-        //         Integer index = 
idToOutputIndex.get(tableDistributeColumnId);
-        //         if (index == null) {
-        //             break;
-        //         }
-        //         
setOperationDistributeColumnIds.add(setOperation.getOutput().get(index).getExprId());
-        //     }
-        //     // check whether the set operation output all distribution 
columns of the child
-        //     if (setOperationDistributeColumnIds.size() == 
orderedShuffledColumns.size()) {
-        //         boolean isUnion = setOperation instanceof Union;
-        //         boolean shuffleToRight = distributeToChildIndex > 0;
-        //         if (!isUnion && shuffleToRight) {
-        //             return new PhysicalProperties(
-        //                     new DistributionSpecHash(
-        //                             setOperationDistributeColumnIds,
-        //                             ShuffleType.EXECUTION_BUCKETED
-        //                     )
-        //             );
-        //         } else {
-        //             // keep the distribution as the child
-        //             return new PhysicalProperties(
-        //                     new DistributionSpecHash(
-        //                             setOperationDistributeColumnIds,
-        //                             childDistribution.getShuffleType(),
-        //                             childDistribution.getTableId(),
-        //                             childDistribution.getSelectedIndexId(),
-        //                             childDistribution.getPartitionIds()
-        //                     )
-        //             );
-        //         }
-        //     }
-        // }
+        // When set-op bucket shuffle is chosen (DISTRIBUTE_TO_CHILD_INDEX is 
set by
+        // ChildrenPropertiesRegulator, which only happens under the FE 
local-shuffle planner),
+        // the set operation keeps the basic child's bucket distribution as 
its own output so the
+        // bucket distribution propagates upward instead of being flattened to 
execution-bucketed.
+        int distributeToChildIndex

Review Comment:
   This branch depends on `DISTRIBUTE_TO_CHILD_INDEX` being present on the 
final `PhysicalSetOperation`, but that value is only mutable planner state. 
`chooseBestPlan()` rebuilds the selected node with `withChildren()`, 
`withGroupExpression()`, and `withPhysicalPropertiesAndStats()`, and the set-op 
`with...` methods only use `copyWithSameId()`, so the selected basic-child 
index is not carried onto the final plan. If a later post-processor rewrites 
anything, the default-enabled `ShuffleKeyPruner` runs 
`RecomputePhysicalPropertiesPostProcessor`, which calls this deriver again and 
copies only `KEY_GROUP`; now this branch is skipped and the code falls back to 
the old all-children derivation. For a bucket-shuffle set op with a 
NATURAL/STORAGE_BUCKETED basic child and STORAGE_BUCKETED enforced children, 
that can downgrade the output from the costed bucket hash to 
`STORAGE_ANY`/`EXECUTION_BUCKETED`, so an upstream join or aggregate can see a 
different distribution contract from the one sele
 cted by CBO. Please make the basic-child choice part of the immutable 
`PhysicalSetOperation` state copied by all `with...` methods, or recompute the 
same choice from child properties, and cover a recompute case with shuffle-key 
pruning enabled.



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/properties/RequestPropertyDeriver.java:
##########
@@ -339,14 +340,22 @@ public Void 
visitPhysicalSetOperation(PhysicalSetOperation setOperation, PlanCon
             // shuffle all column
             // TODO: for wide table, may be we should add a upper limit of 
shuffle columns
 
-            // TODO: open comment when support `enable_local_shuffle_planner` 
and change to REQUIRE
-            // intersect/except always need hash distribution, we use REQUIRE 
to auto select
-            // bucket shuffle or execution shuffle
+            // intersect/except always need hash distribution. Auto-selecting 
bucket shuffle
+            // (ShuffleType.REQUIRE) for set operation is only valid when the 
FE plans the local
+            // shuffle: with the BE-side local-shuffle planner the backend 
cannot infer the
+            // correct local shuffle type for the set sink/probe and computes 
wrong results.
+            // It also requires the nereids distribute planner: the legacy 
coordinator only
+            // supports bucket-shuffle-partitioned sinks whose dest fragment 
contains a bucket
+            // shuffle join. Fall back to EXECUTION_BUCKETED otherwise.
+            ShuffleType setOperationShuffleType = connectContext != null

Review Comment:
   This still enables bucket-shuffle set operations when 
`enable_local_shuffle=false`. The predicate here only checks 
`enable_local_shuffle_planner`, and the same predicate is used later when 
choosing `DISTRIBUTE_TO_CHILD_INDEX` and marking the translated 
`SetOperationNode` as `BUCKET_SHUFFLE`. However 
`NereidsPlanner.addLocalExchangeAfterDistribute()` skips the FE local-exchange 
pass unless both `enable_local_shuffle_planner` and `enable_local_shuffle` are 
true, and BE only plans local shuffle when `enable_local_shuffle` is true and 
FE planning is off. So a legal session like `set enable_local_shuffle=false;` 
with the planner variable left at its default can still choose a 
storage-bucketed set operation, but neither side inserts the bucket/local hash 
exchanges that this PR relies on for set sink/probe alignment. Please gate this 
path on the same effective condition as the local-exchange pass 
(`enable_local_shuffle && enable_local_shuffle_planner && 
canUseNereidsDistributePlanner(...
 )`) in request derivation/regulation/translation, and ideally cover the 
disabled-local-shuffle case in the regression.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to