This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.1 by this push:
new d20739e15a9 branch-4.1: [opt](ditributed-plan) bucket shuffle down
grade only check not shuffle side #59506 (#61528)
d20739e15a9 is described below
commit d20739e15a9266a21f790829d5c709bfb0f17da5
Author: morrySnow <[email protected]>
AuthorDate: Fri Mar 20 10:55:53 2026 +0800
branch-4.1: [opt](ditributed-plan) bucket shuffle down grade only check not
shuffle side #59506 (#61528)
picked from #59506
---
.../properties/ChildrenPropertiesRegulator.java | 116 ++++++++++++---------
.../shape_check/tpcds_sf10t_orc/shape/query54.out | 2 +-
.../shape_check/tpcds_sf10t_orc/shape/query72.out | 22 ++--
3 files changed, 77 insertions(+), 63 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java
index c9f444e4ba7..071c450a220 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java
@@ -267,7 +267,7 @@ public class ChildrenPropertiesRegulator extends
PlanVisitor<List<List<PhysicalP
return ImmutableList.of(originChildrenProperties);
}
- private boolean isBucketShuffleDownGrade(Plan oneSidePlan,
DistributionSpecHash otherSideSpec) {
+ private boolean isBucketShuffleDownGrade(Plan oneSidePlan) {
// improper to do bucket shuffle join:
// oneSide:
// - base table and tablets' number is small enough (<
paraInstanceNum)
@@ -288,7 +288,10 @@ public class ChildrenPropertiesRegulator extends
PlanVisitor<List<List<PhysicalP
int bucketNum =
candidate.getTable().getDefaultDistributionInfo().getBucketNum();
int totalBucketNum = prunedPartNum * bucketNum;
ConnectContext connectContext = ConnectContext.get();
- return totalBucketNum < connectContext.getTotalInstanceNum() *
0.8;
+ int backEndNum = Math.max(1,
connectContext.getEnv().getClusterInfo().getBackendsNumber(true));
+ String clusterName =
connectContext.getSessionVariable().resolveCloudClusterName(connectContext);
+ int paraNum = Math.max(1,
connectContext.getSessionVariable().getParallelExecInstanceNum(clusterName));
+ return totalBucketNum < backEndNum * paraNum * 0.8;
}
}
}
@@ -356,6 +359,9 @@ public class ChildrenPropertiesRegulator extends
PlanVisitor<List<List<PhysicalP
Optional<PhysicalProperties> updatedForLeft = Optional.empty();
Optional<PhysicalProperties> updatedForRight = Optional.empty();
+ boolean shouldCheckLeftBucketDownGrade = false;
+ boolean shouldCheckrightBucketDownGrade = false;
+
if (JoinUtils.couldColocateJoin(leftHashSpec, rightHashSpec,
hashJoin.getHashJoinConjuncts())) {
// check colocate join with scan
return ImmutableList.of(originChildrenProperties);
@@ -370,33 +376,16 @@ public class ChildrenPropertiesRegulator extends
PlanVisitor<List<List<PhysicalP
ShuffleType.EXECUTION_BUCKETED, leftHashSpec,
rightHashSpec,
(DistributionSpecHash)
requiredProperties.get(0).getDistributionSpec(),
(DistributionSpecHash)
requiredProperties.get(1).getDistributionSpec()));
- } else if (isBucketShuffleDownGrade(leftChild, rightHashSpec)) {
- updatedForLeft = Optional.of(calAnotherSideRequired(
- ShuffleType.EXECUTION_BUCKETED, leftHashSpec, leftHashSpec,
- (DistributionSpecHash)
requiredProperties.get(0).getDistributionSpec(),
- (DistributionSpecHash)
requiredProperties.get(0).getDistributionSpec()));
- updatedForRight = Optional.of(calAnotherSideRequired(
- ShuffleType.EXECUTION_BUCKETED, leftHashSpec,
rightHashSpec,
- (DistributionSpecHash)
requiredProperties.get(0).getDistributionSpec(),
- (DistributionSpecHash)
requiredProperties.get(1).getDistributionSpec()));
- } else if (isBucketShuffleDownGrade(rightChild, leftHashSpec)) {
- updatedForLeft = Optional.of(calAnotherSideRequired(
- ShuffleType.EXECUTION_BUCKETED, rightHashSpec,
leftHashSpec,
- (DistributionSpecHash)
requiredProperties.get(1).getDistributionSpec(),
- (DistributionSpecHash)
requiredProperties.get(0).getDistributionSpec()));
- updatedForRight = Optional.of(calAnotherSideRequired(
- ShuffleType.EXECUTION_BUCKETED, rightHashSpec,
rightHashSpec,
- (DistributionSpecHash)
requiredProperties.get(1).getDistributionSpec(),
- (DistributionSpecHash)
requiredProperties.get(1).getDistributionSpec()));
} else if ((leftHashSpec.getShuffleType() == ShuffleType.NATURAL
&& rightHashSpec.getShuffleType() == ShuffleType.NATURAL)) {
+ shouldCheckLeftBucketDownGrade = true;
updatedForRight = Optional.of(calAnotherSideRequired(
ShuffleType.STORAGE_BUCKETED, leftHashSpec, rightHashSpec,
(DistributionSpecHash)
requiredProperties.get(0).getDistributionSpec(),
(DistributionSpecHash)
requiredProperties.get(1).getDistributionSpec()));
} else if (leftHashSpec.getShuffleType() == ShuffleType.NATURAL
&& rightHashSpec.getShuffleType() ==
ShuffleType.EXECUTION_BUCKETED) {
- if (SessionVariable.canUseNereidsDistributePlanner()) {
+ if (SessionVariable.canUseNereidsDistributePlanner() &&
!isBucketShuffleDownGrade(leftChild)) {
List<PhysicalProperties> shuffleToLeft =
Lists.newArrayList(originChildrenProperties);
PhysicalProperties enforceShuffleRight =
calAnotherSideRequired(
ShuffleType.STORAGE_BUCKETED, leftHashSpec,
rightHashSpec,
@@ -413,7 +402,7 @@ public class ChildrenPropertiesRegulator extends
PlanVisitor<List<List<PhysicalP
updateChildEnforceAndCost(0, enforceShuffleLeft,
shuffleToRight);
return ImmutableList.of(shuffleToLeft, shuffleToRight);
}
-
+ shouldCheckLeftBucketDownGrade = true;
// must add enforce because shuffle algorithm is not same between
NATURAL and BUCKETED
updatedForRight = Optional.of(calAnotherSideRequired(
ShuffleType.STORAGE_BUCKETED, leftHashSpec, rightHashSpec,
@@ -421,18 +410,18 @@ public class ChildrenPropertiesRegulator extends
PlanVisitor<List<List<PhysicalP
(DistributionSpecHash)
requiredProperties.get(1).getDistributionSpec()));
} else if (leftHashSpec.getShuffleType() == ShuffleType.NATURAL
&& rightHashSpec.getShuffleType() ==
ShuffleType.STORAGE_BUCKETED) {
- if (bothSideShuffleKeysAreSameOrder(leftHashSpec, rightHashSpec,
+ shouldCheckLeftBucketDownGrade = true;
+ if (!bothSideShuffleKeysAreSameOrder(leftHashSpec, rightHashSpec,
(DistributionSpecHash)
requiredProperties.get(0).getDistributionSpec(),
(DistributionSpecHash)
requiredProperties.get(1).getDistributionSpec())) {
- return ImmutableList.of(originChildrenProperties);
+ updatedForRight = Optional.of(calAnotherSideRequired(
+ ShuffleType.STORAGE_BUCKETED, leftHashSpec,
rightHashSpec,
+ (DistributionSpecHash)
requiredProperties.get(0).getDistributionSpec(),
+ (DistributionSpecHash)
requiredProperties.get(1).getDistributionSpec()));
}
- updatedForRight = Optional.of(calAnotherSideRequired(
- ShuffleType.STORAGE_BUCKETED, leftHashSpec, rightHashSpec,
- (DistributionSpecHash)
requiredProperties.get(0).getDistributionSpec(),
- (DistributionSpecHash)
requiredProperties.get(1).getDistributionSpec()));
} else if (leftHashSpec.getShuffleType() ==
ShuffleType.EXECUTION_BUCKETED
&& rightHashSpec.getShuffleType() == ShuffleType.NATURAL) {
- if (SessionVariable.canUseNereidsDistributePlanner()) {
+ if (SessionVariable.canUseNereidsDistributePlanner() &&
!isBucketShuffleDownGrade(rightChild)) {
// nereids coordinator can exchange left side to right side to
do bucket shuffle join
// TODO: maybe we should check if left child is
PhysicalDistribute.
// If so add storage bucketed shuffle on left side. Other
wise,
@@ -467,23 +456,26 @@ public class ChildrenPropertiesRegulator extends
PlanVisitor<List<List<PhysicalP
}
} else if (leftHashSpec.getShuffleType() ==
ShuffleType.EXECUTION_BUCKETED
&& rightHashSpec.getShuffleType() ==
ShuffleType.EXECUTION_BUCKETED) {
- if (bothSideShuffleKeysAreSameOrder(rightHashSpec, leftHashSpec,
+
+ if (!bothSideShuffleKeysAreSameOrder(rightHashSpec, leftHashSpec,
(DistributionSpecHash)
requiredProperties.get(1).getDistributionSpec(),
(DistributionSpecHash)
requiredProperties.get(0).getDistributionSpec())) {
- return ImmutableList.of(originChildrenProperties);
+ shouldCheckLeftBucketDownGrade = true;
+ updatedForRight = Optional.of(calAnotherSideRequired(
+ ShuffleType.EXECUTION_BUCKETED, leftHashSpec,
rightHashSpec,
+ (DistributionSpecHash)
requiredProperties.get(0).getDistributionSpec(),
+ (DistributionSpecHash)
requiredProperties.get(1).getDistributionSpec()));
}
- updatedForRight = Optional.of(calAnotherSideRequired(
- ShuffleType.EXECUTION_BUCKETED, leftHashSpec,
rightHashSpec,
- (DistributionSpecHash)
requiredProperties.get(0).getDistributionSpec(),
- (DistributionSpecHash)
requiredProperties.get(1).getDistributionSpec()));
} else if ((leftHashSpec.getShuffleType() ==
ShuffleType.EXECUTION_BUCKETED
&& rightHashSpec.getShuffleType() ==
ShuffleType.STORAGE_BUCKETED)) {
if (children.get(0).getPlan() instanceof PhysicalDistribute) {
+ shouldCheckrightBucketDownGrade = true;
updatedForLeft = Optional.of(calAnotherSideRequired(
ShuffleType.STORAGE_BUCKETED, rightHashSpec,
leftHashSpec,
(DistributionSpecHash)
requiredProperties.get(1).getDistributionSpec(),
(DistributionSpecHash)
requiredProperties.get(0).getDistributionSpec()));
} else {
+ shouldCheckLeftBucketDownGrade = true;
updatedForRight = Optional.of(calAnotherSideRequired(
ShuffleType.EXECUTION_BUCKETED, leftHashSpec,
rightHashSpec,
(DistributionSpecHash)
requiredProperties.get(0).getDistributionSpec(),
@@ -491,9 +483,7 @@ public class ChildrenPropertiesRegulator extends
PlanVisitor<List<List<PhysicalP
}
} else if ((leftHashSpec.getShuffleType() ==
ShuffleType.STORAGE_BUCKETED
&& rightHashSpec.getShuffleType() == ShuffleType.NATURAL)) {
- // TODO: we must do shuffle on right because coordinator could not
do right be selection in this case,
- // since it always to check the left most node whether olap scan
node.
- // after we fix coordinator problem, we could do right to left
bucket shuffle
+ shouldCheckLeftBucketDownGrade = true;
updatedForRight = Optional.of(calAnotherSideRequired(
ShuffleType.STORAGE_BUCKETED, leftHashSpec, rightHashSpec,
(DistributionSpecHash)
requiredProperties.get(0).getDistributionSpec(),
@@ -501,11 +491,13 @@ public class ChildrenPropertiesRegulator extends
PlanVisitor<List<List<PhysicalP
} else if ((leftHashSpec.getShuffleType() ==
ShuffleType.STORAGE_BUCKETED
&& rightHashSpec.getShuffleType() ==
ShuffleType.EXECUTION_BUCKETED)) {
if (children.get(0).getPlan() instanceof PhysicalDistribute) {
+ shouldCheckrightBucketDownGrade = true;
updatedForLeft = Optional.of(calAnotherSideRequired(
ShuffleType.EXECUTION_BUCKETED, rightHashSpec,
leftHashSpec,
(DistributionSpecHash)
requiredProperties.get(1).getDistributionSpec(),
(DistributionSpecHash)
requiredProperties.get(0).getDistributionSpec()));
} else {
+ shouldCheckLeftBucketDownGrade = true;
updatedForRight = Optional.of(calAnotherSideRequired(
ShuffleType.STORAGE_BUCKETED, leftHashSpec,
rightHashSpec,
(DistributionSpecHash)
requiredProperties.get(0).getDistributionSpec(),
@@ -514,24 +506,46 @@ public class ChildrenPropertiesRegulator extends
PlanVisitor<List<List<PhysicalP
} else if ((leftHashSpec.getShuffleType() ==
ShuffleType.STORAGE_BUCKETED
&& rightHashSpec.getShuffleType() ==
ShuffleType.STORAGE_BUCKETED)) {
- if (bothSideShuffleKeysAreSameOrder(rightHashSpec, leftHashSpec,
+ if (!bothSideShuffleKeysAreSameOrder(rightHashSpec, leftHashSpec,
(DistributionSpecHash)
requiredProperties.get(1).getDistributionSpec(),
(DistributionSpecHash)
requiredProperties.get(0).getDistributionSpec())) {
- return ImmutableList.of(originChildrenProperties);
- }
- if (children.get(0).getPlan() instanceof PhysicalDistribute) {
- updatedForLeft = Optional.of(calAnotherSideRequired(
- ShuffleType.STORAGE_BUCKETED, rightHashSpec,
leftHashSpec,
- (DistributionSpecHash)
requiredProperties.get(1).getDistributionSpec(),
- (DistributionSpecHash)
requiredProperties.get(0).getDistributionSpec()));
- } else {
- updatedForRight = Optional.of(calAnotherSideRequired(
- ShuffleType.STORAGE_BUCKETED, leftHashSpec,
rightHashSpec,
- (DistributionSpecHash)
requiredProperties.get(0).getDistributionSpec(),
- (DistributionSpecHash)
requiredProperties.get(1).getDistributionSpec()));
+ if (children.get(0).getPlan() instanceof PhysicalDistribute) {
+ shouldCheckrightBucketDownGrade = true;
+ updatedForLeft = Optional.of(calAnotherSideRequired(
+ ShuffleType.STORAGE_BUCKETED, rightHashSpec,
leftHashSpec,
+ (DistributionSpecHash)
requiredProperties.get(1).getDistributionSpec(),
+ (DistributionSpecHash)
requiredProperties.get(0).getDistributionSpec()));
+ } else {
+ shouldCheckLeftBucketDownGrade = true;
+ updatedForRight = Optional.of(calAnotherSideRequired(
+ ShuffleType.STORAGE_BUCKETED, leftHashSpec,
rightHashSpec,
+ (DistributionSpecHash)
requiredProperties.get(0).getDistributionSpec(),
+ (DistributionSpecHash)
requiredProperties.get(1).getDistributionSpec()));
+ }
}
}
+ if (shouldCheckLeftBucketDownGrade &&
isBucketShuffleDownGrade(leftChild)) {
+ updatedForLeft = Optional.of(calAnotherSideRequired(
+ ShuffleType.EXECUTION_BUCKETED, leftHashSpec, leftHashSpec,
+ (DistributionSpecHash)
requiredProperties.get(0).getDistributionSpec(),
+ (DistributionSpecHash)
requiredProperties.get(0).getDistributionSpec()));
+ updatedForRight = Optional.of(calAnotherSideRequired(
+ ShuffleType.EXECUTION_BUCKETED, leftHashSpec,
rightHashSpec,
+ (DistributionSpecHash)
requiredProperties.get(0).getDistributionSpec(),
+ (DistributionSpecHash)
requiredProperties.get(1).getDistributionSpec()));
+ }
+ if (shouldCheckrightBucketDownGrade &&
isBucketShuffleDownGrade(rightChild)) {
+ updatedForLeft = Optional.of(calAnotherSideRequired(
+ ShuffleType.EXECUTION_BUCKETED, rightHashSpec,
leftHashSpec,
+ (DistributionSpecHash)
requiredProperties.get(1).getDistributionSpec(),
+ (DistributionSpecHash)
requiredProperties.get(0).getDistributionSpec()));
+ updatedForRight = Optional.of(calAnotherSideRequired(
+ ShuffleType.EXECUTION_BUCKETED, rightHashSpec,
rightHashSpec,
+ (DistributionSpecHash)
requiredProperties.get(1).getDistributionSpec(),
+ (DistributionSpecHash)
requiredProperties.get(1).getDistributionSpec()));
+ }
+
updatedForLeft.ifPresent(physicalProperties ->
updateChildEnforceAndCost(0, physicalProperties));
updatedForRight.ifPresent(physicalProperties ->
updateChildEnforceAndCost(1, physicalProperties));
diff --git a/regression-test/data/shape_check/tpcds_sf10t_orc/shape/query54.out
b/regression-test/data/shape_check/tpcds_sf10t_orc/shape/query54.out
index 6d88dce347d..1a47463dfde 100644
--- a/regression-test/data/shape_check/tpcds_sf10t_orc/shape/query54.out
+++ b/regression-test/data/shape_check/tpcds_sf10t_orc/shape/query54.out
@@ -19,7 +19,7 @@ PhysicalResultSink
--------------------------------PhysicalProject
----------------------------------hashJoin[INNER_JOIN shuffle]
hashCondition=((my_customers.c_current_addr_sk =
customer_address.ca_address_sk)) otherCondition=() build RFs:RF4
ca_address_sk->[c_current_addr_sk]
------------------------------------PhysicalProject
---------------------------------------hashJoin[INNER_JOIN shuffle]
hashCondition=((my_customers.c_customer_sk = store_sales.ss_customer_sk))
otherCondition=() build RFs:RF3 c_customer_sk->[ss_customer_sk]
+--------------------------------------hashJoin[INNER_JOIN shuffleBucket]
hashCondition=((my_customers.c_customer_sk = store_sales.ss_customer_sk))
otherCondition=() build RFs:RF3 c_customer_sk->[ss_customer_sk]
----------------------------------------PhysicalProject
------------------------------------------PhysicalOlapScan[store_sales] apply
RFs: RF3 RF7
----------------------------------------PhysicalProject
diff --git a/regression-test/data/shape_check/tpcds_sf10t_orc/shape/query72.out
b/regression-test/data/shape_check/tpcds_sf10t_orc/shape/query72.out
index a74beecafbb..c0f2a2312a3 100644
--- a/regression-test/data/shape_check/tpcds_sf10t_orc/shape/query72.out
+++ b/regression-test/data/shape_check/tpcds_sf10t_orc/shape/query72.out
@@ -12,15 +12,15 @@ PhysicalResultSink
------------------PhysicalProject
--------------------hashJoin[INNER_JOIN broadcast]
hashCondition=((catalog_sales.cs_bill_hdemo_sk =
household_demographics.hd_demo_sk)) otherCondition=() build RFs:RF6
hd_demo_sk->[cs_bill_hdemo_sk]
----------------------PhysicalProject
-------------------------hashJoin[LEFT_OUTER_JOIN broadcast]
hashCondition=((catalog_sales.cs_promo_sk = promotion.p_promo_sk))
otherCondition=()
+------------------------hashJoin[LEFT_OUTER_JOIN bucketShuffle]
hashCondition=((catalog_returns.cr_item_sk = catalog_sales.cs_item_sk) and
(catalog_returns.cr_order_number = catalog_sales.cs_order_number))
otherCondition=()
--------------------------PhysicalProject
-----------------------------hashJoin[INNER_JOIN broadcast]
hashCondition=((catalog_sales.cs_ship_date_sk = d3.d_date_sk))
otherCondition=() build RFs:RF5 d_date_sk->[cs_ship_date_sk]
+----------------------------hashJoin[LEFT_OUTER_JOIN broadcast]
hashCondition=((catalog_sales.cs_promo_sk = promotion.p_promo_sk))
otherCondition=()
------------------------------PhysicalProject
---------------------------------hashJoin[INNER_JOIN broadcast]
hashCondition=((inventory.inv_date_sk = d2.d_date_sk)) otherCondition=() build
RFs:RF4 d_date_sk->[inv_date_sk]
+--------------------------------hashJoin[INNER_JOIN broadcast]
hashCondition=((catalog_sales.cs_ship_date_sk = d3.d_date_sk))
otherCondition=() build RFs:RF5 d_date_sk->[cs_ship_date_sk]
----------------------------------PhysicalProject
-------------------------------------hashJoin[INNER_JOIN broadcast]
hashCondition=((catalog_sales.cs_bill_cdemo_sk =
customer_demographics.cd_demo_sk)) otherCondition=() build RFs:RF3
cd_demo_sk->[cs_bill_cdemo_sk]
+------------------------------------hashJoin[INNER_JOIN broadcast]
hashCondition=((inventory.inv_date_sk = d2.d_date_sk)) otherCondition=() build
RFs:RF4 d_date_sk->[inv_date_sk]
--------------------------------------PhysicalProject
-----------------------------------------hashJoin[LEFT_OUTER_JOIN shuffle]
hashCondition=((catalog_returns.cr_item_sk = catalog_sales.cs_item_sk) and
(catalog_returns.cr_order_number = catalog_sales.cs_order_number))
otherCondition=()
+----------------------------------------hashJoin[INNER_JOIN broadcast]
hashCondition=((catalog_sales.cs_bill_cdemo_sk =
customer_demographics.cd_demo_sk)) otherCondition=() build RFs:RF3
cd_demo_sk->[cs_bill_cdemo_sk]
------------------------------------------PhysicalProject
--------------------------------------------hashJoin[INNER_JOIN broadcast]
hashCondition=((item.i_item_sk = catalog_sales.cs_item_sk)) otherCondition=()
build RFs:RF2 i_item_sk->[cs_item_sk,inv_item_sk]
----------------------------------------------PhysicalProject
@@ -35,16 +35,16 @@ PhysicalResultSink
----------------------------------------------PhysicalProject
------------------------------------------------PhysicalOlapScan[item]
------------------------------------------PhysicalProject
---------------------------------------------PhysicalOlapScan[catalog_returns]
+--------------------------------------------filter((customer_demographics.cd_marital_status
= 'D'))
+----------------------------------------------PhysicalOlapScan[customer_demographics]
--------------------------------------PhysicalProject
-----------------------------------------filter((customer_demographics.cd_marital_status
= 'D'))
-------------------------------------------PhysicalOlapScan[customer_demographics]
+----------------------------------------PhysicalOlapScan[date_dim] apply RFs:
RF7
----------------------------------PhysicalProject
-------------------------------------PhysicalOlapScan[date_dim] apply RFs: RF7
+------------------------------------PhysicalOlapScan[date_dim]
------------------------------PhysicalProject
---------------------------------PhysicalOlapScan[date_dim]
+--------------------------------PhysicalOlapScan[promotion]
--------------------------PhysicalProject
-----------------------------PhysicalOlapScan[promotion]
+----------------------------PhysicalOlapScan[catalog_returns]
----------------------PhysicalProject
------------------------filter((household_demographics.hd_buy_potential =
'1001-5000'))
--------------------------PhysicalOlapScan[household_demographics]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]