This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new e302882e529 [branch-2.1](pick) Pick 2 PRs to branch-2.1 (#39604)
e302882e529 is described below
commit e302882e5295d2ae25a05c23a2ae598c50346284
Author: Gabriel <[email protected]>
AuthorDate: Tue Aug 20 17:10:30 2024 +0800
[branch-2.1](pick) Pick 2 PRs to branch-2.1 (#39604)
## Proposed changes
pick #39480 #39589
<!--Describe your changes.-->
---
.../main/java/org/apache/doris/qe/Coordinator.java | 20 +++++++++++---------
.../java/org/apache/doris/qe/CoordinatorTest.java | 20 ++++++++++----------
2 files changed, 21 insertions(+), 19 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index dac9be06b9f..19ca1050469 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -2107,10 +2107,11 @@ public class Coordinator implements CoordInterface {
if ((isColocateFragment(fragment, fragment.getPlanRoot())
&&
fragmentIdToSeqToAddressMap.containsKey(fragment.getFragmentId())
&&
fragmentIdToSeqToAddressMap.get(fragment.getFragmentId()).size() > 0)) {
- computeColocateJoinInstanceParam(fragment.getFragmentId(),
parallelExecInstanceNum, params);
+ computeColocateJoinInstanceParam(fragment.getFragmentId(),
parallelExecInstanceNum, params,
+ fragment.hasNullAwareLeftAntiJoin());
} else if
(bucketShuffleJoinController.isBucketShuffleJoin(fragment.getFragmentId().asInt()))
{
bucketShuffleJoinController.computeInstanceParam(fragment.getFragmentId(),
- parallelExecInstanceNum, params);
+ parallelExecInstanceNum, params,
fragment.hasNullAwareLeftAntiJoin());
} else {
// case A
for (Entry<TNetworkAddress, Map<Integer,
List<TScanRangeParams>>> entry : fragmentExecParamsMap.get(
@@ -2135,7 +2136,8 @@ public class Coordinator implements CoordInterface {
int expectedInstanceNum =
Math.min(parallelExecInstanceNum,
leftMostNode.getNumInstances());
boolean forceToLocalShuffle = context != null
- &&
context.getSessionVariable().isForceToLocalShuffle();
+ &&
context.getSessionVariable().isForceToLocalShuffle()
+ && !fragment.hasNullAwareLeftAntiJoin() &&
useNereids;
boolean ignoreStorageDataDistribution =
forceToLocalShuffle || (scanNodes.stream()
.allMatch(scanNode ->
scanNode.ignoreStorageDataDistribution(context,
addressToBackendID.size())) &&
useNereids);
@@ -2304,9 +2306,9 @@ public class Coordinator implements CoordInterface {
}
private void computeColocateJoinInstanceParam(PlanFragmentId fragmentId,
- int parallelExecInstanceNum, FragmentExecParams params) {
+ int parallelExecInstanceNum, FragmentExecParams params, boolean
hasNullAwareLeftAntiJoin) {
assignScanRanges(fragmentId, parallelExecInstanceNum, params,
fragmentIdTobucketSeqToScanRangeMap,
- fragmentIdToSeqToAddressMap, fragmentIdToScanNodeIds);
+ fragmentIdToSeqToAddressMap, fragmentIdToScanNodeIds,
hasNullAwareLeftAntiJoin);
}
private Map<TNetworkAddress, Long> getReplicaNumPerHostForOlapTable() {
@@ -3049,16 +3051,16 @@ public class Coordinator implements CoordInterface {
}
private void computeInstanceParam(PlanFragmentId fragmentId,
- int parallelExecInstanceNum, FragmentExecParams params) {
+ int parallelExecInstanceNum, FragmentExecParams params,
boolean hasNullAwareLeftAntiJoin) {
assignScanRanges(fragmentId, parallelExecInstanceNum, params,
fragmentIdBucketSeqToScanRangeMap,
- fragmentIdToSeqToAddressMap, fragmentIdToScanNodeIds);
+ fragmentIdToSeqToAddressMap, fragmentIdToScanNodeIds,
hasNullAwareLeftAntiJoin);
}
}
private void assignScanRanges(PlanFragmentId fragmentId, int
parallelExecInstanceNum, FragmentExecParams params,
Map<PlanFragmentId, BucketSeqToScanRange>
fragmentIdBucketSeqToScanRangeMap,
Map<PlanFragmentId, Map<Integer, TNetworkAddress>>
curFragmentIdToSeqToAddressMap,
- Map<PlanFragmentId, Set<Integer>> fragmentIdToScanNodeIds) {
+ Map<PlanFragmentId, Set<Integer>> fragmentIdToScanNodeIds, boolean
hasNullAwareLeftAntiJoin) {
Map<Integer, TNetworkAddress> bucketSeqToAddress =
curFragmentIdToSeqToAddressMap.get(fragmentId);
BucketSeqToScanRange bucketSeqToScanRange =
fragmentIdBucketSeqToScanRangeMap.get(fragmentId);
Set<Integer> scanNodeIds = fragmentIdToScanNodeIds.get(fragmentId);
@@ -3092,7 +3094,7 @@ public class Coordinator implements CoordInterface {
* 2. Use Nereids planner.
*/
boolean forceToLocalShuffle = context != null
- && context.getSessionVariable().isForceToLocalShuffle();
+ && context.getSessionVariable().isForceToLocalShuffle() &&
!hasNullAwareLeftAntiJoin && useNereids;
boolean ignoreStorageDataDistribution = forceToLocalShuffle ||
(scanNodes.stream()
.allMatch(node -> node.ignoreStorageDataDistribution(context,
addressToBackendID.size()))
diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java
b/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java
index 4c38ddd2749..9b3ed7d3119 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/qe/CoordinatorTest.java
@@ -122,7 +122,7 @@ public class CoordinatorTest extends Coordinator {
Deencapsulation.setField(coordinator,
"fragmentIdTobucketSeqToScanRangeMap", fragmentIdBucketSeqToScanRangeMap);
FragmentExecParams params = new FragmentExecParams(null);
- Deencapsulation.invoke(coordinator,
"computeColocateJoinInstanceParam", planFragmentId, 1, params);
+ Deencapsulation.invoke(coordinator,
"computeColocateJoinInstanceParam", planFragmentId, 1, params, false);
Assert.assertEquals(1, params.instanceExecParams.size());
// check whether one instance have 3 tablet to scan
@@ -133,15 +133,15 @@ public class CoordinatorTest extends Coordinator {
}
params = new FragmentExecParams(null);
- Deencapsulation.invoke(coordinator,
"computeColocateJoinInstanceParam", planFragmentId, 2, params);
+ Deencapsulation.invoke(coordinator,
"computeColocateJoinInstanceParam", planFragmentId, 2, params, false);
Assert.assertEquals(2, params.instanceExecParams.size());
params = new FragmentExecParams(null);
- Deencapsulation.invoke(coordinator,
"computeColocateJoinInstanceParam", planFragmentId, 3, params);
+ Deencapsulation.invoke(coordinator,
"computeColocateJoinInstanceParam", planFragmentId, 3, params, false);
Assert.assertEquals(3, params.instanceExecParams.size());
params = new FragmentExecParams(null);
- Deencapsulation.invoke(coordinator,
"computeColocateJoinInstanceParam", planFragmentId, 5, params);
+ Deencapsulation.invoke(coordinator,
"computeColocateJoinInstanceParam", planFragmentId, 5, params, false);
Assert.assertEquals(3, params.instanceExecParams.size());
}
@@ -323,7 +323,7 @@ public class CoordinatorTest extends Coordinator {
PlanFragment fragment = new PlanFragment(planFragmentId, olapScanNode,
new DataPartition(TPartitionType.UNPARTITIONED));
FragmentExecParams params = new FragmentExecParams(fragment);
- Deencapsulation.invoke(coordinator,
"computeColocateJoinInstanceParam", planFragmentId, 1, params);
+ Deencapsulation.invoke(coordinator,
"computeColocateJoinInstanceParam", planFragmentId, 1, params, false);
StringBuilder sb = new StringBuilder();
params.appendTo(sb);
Assert.assertTrue(sb.toString().contains("range=[id1,range=[]]"));
@@ -451,19 +451,19 @@ public class CoordinatorTest extends Coordinator {
Deencapsulation.setField(bucketShuffleJoinController,
"fragmentIdBucketSeqToScanRangeMap", fragmentIdBucketSeqToScanRangeMap);
FragmentExecParams params = new FragmentExecParams(null);
- Deencapsulation.invoke(bucketShuffleJoinController,
"computeInstanceParam", planFragmentId, 1, params);
+ Deencapsulation.invoke(bucketShuffleJoinController,
"computeInstanceParam", planFragmentId, 1, params, false);
Assert.assertEquals(1, params.instanceExecParams.size());
params = new FragmentExecParams(null);
- Deencapsulation.invoke(bucketShuffleJoinController,
"computeInstanceParam", planFragmentId, 2, params);
+ Deencapsulation.invoke(bucketShuffleJoinController,
"computeInstanceParam", planFragmentId, 2, params, false);
Assert.assertEquals(2, params.instanceExecParams.size());
params = new FragmentExecParams(null);
- Deencapsulation.invoke(bucketShuffleJoinController,
"computeInstanceParam", planFragmentId, 3, params);
+ Deencapsulation.invoke(bucketShuffleJoinController,
"computeInstanceParam", planFragmentId, 3, params, false);
Assert.assertEquals(3, params.instanceExecParams.size());
params = new FragmentExecParams(null);
- Deencapsulation.invoke(bucketShuffleJoinController,
"computeInstanceParam", planFragmentId, 5, params);
+ Deencapsulation.invoke(bucketShuffleJoinController,
"computeInstanceParam", planFragmentId, 5, params, false);
Assert.assertEquals(3, params.instanceExecParams.size());
}
@@ -505,7 +505,7 @@ public class CoordinatorTest extends Coordinator {
new DataPartition(TPartitionType.UNPARTITIONED));
FragmentExecParams params = new FragmentExecParams(fragment);
- Deencapsulation.invoke(bucketShuffleJoinController,
"computeInstanceParam", planFragmentId, 1, params);
+ Deencapsulation.invoke(bucketShuffleJoinController,
"computeInstanceParam", planFragmentId, 1, params, false);
Assert.assertEquals(1, params.instanceExecParams.size());
StringBuilder sb = new StringBuilder();
params.appendTo(sb);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]