924060929 commented on code in PR #65129:
URL: https://github.com/apache/doris/pull/65129#discussion_r3522757931
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java:
##########
@@ -440,53 +443,92 @@ public PhysicalProperties
visitPhysicalSetOperation(PhysicalSetOperation setOper
return PhysicalProperties.GATHER;
}
- // TODO: open comment when support `enable_local_shuffle_planner`
- // int distributeToChildIndex
- // =
setOperation.<Integer>getMutableState(PhysicalSetOperation.DISTRIBUTE_TO_CHILD_INDEX).orElse(-1);
- // if (distributeToChildIndex >= 0
- // && childrenDistribution.get(distributeToChildIndex)
instanceof DistributionSpecHash) {
- // DistributionSpecHash childDistribution
- // = (DistributionSpecHash)
childrenDistribution.get(distributeToChildIndex);
- // List<SlotReference> childToIndex =
setOperation.getRegularChildrenOutputs().get(distributeToChildIndex);
- // Map<ExprId, Integer> idToOutputIndex = new LinkedHashMap<>();
- // for (int j = 0; j < childToIndex.size(); j++) {
- // idToOutputIndex.put(childToIndex.get(j).getExprId(), j);
- // }
+ // When set-op bucket shuffle is chosen, the set operation keeps the
basic child's bucket
+ // distribution as its own output so the bucket distribution
propagates upward instead of
+ // being flattened to execution-bucketed. The basic child is
recomputed from the children
+ // distributions instead of being carried as mutable planner state:
mutable state does not
+ // survive the with-copies in chooseBestPlan() and the
+ // RecomputePhysicalPropertiesPostProcessor re-derivation, while the
recomputation below is
+ // deterministic on any copy of the plan.
//
- // List<ExprId> orderedShuffledColumns =
childDistribution.getOrderedShuffledColumns();
- // List<ExprId> setOperationDistributeColumnIds = new
ArrayList<>();
- // for (ExprId tableDistributeColumnId : orderedShuffledColumns) {
- // Integer index =
idToOutputIndex.get(tableDistributeColumnId);
- // if (index == null) {
- // break;
- // }
- //
setOperationDistributeColumnIds.add(setOperation.getOutput().get(index).getExprId());
- // }
- // // check whether the set operation output all distribution
columns of the child
- // if (setOperationDistributeColumnIds.size() ==
orderedShuffledColumns.size()) {
- // boolean isUnion = setOperation instanceof Union;
- // boolean shuffleToRight = distributeToChildIndex > 0;
- // if (!isUnion && shuffleToRight) {
- // return new PhysicalProperties(
- // new DistributionSpecHash(
- // setOperationDistributeColumnIds,
- // ShuffleType.EXECUTION_BUCKETED
- // )
- // );
- // } else {
- // // keep the distribution as the child
- // return new PhysicalProperties(
- // new DistributionSpecHash(
- // setOperationDistributeColumnIds,
- // childDistribution.getShuffleType(),
- // childDistribution.getTableId(),
- // childDistribution.getSelectedIndexId(),
- // childDistribution.getPartitionIds()
- // )
- // );
- // }
- // }
- // }
+ // The bucket-shuffle signature is structural: every child is
hash-distributed by NATURAL
+ // or STORAGE_BUCKETED, and at least one child is STORAGE_BUCKETED (an
enforced
+ // bucket-shuffle child always reports STORAGE_BUCKETED and shares the
basic child's
+ // bucket layout). Requiring one STORAGE_BUCKETED child rejects the
unaligned case where
+ // several children merely keep their own NATURAL distributions (e.g.
union all without a
+ // distribution requirement). The basic child keeps its NATURAL
distribution, so prefer
+ // the first NATURAL child and fall back to the first STORAGE_BUCKETED
one.
+ int distributeToChildIndex = -1;
+ int firstNaturalIndex = -1;
+ int firstStorageBucketedIndex = -1;
+ boolean allChildrenBucketAligned = true;
+ for (int i = 0; i < childrenDistribution.size(); i++) {
+ DistributionSpec childDistributionSpec =
childrenDistribution.get(i);
+ if (!(childDistributionSpec instanceof DistributionSpecHash)) {
+ allChildrenBucketAligned = false;
+ break;
+ }
+ ShuffleType childShuffleType = ((DistributionSpecHash)
childDistributionSpec).getShuffleType();
+ if (childShuffleType == ShuffleType.NATURAL) {
+ if (firstNaturalIndex < 0) {
+ firstNaturalIndex = i;
+ }
+ } else if (childShuffleType == ShuffleType.STORAGE_BUCKETED) {
+ if (firstStorageBucketedIndex < 0) {
+ firstStorageBucketedIndex = i;
+ }
+ } else {
+ allChildrenBucketAligned = false;
+ break;
+ }
+ }
+ if (allChildrenBucketAligned && firstStorageBucketedIndex >= 0) {
Review Comment:
Both points addressed. (1) The alignment is now provable from immutable plan
information: `ChildrenPropertiesRegulator` makes the enforced children carry
the basic child's storage layout (table / index / partitions) in their
`DistributionSpecHash` — which also matches the physical reality, since the
enforced child is re-distributed by the basic child's bucket function — and
this deriver only claims a bucket property when every child is NATURAL /
STORAGE_BUCKETED **with the same storage layout** and at least one child is
STORAGE_BUCKETED. The union-ANY shape (a NATURAL scan plus a lower
bucket-shuffle plan on another table's buckets) is rejected by the layout
equality. (2) For the all-STORAGE_BUCKETED case the picked index is irrelevant
for union (every child shares the same layout, the claims are identical), and
intersect / except now conservatively downgrade to `EXECUTION_BUCKETED` like
the shuffle-to-right case since the regulator's actual pick is unknown. Added a
nested set
-operation regression case.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]