924060929 commented on code in PR #65129:
URL: https://github.com/apache/doris/pull/65129#discussion_r3523013266


##########
fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java:
##########
@@ -440,53 +443,131 @@ public PhysicalProperties 
visitPhysicalSetOperation(PhysicalSetOperation setOper
             return PhysicalProperties.GATHER;
         }
 
-        // TODO: open comment when support `enable_local_shuffle_planner`
-        // int distributeToChildIndex
-        //         = 
setOperation.<Integer>getMutableState(PhysicalSetOperation.DISTRIBUTE_TO_CHILD_INDEX).orElse(-1);
-        // if (distributeToChildIndex >= 0
-        //         && childrenDistribution.get(distributeToChildIndex) 
instanceof DistributionSpecHash) {
-        //     DistributionSpecHash childDistribution
-        //             = (DistributionSpecHash) 
childrenDistribution.get(distributeToChildIndex);
-        //     List<SlotReference> childToIndex = 
setOperation.getRegularChildrenOutputs().get(distributeToChildIndex);
-        //     Map<ExprId, Integer> idToOutputIndex = new LinkedHashMap<>();
-        //     for (int j = 0; j < childToIndex.size(); j++) {
-        //         idToOutputIndex.put(childToIndex.get(j).getExprId(), j);
-        //     }
+        // When set-op bucket shuffle is chosen, the set operation keeps the 
basic child's bucket
+        // distribution as its own output so the bucket distribution 
propagates upward instead of
+        // being flattened to execution-bucketed. The basic child is 
recomputed from the children
+        // distributions instead of being carried as mutable planner state: 
mutable state does not
+        // survive the with-copies in chooseBestPlan() and the
+        // RecomputePhysicalPropertiesPostProcessor re-derivation, while the 
recomputation below is
+        // deterministic on any copy of the plan.
         //
-        //     List<ExprId> orderedShuffledColumns = 
childDistribution.getOrderedShuffledColumns();
-        //     List<ExprId> setOperationDistributeColumnIds = new 
ArrayList<>();
-        //     for (ExprId tableDistributeColumnId : orderedShuffledColumns) {
-        //         Integer index = 
idToOutputIndex.get(tableDistributeColumnId);
-        //         if (index == null) {
-        //             break;
-        //         }
-        //         
setOperationDistributeColumnIds.add(setOperation.getOutput().get(index).getExprId());
-        //     }
-        //     // check whether the set operation output all distribution 
columns of the child
-        //     if (setOperationDistributeColumnIds.size() == 
orderedShuffledColumns.size()) {
-        //         boolean isUnion = setOperation instanceof Union;
-        //         boolean shuffleToRight = distributeToChildIndex > 0;
-        //         if (!isUnion && shuffleToRight) {
-        //             return new PhysicalProperties(
-        //                     new DistributionSpecHash(
-        //                             setOperationDistributeColumnIds,
-        //                             ShuffleType.EXECUTION_BUCKETED
-        //                     )
-        //             );
-        //         } else {
-        //             // keep the distribution as the child
-        //             return new PhysicalProperties(
-        //                     new DistributionSpecHash(
-        //                             setOperationDistributeColumnIds,
-        //                             childDistribution.getShuffleType(),
-        //                             childDistribution.getTableId(),
-        //                             childDistribution.getSelectedIndexId(),
-        //                             childDistribution.getPartitionIds()
-        //                     )
-        //             );
-        //         }
-        //     }
-        // }
+        // The bucket-shuffle signature is structural: every child is 
hash-distributed by NATURAL
+        // or STORAGE_BUCKETED with the same storage layout (table / index / 
partitions — an
+        // enforced bucket-shuffle child carries the basic child's layout, see
+        // ChildrenPropertiesRegulator), and at least one child is 
STORAGE_BUCKETED. The layout
+        // equality rejects an un-enforced mix such as a NATURAL scan plus a 
lower bucket-shuffle
+        // plan distributed by another table's buckets (reachable through the 
ANY child request of
+        // union), and requiring one STORAGE_BUCKETED child rejects children 
that merely keep
+        // their own NATURAL distributions without any set-op enforcement. The 
basic child keeps
+        // its NATURAL distribution, so prefer the first NATURAL child and 
fall back to the first
+        // STORAGE_BUCKETED one (then every child shares the same layout, so 
the claim does not
+        // depend on which child the regulator actually picked).
+        int distributeToChildIndex = -1;
+        int firstNaturalIndex = -1;
+        int firstStorageBucketedIndex = -1;
+        boolean allChildrenBucketAligned = true;
+        DistributionSpecHash firstChildHash = null;
+        for (int i = 0; i < childrenDistribution.size(); i++) {
+            DistributionSpec childDistributionSpec = 
childrenDistribution.get(i);
+            if (!(childDistributionSpec instanceof DistributionSpecHash)) {
+                allChildrenBucketAligned = false;
+                break;
+            }
+            DistributionSpecHash childHash = (DistributionSpecHash) 
childDistributionSpec;
+            if (firstChildHash == null) {
+                firstChildHash = childHash;
+            } else if (childHash.getTableId() != firstChildHash.getTableId()
+                    || childHash.getSelectedIndexId() != 
firstChildHash.getSelectedIndexId()
+                    || 
!childHash.getPartitionIds().equals(firstChildHash.getPartitionIds())) {
+                allChildrenBucketAligned = false;
+                break;
+            }
+            ShuffleType childShuffleType = childHash.getShuffleType();
+            if (childShuffleType == ShuffleType.NATURAL) {
+                if (firstNaturalIndex < 0) {
+                    firstNaturalIndex = i;
+                }
+            } else if (childShuffleType == ShuffleType.STORAGE_BUCKETED) {
+                if (firstStorageBucketedIndex < 0) {
+                    firstStorageBucketedIndex = i;
+                }
+            } else {
+                allChildrenBucketAligned = false;
+                break;
+            }
+        }
+        if (allChildrenBucketAligned && firstStorageBucketedIndex >= 0) {
+            distributeToChildIndex = firstNaturalIndex >= 0 ? 
firstNaturalIndex : firstStorageBucketedIndex;
+        }
+        if (distributeToChildIndex >= 0) {
+            DistributionSpecHash childDistribution
+                    = (DistributionSpecHash) 
childrenDistribution.get(distributeToChildIndex);
+            // A shared storage layout alone does not prove alignment: two 
children may be
+            // bucketed by columns that feed different set-output positions 
(e.g. one child
+            // bucketed by the column feeding output k, another by the column 
feeding output v
+            // of the same table layout). Map every child's hash columns to 
set-output
+            // positions and require all children to land on the same 
positions in the same
+            // order; otherwise fall through to the generic derivation below.
+            List<Integer> outputPositions = null;
+            boolean allChildrenSamePositions = true;
+            for (int i = 0; i < childrenDistribution.size() && 
allChildrenSamePositions; i++) {
+                DistributionSpecHash childHash = (DistributionSpecHash) 
childrenDistribution.get(i);
+                List<SlotReference> childOutput = 
setOperation.getRegularChildrenOutputs().get(i);
+                Map<ExprId, Integer> idToOutputIndex = new LinkedHashMap<>();
+                for (int j = 0; j < childOutput.size(); j++) {
+                    idToOutputIndex.put(childOutput.get(j).getExprId(), j);
+                }
+                List<Integer> positions = new ArrayList<>();
+                for (ExprId shuffledColumnId : 
childHash.getOrderedShuffledColumns()) {
+                    Integer index = idToOutputIndex.get(shuffledColumnId);
+                    if (index == null) {
+                        allChildrenSamePositions = false;
+                        break;
+                    }
+                    positions.add(index);
+                }
+                if (!allChildrenSamePositions) {
+                    break;
+                }
+                if (outputPositions == null) {
+                    outputPositions = positions;
+                } else if (!outputPositions.equals(positions)) {
+                    allChildrenSamePositions = false;
+                }

Review Comment:
   Addressed. The generic fallback now validates the storage layout before 
returning a STORAGE_BUCKETED property: when the children share the offsets and 
the STORAGE_BUCKETED shuffle type but their storage layouts (table / index / 
partitions) differ, it returns a non-specific property (`createAnyFromHash`) so 
the parent re-aligns the data. The NATURAL branch is left as-is on purpose: it 
predates this PR and a colocate group legitimately spans different table ids 
there, so tightening it with a plain layout check would regress valid colocate 
set operations — that deserves a colocate-group-aware check in a separate 
change if needed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to