924060929 commented on code in PR #65129:
URL: https://github.com/apache/doris/pull/65129#discussion_r3523146573
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java:
##########
@@ -651,83 +652,109 @@ public List<List<PhysicalProperties>>
visitPhysicalSetOperation(PhysicalSetOpera
} else if (requiredDistributionSpec instanceof DistributionSpecHash) {
// TODO: should use the most common hash spec as basic
DistributionSpecHash basic = (DistributionSpecHash)
requiredDistributionSpec;
- // TODO: open comment when support `enable_local_shuffle_planner`
- // int bucketShuffleBasicIndex = -1;
- // double basicRowCount = -1;
-
- // find the bucket shuffle basic index
- // try {
- // ImmutableSet<ShuffleType> supportBucketShuffleTypes =
ImmutableSet.of(
- // ShuffleType.NATURAL,
- // ShuffleType.STORAGE_BUCKETED
- // );
- // for (int i = 0; i < originChildrenProperties.size(); i++) {
- // PhysicalProperties originChildrenProperty =
originChildrenProperties.get(i);
- // DistributionSpec childDistribution =
originChildrenProperty.getDistributionSpec();
- // if (childDistribution instanceof DistributionSpecHash
- // && supportBucketShuffleTypes.contains(
- // ((DistributionSpecHash)
childDistribution).getShuffleType())
- // &&
!(isBucketShuffleDownGrade(setOperation.child(i)))) {
- // Statistics stats = setOperation.child(i).getStats();
- // double rowCount = stats.getRowCount();
- // if (rowCount > basicRowCount) {
- // basicRowCount = rowCount;
- // bucketShuffleBasicIndex = i;
- // }
- // }
- // }
- // } catch (Throwable t) {
- // // catch stats exception
- // LOG.warn("Can not find the most (bucket num, rowCount): " +
t, t);
- // bucketShuffleBasicIndex = -1;
- // }
-
- // use bucket shuffle
- // if (bucketShuffleBasicIndex >= 0) {
- // DistributionSpecHash notShuffleSideRequire
- // = (DistributionSpecHash)
requiredProperties.get(bucketShuffleBasicIndex)
- // .getDistributionSpec();
- //
- // DistributionSpecHash notNeedShuffleOutput
- // = (DistributionSpecHash)
originChildrenProperties.get(bucketShuffleBasicIndex)
- // .getDistributionSpec();
- //
- // for (int i = 0; i < originChildrenProperties.size(); i++) {
- // DistributionSpecHash current
- // = (DistributionSpecHash)
originChildrenProperties.get(i).getDistributionSpec();
- // if (i == bucketShuffleBasicIndex) {
- // continue;
- // }
- //
- // DistributionSpecHash currentRequire
- // = (DistributionSpecHash)
requiredProperties.get(i).getDistributionSpec();
- //
- // PhysicalProperties target = calAnotherSideRequired(
- // ShuffleType.STORAGE_BUCKETED,
- // notNeedShuffleOutput, current,
- // notShuffleSideRequire,
- // currentRequire);
- // updateChildEnforceAndCost(i, target);
- // }
- // setOperation.setMutableState(
- // PhysicalSetOperation.DISTRIBUTE_TO_CHILD_INDEX,
bucketShuffleBasicIndex);
- // use partitioned shuffle
- // } else {
- for (int i = 0; i < originChildrenProperties.size(); i++) {
- DistributionSpecHash current
- = (DistributionSpecHash)
originChildrenProperties.get(i).getDistributionSpec();
- if (current.getShuffleType() != ShuffleType.EXECUTION_BUCKETED
- || !bothSideShuffleKeysAreSameOrder(basic, current,
- (DistributionSpecHash)
requiredProperties.get(0).getDistributionSpec(),
- (DistributionSpecHash)
requiredProperties.get(i).getDistributionSpec())) {
- PhysicalProperties target = calAnotherSideRequired(
- ShuffleType.EXECUTION_BUCKETED, basic, current,
- (DistributionSpecHash)
requiredProperties.get(0).getDistributionSpec(),
- (DistributionSpecHash)
requiredProperties.get(i).getDistributionSpec());
+ int bucketShuffleBasicIndex = -1;
+ double basicRowCount = -1;
+
+ // Bucket shuffle for set operation is only valid when the FE
plans the local
+ // shuffle: with the BE-side local-shuffle planner the backend
cannot infer the
+ // correct local shuffle type for the set sink/probe and computes
wrong results.
+ // It also requires the nereids distribute planner: the legacy
coordinator only
+ // supports bucket-shuffle-partitioned sinks whose dest fragment
contains a bucket
+ // shuffle join, so a bucket-shuffle set operation fragment cannot
be scheduled there.
+ // Otherwise, keep bucketShuffleBasicIndex = -1 and fall back to
the
+ // execution-bucketed (partitioned) shuffle below.
+ ConnectContext setOperationContext = ConnectContext.get();
+ boolean enableLocalShufflePlanner = setOperationContext != null
+ &&
setOperationContext.getSessionVariable().isEnableLocalShuffle()
+ &&
setOperationContext.getSessionVariable().isEnableLocalShufflePlanner()
+ &&
SessionVariable.canUseNereidsDistributePlanner(setOperationContext);
+
+ // find the bucket shuffle basic index: the largest natural /
storage-bucketed child
+ // keeps its bucket distribution, every other child is
bucket-shuffled to it.
+ // isBucketShuffleDownGrade reuses the join-side heuristics on
purpose, including
+ // the enable_bucket_shuffle_join switch and
bucket_shuffle_downgrade_ratio: bucket
+ // shuffle for set operation belongs to the same optimization
family as bucket
+ // shuffle join, so the join switches govern both instead of
introducing a separate
+ // session variable.
+ if (enableLocalShufflePlanner) {
+ try {
+ ImmutableSet<ShuffleType> supportBucketShuffleTypes =
ImmutableSet.of(
+ ShuffleType.NATURAL,
+ ShuffleType.STORAGE_BUCKETED
+ );
+ for (int i = 0; i < originChildrenProperties.size(); i++) {
+ PhysicalProperties originChildrenProperty =
originChildrenProperties.get(i);
+ DistributionSpec childDistribution =
originChildrenProperty.getDistributionSpec();
+ if (childDistribution instanceof DistributionSpecHash
+ && supportBucketShuffleTypes.contains(
Review Comment:
Addressed in both places. The basic-child selection now requires a known
storage layout (`tableId >= 0`) — a STORAGE_BUCKETED child whose layout was
cleared (e.g. a hash join output through
`withShuffleTypeAndForbidColocateJoin`) cannot serve as the alignment anchor,
so the set operation falls back to the execution-bucketed shuffle. The
translator applies the same check before marking the node BUCKET_SHUFFLE, so an
unproved layout never reaches the bucket-hash local exchange / scheduling path.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]