This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 7f6271c  [Bug]Fix Query failed when fact table has no data in join 
case (#3604)
7f6271c is described below

commit 7f6271c637e617d2e27d5a415ee081f3da6a950c
Author: wangbo <[email protected]>
AuthorDate: Wed Jun 3 22:01:55 2020 +0800

    [Bug]Fix Query failed when fact table has no data in join case (#3604)
    
    major work
    1.  Correct the value of ```numNodes``` and ```cardinality``` when 
```OlapTableScan``` computeStats so that the ``` broadcast cost``` and 
```paritition join cost ``` can be calculated correctly.
    2. Find a input fragment with higher parallelism for shuffle fragment to 
assign backend
---
 .../apache/doris/planner/DistributedPlanner.java   |  5 ++---
 .../org/apache/doris/planner/OlapScanNode.java     |  4 ++++
 .../main/java/org/apache/doris/qe/Coordinator.java | 22 ++++++++++++++++------
 3 files changed, 22 insertions(+), 9 deletions(-)

diff --git a/fe/src/main/java/org/apache/doris/planner/DistributedPlanner.java 
b/fe/src/main/java/org/apache/doris/planner/DistributedPlanner.java
index 9c3bce3..9af4d41 100644
--- a/fe/src/main/java/org/apache/doris/planner/DistributedPlanner.java
+++ b/fe/src/main/java/org/apache/doris/planner/DistributedPlanner.java
@@ -335,14 +335,13 @@ public class DistributedPlanner {
         // - and we're not doing a full or right outer join (those require the 
left-hand
         //   side to be partitioned for correctness)
         // - and the expected size of the hash tbl doesn't exceed 
perNodeMemLimit
-        // we do a "<=" comparison of the costs so that we default to 
broadcast joins if
-        // we're unable to estimate the cost
+        // we set partition join as default when broadcast join cost equals 
partition join cost
         if (node.getJoinOp() != JoinOperator.RIGHT_OUTER_JOIN
                 && node.getJoinOp() != JoinOperator.FULL_OUTER_JOIN
                 && (perNodeMemLimit == 0 || Math.round(
                 (double) rhsDataSize * PlannerContext.HASH_TBL_SPACE_OVERHEAD) 
<= perNodeMemLimit)
                 && (node.getInnerRef().isBroadcastJoin() || 
(!node.getInnerRef().isPartitionJoin()
-                && broadcastCost <= partitionCost))) {
+                && broadcastCost < partitionCost))) {
             doBroadcast = true;
         } else {
             doBroadcast = false;
diff --git a/fe/src/main/java/org/apache/doris/planner/OlapScanNode.java 
b/fe/src/main/java/org/apache/doris/planner/OlapScanNode.java
index 9d7c88d..c7a993f 100644
--- a/fe/src/main/java/org/apache/doris/planner/OlapScanNode.java
+++ b/fe/src/main/java/org/apache/doris/planner/OlapScanNode.java
@@ -271,6 +271,10 @@ public class OlapScanNode extends ScanNode {
             }
             numNodes = scanBackendIds.size();
         }
+        // even current node scan has no data,at least on backend will be 
assigned when the fragment actually execute
+        numNodes = numNodes <= 0 ? 1 : numNodes;
+        // when node scan has no data, cardinality should be 0 instead of a 
invalid value after computeStats()
+        cardinality = cardinality == -1 ? 0 : cardinality;
     }
 
     private Collection<Long> partitionPrune(RangePartitionInfo partitionInfo, 
PartitionNames partitionNames) throws AnalysisException {
diff --git a/fe/src/main/java/org/apache/doris/qe/Coordinator.java 
b/fe/src/main/java/org/apache/doris/qe/Coordinator.java
index 36f9a0d..66abaab 100644
--- a/fe/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -923,19 +923,29 @@ public class Coordinator {
             if (!(leftMostNode instanceof ScanNode)) {
                 // (Case B)
                 // there is no leftmost scan; we assign the same hosts as 
those of our
-                // leftmost input fragment (so that a partitioned aggregation
-                // fragment runs on the hosts that provide the input data)
-                PlanFragmentId inputFragmentIdx = 
fragments.get(i).getChild(0).getFragmentId();
+                //  input fragment which has a higher instance_number
+
+                int inputFragmentIndex = 0;
+                int maxParallelism = 0;
+                for (int j = 0; j < fragment.getChildren().size(); j++) {
+                    int currentChildFragmentParallelism = 
fragmentExecParamsMap.get(fragment.getChild(j).getFragmentId()).instanceExecParams.size();
+                    if (currentChildFragmentParallelism > maxParallelism) {
+                        maxParallelism = currentChildFragmentParallelism;
+                        inputFragmentIndex = j;
+                    }
+                }
+
+                PlanFragmentId inputFragmentId = 
fragment.getChild(inputFragmentIndex).getFragmentId();
                 // AddAll() soft copy()
                 int exchangeInstances = -1;
                 if (ConnectContext.get() != null && 
ConnectContext.get().getSessionVariable() != null) {
                     exchangeInstances = 
ConnectContext.get().getSessionVariable().getExchangeInstanceParallel();
                 }
-                if (exchangeInstances > 0 && 
fragmentExecParamsMap.get(inputFragmentIdx).instanceExecParams.size() > 
exchangeInstances) {
+                if (exchangeInstances > 0 && 
fragmentExecParamsMap.get(inputFragmentId).instanceExecParams.size() > 
exchangeInstances) {
                     // random select some instance
                     // get distinct host,  when 
parallel_fragment_exec_instance_num > 1, single host may execute severval 
instances
                     Set<TNetworkAddress> hostSet = Sets.newHashSet();
-                    for (FInstanceExecParam execParams: 
fragmentExecParamsMap.get(inputFragmentIdx).instanceExecParams) {
+                    for (FInstanceExecParam execParams: 
fragmentExecParamsMap.get(inputFragmentId).instanceExecParams) {
                         hostSet.add(execParams.host);
                     }
                     List<TNetworkAddress> hosts = Lists.newArrayList(hostSet);
@@ -945,7 +955,7 @@ public class Coordinator {
                         params.instanceExecParams.add(instanceParam);
                     }
                 } else {
-                    for (FInstanceExecParam execParams: 
fragmentExecParamsMap.get(inputFragmentIdx).instanceExecParams) {
+                    for (FInstanceExecParam execParams: 
fragmentExecParamsMap.get(inputFragmentId).instanceExecParams) {
                         FInstanceExecParam instanceParam = new 
FInstanceExecParam(null, execParams.host, 0, params);
                         params.instanceExecParams.add(instanceParam);
                     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to