This is an automated email from the ASF dual-hosted git repository.
huajianlan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 07ec65712d6 [opt](Nereids) optimize performance in new distribute
planner (#44048)
07ec65712d6 is described below
commit 07ec65712d67efd072667247704b40adaf3e35a7
Author: 924060929 <[email protected]>
AuthorDate: Mon Nov 18 18:43:21 2024 +0800
[opt](Nereids) optimize performance in new distribute planner (#44048)
optimize new distribute planner performance in tpc-h, because #41730
made some performance rollback has occurred
1. fix the wrong runtime filter thrift parameters
2. not default to print distribute plan in profile, you should config
`set profile_level=3` to see it
3. for shuffle join which two sides distribution of natural +
execution_bucketed, support compare cost between plans of shuffle to
left/right
---
.../org/apache/doris/common/profile/Profile.java | 12 +-
.../nereids/jobs/cascades/CostAndEnforcerJob.java | 83 +++++----
.../properties/ChildrenPropertiesRegulator.java | 188 +++++++++++++--------
.../worker/job/AbstractUnassignedScanJob.java | 112 ++++++------
.../job/LocalShuffleBucketJoinAssignedJob.java | 56 ++++++
.../distribute/worker/job/StaticAssignedJob.java | 5 +
.../job/UnassignedScanBucketOlapTableJob.java | 38 ++++-
.../doris/qe/runtime/PipelineExecutionTask.java | 2 -
.../qe/runtime/RuntimeFiltersThriftBuilder.java | 15 +-
.../doris/qe/runtime/ThriftPlansBuilder.java | 19 ++-
.../distribute/shuffle_left_join.out | 8 +-
.../tpch_sf1000/nostats_rf_prune/q15.out | 2 +-
.../tpch_sf1000/nostats_rf_prune/q20-rewrite.out | 2 +-
.../tpch_sf1000/nostats_rf_prune/q20.out | 2 +-
.../tpch_sf1000/rf_prune/q20-rewrite.out | 2 +-
.../new_shapes_p0/tpch_sf1000/rf_prune/q20.out | 2 +-
.../tpch_sf1000/shape/q20-rewrite.out | 2 +-
.../data/new_shapes_p0/tpch_sf1000/shape/q20.out | 2 +-
.../tpch_sf1000/shape_no_stats/q15.out | 2 +-
.../tpch_sf1000/shape_no_stats/q20-rewrite.out | 2 +-
.../tpch_sf1000/shape_no_stats/q20.out | 2 +-
.../distribute/shuffle_left_join.groovy | 4 +-
22 files changed, 363 insertions(+), 199 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/profile/Profile.java
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/Profile.java
index 5381cc26bd2..71d51c32faf 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/profile/Profile.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/profile/Profile.java
@@ -286,12 +286,14 @@ public class Profile {
NereidsPlanner nereidsPlanner = ((NereidsPlanner) planner);
physicalPlan = nereidsPlanner.getPhysicalPlan();
physicalRelations.addAll(nereidsPlanner.getPhysicalRelations());
- FragmentIdMapping<DistributedPlan> distributedPlans =
nereidsPlanner.getDistributedPlans();
- if (distributedPlans != null) {
- summaryInfo.put(SummaryProfile.DISTRIBUTED_PLAN,
-
DistributedPlan.toString(Lists.newArrayList(distributedPlans.values()))
+ if (profileLevel >= 3) {
+ FragmentIdMapping<DistributedPlan> distributedPlans =
nereidsPlanner.getDistributedPlans();
+ if (distributedPlans != null) {
+ summaryInfo.put(SummaryProfile.DISTRIBUTED_PLAN,
+
DistributedPlan.toString(Lists.newArrayList(distributedPlans.values()))
.replace("\n", "\n ")
- );
+ );
+ }
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/cascades/CostAndEnforcerJob.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/cascades/CostAndEnforcerJob.java
index 101b3ca6670..96960475e9a 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/cascades/CostAndEnforcerJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/cascades/CostAndEnforcerJob.java
@@ -234,52 +234,61 @@ public class CostAndEnforcerJob extends Job implements
Cloneable {
* @return false if error occurs, the caller will return.
*/
private boolean calculateEnforce(List<PhysicalProperties>
requestChildrenProperties,
- List<PhysicalProperties> outputChildrenProperties) {
+ List<PhysicalProperties> originOutputChildrenProperties) {
+
// to ensure distributionSpec has been added sufficiently.
// it's certain that lowestCostChildren is equals to arity().
ChildrenPropertiesRegulator regulator = new
ChildrenPropertiesRegulator(groupExpression,
- lowestCostChildren, outputChildrenProperties,
requestChildrenProperties, context);
- boolean success = regulator.adjustChildrenProperties();
- if (!success) {
+ lowestCostChildren, new
ArrayList<>(originOutputChildrenProperties),
+ requestChildrenProperties, context);
+ List<List<PhysicalProperties>> childrenOutputSpace =
regulator.adjustChildrenProperties();
+ if (childrenOutputSpace.isEmpty()) {
// invalid enforce, return.
return false;
}
- // Not need to do pruning here because it has been done when we get the
- // best expr from the child group
- ChildOutputPropertyDeriver childOutputPropertyDeriver
- = new ChildOutputPropertyDeriver(outputChildrenProperties);
- // the physical properties the group expression support for its parent.
- PhysicalProperties outputProperty =
childOutputPropertyDeriver.getOutputProperties(getConnectContext(),
- groupExpression);
-
- // update current group statistics and re-compute costs.
- if (groupExpression.children().stream().anyMatch(group ->
group.getStatistics() == null)
- && groupExpression.getOwnerGroup().getStatistics() == null) {
- // if we come here, mean that we have some error in stats
calculator and should fix it.
- LOG.warn("Nereids try to calculate cost without stats for group
expression {}", groupExpression);
- return false;
- }
+ boolean hasSuccess = false;
+ for (List<PhysicalProperties> outputChildrenProperties :
childrenOutputSpace) {
+ // Not need to do pruning here because it has been done when we
get the
+ // best expr from the child group
+ ChildOutputPropertyDeriver childOutputPropertyDeriver
+ = new ChildOutputPropertyDeriver(outputChildrenProperties);
+ // the physical properties the group expression support for its
parent.
+ // some cases maybe output some possibilities, for example,
shuffle join
+ // maybe select left shuffle to right, or right shuffle to left,
so their
+ // are 2 output properties possibilities
+ PhysicalProperties outputProperty
+ =
childOutputPropertyDeriver.getOutputProperties(getConnectContext(),
groupExpression);
+
+ // update current group statistics and re-compute costs.
+ if (groupExpression.children().stream().anyMatch(group ->
group.getStatistics() == null)
+ && groupExpression.getOwnerGroup().getStatistics() ==
null) {
+ // if we come here, mean that we have some error in stats
calculator and should fix it.
+ LOG.warn("Nereids try to calculate cost without stats for
group expression {}", groupExpression);
+ }
- // recompute cost after adjusting property
- curNodeCost = CostCalculator.calculateCost(getConnectContext(),
groupExpression, requestChildrenProperties);
- groupExpression.setCost(curNodeCost);
- curTotalCost = curNodeCost;
- for (int i = 0; i < outputChildrenProperties.size(); i++) {
- PhysicalProperties childProperties =
outputChildrenProperties.get(i);
- curTotalCost = CostCalculator.addChildCost(
- getConnectContext(),
- groupExpression.getPlan(),
- curTotalCost,
-
groupExpression.child(i).getLowestCostPlan(childProperties).get().first,
- i);
- }
+ // recompute cost after adjusting property
+ curNodeCost = CostCalculator.calculateCost(getConnectContext(),
groupExpression, requestChildrenProperties);
+ groupExpression.setCost(curNodeCost);
+ curTotalCost = curNodeCost;
+ for (int i = 0; i < outputChildrenProperties.size(); i++) {
+ PhysicalProperties childProperties =
outputChildrenProperties.get(i);
+ curTotalCost = CostCalculator.addChildCost(
+ getConnectContext(),
+ groupExpression.getPlan(),
+ curTotalCost,
+
groupExpression.child(i).getLowestCostPlan(childProperties).get().first,
+ i);
+ }
- // record map { outputProperty -> outputProperty }, { ANY ->
outputProperty },
- recordPropertyAndCost(groupExpression, outputProperty,
PhysicalProperties.ANY, outputChildrenProperties);
- recordPropertyAndCost(groupExpression, outputProperty, outputProperty,
outputChildrenProperties);
- enforce(outputProperty, outputChildrenProperties);
- return true;
+ // record map { outputProperty -> outputProperty }, { ANY ->
outputProperty },
+ recordPropertyAndCost(groupExpression, outputProperty,
PhysicalProperties.ANY, outputChildrenProperties);
+ recordPropertyAndCost(groupExpression, outputProperty,
outputProperty, outputChildrenProperties);
+ enforce(outputProperty, outputChildrenProperties);
+
+ hasSuccess = true;
+ }
+ return hasSuccess;
}
/**
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java
index 9985b9c567f..b821ff0de87 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java
@@ -65,20 +65,20 @@ import java.util.stream.Collectors;
* NOTICE: all visitor should call visit(plan, context) at proper place
* to process must shuffle except project and filter
*/
-public class ChildrenPropertiesRegulator extends PlanVisitor<Boolean, Void> {
+public class ChildrenPropertiesRegulator extends
PlanVisitor<List<List<PhysicalProperties>>, Void> {
private final GroupExpression parent;
private final List<GroupExpression> children;
- private final List<PhysicalProperties> childrenProperties;
+ private final List<PhysicalProperties> originChildrenProperties;
private final List<PhysicalProperties> requiredProperties;
private final JobContext jobContext;
public ChildrenPropertiesRegulator(GroupExpression parent,
List<GroupExpression> children,
- List<PhysicalProperties> childrenProperties,
List<PhysicalProperties> requiredProperties,
+ List<PhysicalProperties> originChildrenProperties,
List<PhysicalProperties> requiredProperties,
JobContext jobContext) {
this.parent = parent;
this.children = children;
- this.childrenProperties = childrenProperties;
+ this.originChildrenProperties = originChildrenProperties;
this.requiredProperties = requiredProperties;
this.jobContext = jobContext;
}
@@ -88,34 +88,35 @@ public class ChildrenPropertiesRegulator extends
PlanVisitor<Boolean, Void> {
*
* @return enforce cost.
*/
- public boolean adjustChildrenProperties() {
+ public List<List<PhysicalProperties>> adjustChildrenProperties() {
return parent.getPlan().accept(this, null);
}
@Override
- public Boolean visit(Plan plan, Void context) {
+ public List<List<PhysicalProperties>> visit(Plan plan, Void context) {
// process must shuffle
for (int i = 0; i < children.size(); i++) {
- DistributionSpec distributionSpec =
childrenProperties.get(i).getDistributionSpec();
+ DistributionSpec distributionSpec =
originChildrenProperties.get(i).getDistributionSpec();
if (distributionSpec instanceof DistributionSpecMustShuffle) {
updateChildEnforceAndCost(i, PhysicalProperties.EXECUTION_ANY);
}
}
- return true;
+ return ImmutableList.of(originChildrenProperties);
}
@Override
- public Boolean visitPhysicalHashAggregate(PhysicalHashAggregate<? extends
Plan> agg, Void context) {
+ public List<List<PhysicalProperties>> visitPhysicalHashAggregate(
+ PhysicalHashAggregate<? extends Plan> agg, Void context) {
if (agg.getGroupByExpressions().isEmpty() &&
agg.getOutputExpressions().isEmpty()) {
- return false;
+ return ImmutableList.of();
}
if (!agg.getAggregateParam().canBeBanned) {
- return true;
+ return ImmutableList.of(originChildrenProperties);
}
// forbid one phase agg on distribute
if (agg.getAggMode() == AggMode.INPUT_TO_RESULT &&
children.get(0).getPlan() instanceof PhysicalDistribute) {
// this means one stage gather agg, usually bad pattern
- return false;
+ return ImmutableList.of();
}
// forbid TWO_PHASE_AGGREGATE_WITH_DISTINCT after shuffle
@@ -123,7 +124,7 @@ public class ChildrenPropertiesRegulator extends
PlanVisitor<Boolean, Void> {
if (agg.getAggMode() == AggMode.INPUT_TO_BUFFER
&& requiredProperties.get(0).getDistributionSpec() instanceof
DistributionSpecHash
&& children.get(0).getPlan() instanceof PhysicalDistribute) {
- return false;
+ return ImmutableList.of();
}
// agg(group by x)-union all(A, B)
@@ -132,7 +133,7 @@ public class ChildrenPropertiesRegulator extends
PlanVisitor<Boolean, Void> {
if (agg.getAggMode() == AggMode.INPUT_TO_RESULT
&& children.get(0).getPlan() instanceof PhysicalUnion
&& !((PhysicalUnion) children.get(0).getPlan()).isDistinct()) {
- return false;
+ return ImmutableList.of();
}
// forbid multi distinct opt that bad than multi-stage version when
multi-stage can be executed in one fragment
if (agg.getAggMode() == AggMode.INPUT_TO_BUFFER || agg.getAggMode() ==
AggMode.INPUT_TO_RESULT) {
@@ -146,7 +147,7 @@ public class ChildrenPropertiesRegulator extends
PlanVisitor<Boolean, Void> {
.collect(Collectors.toList());
if (multiDistinctions.size() == 1) {
Expression distinctChild = multiDistinctions.get(0).child(0);
- DistributionSpec childDistribution =
childrenProperties.get(0).getDistributionSpec();
+ DistributionSpec childDistribution =
originChildrenProperties.get(0).getDistributionSpec();
if (distinctChild instanceof SlotReference &&
childDistribution instanceof DistributionSpecHash) {
SlotReference slotReference = (SlotReference)
distinctChild;
DistributionSpecHash distributionSpecHash =
(DistributionSpecHash) childDistribution;
@@ -163,7 +164,7 @@ public class ChildrenPropertiesRegulator extends
PlanVisitor<Boolean, Void> {
if ((!groupByColumns.isEmpty() &&
distributionSpecHash.satisfy(groupByRequire))
|| (groupByColumns.isEmpty() &&
distributionSpecHash.satisfy(distinctChildRequire))) {
if (!agg.mustUseMultiDistinctAgg()) {
- return false;
+ return ImmutableList.of();
}
}
}
@@ -171,40 +172,41 @@ public class ChildrenPropertiesRegulator extends
PlanVisitor<Boolean, Void> {
// because the second phase of multi-distinct only have one
instance, and it is slow generally.
if (agg.getOutputExpressions().size() == 1 &&
agg.getGroupByExpressions().isEmpty()
&& !agg.mustUseMultiDistinctAgg()) {
- return false;
+ return ImmutableList.of();
}
}
}
// process must shuffle
visit(agg, context);
// process agg
- return true;
+ return ImmutableList.of(originChildrenProperties);
}
@Override
- public Boolean visitPhysicalPartitionTopN(PhysicalPartitionTopN<? extends
Plan> partitionTopN, Void context) {
+ public List<List<PhysicalProperties>> visitPhysicalPartitionTopN(
+ PhysicalPartitionTopN<? extends Plan> partitionTopN, Void context)
{
if (partitionTopN.getPhase().isOnePhaseGlobal() &&
children.get(0).getPlan() instanceof PhysicalDistribute) {
// one phase partition topn, if the child is an enforced
distribution, discard this
// and use two phase candidate.
- return false;
+ return ImmutableList.of();
} else if (partitionTopN.getPhase().isTwoPhaseGlobal()
&& !(children.get(0).getPlan() instanceof PhysicalDistribute))
{
// two phase partition topn, if global's child is not
distribution, which means
// the local distribution has met final requirement, discard this
candidate.
- return false;
+ return ImmutableList.of();
} else {
visit(partitionTopN, context);
- return true;
+ return ImmutableList.of(originChildrenProperties);
}
}
@Override
- public Boolean visitPhysicalFilter(PhysicalFilter<? extends Plan> filter,
Void context) {
+ public List<List<PhysicalProperties>> visitPhysicalFilter(PhysicalFilter<?
extends Plan> filter, Void context) {
// do not process must shuffle
if (children.get(0).getPlan() instanceof PhysicalDistribute) {
- return false;
+ return ImmutableList.of();
}
- return true;
+ return ImmutableList.of(originChildrenProperties);
}
private boolean isBucketShuffleDownGrade(Plan oneSidePlan,
DistributionSpecHash otherSideSpec) {
@@ -268,20 +270,20 @@ public class ChildrenPropertiesRegulator extends
PlanVisitor<Boolean, Void> {
}
@Override
- public Boolean visitPhysicalHashJoin(
+ public List<List<PhysicalProperties>> visitPhysicalHashJoin(
PhysicalHashJoin<? extends Plan, ? extends Plan> hashJoin, Void
context) {
Preconditions.checkArgument(children.size() == 2, "children.size() !=
2");
- Preconditions.checkArgument(childrenProperties.size() == 2);
+ Preconditions.checkArgument(originChildrenProperties.size() == 2);
Preconditions.checkArgument(requiredProperties.size() == 2);
// process must shuffle
visit(hashJoin, context);
// process hash join
- DistributionSpec leftDistributionSpec =
childrenProperties.get(0).getDistributionSpec();
- DistributionSpec rightDistributionSpec =
childrenProperties.get(1).getDistributionSpec();
+ DistributionSpec leftDistributionSpec =
originChildrenProperties.get(0).getDistributionSpec();
+ DistributionSpec rightDistributionSpec =
originChildrenProperties.get(1).getDistributionSpec();
// broadcast do not need regular
if (rightDistributionSpec instanceof DistributionSpecReplicated) {
- return true;
+ return ImmutableList.of(originChildrenProperties);
}
// shuffle
@@ -301,7 +303,7 @@ public class ChildrenPropertiesRegulator extends
PlanVisitor<Boolean, Void> {
if (JoinUtils.couldColocateJoin(leftHashSpec, rightHashSpec,
hashJoin.getHashJoinConjuncts())) {
// check colocate join with scan
- return true;
+ return ImmutableList.of(originChildrenProperties);
} else if (couldNotRightBucketShuffleJoin(hashJoin.getJoinType(),
leftHashSpec, rightHashSpec)) {
// right anti, right outer, full outer join could not do bucket
shuffle join
// TODO remove this after we refactor coordinator
@@ -339,6 +341,24 @@ public class ChildrenPropertiesRegulator extends
PlanVisitor<Boolean, Void> {
(DistributionSpecHash)
requiredProperties.get(1).getDistributionSpec()));
} else if (leftHashSpec.getShuffleType() == ShuffleType.NATURAL
&& rightHashSpec.getShuffleType() ==
ShuffleType.EXECUTION_BUCKETED) {
+ if (SessionVariable.canUseNereidsDistributePlanner()) {
+ List<PhysicalProperties> shuffleToLeft =
Lists.newArrayList(originChildrenProperties);
+ PhysicalProperties enforceShuffleRight =
calAnotherSideRequired(
+ ShuffleType.STORAGE_BUCKETED, leftHashSpec,
rightHashSpec,
+ (DistributionSpecHash)
requiredProperties.get(0).getDistributionSpec(),
+ (DistributionSpecHash)
requiredProperties.get(1).getDistributionSpec());
+ updateChildEnforceAndCost(1, enforceShuffleRight,
shuffleToLeft);
+
+ List<PhysicalProperties> shuffleToRight =
Lists.newArrayList(originChildrenProperties);
+ PhysicalProperties enforceShuffleLeft = calAnotherSideRequired(
+ ShuffleType.EXECUTION_BUCKETED, rightHashSpec,
leftHashSpec,
+ (DistributionSpecHash)
requiredProperties.get(1).getDistributionSpec(),
+ (DistributionSpecHash)
requiredProperties.get(0).getDistributionSpec()
+ );
+ updateChildEnforceAndCost(0, enforceShuffleLeft,
shuffleToRight);
+ return ImmutableList.of(shuffleToLeft, shuffleToRight);
+ }
+
// must add enforce because shuffle algorithm is not same between
NATURAL and BUCKETED
updatedForRight = Optional.of(calAnotherSideRequired(
ShuffleType.STORAGE_BUCKETED, leftHashSpec, rightHashSpec,
@@ -349,7 +369,7 @@ public class ChildrenPropertiesRegulator extends
PlanVisitor<Boolean, Void> {
if (bothSideShuffleKeysAreSameOrder(leftHashSpec, rightHashSpec,
(DistributionSpecHash)
requiredProperties.get(0).getDistributionSpec(),
(DistributionSpecHash)
requiredProperties.get(1).getDistributionSpec())) {
- return true;
+ return ImmutableList.of(originChildrenProperties);
}
updatedForRight = Optional.of(calAnotherSideRequired(
ShuffleType.STORAGE_BUCKETED, leftHashSpec, rightHashSpec,
@@ -362,10 +382,25 @@ public class ChildrenPropertiesRegulator extends
PlanVisitor<Boolean, Void> {
// TODO: maybe we should check if left child is
PhysicalDistribute.
// If so add storage bucketed shuffle on left side. Other
wise,
// add execution bucketed shuffle on right side.
- updatedForLeft = Optional.of(calAnotherSideRequired(
+ // updatedForLeft = Optional.of(calAnotherSideRequired(
+ // ShuffleType.STORAGE_BUCKETED, rightHashSpec,
leftHashSpec,
+ // (DistributionSpecHash)
requiredProperties.get(1).getDistributionSpec(),
+ // (DistributionSpecHash)
requiredProperties.get(0).getDistributionSpec()));
+ List<PhysicalProperties> shuffleToLeft =
Lists.newArrayList(originChildrenProperties);
+ PhysicalProperties enforceShuffleRight =
calAnotherSideRequired(
+ ShuffleType.EXECUTION_BUCKETED, leftHashSpec,
rightHashSpec,
+ (DistributionSpecHash)
requiredProperties.get(0).getDistributionSpec(),
+ (DistributionSpecHash)
requiredProperties.get(1).getDistributionSpec());
+ updateChildEnforceAndCost(1, enforceShuffleRight,
shuffleToLeft);
+
+ List<PhysicalProperties> shuffleToRight =
Lists.newArrayList(originChildrenProperties);
+ PhysicalProperties enforceShuffleLeft = calAnotherSideRequired(
ShuffleType.STORAGE_BUCKETED, rightHashSpec,
leftHashSpec,
(DistributionSpecHash)
requiredProperties.get(1).getDistributionSpec(),
- (DistributionSpecHash)
requiredProperties.get(0).getDistributionSpec()));
+ (DistributionSpecHash)
requiredProperties.get(0).getDistributionSpec()
+ );
+ updateChildEnforceAndCost(0, enforceShuffleLeft,
shuffleToRight);
+ return ImmutableList.of(shuffleToLeft, shuffleToRight);
} else {
// legacy coordinator could not do right be selection in this
case,
// since it always to check the left most node whether olap
scan node.
@@ -380,7 +415,7 @@ public class ChildrenPropertiesRegulator extends
PlanVisitor<Boolean, Void> {
if (bothSideShuffleKeysAreSameOrder(rightHashSpec, leftHashSpec,
(DistributionSpecHash)
requiredProperties.get(1).getDistributionSpec(),
(DistributionSpecHash)
requiredProperties.get(0).getDistributionSpec())) {
- return true;
+ return ImmutableList.of(originChildrenProperties);
}
updatedForRight = Optional.of(calAnotherSideRequired(
ShuffleType.EXECUTION_BUCKETED, leftHashSpec,
rightHashSpec,
@@ -427,7 +462,7 @@ public class ChildrenPropertiesRegulator extends
PlanVisitor<Boolean, Void> {
if (bothSideShuffleKeysAreSameOrder(rightHashSpec, leftHashSpec,
(DistributionSpecHash)
requiredProperties.get(1).getDistributionSpec(),
(DistributionSpecHash)
requiredProperties.get(0).getDistributionSpec())) {
- return true;
+ return ImmutableList.of(originChildrenProperties);
}
if (children.get(0).getPlan() instanceof PhysicalDistribute) {
updatedForLeft = Optional.of(calAnotherSideRequired(
@@ -445,58 +480,59 @@ public class ChildrenPropertiesRegulator extends
PlanVisitor<Boolean, Void> {
updatedForLeft.ifPresent(physicalProperties ->
updateChildEnforceAndCost(0, physicalProperties));
updatedForRight.ifPresent(physicalProperties ->
updateChildEnforceAndCost(1, physicalProperties));
- return true;
+ return ImmutableList.of(originChildrenProperties);
}
@Override
- public Boolean visitPhysicalNestedLoopJoin(PhysicalNestedLoopJoin<?
extends Plan, ? extends Plan> nestedLoopJoin,
- Void context) {
+ public List<List<PhysicalProperties>> visitPhysicalNestedLoopJoin(
+ PhysicalNestedLoopJoin<? extends Plan, ? extends Plan>
nestedLoopJoin, Void context) {
Preconditions.checkArgument(children.size() == 2,
String.format("children.size() is %d", children.size()));
- Preconditions.checkArgument(childrenProperties.size() == 2);
+ Preconditions.checkArgument(originChildrenProperties.size() == 2);
Preconditions.checkArgument(requiredProperties.size() == 2);
// process must shuffle
visit(nestedLoopJoin, context);
// process nlj
- DistributionSpec rightDistributionSpec =
childrenProperties.get(1).getDistributionSpec();
+ DistributionSpec rightDistributionSpec =
originChildrenProperties.get(1).getDistributionSpec();
if (rightDistributionSpec instanceof DistributionSpecStorageGather) {
updateChildEnforceAndCost(1, PhysicalProperties.GATHER);
}
- return true;
+ return ImmutableList.of(originChildrenProperties);
}
@Override
- public Boolean visitPhysicalProject(PhysicalProject<? extends Plan>
project, Void context) {
+ public List<List<PhysicalProperties>>
visitPhysicalProject(PhysicalProject<? extends Plan> project, Void context) {
// do not process must shuffle
if (children.get(0).getPlan() instanceof PhysicalDistribute) {
- return false;
+ return ImmutableList.of();
}
- return true;
+ return ImmutableList.of(originChildrenProperties);
}
@Override
- public Boolean visitPhysicalSetOperation(PhysicalSetOperation
setOperation, Void context) {
+ public List<List<PhysicalProperties>>
visitPhysicalSetOperation(PhysicalSetOperation setOperation, Void context) {
// process must shuffle
visit(setOperation, context);
// union with only constant exprs list
if (children.isEmpty()) {
- return true;
+ return ImmutableList.of(originChildrenProperties);
}
// process set operation
PhysicalProperties requiredProperty = requiredProperties.get(0);
DistributionSpec requiredDistributionSpec =
requiredProperty.getDistributionSpec();
if (requiredDistributionSpec instanceof DistributionSpecGather) {
- for (int i = 0; i < childrenProperties.size(); i++) {
- if (childrenProperties.get(i).getDistributionSpec() instanceof
DistributionSpecStorageGather) {
+ for (int i = 0; i < originChildrenProperties.size(); i++) {
+ if (originChildrenProperties.get(i).getDistributionSpec()
instanceof DistributionSpecStorageGather) {
updateChildEnforceAndCost(i, PhysicalProperties.GATHER);
}
}
} else if (requiredDistributionSpec instanceof DistributionSpecAny) {
- for (int i = 0; i < childrenProperties.size(); i++) {
- if (childrenProperties.get(i).getDistributionSpec() instanceof
DistributionSpecStorageAny
- || childrenProperties.get(i).getDistributionSpec()
instanceof DistributionSpecStorageGather
- || childrenProperties.get(i).getDistributionSpec()
instanceof DistributionSpecGather
- || (childrenProperties.get(i).getDistributionSpec()
instanceof DistributionSpecHash
- && ((DistributionSpecHash)
childrenProperties.get(i).getDistributionSpec())
+ for (int i = 0; i < originChildrenProperties.size(); i++) {
+ PhysicalProperties physicalProperties =
originChildrenProperties.get(i);
+ if (physicalProperties.getDistributionSpec() instanceof
DistributionSpecStorageAny
+ || physicalProperties.getDistributionSpec() instanceof
DistributionSpecStorageGather
+ || physicalProperties.getDistributionSpec() instanceof
DistributionSpecGather
+ || (physicalProperties.getDistributionSpec()
instanceof DistributionSpecHash
+ && ((DistributionSpecHash)
physicalProperties.getDistributionSpec())
.getShuffleType() == ShuffleType.NATURAL)) {
updateChildEnforceAndCost(i,
PhysicalProperties.EXECUTION_ANY);
}
@@ -504,8 +540,9 @@ public class ChildrenPropertiesRegulator extends
PlanVisitor<Boolean, Void> {
} else if (requiredDistributionSpec instanceof DistributionSpecHash) {
// TODO: should use the most common hash spec as basic
DistributionSpecHash basic = (DistributionSpecHash)
requiredDistributionSpec;
- for (int i = 0; i < childrenProperties.size(); i++) {
- DistributionSpecHash current = (DistributionSpecHash)
childrenProperties.get(i).getDistributionSpec();
+ for (int i = 0; i < originChildrenProperties.size(); i++) {
+ DistributionSpecHash current
+ = (DistributionSpecHash)
originChildrenProperties.get(i).getDistributionSpec();
if (current.getShuffleType() != ShuffleType.EXECUTION_BUCKETED
|| !bothSideShuffleKeysAreSameOrder(basic, current,
(DistributionSpecHash)
requiredProperties.get(0).getDistributionSpec(),
@@ -518,41 +555,42 @@ public class ChildrenPropertiesRegulator extends
PlanVisitor<Boolean, Void> {
}
}
}
- return true;
+ return ImmutableList.of(originChildrenProperties);
}
@Override
- public Boolean visitAbstractPhysicalSort(AbstractPhysicalSort<? extends
Plan> sort, Void context) {
+ public List<List<PhysicalProperties>> visitAbstractPhysicalSort(
+ AbstractPhysicalSort<? extends Plan> sort, Void context) {
// process must shuffle
visit(sort, context);
if (sort.getSortPhase() == SortPhase.GATHER_SORT && sort.child()
instanceof PhysicalDistribute) {
// forbid gather sort need explicit shuffle
- return false;
+ return ImmutableList.of();
}
- return true;
+ return ImmutableList.of(originChildrenProperties);
}
@Override
- public Boolean visitPhysicalTopN(PhysicalTopN<? extends Plan> topN, Void
context) {
+ public List<List<PhysicalProperties>> visitPhysicalTopN(PhysicalTopN<?
extends Plan> topN, Void context) {
// process must shuffle
visit(topN, context);
int sortPhaseNum =
jobContext.getCascadesContext().getConnectContext().getSessionVariable().sortPhaseNum;
// if control sort phase, forbid nothing
if (sortPhaseNum == 1 || sortPhaseNum == 2) {
- return true;
+ return ImmutableList.of(originChildrenProperties);
}
// If child is DistributionSpecGather, topN should forbid two-phase
topN
if (topN.getSortPhase() == SortPhase.LOCAL_SORT
- &&
childrenProperties.get(0).getDistributionSpec().equals(DistributionSpecGather.INSTANCE))
{
- return false;
+ &&
originChildrenProperties.get(0).getDistributionSpec().equals(DistributionSpecGather.INSTANCE))
{
+ return ImmutableList.of();
}
// forbid one step topn with distribute as child
if (topN.getSortPhase() == SortPhase.GATHER_SORT
&& children.get(0).getPlan() instanceof PhysicalDistribute) {
- return false;
+ return ImmutableList.of();
}
- return true;
+ return ImmutableList.of(originChildrenProperties);
}
/**
@@ -645,8 +683,14 @@ public class ChildrenPropertiesRegulator extends
PlanVisitor<Boolean, Void> {
}
private void updateChildEnforceAndCost(int index, PhysicalProperties
targetProperties) {
+ updateChildEnforceAndCost(index, targetProperties,
originChildrenProperties);
+ }
+
+ private void updateChildEnforceAndCost(
+ int index, PhysicalProperties targetProperties,
List<PhysicalProperties> childrenProperties) {
GroupExpression child = children.get(index);
- Pair<Cost, List<PhysicalProperties>> lowest =
child.getLowestCostTable().get(childrenProperties.get(index));
+ Pair<Cost, List<PhysicalProperties>> lowest
+ =
child.getLowestCostTable().get(childrenProperties.get(index));
PhysicalProperties output =
child.getOutputProperties(childrenProperties.get(index));
DistributionSpec target = targetProperties.getDistributionSpec();
updateChildEnforceAndCost(child, output, target, lowest.first);
@@ -668,10 +712,10 @@ public class ChildrenPropertiesRegulator extends
PlanVisitor<Boolean, Void> {
GroupExpression enforcer = target.addEnforcer(child.getOwnerGroup());
child.getOwnerGroup().addEnforcer(enforcer);
ConnectContext connectContext =
jobContext.getCascadesContext().getConnectContext();
- Cost totalCost = CostCalculator.addChildCost(connectContext,
enforcer.getPlan(),
- CostCalculator.calculateCost(connectContext, enforcer,
Lists.newArrayList(childOutput)),
- currentCost,
- 0);
+ Cost enforceCost = CostCalculator.calculateCost(connectContext,
enforcer, Lists.newArrayList(childOutput));
+ enforcer.setCost(enforceCost);
+ Cost totalCost = CostCalculator.addChildCost(
+ connectContext, enforcer.getPlan(), enforceCost, currentCost,
0);
if (enforcer.updateLowestCostTable(newOutputProperty,
Lists.newArrayList(childOutput), totalCost)) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/AbstractUnassignedScanJob.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/AbstractUnassignedScanJob.java
index ef3236690f1..d2fbb9905e1 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/AbstractUnassignedScanJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/AbstractUnassignedScanJob.java
@@ -74,8 +74,7 @@ public abstract class AbstractUnassignedScanJob extends
AbstractUnassignedJob {
DistributeContext distributeContext) {
ConnectContext context = statementContext.getConnectContext();
- boolean useLocalShuffleToAddParallel =
fragment.useSerialSource(ConnectContext.get());
- int instanceIndexInFragment = 0;
+ boolean useLocalShuffleToAddParallel = useLocalShuffleToAddParallel();
List<AssignedJob> instances = Lists.newArrayList();
for (Entry<DistributedPlanWorker, UninstancedScanSource> entry :
workerToScanRanges.entrySet()) {
DistributedPlanWorker worker = entry.getKey();
@@ -94,64 +93,73 @@ public abstract class AbstractUnassignedScanJob extends
AbstractUnassignedJob {
// for example: two instances
int instanceNum = degreeOfParallelism(scanSourceMaxParallel);
- List<ScanSource> instanceToScanRanges;
if (useLocalShuffleToAddParallel) {
- // only generate one instance to scan all data, in this step
- instanceToScanRanges = scanSource.parallelize(
- scanNodes, 1
- );
-
- // when data not big, but aggregation too slow, we will use 1
instance to scan data,
- // and use more instances (to ***add parallel***) to process
aggregate.
- // We call it `ignore data distribution` of `share scan`.
Backend will know this instances
- // share the same ScanSource, and will not scan same data
multiple times.
- //
- // +-------------------------------- same fragment in one host
-------------------------------------+
- // | instance1 instance2 instance3
instance4 |
- // | \ \ /
/ |
- // |
|
- // | OlapScanNode
|
- // |(share scan node, instance1 will scan all data and local
shuffle to other local instances |
- // | to parallel compute this data)
|
- //
+------------------------------------------------------------------------------------------------+
- ScanSource shareScanSource = instanceToScanRanges.get(0);
-
- // one scan range generate multiple instances,
- // different instances reference the same scan source
- int shareScanId = shareScanIdGenerator.getAndIncrement();
- ScanSource emptyShareScanSource = shareScanSource.newEmpty();
- for (int i = 0; i < instanceNum; i++) {
- LocalShuffleAssignedJob instance = new
LocalShuffleAssignedJob(
- instanceIndexInFragment++, shareScanId, i > 0,
- context.nextInstanceId(), this, worker,
- i == 0 ? shareScanSource : emptyShareScanSource
- );
- instances.add(instance);
- }
+ assignLocalShuffleJobs(scanSource, instanceNum, instances,
context, worker);
} else {
- // split the scanRanges to some partitions, one partition for
one instance
- // for example:
- // [
- // scan tbl1: [tablet_10001, tablet_10003], // instance 1
- // scan tbl1: [tablet_10002, tablet_10004] // instance 2
- // ]
- instanceToScanRanges = scanSource.parallelize(
- scanNodes, instanceNum
- );
-
- for (ScanSource instanceToScanRange : instanceToScanRanges) {
- instances.add(
- assignWorkerAndDataSources(
- instanceIndexInFragment++,
context.nextInstanceId(), worker, instanceToScanRange
- )
- );
- }
+ assignedDefaultJobs(scanSource, instanceNum, instances,
context, worker);
}
}
return instances;
}
+ protected boolean useLocalShuffleToAddParallel() {
+ return fragment.useSerialSource(ConnectContext.get());
+ }
+
+ protected void assignedDefaultJobs(ScanSource scanSource, int instanceNum,
List<AssignedJob> instances,
+ ConnectContext context, DistributedPlanWorker worker) {
+ // split the scanRanges to some partitions, one partition for one
instance
+ // for example:
+ // [
+ // scan tbl1: [tablet_10001, tablet_10003], // instance 1
+ // scan tbl1: [tablet_10002, tablet_10004] // instance 2
+ // ]
+ List<ScanSource> instanceToScanRanges =
scanSource.parallelize(scanNodes, instanceNum);
+
+ for (ScanSource instanceToScanRange : instanceToScanRanges) {
+ instances.add(
+ assignWorkerAndDataSources(
+ instances.size(), context.nextInstanceId(), worker,
instanceToScanRange
+ )
+ );
+ }
+ }
+
+ protected void assignLocalShuffleJobs(ScanSource scanSource, int
instanceNum, List<AssignedJob> instances,
+ ConnectContext context, DistributedPlanWorker worker) {
+ // only generate one instance to scan all data, in this step
+ List<ScanSource> instanceToScanRanges =
scanSource.parallelize(scanNodes, 1);
+
+ // when data not big, but aggregation too slow, we will use 1 instance
to scan data,
+ // and use more instances (to ***add parallel***) to process aggregate.
+ // We call it `ignore data distribution` of `share scan`. Backend will
know this instances
+ // share the same ScanSource, and will not scan same data multiple
times.
+ //
+ // +-------------------------------- same fragment in one host
-------------------------------------+
+ // | instance1 instance2 instance3
instance4 |
+ // | \ \ / /
|
+ // |
|
+ // | OlapScanNode
|
+ // |(share scan node, instance1 will scan all data and local shuffle
to other local instances |
+ // | to parallel compute this data)
|
+ //
+------------------------------------------------------------------------------------------------+
+ ScanSource shareScanSource = instanceToScanRanges.get(0);
+
+ // one scan range generate multiple instances,
+ // different instances reference the same scan source
+ int shareScanId = shareScanIdGenerator.getAndIncrement();
+ ScanSource emptyShareScanSource = shareScanSource.newEmpty();
+ for (int i = 0; i < instanceNum; i++) {
+ LocalShuffleAssignedJob instance = new LocalShuffleAssignedJob(
+ instances.size(), shareScanId, i > 0,
+ context.nextInstanceId(), this, worker,
+ i == 0 ? shareScanSource : emptyShareScanSource
+ );
+ instances.add(instance);
+ }
+ }
+
protected int degreeOfParallelism(int maxParallel) {
Preconditions.checkArgument(maxParallel > 0, "maxParallel must be
positive");
if (!fragment.getDataPartition().isPartitioned()) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/LocalShuffleBucketJoinAssignedJob.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/LocalShuffleBucketJoinAssignedJob.java
new file mode 100644
index 00000000000..443acb50d78
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/LocalShuffleBucketJoinAssignedJob.java
@@ -0,0 +1,56 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans.distribute.worker.job;
+
+import
org.apache.doris.nereids.trees.plans.distribute.worker.DistributedPlanWorker;
+import org.apache.doris.nereids.util.Utils;
+import org.apache.doris.thrift.TUniqueId;
+
+import com.google.common.collect.ImmutableSet;
+
+import java.util.Set;
+
+/** LocalShuffleBucketJoinAssignedJob */
+public class LocalShuffleBucketJoinAssignedJob extends LocalShuffleAssignedJob
{
+ private volatile Set<Integer> assignedJoinBucketIndexes;
+
+ public LocalShuffleBucketJoinAssignedJob(
+ int indexInUnassignedJob, int shareScanId, boolean
receiveDataFromLocal,
+ TUniqueId instanceId, UnassignedJob unassignedJob,
+ DistributedPlanWorker worker, ScanSource scanSource,
+ Set<Integer> assignedJoinBucketIndexes) {
+ super(indexInUnassignedJob, shareScanId, receiveDataFromLocal,
instanceId, unassignedJob, worker, scanSource);
+ this.assignedJoinBucketIndexes =
Utils.fastToImmutableSet(assignedJoinBucketIndexes);
+ }
+
+ public Set<Integer> getAssignedJoinBucketIndexes() {
+ return assignedJoinBucketIndexes;
+ }
+
+ public void addAssignedJoinBucketIndexes(Set<Integer> joinBucketIndexes) {
+ this.assignedJoinBucketIndexes = ImmutableSet.<Integer>builder()
+ .addAll(assignedJoinBucketIndexes)
+ .addAll(joinBucketIndexes)
+ .build();
+ }
+
+ @Override
+ protected String formatOtherString() {
+ return ",\n assigned join buckets: " + assignedJoinBucketIndexes;
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/StaticAssignedJob.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/StaticAssignedJob.java
index 75849ad8146..77494d621ee 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/StaticAssignedJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/StaticAssignedJob.java
@@ -89,6 +89,7 @@ public class StaticAssignedJob implements AssignedJob {
}
return str
+ .append(formatOtherString())
.append(",\n scanSource: " + formatScanSourceString())
.append("\n)")
.toString();
@@ -108,6 +109,10 @@ public class StaticAssignedJob implements AssignedJob {
return scanSourceString.toString();
}
+ protected String formatOtherString() {
+ return "";
+ }
+
@Override
public int hashCode() {
return indexInUnassignedJob;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanBucketOlapTableJob.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanBucketOlapTableJob.java
index 70a91ca2b30..f90fe7ea6e2 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanBucketOlapTableJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanBucketOlapTableJob.java
@@ -27,6 +27,7 @@ import
org.apache.doris.nereids.trees.plans.distribute.DistributeContext;
import
org.apache.doris.nereids.trees.plans.distribute.worker.DistributedPlanWorker;
import
org.apache.doris.nereids.trees.plans.distribute.worker.DistributedPlanWorkerManager;
import
org.apache.doris.nereids.trees.plans.distribute.worker.ScanWorkerSelector;
+import org.apache.doris.nereids.util.Utils;
import org.apache.doris.planner.ExchangeNode;
import org.apache.doris.planner.HashJoinNode;
import org.apache.doris.planner.OlapScanNode;
@@ -164,6 +165,34 @@ public class UnassignedScanBucketOlapTableJob extends
AbstractUnassignedScanJob
return assignedJobs;
}
+ @Override
+ protected void assignLocalShuffleJobs(ScanSource scanSource, int
instanceNum, List<AssignedJob> instances,
+ ConnectContext context, DistributedPlanWorker worker) {
+ // only generate one instance to scan all data, in this step
+ List<ScanSource> assignJoinBuckets = scanSource.parallelize(
+ scanNodes, instanceNum
+ );
+
+ // one scan range generate multiple instances,
+ // different instances reference the same scan source
+ int shareScanId = shareScanIdGenerator.getAndIncrement();
+
+ BucketScanSource shareScanSource = (BucketScanSource) scanSource;
+ ScanSource emptyShareScanSource = shareScanSource.newEmpty();
+
+ for (int i = 0; i < assignJoinBuckets.size(); i++) {
+ Set<Integer> assignedJoinBuckets
+ = ((BucketScanSource)
assignJoinBuckets.get(i)).bucketIndexToScanNodeToTablets.keySet();
+ LocalShuffleBucketJoinAssignedJob instance = new
LocalShuffleBucketJoinAssignedJob(
+ instances.size(), shareScanId, i > 0,
+ context.nextInstanceId(), this, worker,
+ i == 0 ? shareScanSource : emptyShareScanSource,
+ Utils.fastToImmutableSet(assignedJoinBuckets)
+ );
+ instances.add(instance);
+ }
+ }
+
private boolean shouldFillUpInstances(List<HashJoinNode> hashJoinNodes) {
for (HashJoinNode hashJoinNode : hashJoinNodes) {
if (!hashJoinNode.isBucketShuffle()) {
@@ -198,6 +227,7 @@ public class UnassignedScanBucketOlapTableJob extends
AbstractUnassignedScanJob
List<AssignedJob> newInstances = new ArrayList<>(instances);
for (Entry<DistributedPlanWorker, Collection<Integer>> workerToBuckets
: missingBuckets.asMap().entrySet()) {
Map<Integer, Map<ScanNode, ScanRanges>> scanEmptyBuckets =
Maps.newLinkedHashMap();
+ Set<Integer> assignedJoinBuckets =
Utils.fastToImmutableSet(workerToBuckets.getValue());
for (Integer bucketIndex : workerToBuckets.getValue()) {
Map<ScanNode, ScanRanges> scanTableWithEmptyData =
Maps.newLinkedHashMap();
for (ScanNode scanNode : scanNodes) {
@@ -218,12 +248,16 @@ public class UnassignedScanBucketOlapTableJob extends
AbstractUnassignedScanJob
BucketScanSource bucketScanSource = (BucketScanSource)
newInstance.getScanSource();
bucketScanSource.bucketIndexToScanNodeToTablets.putAll(scanEmptyBuckets);
mergedBucketsInSameWorkerInstance = true;
+
+ LocalShuffleBucketJoinAssignedJob instance =
(LocalShuffleBucketJoinAssignedJob) newInstance;
+
instance.addAssignedJoinBucketIndexes(assignedJoinBuckets);
}
}
if (!mergedBucketsInSameWorkerInstance) {
- fillUpInstance = new LocalShuffleAssignedJob(
+ fillUpInstance = new LocalShuffleBucketJoinAssignedJob(
newInstances.size(),
shareScanIdGenerator.getAndIncrement(),
- false, context.nextInstanceId(), this, worker,
scanSource
+ false, context.nextInstanceId(), this, worker,
scanSource,
+ assignedJoinBuckets
);
}
} else {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/PipelineExecutionTask.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/PipelineExecutionTask.java
index 1a8f2a216cd..8c1b9714c35 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/PipelineExecutionTask.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/PipelineExecutionTask.java
@@ -63,7 +63,6 @@ public class PipelineExecutionTask extends
AbstractRuntimeTask<Long, MultiFragme
private final long timeoutDeadline;
private final CoordinatorContext coordinatorContext;
private final BackendServiceProxy backendServiceProxy;
- private final Map<BackendFragmentId, SingleFragmentPipelineTask>
backendFragmentTasks;
// mutable states
public PipelineExecutionTask(
@@ -88,7 +87,6 @@ public class PipelineExecutionTask extends
AbstractRuntimeTask<Long, MultiFragme
backendFragmentTasks.put(new BackendFragmentId(backendId,
fragmentId), fragmentTask);
}
}
- this.backendFragmentTasks = backendFragmentTasks.build();
}
@Override
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/RuntimeFiltersThriftBuilder.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/RuntimeFiltersThriftBuilder.java
index 42cf08fb2e3..47c01ef8eb3 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/RuntimeFiltersThriftBuilder.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/RuntimeFiltersThriftBuilder.java
@@ -82,11 +82,13 @@ public class RuntimeFiltersThriftBuilder {
target.address, address -> {
TRuntimeFilterTargetParamsV2 params = new
TRuntimeFilterTargetParamsV2();
params.target_fragment_instance_addr = address;
+ params.target_fragment_ids = new ArrayList<>();
+ // required field
params.target_fragment_instance_ids = new
ArrayList<>();
return params;
});
-
targetParams.target_fragment_instance_ids.add(target.instanceId);
+ targetParams.target_fragment_ids.add(target.fragmentId);
}
runtimeFilterParams.putToRidToTargetParamv2(
@@ -95,7 +97,8 @@ public class RuntimeFiltersThriftBuilder {
} else {
List<TRuntimeFilterTargetParams> targetParams =
Lists.newArrayList();
for (RuntimeFilterTarget target : targets) {
- targetParams.add(new
TRuntimeFilterTargetParams(target.instanceId, target.address));
+ // Instance id make no sense if this runtime filter
doesn't have remote targets.
+ targetParams.add(new TRuntimeFilterTargetParams(new
TUniqueId(), target.address));
}
runtimeFilterParams.putToRidToTargetParam(rf.getFilterId().asInt(),
targetParams);
}
@@ -135,7 +138,7 @@ public class RuntimeFiltersThriftBuilder {
BackendWorker backendWorker = (BackendWorker)
instanceJob.getAssignedWorker();
Backend backend = backendWorker.getBackend();
targetFragments.add(new RuntimeFilterTarget(
- instanceJob.instanceId(),
+ fragment.getFragmentId().asInt(),
new TNetworkAddress(backend.getHost(),
backend.getBrpcPort())
));
}
@@ -158,11 +161,11 @@ public class RuntimeFiltersThriftBuilder {
}
public static class RuntimeFilterTarget {
- public final TUniqueId instanceId;
+ public final int fragmentId;
public final TNetworkAddress address;
- public RuntimeFilterTarget(TUniqueId instanceId, TNetworkAddress
address) {
- this.instanceId = instanceId;
+ public RuntimeFilterTarget(int fragmentId, TNetworkAddress address) {
+ this.fragmentId = fragmentId;
this.address = address;
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java
index c04861cbf43..f0e3febe192 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java
@@ -26,6 +26,7 @@ import
org.apache.doris.nereids.trees.plans.distribute.worker.job.AssignedJob;
import
org.apache.doris.nereids.trees.plans.distribute.worker.job.BucketScanSource;
import
org.apache.doris.nereids.trees.plans.distribute.worker.job.DefaultScanSource;
import
org.apache.doris.nereids.trees.plans.distribute.worker.job.LocalShuffleAssignedJob;
+import
org.apache.doris.nereids.trees.plans.distribute.worker.job.LocalShuffleBucketJoinAssignedJob;
import org.apache.doris.nereids.trees.plans.distribute.worker.job.ScanRanges;
import org.apache.doris.nereids.trees.plans.distribute.worker.job.ScanSource;
import
org.apache.doris.nereids.trees.plans.distribute.worker.job.UnassignedScanBucketOlapTableJob;
@@ -498,14 +499,18 @@ public class ThriftPlansBuilder {
if (instanceJob.getAssignedWorker().id() != worker.id()) {
continue;
}
- if (instanceJob instanceof LocalShuffleAssignedJob
- && ((LocalShuffleAssignedJob)
instanceJob).receiveDataFromLocal) {
- continue;
- }
+
Integer instanceIndex = instanceToIndex.get(instanceJob);
- BucketScanSource bucketScanSource = (BucketScanSource)
instanceJob.getScanSource();
- for (Integer bucketIndex :
bucketScanSource.bucketIndexToScanNodeToTablets.keySet()) {
- bucketIdToInstanceId.put(bucketIndex, instanceIndex);
+ if (instanceJob instanceof LocalShuffleBucketJoinAssignedJob) {
+ LocalShuffleBucketJoinAssignedJob assignedJob =
(LocalShuffleBucketJoinAssignedJob) instanceJob;
+ for (Integer bucketIndex :
assignedJob.getAssignedJoinBucketIndexes()) {
+ bucketIdToInstanceId.put(bucketIndex, instanceIndex);
+ }
+ } else {
+ BucketScanSource bucketScanSource = (BucketScanSource)
instanceJob.getScanSource();
+ for (Integer bucketIndex :
bucketScanSource.bucketIndexToScanNodeToTablets.keySet()) {
+ bucketIdToInstanceId.put(bucketIndex, instanceIndex);
+ }
}
}
return bucketIdToInstanceId;
diff --git
a/regression-test/data/nereids_syntax_p0/distribute/shuffle_left_join.out
b/regression-test/data/nereids_syntax_p0/distribute/shuffle_left_join.out
index 99d095d87f7..0b5a63b0295 100644
--- a/regression-test/data/nereids_syntax_p0/distribute/shuffle_left_join.out
+++ b/regression-test/data/nereids_syntax_p0/distribute/shuffle_left_join.out
@@ -1,9 +1,9 @@
-- This file is automatically generated. You should know what you did if you
want to edit this
-- !shuffle_left_and_right --
-1 1 1 1
-2 2 2 2
+1 1 1
+2 2 2
-- !shuffle_left --
-1 1 1 1
-2 2 2 2
+1 1 1
+2 2 2
diff --git
a/regression-test/data/new_shapes_p0/tpch_sf1000/nostats_rf_prune/q15.out
b/regression-test/data/new_shapes_p0/tpch_sf1000/nostats_rf_prune/q15.out
index 519a64b38aa..9e6b383230a 100644
--- a/regression-test/data/new_shapes_p0/tpch_sf1000/nostats_rf_prune/q15.out
+++ b/regression-test/data/new_shapes_p0/tpch_sf1000/nostats_rf_prune/q15.out
@@ -7,7 +7,7 @@ PhysicalResultSink
--------PhysicalProject
----------hashJoin[INNER_JOIN broadcast]
hashCondition=((revenue0.total_revenue = max(total_revenue))) otherCondition=()
------------PhysicalProject
---------------hashJoin[INNER_JOIN broadcast]
hashCondition=((supplier.s_suppkey = revenue0.supplier_no)) otherCondition=()
+--------------hashJoin[INNER_JOIN bucketShuffle]
hashCondition=((supplier.s_suppkey = revenue0.supplier_no)) otherCondition=()
----------------PhysicalProject
------------------hashAgg[GLOBAL]
--------------------PhysicalDistribute[DistributionSpecHash]
diff --git
a/regression-test/data/new_shapes_p0/tpch_sf1000/nostats_rf_prune/q20-rewrite.out
b/regression-test/data/new_shapes_p0/tpch_sf1000/nostats_rf_prune/q20-rewrite.out
index f1ec59e40d8..89548468b7c 100644
---
a/regression-test/data/new_shapes_p0/tpch_sf1000/nostats_rf_prune/q20-rewrite.out
+++
b/regression-test/data/new_shapes_p0/tpch_sf1000/nostats_rf_prune/q20-rewrite.out
@@ -9,7 +9,7 @@ PhysicalResultSink
------------PhysicalProject
--------------hashJoin[RIGHT_SEMI_JOIN shuffleBucket]
hashCondition=((supplier.s_suppkey = t3.ps_suppkey)) otherCondition=() build
RFs:RF3 s_suppkey->[l_suppkey,ps_suppkey]
----------------PhysicalProject
-------------------hashJoin[INNER_JOIN shuffleBucket]
hashCondition=((t2.l_partkey = t1.ps_partkey) and (t2.l_suppkey =
t1.ps_suppkey)) otherCondition=((cast(ps_availqty as DECIMALV3(38, 3)) >
t2.l_q)) build RFs:RF1 ps_partkey->[l_partkey];RF2 ps_suppkey->[l_suppkey]
+------------------hashJoin[INNER_JOIN bucketShuffle]
hashCondition=((t2.l_partkey = t1.ps_partkey) and (t2.l_suppkey =
t1.ps_suppkey)) otherCondition=((cast(ps_availqty as DECIMALV3(38, 3)) >
t2.l_q)) build RFs:RF1 ps_partkey->[l_partkey];RF2 ps_suppkey->[l_suppkey]
--------------------PhysicalProject
----------------------hashAgg[GLOBAL]
------------------------PhysicalDistribute[DistributionSpecHash]
diff --git
a/regression-test/data/new_shapes_p0/tpch_sf1000/nostats_rf_prune/q20.out
b/regression-test/data/new_shapes_p0/tpch_sf1000/nostats_rf_prune/q20.out
index bcf875640f9..7678db3199a 100644
--- a/regression-test/data/new_shapes_p0/tpch_sf1000/nostats_rf_prune/q20.out
+++ b/regression-test/data/new_shapes_p0/tpch_sf1000/nostats_rf_prune/q20.out
@@ -9,7 +9,7 @@ PhysicalResultSink
------------PhysicalProject
--------------hashJoin[RIGHT_SEMI_JOIN shuffleBucket]
hashCondition=((supplier.s_suppkey = partsupp.ps_suppkey)) otherCondition=()
build RFs:RF3 s_suppkey->[l_suppkey,ps_suppkey]
----------------PhysicalProject
-------------------hashJoin[INNER_JOIN shuffleBucket]
hashCondition=((lineitem.l_partkey = partsupp.ps_partkey) and
(lineitem.l_suppkey = partsupp.ps_suppkey)) otherCondition=((cast(ps_availqty
as DECIMALV3(38, 3)) > (0.5 * sum(l_quantity)))) build RFs:RF1
ps_partkey->[l_partkey];RF2 ps_suppkey->[l_suppkey]
+------------------hashJoin[INNER_JOIN bucketShuffle]
hashCondition=((lineitem.l_partkey = partsupp.ps_partkey) and
(lineitem.l_suppkey = partsupp.ps_suppkey)) otherCondition=((cast(ps_availqty
as DECIMALV3(38, 3)) > (0.5 * sum(l_quantity)))) build RFs:RF1
ps_partkey->[l_partkey];RF2 ps_suppkey->[l_suppkey]
--------------------hashAgg[GLOBAL]
----------------------PhysicalDistribute[DistributionSpecHash]
------------------------hashAgg[LOCAL]
diff --git
a/regression-test/data/new_shapes_p0/tpch_sf1000/rf_prune/q20-rewrite.out
b/regression-test/data/new_shapes_p0/tpch_sf1000/rf_prune/q20-rewrite.out
index 80f86bf96e4..6b7a6da490c 100644
--- a/regression-test/data/new_shapes_p0/tpch_sf1000/rf_prune/q20-rewrite.out
+++ b/regression-test/data/new_shapes_p0/tpch_sf1000/rf_prune/q20-rewrite.out
@@ -7,7 +7,7 @@ PhysicalResultSink
--------PhysicalProject
----------hashJoin[RIGHT_SEMI_JOIN shuffleBucket]
hashCondition=((supplier.s_suppkey = t3.ps_suppkey)) otherCondition=() build
RFs:RF4 s_suppkey->[l_suppkey,ps_suppkey]
------------PhysicalProject
---------------hashJoin[INNER_JOIN shuffleBucket] hashCondition=((t2.l_partkey
= t1.ps_partkey) and (t2.l_suppkey = t1.ps_suppkey))
otherCondition=((cast(ps_availqty as DECIMALV3(38, 3)) > t2.l_q)) build RFs:RF2
ps_partkey->[l_partkey];RF3 ps_suppkey->[l_suppkey]
+--------------hashJoin[INNER_JOIN bucketShuffle] hashCondition=((t2.l_partkey
= t1.ps_partkey) and (t2.l_suppkey = t1.ps_suppkey))
otherCondition=((cast(ps_availqty as DECIMALV3(38, 3)) > t2.l_q)) build RFs:RF2
ps_partkey->[l_partkey];RF3 ps_suppkey->[l_suppkey]
----------------PhysicalProject
------------------hashAgg[GLOBAL]
--------------------PhysicalDistribute[DistributionSpecHash]
diff --git a/regression-test/data/new_shapes_p0/tpch_sf1000/rf_prune/q20.out
b/regression-test/data/new_shapes_p0/tpch_sf1000/rf_prune/q20.out
index aca1634e470..6b3b115fcfe 100644
--- a/regression-test/data/new_shapes_p0/tpch_sf1000/rf_prune/q20.out
+++ b/regression-test/data/new_shapes_p0/tpch_sf1000/rf_prune/q20.out
@@ -7,7 +7,7 @@ PhysicalResultSink
--------PhysicalProject
----------hashJoin[RIGHT_SEMI_JOIN shuffleBucket]
hashCondition=((supplier.s_suppkey = partsupp.ps_suppkey)) otherCondition=()
build RFs:RF4 s_suppkey->[l_suppkey,ps_suppkey]
------------PhysicalProject
---------------hashJoin[INNER_JOIN shuffleBucket]
hashCondition=((lineitem.l_partkey = partsupp.ps_partkey) and
(lineitem.l_suppkey = partsupp.ps_suppkey)) otherCondition=((cast(ps_availqty
as DECIMALV3(38, 3)) > (0.5 * sum(l_quantity)))) build RFs:RF2
ps_partkey->[l_partkey];RF3 ps_suppkey->[l_suppkey]
+--------------hashJoin[INNER_JOIN bucketShuffle]
hashCondition=((lineitem.l_partkey = partsupp.ps_partkey) and
(lineitem.l_suppkey = partsupp.ps_suppkey)) otherCondition=((cast(ps_availqty
as DECIMALV3(38, 3)) > (0.5 * sum(l_quantity)))) build RFs:RF2
ps_partkey->[l_partkey];RF3 ps_suppkey->[l_suppkey]
----------------hashAgg[GLOBAL]
------------------PhysicalDistribute[DistributionSpecHash]
--------------------hashAgg[LOCAL]
diff --git
a/regression-test/data/new_shapes_p0/tpch_sf1000/shape/q20-rewrite.out
b/regression-test/data/new_shapes_p0/tpch_sf1000/shape/q20-rewrite.out
index 80f86bf96e4..6b7a6da490c 100644
--- a/regression-test/data/new_shapes_p0/tpch_sf1000/shape/q20-rewrite.out
+++ b/regression-test/data/new_shapes_p0/tpch_sf1000/shape/q20-rewrite.out
@@ -7,7 +7,7 @@ PhysicalResultSink
--------PhysicalProject
----------hashJoin[RIGHT_SEMI_JOIN shuffleBucket]
hashCondition=((supplier.s_suppkey = t3.ps_suppkey)) otherCondition=() build
RFs:RF4 s_suppkey->[l_suppkey,ps_suppkey]
------------PhysicalProject
---------------hashJoin[INNER_JOIN shuffleBucket] hashCondition=((t2.l_partkey
= t1.ps_partkey) and (t2.l_suppkey = t1.ps_suppkey))
otherCondition=((cast(ps_availqty as DECIMALV3(38, 3)) > t2.l_q)) build RFs:RF2
ps_partkey->[l_partkey];RF3 ps_suppkey->[l_suppkey]
+--------------hashJoin[INNER_JOIN bucketShuffle] hashCondition=((t2.l_partkey
= t1.ps_partkey) and (t2.l_suppkey = t1.ps_suppkey))
otherCondition=((cast(ps_availqty as DECIMALV3(38, 3)) > t2.l_q)) build RFs:RF2
ps_partkey->[l_partkey];RF3 ps_suppkey->[l_suppkey]
----------------PhysicalProject
------------------hashAgg[GLOBAL]
--------------------PhysicalDistribute[DistributionSpecHash]
diff --git a/regression-test/data/new_shapes_p0/tpch_sf1000/shape/q20.out
b/regression-test/data/new_shapes_p0/tpch_sf1000/shape/q20.out
index aca1634e470..6b3b115fcfe 100644
--- a/regression-test/data/new_shapes_p0/tpch_sf1000/shape/q20.out
+++ b/regression-test/data/new_shapes_p0/tpch_sf1000/shape/q20.out
@@ -7,7 +7,7 @@ PhysicalResultSink
--------PhysicalProject
----------hashJoin[RIGHT_SEMI_JOIN shuffleBucket]
hashCondition=((supplier.s_suppkey = partsupp.ps_suppkey)) otherCondition=()
build RFs:RF4 s_suppkey->[l_suppkey,ps_suppkey]
------------PhysicalProject
---------------hashJoin[INNER_JOIN shuffleBucket]
hashCondition=((lineitem.l_partkey = partsupp.ps_partkey) and
(lineitem.l_suppkey = partsupp.ps_suppkey)) otherCondition=((cast(ps_availqty
as DECIMALV3(38, 3)) > (0.5 * sum(l_quantity)))) build RFs:RF2
ps_partkey->[l_partkey];RF3 ps_suppkey->[l_suppkey]
+--------------hashJoin[INNER_JOIN bucketShuffle]
hashCondition=((lineitem.l_partkey = partsupp.ps_partkey) and
(lineitem.l_suppkey = partsupp.ps_suppkey)) otherCondition=((cast(ps_availqty
as DECIMALV3(38, 3)) > (0.5 * sum(l_quantity)))) build RFs:RF2
ps_partkey->[l_partkey];RF3 ps_suppkey->[l_suppkey]
----------------hashAgg[GLOBAL]
------------------PhysicalDistribute[DistributionSpecHash]
--------------------hashAgg[LOCAL]
diff --git
a/regression-test/data/new_shapes_p0/tpch_sf1000/shape_no_stats/q15.out
b/regression-test/data/new_shapes_p0/tpch_sf1000/shape_no_stats/q15.out
index f29b77317c3..e9b45b5888c 100644
--- a/regression-test/data/new_shapes_p0/tpch_sf1000/shape_no_stats/q15.out
+++ b/regression-test/data/new_shapes_p0/tpch_sf1000/shape_no_stats/q15.out
@@ -7,7 +7,7 @@ PhysicalResultSink
--------PhysicalProject
----------hashJoin[INNER_JOIN broadcast]
hashCondition=((revenue0.total_revenue = max(total_revenue))) otherCondition=()
------------PhysicalProject
---------------hashJoin[INNER_JOIN broadcast]
hashCondition=((supplier.s_suppkey = revenue0.supplier_no)) otherCondition=()
build RFs:RF0 s_suppkey->[l_suppkey]
+--------------hashJoin[INNER_JOIN bucketShuffle]
hashCondition=((supplier.s_suppkey = revenue0.supplier_no)) otherCondition=()
build RFs:RF0 s_suppkey->[l_suppkey]
----------------PhysicalProject
------------------hashAgg[GLOBAL]
--------------------PhysicalDistribute[DistributionSpecHash]
diff --git
a/regression-test/data/new_shapes_p0/tpch_sf1000/shape_no_stats/q20-rewrite.out
b/regression-test/data/new_shapes_p0/tpch_sf1000/shape_no_stats/q20-rewrite.out
index f1ec59e40d8..89548468b7c 100644
---
a/regression-test/data/new_shapes_p0/tpch_sf1000/shape_no_stats/q20-rewrite.out
+++
b/regression-test/data/new_shapes_p0/tpch_sf1000/shape_no_stats/q20-rewrite.out
@@ -9,7 +9,7 @@ PhysicalResultSink
------------PhysicalProject
--------------hashJoin[RIGHT_SEMI_JOIN shuffleBucket]
hashCondition=((supplier.s_suppkey = t3.ps_suppkey)) otherCondition=() build
RFs:RF3 s_suppkey->[l_suppkey,ps_suppkey]
----------------PhysicalProject
-------------------hashJoin[INNER_JOIN shuffleBucket]
hashCondition=((t2.l_partkey = t1.ps_partkey) and (t2.l_suppkey =
t1.ps_suppkey)) otherCondition=((cast(ps_availqty as DECIMALV3(38, 3)) >
t2.l_q)) build RFs:RF1 ps_partkey->[l_partkey];RF2 ps_suppkey->[l_suppkey]
+------------------hashJoin[INNER_JOIN bucketShuffle]
hashCondition=((t2.l_partkey = t1.ps_partkey) and (t2.l_suppkey =
t1.ps_suppkey)) otherCondition=((cast(ps_availqty as DECIMALV3(38, 3)) >
t2.l_q)) build RFs:RF1 ps_partkey->[l_partkey];RF2 ps_suppkey->[l_suppkey]
--------------------PhysicalProject
----------------------hashAgg[GLOBAL]
------------------------PhysicalDistribute[DistributionSpecHash]
diff --git
a/regression-test/data/new_shapes_p0/tpch_sf1000/shape_no_stats/q20.out
b/regression-test/data/new_shapes_p0/tpch_sf1000/shape_no_stats/q20.out
index bcf875640f9..7678db3199a 100644
--- a/regression-test/data/new_shapes_p0/tpch_sf1000/shape_no_stats/q20.out
+++ b/regression-test/data/new_shapes_p0/tpch_sf1000/shape_no_stats/q20.out
@@ -9,7 +9,7 @@ PhysicalResultSink
------------PhysicalProject
--------------hashJoin[RIGHT_SEMI_JOIN shuffleBucket]
hashCondition=((supplier.s_suppkey = partsupp.ps_suppkey)) otherCondition=()
build RFs:RF3 s_suppkey->[l_suppkey,ps_suppkey]
----------------PhysicalProject
-------------------hashJoin[INNER_JOIN shuffleBucket]
hashCondition=((lineitem.l_partkey = partsupp.ps_partkey) and
(lineitem.l_suppkey = partsupp.ps_suppkey)) otherCondition=((cast(ps_availqty
as DECIMALV3(38, 3)) > (0.5 * sum(l_quantity)))) build RFs:RF1
ps_partkey->[l_partkey];RF2 ps_suppkey->[l_suppkey]
+------------------hashJoin[INNER_JOIN bucketShuffle]
hashCondition=((lineitem.l_partkey = partsupp.ps_partkey) and
(lineitem.l_suppkey = partsupp.ps_suppkey)) otherCondition=((cast(ps_availqty
as DECIMALV3(38, 3)) > (0.5 * sum(l_quantity)))) build RFs:RF1
ps_partkey->[l_partkey];RF2 ps_suppkey->[l_suppkey]
--------------------hashAgg[GLOBAL]
----------------------PhysicalDistribute[DistributionSpecHash]
------------------------hashAgg[LOCAL]
diff --git
a/regression-test/suites/nereids_syntax_p0/distribute/shuffle_left_join.groovy
b/regression-test/suites/nereids_syntax_p0/distribute/shuffle_left_join.groovy
index 8c56c257b0e..4ee14eba481 100644
---
a/regression-test/suites/nereids_syntax_p0/distribute/shuffle_left_join.groovy
+++
b/regression-test/suites/nereids_syntax_p0/distribute/shuffle_left_join.groovy
@@ -95,8 +95,8 @@ suite("shuffle_left_join") {
.collect(Collectors.joining("\n"))
logger.info("Variables:\n${variableString}")
- extractFragment(sqlStr, "INNER JOIN(BUCKET_SHUFFLE)") { exchangeNum ->
- assertTrue(exchangeNum == 1)
+ extractFragment(sqlStr, "INNER JOIN(PARTITIONED)") { exchangeNum ->
+ assertTrue(exchangeNum == 2)
}
explain {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]