924060929 commented on code in PR #65129:
URL: https://github.com/apache/doris/pull/65129#discussion_r3523013266
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java:
##########
@@ -440,53 +443,131 @@ public PhysicalProperties
visitPhysicalSetOperation(PhysicalSetOperation setOper
return PhysicalProperties.GATHER;
}
- // TODO: open comment when support `enable_local_shuffle_planner`
- // int distributeToChildIndex
- // =
setOperation.<Integer>getMutableState(PhysicalSetOperation.DISTRIBUTE_TO_CHILD_INDEX).orElse(-1);
- // if (distributeToChildIndex >= 0
- // && childrenDistribution.get(distributeToChildIndex)
instanceof DistributionSpecHash) {
- // DistributionSpecHash childDistribution
- // = (DistributionSpecHash)
childrenDistribution.get(distributeToChildIndex);
- // List<SlotReference> childToIndex =
setOperation.getRegularChildrenOutputs().get(distributeToChildIndex);
- // Map<ExprId, Integer> idToOutputIndex = new LinkedHashMap<>();
- // for (int j = 0; j < childToIndex.size(); j++) {
- // idToOutputIndex.put(childToIndex.get(j).getExprId(), j);
- // }
+ // When set-op bucket shuffle is chosen, the set operation keeps the
basic child's bucket
+ // distribution as its own output so the bucket distribution
propagates upward instead of
+ // being flattened to execution-bucketed. The basic child is
recomputed from the children
+ // distributions instead of being carried as mutable planner state:
mutable state does not
+ // survive the with-copies in chooseBestPlan() and the
+ // RecomputePhysicalPropertiesPostProcessor re-derivation, while the
recomputation below is
+ // deterministic on any copy of the plan.
//
- // List<ExprId> orderedShuffledColumns =
childDistribution.getOrderedShuffledColumns();
- // List<ExprId> setOperationDistributeColumnIds = new
ArrayList<>();
- // for (ExprId tableDistributeColumnId : orderedShuffledColumns) {
- // Integer index =
idToOutputIndex.get(tableDistributeColumnId);
- // if (index == null) {
- // break;
- // }
- //
setOperationDistributeColumnIds.add(setOperation.getOutput().get(index).getExprId());
- // }
- // // check whether the set operation output all distribution
columns of the child
- // if (setOperationDistributeColumnIds.size() ==
orderedShuffledColumns.size()) {
- // boolean isUnion = setOperation instanceof Union;
- // boolean shuffleToRight = distributeToChildIndex > 0;
- // if (!isUnion && shuffleToRight) {
- // return new PhysicalProperties(
- // new DistributionSpecHash(
- // setOperationDistributeColumnIds,
- // ShuffleType.EXECUTION_BUCKETED
- // )
- // );
- // } else {
- // // keep the distribution as the child
- // return new PhysicalProperties(
- // new DistributionSpecHash(
- // setOperationDistributeColumnIds,
- // childDistribution.getShuffleType(),
- // childDistribution.getTableId(),
- // childDistribution.getSelectedIndexId(),
- // childDistribution.getPartitionIds()
- // )
- // );
- // }
- // }
- // }
+ // The bucket-shuffle signature is structural: every child is
hash-distributed by NATURAL
+ // or STORAGE_BUCKETED with the same storage layout (table / index /
partitions — an
+ // enforced bucket-shuffle child carries the basic child's layout, see
+ // ChildrenPropertiesRegulator), and at least one child is
STORAGE_BUCKETED. The layout
+ // equality rejects an un-enforced mix such as a NATURAL scan plus a
lower bucket-shuffle
+ // plan distributed by another table's buckets (reachable through the
ANY child request of
+ // union), and requiring one STORAGE_BUCKETED child rejects children
that merely keep
+ // their own NATURAL distributions without any set-op enforcement. The
basic child keeps
+ // its NATURAL distribution, so prefer the first NATURAL child and
fall back to the first
+ // STORAGE_BUCKETED one (then every child shares the same layout, so
the claim does not
+ // depend on which child the regulator actually picked).
+ int distributeToChildIndex = -1;
+ int firstNaturalIndex = -1;
+ int firstStorageBucketedIndex = -1;
+ boolean allChildrenBucketAligned = true;
+ DistributionSpecHash firstChildHash = null;
+ for (int i = 0; i < childrenDistribution.size(); i++) {
+ DistributionSpec childDistributionSpec =
childrenDistribution.get(i);
+ if (!(childDistributionSpec instanceof DistributionSpecHash)) {
+ allChildrenBucketAligned = false;
+ break;
+ }
+ DistributionSpecHash childHash = (DistributionSpecHash)
childDistributionSpec;
+ if (firstChildHash == null) {
+ firstChildHash = childHash;
+ } else if (childHash.getTableId() != firstChildHash.getTableId()
+ || childHash.getSelectedIndexId() !=
firstChildHash.getSelectedIndexId()
+ ||
!childHash.getPartitionIds().equals(firstChildHash.getPartitionIds())) {
+ allChildrenBucketAligned = false;
+ break;
+ }
+ ShuffleType childShuffleType = childHash.getShuffleType();
+ if (childShuffleType == ShuffleType.NATURAL) {
+ if (firstNaturalIndex < 0) {
+ firstNaturalIndex = i;
+ }
+ } else if (childShuffleType == ShuffleType.STORAGE_BUCKETED) {
+ if (firstStorageBucketedIndex < 0) {
+ firstStorageBucketedIndex = i;
+ }
+ } else {
+ allChildrenBucketAligned = false;
+ break;
+ }
+ }
+ if (allChildrenBucketAligned && firstStorageBucketedIndex >= 0) {
+ distributeToChildIndex = firstNaturalIndex >= 0 ?
firstNaturalIndex : firstStorageBucketedIndex;
+ }
+ if (distributeToChildIndex >= 0) {
+ DistributionSpecHash childDistribution
+ = (DistributionSpecHash)
childrenDistribution.get(distributeToChildIndex);
+ // A shared storage layout alone does not prove alignment: two
children may be
+ // bucketed by columns that feed different set-output positions
(e.g. one child
+ // bucketed by the column feeding output k, another by the column
feeding output v
+ // of the same table layout). Map every child's hash columns to
set-output
+ // positions and require all children to land on the same
positions in the same
+ // order; otherwise fall through to the generic derivation below.
+ List<Integer> outputPositions = null;
+ boolean allChildrenSamePositions = true;
+ for (int i = 0; i < childrenDistribution.size() &&
allChildrenSamePositions; i++) {
+ DistributionSpecHash childHash = (DistributionSpecHash)
childrenDistribution.get(i);
+ List<SlotReference> childOutput =
setOperation.getRegularChildrenOutputs().get(i);
+ Map<ExprId, Integer> idToOutputIndex = new LinkedHashMap<>();
+ for (int j = 0; j < childOutput.size(); j++) {
+ idToOutputIndex.put(childOutput.get(j).getExprId(), j);
+ }
+ List<Integer> positions = new ArrayList<>();
+ for (ExprId shuffledColumnId :
childHash.getOrderedShuffledColumns()) {
+ Integer index = idToOutputIndex.get(shuffledColumnId);
+ if (index == null) {
+ allChildrenSamePositions = false;
+ break;
+ }
+ positions.add(index);
+ }
+ if (!allChildrenSamePositions) {
+ break;
+ }
+ if (outputPositions == null) {
+ outputPositions = positions;
+ } else if (!outputPositions.equals(positions)) {
+ allChildrenSamePositions = false;
+ }
Review Comment:
Addressed. The generic fallback now validates the storage layout before
returning a STORAGE_BUCKETED property: when the children share the offsets and
the STORAGE_BUCKETED shuffle type but their storage layouts (table / index /
partitions) differ, it returns a non-specific property (`createAnyFromHash`) so
the parent re-aligns the data. The NATURAL branch is left as-is on purpose: it
predates this PR and a colocate group legitimately spans different table ids
there, so tightening it with a plain layout check would regress valid colocate
set operations — that deserves a colocate-group-aware check in a separate
change if needed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]