github-actions[bot] commented on code in PR #65129:
URL: https://github.com/apache/doris/pull/65129#discussion_r3523096872
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java:
##########
@@ -651,83 +652,109 @@ public List<List<PhysicalProperties>>
visitPhysicalSetOperation(PhysicalSetOpera
} else if (requiredDistributionSpec instanceof DistributionSpecHash) {
// TODO: should use the most common hash spec as basic
DistributionSpecHash basic = (DistributionSpecHash)
requiredDistributionSpec;
- // TODO: open comment when support `enable_local_shuffle_planner`
- // int bucketShuffleBasicIndex = -1;
- // double basicRowCount = -1;
-
- // find the bucket shuffle basic index
- // try {
- // ImmutableSet<ShuffleType> supportBucketShuffleTypes =
ImmutableSet.of(
- // ShuffleType.NATURAL,
- // ShuffleType.STORAGE_BUCKETED
- // );
- // for (int i = 0; i < originChildrenProperties.size(); i++) {
- // PhysicalProperties originChildrenProperty =
originChildrenProperties.get(i);
- // DistributionSpec childDistribution =
originChildrenProperty.getDistributionSpec();
- // if (childDistribution instanceof DistributionSpecHash
- // && supportBucketShuffleTypes.contains(
- // ((DistributionSpecHash)
childDistribution).getShuffleType())
- // &&
!(isBucketShuffleDownGrade(setOperation.child(i)))) {
- // Statistics stats = setOperation.child(i).getStats();
- // double rowCount = stats.getRowCount();
- // if (rowCount > basicRowCount) {
- // basicRowCount = rowCount;
- // bucketShuffleBasicIndex = i;
- // }
- // }
- // }
- // } catch (Throwable t) {
- // // catch stats exception
- // LOG.warn("Can not find the most (bucket num, rowCount): " +
t, t);
- // bucketShuffleBasicIndex = -1;
- // }
-
- // use bucket shuffle
- // if (bucketShuffleBasicIndex >= 0) {
- // DistributionSpecHash notShuffleSideRequire
- // = (DistributionSpecHash)
requiredProperties.get(bucketShuffleBasicIndex)
- // .getDistributionSpec();
- //
- // DistributionSpecHash notNeedShuffleOutput
- // = (DistributionSpecHash)
originChildrenProperties.get(bucketShuffleBasicIndex)
- // .getDistributionSpec();
- //
- // for (int i = 0; i < originChildrenProperties.size(); i++) {
- // DistributionSpecHash current
- // = (DistributionSpecHash)
originChildrenProperties.get(i).getDistributionSpec();
- // if (i == bucketShuffleBasicIndex) {
- // continue;
- // }
- //
- // DistributionSpecHash currentRequire
- // = (DistributionSpecHash)
requiredProperties.get(i).getDistributionSpec();
- //
- // PhysicalProperties target = calAnotherSideRequired(
- // ShuffleType.STORAGE_BUCKETED,
- // notNeedShuffleOutput, current,
- // notShuffleSideRequire,
- // currentRequire);
- // updateChildEnforceAndCost(i, target);
- // }
- // setOperation.setMutableState(
- // PhysicalSetOperation.DISTRIBUTE_TO_CHILD_INDEX,
bucketShuffleBasicIndex);
- // use partitioned shuffle
- // } else {
- for (int i = 0; i < originChildrenProperties.size(); i++) {
- DistributionSpecHash current
- = (DistributionSpecHash)
originChildrenProperties.get(i).getDistributionSpec();
- if (current.getShuffleType() != ShuffleType.EXECUTION_BUCKETED
- || !bothSideShuffleKeysAreSameOrder(basic, current,
- (DistributionSpecHash)
requiredProperties.get(0).getDistributionSpec(),
- (DistributionSpecHash)
requiredProperties.get(i).getDistributionSpec())) {
- PhysicalProperties target = calAnotherSideRequired(
- ShuffleType.EXECUTION_BUCKETED, basic, current,
- (DistributionSpecHash)
requiredProperties.get(0).getDistributionSpec(),
- (DistributionSpecHash)
requiredProperties.get(i).getDistributionSpec());
+ int bucketShuffleBasicIndex = -1;
+ double basicRowCount = -1;
+
+ // Bucket shuffle for set operation is only valid when the FE
plans the local
+ // shuffle: with the BE-side local-shuffle planner the backend
cannot infer the
+ // correct local shuffle type for the set sink/probe and computes
wrong results.
+ // It also requires the nereids distribute planner: the legacy
coordinator only
+ // supports bucket-shuffle-partitioned sinks whose dest fragment
contains a bucket
+ // shuffle join, so a bucket-shuffle set operation fragment cannot
be scheduled there.
+ // Otherwise, keep bucketShuffleBasicIndex = -1 and fall back to
the
+ // execution-bucketed (partitioned) shuffle below.
+ ConnectContext setOperationContext = ConnectContext.get();
+ boolean enableLocalShufflePlanner = setOperationContext != null
+ &&
setOperationContext.getSessionVariable().isEnableLocalShuffle()
+ &&
setOperationContext.getSessionVariable().isEnableLocalShufflePlanner()
+ &&
SessionVariable.canUseNereidsDistributePlanner(setOperationContext);
+
+ // find the bucket shuffle basic index: the largest natural /
storage-bucketed child
+ // keeps its bucket distribution, every other child is
bucket-shuffled to it.
+ // isBucketShuffleDownGrade reuses the join-side heuristics on
purpose, including
+ // the enable_bucket_shuffle_join switch and
bucket_shuffle_downgrade_ratio: bucket
+ // shuffle for set operation belongs to the same optimization
family as bucket
+ // shuffle join, so the join switches govern both instead of
introducing a separate
+ // session variable.
+ if (enableLocalShufflePlanner) {
+ try {
+ ImmutableSet<ShuffleType> supportBucketShuffleTypes =
ImmutableSet.of(
+ ShuffleType.NATURAL,
+ ShuffleType.STORAGE_BUCKETED
+ );
+ for (int i = 0; i < originChildrenProperties.size(); i++) {
+ PhysicalProperties originChildrenProperty =
originChildrenProperties.get(i);
+ DistributionSpec childDistribution =
originChildrenProperty.getDistributionSpec();
+ if (childDistribution instanceof DistributionSpecHash
+ && supportBucketShuffleTypes.contains(
Review Comment:
The set-op bucket-shuffle chooser still accepts `STORAGE_BUCKETED` children
whose storage layout is unknown. That can happen for a child such as a hash
join output that went through `withShuffleTypeAndForbidColocateJoin(...)`: it
keeps `ShuffleType.STORAGE_BUCKETED` but clears `tableId` / `selectedIndexId`
to `-1`.
Reduced shape:
```text
Intersect(k)
child0: Project(k)
HashJoin(...), output STORAGE_BUCKETED(tableId=-1)
child1: OlapScan(k), output NATURAL(tableId=T)
```
If child0 has the larger row count, this loop can pick it as
`bucketShuffleBasicIndex` because it only checks the shuffle type. Lines
734-738 then copy the `-1` layout into the enforced children.
`ChildOutputPropertyDeriver` correctly refuses to prove a set-op bucketed
output for `tableId < 0`, but the translator later marks the `SetOperationNode`
`BUCKET_SHUFFLE` from
`JoinUtils.isStorageBucketed(childPhysicalPlan.getPhysicalProperties())` alone,
and `SetOperationNode.enforceAndDeriveLocalExchange` switches to bucket-hash
local exchange/scheduling for a storage layout that was never proved.
Please require a known storage layout before selecting a `STORAGE_BUCKETED`
child as the set-op bucket-shuffle basic side (or before marking the translated
node `BUCKET_SHUFFLE`), and fall back to execution-bucketed shuffle when the
layout is unknown.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]