This is an automated email from the ASF dual-hosted git repository.
huajianlan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 584a256a252 [opt](coordinator) optimize parallel degree of shuffle
when use nereids (#44754)
584a256a252 is described below
commit 584a256a25274722d22c36eafe57ba3cb8e0dc88
Author: 924060929 <[email protected]>
AuthorDate: Fri Jan 10 11:16:13 2025 +0800
[opt](coordinator) optimize parallel degree of shuffle when use nereids
(#44754)
optimize parallel degree of shuffle when use nereids , this pr can fix
some performance rollback when upgrade doris from 1.2 to 2.x/3.x
---
.../aggregate_function_collect.h | 8 +-
.../trees/plans/distribute/DistributePlanner.java | 24 +++--
.../worker/job/AbstractUnassignedScanJob.java | 18 ++--
.../worker/job/LocalShuffleAssignedJob.java | 13 +--
.../job/LocalShuffleBucketJoinAssignedJob.java | 6 +-
.../distribute/worker/job/UnassignedGatherJob.java | 16 ++--
.../job/UnassignedScanBucketOlapTableJob.java | 99 +++++++++++++++------
.../worker/job/UnassignedShuffleJob.java | 34 +------
.../main/java/org/apache/doris/qe/Coordinator.java | 3 +-
.../doris/qe/runtime/ThriftPlansBuilder.java | 21 ++---
.../tvf/orc_tvf/test_hdfs_orc_group0_orc_files.out | Bin 101642 -> 101666 bytes
.../orc_tvf/test_hdfs_orc_group0_orc_files.groovy | 6 +-
12 files changed, 127 insertions(+), 121 deletions(-)
diff --git a/be/src/vec/aggregate_functions/aggregate_function_collect.h
b/be/src/vec/aggregate_functions/aggregate_function_collect.h
index 2d18a56313f..1b4eadf259d 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_collect.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_collect.h
@@ -263,10 +263,10 @@ struct AggregateFunctionCollectListData<StringRef,
HasLimit> {
}
max_size = rhs.max_size;
- data->insert_range_from(
- *rhs.data, 0,
- std::min(assert_cast<size_t,
TypeCheckOnRelease::DISABLE>(max_size - size()),
- rhs.size()));
+ data->insert_range_from(*rhs.data, 0,
+ std::min(assert_cast<size_t,
TypeCheckOnRelease::DISABLE>(
+
static_cast<size_t>(max_size - size())),
+ rhs.size()));
} else {
data->insert_range_from(*rhs.data, 0, rhs.size());
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/DistributePlanner.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/DistributePlanner.java
index 75a2326236f..388cef6f062 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/DistributePlanner.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/DistributePlanner.java
@@ -46,7 +46,6 @@ import org.apache.doris.thrift.TUniqueId;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.LinkedHashMultimap;
import com.google.common.collect.ListMultimap;
-import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.SetMultimap;
import org.apache.logging.log4j.LogManager;
@@ -136,6 +135,16 @@ public class DistributePlanner {
link.getKey(),
enableShareHashTableForBroadcastJoin
);
+ for (Entry<DataSink, List<AssignedJob>> kv :
+ ((PipelineDistributedPlan)
link.getValue()).getDestinations().entrySet()) {
+ if (kv.getValue().isEmpty()) {
+ int sourceFragmentId =
link.getValue().getFragmentJob().getFragment().getFragmentId().asInt();
+ String msg = "Invalid plan which exchange not contains
receiver, "
+ + "exchange id: " +
kv.getKey().getExchNodeId().asInt()
+ + ", source fragmentId: " + sourceFragmentId;
+ throw new IllegalStateException(msg);
+ }
+ }
}
}
return plans;
@@ -184,7 +193,7 @@ public class DistributePlanner {
boolean useLocalShuffle = receiverPlan.getInstanceJobs().stream()
.anyMatch(LocalShuffleAssignedJob.class::isInstance);
if (useLocalShuffle) {
- return getFirstInstancePerShareScan(receiverPlan);
+ return getFirstInstancePerWorker(receiverPlan.getInstanceJobs());
} else if (enableShareHashTableForBroadcastJoin &&
linkNode.isRightChildOfBroadcastHashJoin()) {
return getFirstInstancePerWorker(receiverPlan.getInstanceJobs());
} else {
@@ -221,17 +230,6 @@ public class DistributePlanner {
return Arrays.asList(instances);
}
- private List<AssignedJob>
getFirstInstancePerShareScan(PipelineDistributedPlan plan) {
- List<AssignedJob> canReceiveDataFromRemote =
Lists.newArrayListWithCapacity(plan.getInstanceJobs().size());
- for (AssignedJob instanceJob : plan.getInstanceJobs()) {
- LocalShuffleAssignedJob localShuffleJob =
(LocalShuffleAssignedJob) instanceJob;
- if (!localShuffleJob.receiveDataFromLocal) {
- canReceiveDataFromRemote.add(localShuffleJob);
- }
- }
- return canReceiveDataFromRemote;
- }
-
private List<AssignedJob> getFirstInstancePerWorker(List<AssignedJob>
instances) {
Map<DistributedPlanWorker, AssignedJob> firstInstancePerWorker =
Maps.newLinkedHashMap();
for (AssignedJob instance : instances) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/AbstractUnassignedScanJob.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/AbstractUnassignedScanJob.java
index d2fbb9905e1..37d665adcc4 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/AbstractUnassignedScanJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/AbstractUnassignedScanJob.java
@@ -91,7 +91,7 @@ public abstract class AbstractUnassignedScanJob extends
AbstractUnassignedJob {
// now we should compute how many instances to process the data,
// for example: two instances
- int instanceNum = degreeOfParallelism(scanSourceMaxParallel);
+ int instanceNum = degreeOfParallelism(scanSourceMaxParallel,
useLocalShuffleToAddParallel);
if (useLocalShuffleToAddParallel) {
assignLocalShuffleJobs(scanSource, instanceNum, instances,
context, worker);
@@ -129,7 +129,7 @@ public abstract class AbstractUnassignedScanJob extends
AbstractUnassignedJob {
protected void assignLocalShuffleJobs(ScanSource scanSource, int
instanceNum, List<AssignedJob> instances,
ConnectContext context, DistributedPlanWorker worker) {
// only generate one instance to scan all data, in this step
- List<ScanSource> instanceToScanRanges =
scanSource.parallelize(scanNodes, 1);
+ List<ScanSource> assignedJoinBuckets =
scanSource.parallelize(scanNodes, instanceNum);
// when data not big, but aggregation too slow, we will use 1 instance
to scan data,
// and use more instances (to ***add parallel***) to process aggregate.
@@ -144,7 +144,7 @@ public abstract class AbstractUnassignedScanJob extends
AbstractUnassignedJob {
// |(share scan node, instance1 will scan all data and local shuffle
to other local instances |
// | to parallel compute this data)
|
//
+------------------------------------------------------------------------------------------------+
- ScanSource shareScanSource = instanceToScanRanges.get(0);
+ ScanSource shareScanSource = assignedJoinBuckets.get(0);
// one scan range generate multiple instances,
// different instances reference the same scan source
@@ -152,15 +152,15 @@ public abstract class AbstractUnassignedScanJob extends
AbstractUnassignedJob {
ScanSource emptyShareScanSource = shareScanSource.newEmpty();
for (int i = 0; i < instanceNum; i++) {
LocalShuffleAssignedJob instance = new LocalShuffleAssignedJob(
- instances.size(), shareScanId, i > 0,
- context.nextInstanceId(), this, worker,
- i == 0 ? shareScanSource : emptyShareScanSource
+ instances.size(), shareScanId, context.nextInstanceId(),
this, worker,
+ // only first instance need to scan data
+ i == 0 ? scanSource : emptyShareScanSource
);
instances.add(instance);
}
}
- protected int degreeOfParallelism(int maxParallel) {
+ protected int degreeOfParallelism(int maxParallel, boolean
useLocalShuffleToAddParallel) {
Preconditions.checkArgument(maxParallel > 0, "maxParallel must be
positive");
if (!fragment.getDataPartition().isPartitioned()) {
return 1;
@@ -179,6 +179,10 @@ public abstract class AbstractUnassignedScanJob extends
AbstractUnassignedJob {
}
}
+ if (useLocalShuffleToAddParallel) {
+ return Math.max(fragment.getParallelExecNum(), 1);
+ }
+
// the scan instance num should not larger than the tablets num
return Math.min(maxParallel, Math.max(fragment.getParallelExecNum(),
1));
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/LocalShuffleAssignedJob.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/LocalShuffleAssignedJob.java
index 2ba269a5a7b..9bb7c61ffaf 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/LocalShuffleAssignedJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/LocalShuffleAssignedJob.java
@@ -31,28 +31,17 @@ import java.util.Map;
*/
public class LocalShuffleAssignedJob extends StaticAssignedJob {
public final int shareScanId;
- public final boolean receiveDataFromLocal;
public LocalShuffleAssignedJob(
- int indexInUnassignedJob, int shareScanId, boolean
receiveDataFromLocal, TUniqueId instanceId,
+ int indexInUnassignedJob, int shareScanId, TUniqueId instanceId,
UnassignedJob unassignedJob,
DistributedPlanWorker worker, ScanSource scanSource) {
super(indexInUnassignedJob, instanceId, unassignedJob, worker,
scanSource);
this.shareScanId = shareScanId;
- this.receiveDataFromLocal = receiveDataFromLocal;
}
@Override
protected Map<String, String> extraInfo() {
return ImmutableMap.of("shareScanIndex", String.valueOf(shareScanId));
}
-
- @Override
- protected String formatScanSourceString() {
- if (receiveDataFromLocal) {
- return "read data from first instance of " + getAssignedWorker();
- } else {
- return super.formatScanSourceString();
- }
- }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/LocalShuffleBucketJoinAssignedJob.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/LocalShuffleBucketJoinAssignedJob.java
index 443acb50d78..9090bf98b36 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/LocalShuffleBucketJoinAssignedJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/LocalShuffleBucketJoinAssignedJob.java
@@ -30,11 +30,11 @@ public class LocalShuffleBucketJoinAssignedJob extends
LocalShuffleAssignedJob {
private volatile Set<Integer> assignedJoinBucketIndexes;
public LocalShuffleBucketJoinAssignedJob(
- int indexInUnassignedJob, int shareScanId, boolean
receiveDataFromLocal,
+ int indexInUnassignedJob, int shareScanId,
TUniqueId instanceId, UnassignedJob unassignedJob,
DistributedPlanWorker worker, ScanSource scanSource,
Set<Integer> assignedJoinBucketIndexes) {
- super(indexInUnassignedJob, shareScanId, receiveDataFromLocal,
instanceId, unassignedJob, worker, scanSource);
+ super(indexInUnassignedJob, shareScanId, instanceId, unassignedJob,
worker, scanSource);
this.assignedJoinBucketIndexes =
Utils.fastToImmutableSet(assignedJoinBucketIndexes);
}
@@ -42,7 +42,7 @@ public class LocalShuffleBucketJoinAssignedJob extends
LocalShuffleAssignedJob {
return assignedJoinBucketIndexes;
}
- public void addAssignedJoinBucketIndexes(Set<Integer> joinBucketIndexes) {
+ public synchronized void addAssignedJoinBucketIndexes(Set<Integer>
joinBucketIndexes) {
this.assignedJoinBucketIndexes = ImmutableSet.<Integer>builder()
.addAll(assignedJoinBucketIndexes)
.addAll(joinBucketIndexes)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedGatherJob.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedGatherJob.java
index 830342514bd..6fe9fcfb439 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedGatherJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedGatherJob.java
@@ -32,7 +32,7 @@ import java.util.List;
/** UnassignedGatherJob */
public class UnassignedGatherJob extends AbstractUnassignedJob {
- private boolean useLocalShuffleToAddParallel;
+ private boolean useSerialSource;
public UnassignedGatherJob(
StatementContext statementContext, PlanFragment fragment,
@@ -44,24 +44,24 @@ public class UnassignedGatherJob extends
AbstractUnassignedJob {
public List<AssignedJob> computeAssignedJobs(
DistributeContext distributeContext, ListMultimap<ExchangeNode,
AssignedJob> inputJobs) {
ConnectContext connectContext = statementContext.getConnectContext();
- useLocalShuffleToAddParallel =
fragment.useSerialSource(connectContext);
+ useSerialSource = fragment.useSerialSource(connectContext);
int expectInstanceNum = degreeOfParallelism();
DistributedPlanWorker selectedWorker =
distributeContext.selectedWorkers.tryToSelectRandomUsedWorker();
- if (useLocalShuffleToAddParallel) {
+ if (useSerialSource) {
+ // Using serial source means a serial source operator will be used
in this fragment (e.g. data will be
+ // shuffled to only 1 exchange operator) and then split by
followed local exchanger
ImmutableList.Builder<AssignedJob> instances =
ImmutableList.builder();
-
DefaultScanSource shareScan = new
DefaultScanSource(ImmutableMap.of());
LocalShuffleAssignedJob receiveDataFromRemote = new
LocalShuffleAssignedJob(
- 0, 0, false,
+ 0, 0,
connectContext.nextInstanceId(), this, selectedWorker,
shareScan);
instances.add(receiveDataFromRemote);
for (int i = 1; i < expectInstanceNum; ++i) {
LocalShuffleAssignedJob receiveDataFromLocal = new
LocalShuffleAssignedJob(
- i, 0, true,
- connectContext.nextInstanceId(), this, selectedWorker,
shareScan);
+ i, 0, connectContext.nextInstanceId(), this,
selectedWorker, shareScan);
instances.add(receiveDataFromLocal);
}
return instances.build();
@@ -76,6 +76,6 @@ public class UnassignedGatherJob extends
AbstractUnassignedJob {
}
protected int degreeOfParallelism() {
- return useLocalShuffleToAddParallel ? fragment.getParallelExecNum() :
1;
+ return useSerialSource ? Math.max(1, fragment.getParallelExecNum()) :
1;
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanBucketOlapTableJob.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanBucketOlapTableJob.java
index f90fe7ea6e2..88612640d50 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanBucketOlapTableJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanBucketOlapTableJob.java
@@ -38,12 +38,16 @@ import org.apache.doris.qe.ConnectContext;
import com.google.common.base.Preconditions;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ListMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import com.google.common.collect.Multimap;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@@ -184,13 +188,23 @@ public class UnassignedScanBucketOlapTableJob extends
AbstractUnassignedScanJob
Set<Integer> assignedJoinBuckets
= ((BucketScanSource)
assignJoinBuckets.get(i)).bucketIndexToScanNodeToTablets.keySet();
LocalShuffleBucketJoinAssignedJob instance = new
LocalShuffleBucketJoinAssignedJob(
- instances.size(), shareScanId, i > 0,
- context.nextInstanceId(), this, worker,
+ instances.size(), shareScanId, context.nextInstanceId(),
+ this, worker,
i == 0 ? shareScanSource : emptyShareScanSource,
Utils.fastToImmutableSet(assignedJoinBuckets)
);
instances.add(instance);
}
+
+ for (int i = assignJoinBuckets.size(); i < instanceNum; ++i) {
+ LocalShuffleBucketJoinAssignedJob instance = new
LocalShuffleBucketJoinAssignedJob(
+ instances.size(), shareScanId, context.nextInstanceId(),
+ this, worker, emptyShareScanSource,
+ // these instance not need to join, because no any bucket
assign to it
+ ImmutableSet.of()
+ );
+ instances.add(instance);
+ }
}
private boolean shouldFillUpInstances(List<HashJoinNode> hashJoinNodes) {
@@ -224,10 +238,21 @@ public class UnassignedScanBucketOlapTableJob extends
AbstractUnassignedScanJob
olapScanNode, randomPartition, missingBucketIndexes);
boolean useLocalShuffle =
instances.stream().anyMatch(LocalShuffleAssignedJob.class::isInstance);
+ Multimap<DistributedPlanWorker, AssignedJob> workerToAssignedJobs =
ArrayListMultimap.create();
+ int maxNumInstancePerWorker = 1;
+ if (useLocalShuffle) {
+ for (AssignedJob instance : instances) {
+ workerToAssignedJobs.put(instance.getAssignedWorker(),
instance);
+ }
+ for (Collection<AssignedJob> instanceList :
workerToAssignedJobs.asMap().values()) {
+ maxNumInstancePerWorker = Math.max(maxNumInstancePerWorker,
instanceList.size());
+ }
+ }
+
List<AssignedJob> newInstances = new ArrayList<>(instances);
+
for (Entry<DistributedPlanWorker, Collection<Integer>> workerToBuckets
: missingBuckets.asMap().entrySet()) {
Map<Integer, Map<ScanNode, ScanRanges>> scanEmptyBuckets =
Maps.newLinkedHashMap();
- Set<Integer> assignedJoinBuckets =
Utils.fastToImmutableSet(workerToBuckets.getValue());
for (Integer bucketIndex : workerToBuckets.getValue()) {
Map<ScanNode, ScanRanges> scanTableWithEmptyData =
Maps.newLinkedHashMap();
for (ScanNode scanNode : scanNodes) {
@@ -236,42 +261,62 @@ public class UnassignedScanBucketOlapTableJob extends
AbstractUnassignedScanJob
scanEmptyBuckets.put(bucketIndex, scanTableWithEmptyData);
}
- AssignedJob fillUpInstance = null;
DistributedPlanWorker worker = workerToBuckets.getKey();
BucketScanSource scanSource = new
BucketScanSource(scanEmptyBuckets);
if (useLocalShuffle) {
- // when use local shuffle, we should ensure every backend only
process one instance!
- // so here we should try to merge the missing buckets into
exist instances
- boolean mergedBucketsInSameWorkerInstance = false;
- for (AssignedJob newInstance : newInstances) {
- if (newInstance.getAssignedWorker().equals(worker)) {
- BucketScanSource bucketScanSource = (BucketScanSource)
newInstance.getScanSource();
-
bucketScanSource.bucketIndexToScanNodeToTablets.putAll(scanEmptyBuckets);
- mergedBucketsInSameWorkerInstance = true;
-
- LocalShuffleBucketJoinAssignedJob instance =
(LocalShuffleBucketJoinAssignedJob) newInstance;
-
instance.addAssignedJoinBucketIndexes(assignedJoinBuckets);
- }
+ List<AssignedJob> sameWorkerInstances = (List)
workerToAssignedJobs.get(worker);
+ if (sameWorkerInstances.isEmpty()) {
+ sameWorkerInstances = fillUpEmptyInstances(
+ maxNumInstancePerWorker, scanSource, worker,
newInstances, context);
}
- if (!mergedBucketsInSameWorkerInstance) {
- fillUpInstance = new LocalShuffleBucketJoinAssignedJob(
- newInstances.size(),
shareScanIdGenerator.getAndIncrement(),
- false, context.nextInstanceId(), this, worker,
scanSource,
- assignedJoinBuckets
- );
+
+ LocalShuffleBucketJoinAssignedJob firstInstance
+ = (LocalShuffleBucketJoinAssignedJob )
sameWorkerInstances.get(0);
+ BucketScanSource firstInstanceScanSource
+ = (BucketScanSource) firstInstance.getScanSource();
+
firstInstanceScanSource.bucketIndexToScanNodeToTablets.putAll(scanEmptyBuckets);
+
+ Iterator<Integer> assignedJoinBuckets = new
LinkedHashSet<>(workerToBuckets.getValue()).iterator();
+ // make sure the first instance must be assigned some buckets:
+ // if the first instance assigned some buckets, we start
assign empty
+ // bucket for second instance for balance, or else assign for
first instance
+ int index =
firstInstance.getAssignedJoinBucketIndexes().isEmpty() ? -1 : 0;
+ while (assignedJoinBuckets.hasNext()) {
+ Integer bucketIndex = assignedJoinBuckets.next();
+ assignedJoinBuckets.remove();
+
+ index = (index + 1) % sameWorkerInstances.size();
+ LocalShuffleBucketJoinAssignedJob instance
+ = (LocalShuffleBucketJoinAssignedJob)
sameWorkerInstances.get(index);
+
instance.addAssignedJoinBucketIndexes(ImmutableSet.of(bucketIndex));
}
} else {
- fillUpInstance = assignWorkerAndDataSources(
+ newInstances.add(assignWorkerAndDataSources(
newInstances.size(), context.nextInstanceId(), worker,
scanSource
- );
- }
- if (fillUpInstance != null) {
- newInstances.add(fillUpInstance);
+ ));
}
}
return newInstances;
}
+ private List<AssignedJob> fillUpEmptyInstances(
+ int maxNumInstancePerWorker, BucketScanSource scanSource,
DistributedPlanWorker worker,
+ List<AssignedJob> existsInstances, ConnectContext context) {
+ int shareScanId = shareScanIdGenerator.getAndIncrement();
+ List<AssignedJob> newInstances = new
ArrayList<>(maxNumInstancePerWorker);
+ for (int i = 0; i < maxNumInstancePerWorker; i++) {
+ LocalShuffleBucketJoinAssignedJob newInstance = new
LocalShuffleBucketJoinAssignedJob(
+ existsInstances.size(), shareScanId,
+ context.nextInstanceId(), this, worker,
+ scanSource.newEmpty(),
+ ImmutableSet.of()
+ );
+ existsInstances.add(newInstance);
+ newInstances.add(newInstance);
+ }
+ return newInstances;
+ }
+
private int fullBucketNum() {
for (ScanNode scanNode : scanNodes) {
if (scanNode instanceof OlapScanNode) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedShuffleJob.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedShuffleJob.java
index 3d937bfb35d..d8eac65cded 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedShuffleJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedShuffleJob.java
@@ -17,13 +17,11 @@
package org.apache.doris.nereids.trees.plans.distribute.worker.job;
-import org.apache.doris.common.Pair;
import org.apache.doris.nereids.StatementContext;
import org.apache.doris.nereids.trees.plans.distribute.DistributeContext;
import
org.apache.doris.nereids.trees.plans.distribute.worker.DistributedPlanWorker;
import org.apache.doris.planner.ExchangeNode;
import org.apache.doris.planner.PlanFragment;
-import org.apache.doris.planner.PlanNode;
import org.apache.doris.qe.ConnectContext;
import com.google.common.collect.ArrayListMultimap;
@@ -43,7 +41,7 @@ import java.util.function.Function;
/** UnassignedShuffleJob */
public class UnassignedShuffleJob extends AbstractUnassignedJob {
- private boolean useLocalShuffleToAddParallel;
+ private boolean useSerialSource;
public UnassignedShuffleJob(
StatementContext statementContext, PlanFragment fragment,
@@ -54,7 +52,7 @@ public class UnassignedShuffleJob extends
AbstractUnassignedJob {
@Override
public List<AssignedJob> computeAssignedJobs(
DistributeContext distributeContext, ListMultimap<ExchangeNode,
AssignedJob> inputJobs) {
- useLocalShuffleToAddParallel =
fragment.useSerialSource(statementContext.getConnectContext());
+ useSerialSource =
fragment.useSerialSource(statementContext.getConnectContext());
int expectInstanceNum = degreeOfParallelism();
List<AssignedJob> biggestParallelChildFragment =
getInstancesOfBiggestParallelChildFragment(inputJobs);
@@ -83,18 +81,10 @@ public class UnassignedShuffleJob extends
AbstractUnassignedJob {
protected int degreeOfParallelism() {
// TODO: check we use nested loop join do right outer / semi / anti
join,
// we should add an exchange node with gather distribute under
the nested loop join
-
int expectInstanceNum = -1;
if (ConnectContext.get() != null &&
ConnectContext.get().getSessionVariable() != null) {
expectInstanceNum =
ConnectContext.get().getSessionVariable().getExchangeInstanceParallel();
}
-
- // TODO: check nested loop join do right outer / semi / anti join
- PlanNode leftMostNode =
findLeftmostNode(fragment.getPlanRoot()).second;
- // when we use nested loop join do right outer / semi / anti join, the
instance must be 1.
- if (leftMostNode.getNumInstances() == 1) {
- expectInstanceNum = 1;
- }
return expectInstanceNum;
}
@@ -116,7 +106,7 @@ public class UnassignedShuffleJob extends
AbstractUnassignedJob {
private List<AssignedJob> buildInstances(
int instanceNum, Function<Integer, DistributedPlanWorker>
workerSelector) {
ConnectContext connectContext = statementContext.getConnectContext();
- if (useLocalShuffleToAddParallel) {
+ if (useSerialSource) {
return buildInstancesWithLocalShuffle(instanceNum, workerSelector,
connectContext);
} else {
return buildInstancesWithoutLocalShuffle(instanceNum,
workerSelector, connectContext);
@@ -150,17 +140,13 @@ public class UnassignedShuffleJob extends
AbstractUnassignedJob {
for (Entry<DistributedPlanWorker, Collection<Integer>> kv :
workerToInstanceIds.asMap().entrySet()) {
DistributedPlanWorker worker = kv.getKey();
Collection<Integer> indexesInFragment = kv.getValue();
-
DefaultScanSource shareScanSource = new
DefaultScanSource(ImmutableMap.of());
-
- boolean receiveDataFromLocal = false;
for (Integer indexInFragment : indexesInFragment) {
LocalShuffleAssignedJob instance = new LocalShuffleAssignedJob(
- indexInFragment, shareScanId, receiveDataFromLocal,
connectContext.nextInstanceId(),
+ indexInFragment, shareScanId,
connectContext.nextInstanceId(),
this, worker, shareScanSource
);
instances.add(instance);
- receiveDataFromLocal = true;
}
shareScanId++;
}
@@ -176,16 +162,4 @@ public class UnassignedShuffleJob extends
AbstractUnassignedJob {
Collections.shuffle(candidateWorkers);
return candidateWorkers;
}
-
- // Returns the id of the leftmost node of any of the gives types in
'plan_root',
- // or INVALID_PLAN_NODE_ID if no such node present.
- private Pair<PlanNode, PlanNode> findLeftmostNode(PlanNode plan) {
- PlanNode childPlan = plan;
- PlanNode fatherPlan = null;
- while (childPlan.getChildren().size() != 0 && !(childPlan instanceof
ExchangeNode)) {
- fatherPlan = childPlan;
- childPlan = childPlan.getChild(0);
- }
- return Pair.of(fatherPlan, childPlan);
- }
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 3bf5c44d564..09a9a857f33 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -1809,7 +1809,8 @@ public class Coordinator implements CoordInterface {
exchangeInstances =
ConnectContext.get().getSessionVariable().getExchangeInstanceParallel();
}
// when we use nested loop join do right outer / semi / anti
join, the instance must be 1.
- if (leftMostNode.getNumInstances() == 1) {
+ boolean isNereids = context != null &&
context.getState().isNereids();
+ if (!isNereids && leftMostNode.getNumInstances() == 1) {
exchangeInstances = 1;
}
// Using serial source means a serial source operator will be
used in this fragment (e.g. data will be
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java
index 54bc0b24d3e..0caca2f47c1 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java
@@ -419,21 +419,18 @@ public class ThriftPlansBuilder {
if
(runtimeFiltersThriftBuilder.isMergeRuntimeFilterInstance(instance)) {
runtimeFiltersThriftBuilder.setRuntimeFilterThriftParams(runtimeFilterParams);
}
+ boolean isLocalShuffle = instance instanceof LocalShuffleAssignedJob;
+ if (isLocalShuffle) {
+ // a fragment in a backend only enable local shuffle once for the
first local shuffle instance,
+ // because we just skip set scan params for
LocalShuffleAssignedJob.receiveDataFromLocal == true
+ ignoreDataDistribution(currentFragmentParam);
+ }
return instanceParam;
}
private static void setScanSourceParam(
TPipelineFragmentParams currentFragmentParam, AssignedJob instance,
TPipelineInstanceParams instanceParams) {
-
- boolean isLocalShuffle = instance instanceof LocalShuffleAssignedJob;
- if (isLocalShuffle && ((LocalShuffleAssignedJob)
instance).receiveDataFromLocal) {
- // save thrift rpc message size, don't need perNodeScanRanges,
- // but the perNodeScanRanges is required rpc field
- instanceParams.setPerNodeScanRanges(Maps.newLinkedHashMap());
- return;
- }
-
ScanSource scanSource = instance.getScanSource();
PerNodeScanParams scanParams;
if (scanSource instanceof BucketScanSource) {
@@ -443,12 +440,6 @@ public class ThriftPlansBuilder {
}
// perNodeScanRanges is required
instanceParams.setPerNodeScanRanges(scanParams.perNodeScanRanges);
-
- if (isLocalShuffle) {
- // a fragment in a backend only enable local shuffle once for the
first local shuffle instance,
- // because we just skip set scan params for
LocalShuffleAssignedJob.receiveDataFromLocal == true
- ignoreDataDistribution(currentFragmentParam);
- }
}
// local shuffle has two functions:
diff --git
a/regression-test/data/external_table_p0/tvf/orc_tvf/test_hdfs_orc_group0_orc_files.out
b/regression-test/data/external_table_p0/tvf/orc_tvf/test_hdfs_orc_group0_orc_files.out
index 01158a2fb60..cb2f86d7ba4 100644
Binary files
a/regression-test/data/external_table_p0/tvf/orc_tvf/test_hdfs_orc_group0_orc_files.out
and
b/regression-test/data/external_table_p0/tvf/orc_tvf/test_hdfs_orc_group0_orc_files.out
differ
diff --git
a/regression-test/suites/external_table_p0/tvf/orc_tvf/test_hdfs_orc_group0_orc_files.groovy
b/regression-test/suites/external_table_p0/tvf/orc_tvf/test_hdfs_orc_group0_orc_files.groovy
index 924ceca4204..bf2929ee5b4 100644
---
a/regression-test/suites/external_table_p0/tvf/orc_tvf/test_hdfs_orc_group0_orc_files.groovy
+++
b/regression-test/suites/external_table_p0/tvf/orc_tvf/test_hdfs_orc_group0_orc_files.groovy
@@ -174,7 +174,11 @@
suite("test_hdfs_orc_group0_orc_files","external,hive,tvf,external_docker") {
order_qt_test_25 """ select * from HDFS(
"uri" = "${uri}",
"hadoop.username" = "${hdfsUserName}",
- "format" = "orc") order by _col0 DESC limit 100; """
+ "format" = "orc")
+ where _col0 is not null and _col1 is not null and
_col2 is not null and _col3 is not null
+ and _col4 is not null and _col5 is not null and
_col6 is not null
+ order by _col0 DESC, _col1 DESC, _col2 DESC, _col3
DESC, _col4 DESC, _col5 DESC, _col6 DESC
+ limit 100; """
uri = "${defaultFS}" +
"/user/doris/tvf_data/test_hdfs_orc/group0/TestVectorOrcFile.testLzo.orc"
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]