This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new 7f6271c [Bug]Fix Query failed when fact table has no data in join
case (#3604)
7f6271c is described below
commit 7f6271c637e617d2e27d5a415ee081f3da6a950c
Author: wangbo <[email protected]>
AuthorDate: Wed Jun 3 22:01:55 2020 +0800
[Bug]Fix Query failed when fact table has no data in join case (#3604)
major work
1. Correct the value of ```numNodes``` and ```cardinality``` when
```OlapTableScan``` computeStats so that the ``` broadcast cost``` and
```paritition join cost ``` can be calculated correctly.
2. Find a input fragment with higher parallelism for shuffle fragment to
assign backend
---
.../apache/doris/planner/DistributedPlanner.java | 5 ++---
.../org/apache/doris/planner/OlapScanNode.java | 4 ++++
.../main/java/org/apache/doris/qe/Coordinator.java | 22 ++++++++++++++++------
3 files changed, 22 insertions(+), 9 deletions(-)
diff --git a/fe/src/main/java/org/apache/doris/planner/DistributedPlanner.java
b/fe/src/main/java/org/apache/doris/planner/DistributedPlanner.java
index 9c3bce3..9af4d41 100644
--- a/fe/src/main/java/org/apache/doris/planner/DistributedPlanner.java
+++ b/fe/src/main/java/org/apache/doris/planner/DistributedPlanner.java
@@ -335,14 +335,13 @@ public class DistributedPlanner {
// - and we're not doing a full or right outer join (those require the
left-hand
// side to be partitioned for correctness)
// - and the expected size of the hash tbl doesn't exceed
perNodeMemLimit
- // we do a "<=" comparison of the costs so that we default to
broadcast joins if
- // we're unable to estimate the cost
+ // we set partition join as default when broadcast join cost equals
partition join cost
if (node.getJoinOp() != JoinOperator.RIGHT_OUTER_JOIN
&& node.getJoinOp() != JoinOperator.FULL_OUTER_JOIN
&& (perNodeMemLimit == 0 || Math.round(
(double) rhsDataSize * PlannerContext.HASH_TBL_SPACE_OVERHEAD)
<= perNodeMemLimit)
&& (node.getInnerRef().isBroadcastJoin() ||
(!node.getInnerRef().isPartitionJoin()
- && broadcastCost <= partitionCost))) {
+ && broadcastCost < partitionCost))) {
doBroadcast = true;
} else {
doBroadcast = false;
diff --git a/fe/src/main/java/org/apache/doris/planner/OlapScanNode.java
b/fe/src/main/java/org/apache/doris/planner/OlapScanNode.java
index 9d7c88d..c7a993f 100644
--- a/fe/src/main/java/org/apache/doris/planner/OlapScanNode.java
+++ b/fe/src/main/java/org/apache/doris/planner/OlapScanNode.java
@@ -271,6 +271,10 @@ public class OlapScanNode extends ScanNode {
}
numNodes = scanBackendIds.size();
}
+ // even current node scan has no data,at least on backend will be
assigned when the fragment actually execute
+ numNodes = numNodes <= 0 ? 1 : numNodes;
+ // when node scan has no data, cardinality should be 0 instead of a
invalid value after computeStats()
+ cardinality = cardinality == -1 ? 0 : cardinality;
}
private Collection<Long> partitionPrune(RangePartitionInfo partitionInfo,
PartitionNames partitionNames) throws AnalysisException {
diff --git a/fe/src/main/java/org/apache/doris/qe/Coordinator.java
b/fe/src/main/java/org/apache/doris/qe/Coordinator.java
index 36f9a0d..66abaab 100644
--- a/fe/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -923,19 +923,29 @@ public class Coordinator {
if (!(leftMostNode instanceof ScanNode)) {
// (Case B)
// there is no leftmost scan; we assign the same hosts as
those of our
- // leftmost input fragment (so that a partitioned aggregation
- // fragment runs on the hosts that provide the input data)
- PlanFragmentId inputFragmentIdx =
fragments.get(i).getChild(0).getFragmentId();
+ // input fragment which has a higher instance_number
+
+ int inputFragmentIndex = 0;
+ int maxParallelism = 0;
+ for (int j = 0; j < fragment.getChildren().size(); j++) {
+ int currentChildFragmentParallelism =
fragmentExecParamsMap.get(fragment.getChild(j).getFragmentId()).instanceExecParams.size();
+ if (currentChildFragmentParallelism > maxParallelism) {
+ maxParallelism = currentChildFragmentParallelism;
+ inputFragmentIndex = j;
+ }
+ }
+
+ PlanFragmentId inputFragmentId =
fragment.getChild(inputFragmentIndex).getFragmentId();
// AddAll() soft copy()
int exchangeInstances = -1;
if (ConnectContext.get() != null &&
ConnectContext.get().getSessionVariable() != null) {
exchangeInstances =
ConnectContext.get().getSessionVariable().getExchangeInstanceParallel();
}
- if (exchangeInstances > 0 &&
fragmentExecParamsMap.get(inputFragmentIdx).instanceExecParams.size() >
exchangeInstances) {
+ if (exchangeInstances > 0 &&
fragmentExecParamsMap.get(inputFragmentId).instanceExecParams.size() >
exchangeInstances) {
// random select some instance
// get distinct host, when
parallel_fragment_exec_instance_num > 1, single host may execute severval
instances
Set<TNetworkAddress> hostSet = Sets.newHashSet();
- for (FInstanceExecParam execParams:
fragmentExecParamsMap.get(inputFragmentIdx).instanceExecParams) {
+ for (FInstanceExecParam execParams:
fragmentExecParamsMap.get(inputFragmentId).instanceExecParams) {
hostSet.add(execParams.host);
}
List<TNetworkAddress> hosts = Lists.newArrayList(hostSet);
@@ -945,7 +955,7 @@ public class Coordinator {
params.instanceExecParams.add(instanceParam);
}
} else {
- for (FInstanceExecParam execParams:
fragmentExecParamsMap.get(inputFragmentIdx).instanceExecParams) {
+ for (FInstanceExecParam execParams:
fragmentExecParamsMap.get(inputFragmentId).instanceExecParams) {
FInstanceExecParam instanceParam = new
FInstanceExecParam(null, execParams.host, 0, params);
params.instanceExecParams.add(instanceParam);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]