924060929 commented on code in PR #65129:
URL: https://github.com/apache/doris/pull/65129#discussion_r3518332106
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/properties/RequestPropertyDeriver.java:
##########
@@ -339,14 +340,22 @@ public Void
visitPhysicalSetOperation(PhysicalSetOperation setOperation, PlanCon
// shuffle all column
// TODO: for wide table, may be we should add a upper limit of
shuffle columns
- // TODO: open comment when support `enable_local_shuffle_planner`
and change to REQUIRE
- // intersect/except always need hash distribution, we use REQUIRE
to auto select
- // bucket shuffle or execution shuffle
+ // intersect/except always need hash distribution. Auto-selecting
bucket shuffle
+ // (ShuffleType.REQUIRE) for set operation is only valid when the
FE plans the local
+ // shuffle: with the BE-side local-shuffle planner the backend
cannot infer the
+ // correct local shuffle type for the set sink/probe and computes
wrong results.
+ // It also requires the nereids distribute planner: the legacy
coordinator only
+ // supports bucket-shuffle-partitioned sinks whose dest fragment
contains a bucket
+ // shuffle join. Fall back to EXECUTION_BUCKETED otherwise.
+ ShuffleType setOperationShuffleType = connectContext != null
Review Comment:
Good catch. Gated all three sites on the same effective condition as the
local-exchange pass: `enable_local_shuffle && enable_local_shuffle_planner &&
canUseNereidsDistributePlanner(...)`, and added a regression case that verifies
no bucket shuffle is chosen and the result stays correct with
`enable_local_shuffle=false`.
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java:
##########
@@ -440,53 +443,56 @@ public PhysicalProperties
visitPhysicalSetOperation(PhysicalSetOperation setOper
return PhysicalProperties.GATHER;
}
- // TODO: open comment when support `enable_local_shuffle_planner`
- // int distributeToChildIndex
- // =
setOperation.<Integer>getMutableState(PhysicalSetOperation.DISTRIBUTE_TO_CHILD_INDEX).orElse(-1);
- // if (distributeToChildIndex >= 0
- // && childrenDistribution.get(distributeToChildIndex)
instanceof DistributionSpecHash) {
- // DistributionSpecHash childDistribution
- // = (DistributionSpecHash)
childrenDistribution.get(distributeToChildIndex);
- // List<SlotReference> childToIndex =
setOperation.getRegularChildrenOutputs().get(distributeToChildIndex);
- // Map<ExprId, Integer> idToOutputIndex = new LinkedHashMap<>();
- // for (int j = 0; j < childToIndex.size(); j++) {
- // idToOutputIndex.put(childToIndex.get(j).getExprId(), j);
- // }
- //
- // List<ExprId> orderedShuffledColumns =
childDistribution.getOrderedShuffledColumns();
- // List<ExprId> setOperationDistributeColumnIds = new
ArrayList<>();
- // for (ExprId tableDistributeColumnId : orderedShuffledColumns) {
- // Integer index =
idToOutputIndex.get(tableDistributeColumnId);
- // if (index == null) {
- // break;
- // }
- //
setOperationDistributeColumnIds.add(setOperation.getOutput().get(index).getExprId());
- // }
- // // check whether the set operation output all distribution
columns of the child
- // if (setOperationDistributeColumnIds.size() ==
orderedShuffledColumns.size()) {
- // boolean isUnion = setOperation instanceof Union;
- // boolean shuffleToRight = distributeToChildIndex > 0;
- // if (!isUnion && shuffleToRight) {
- // return new PhysicalProperties(
- // new DistributionSpecHash(
- // setOperationDistributeColumnIds,
- // ShuffleType.EXECUTION_BUCKETED
- // )
- // );
- // } else {
- // // keep the distribution as the child
- // return new PhysicalProperties(
- // new DistributionSpecHash(
- // setOperationDistributeColumnIds,
- // childDistribution.getShuffleType(),
- // childDistribution.getTableId(),
- // childDistribution.getSelectedIndexId(),
- // childDistribution.getPartitionIds()
- // )
- // );
- // }
- // }
- // }
+ // When set-op bucket shuffle is chosen (DISTRIBUTE_TO_CHILD_INDEX is
set by
+ // ChildrenPropertiesRegulator, which only happens under the FE
local-shuffle planner),
+ // the set operation keeps the basic child's bucket distribution as
its own output so the
+ // bucket distribution propagates upward instead of being flattened to
execution-bucketed.
+ int distributeToChildIndex
Review Comment:
Good catch — the mutable state indeed does not survive `chooseBestPlan()` /
`RecomputePhysicalPropertiesPostProcessor`. Removed `DISTRIBUTE_TO_CHILD_INDEX`
entirely: `ChildOutputPropertyDeriver` now recomputes the basic child
structurally from the children distributions (every child hash-distributed by
NATURAL or STORAGE_BUCKETED, and at least one STORAGE_BUCKETED child — an
enforced bucket-shuffle child always reports STORAGE_BUCKETED — which rejects
the unaligned all-NATURAL case such as union all without a distribution
requirement; then prefer the first NATURAL child as the basic, falling back to
the first STORAGE_BUCKETED one). The derivation is deterministic on any copy of
the plan, so costing and the post-processor re-derivation agree.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]