This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch branch-0.13 in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
commit 53f057797aea5353cdfa6e5c94585b01de383394 Author: wutiangan <[email protected]> AuthorDate: Sun Sep 13 19:14:00 2020 +0800 [SQL][Planner] Fix the the parallesim of fragment which has 3 or more childern #4569 (#4570) fix the the parallelism of fragment which has 3 or more childern --- .../main/java/org/apache/doris/qe/Coordinator.java | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 60707a0..55c63c6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -17,7 +17,6 @@ package org.apache.doris.qe; -import org.apache.commons.collections.map.HashedMap; import org.apache.doris.analysis.Analyzer; import org.apache.doris.analysis.DescriptorTable; import org.apache.doris.catalog.Catalog; @@ -89,6 +88,7 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; +import org.apache.commons.collections.map.HashedMap; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.thrift.TException; @@ -916,7 +916,9 @@ public class Coordinator { continue; } - PlanNode leftMostNode = findLeftmostNode(fragment.getPlanRoot()); + Pair<PlanNode, PlanNode> pairNodes = findLeftmostNode(fragment.getPlanRoot()); + PlanNode fatherNode = pairNodes.first; + PlanNode leftMostNode = pairNodes.second; /* * Case A: @@ -933,7 +935,11 @@ public class Coordinator { int inputFragmentIndex = 0; int maxParallelism = 0; - for (int j = 0; j < fragment.getChildren().size(); j++) { + // If the fragment has three children, then the first child and the second child are the child(both exchange node) of shuffle HashJoinNode, + // and the third child is the right child(ExchangeNode) of broadcast HashJoinNode. + // We only need to pay attention to the maximum parallelism among the two ExchangeNodes of shuffle HashJoinNode. + int childrenCount = (fatherNode != null) ? fatherNode.getChildren().size() : 1; + for (int j = 0; j < childrenCount; j++) { int currentChildFragmentParallelism = fragmentExecParamsMap.get(fragment.getChild(j).getFragmentId()).instanceExecParams.size(); if (currentChildFragmentParallelism > maxParallelism) { maxParallelism = currentChildFragmentParallelism; @@ -1071,12 +1077,14 @@ public class Coordinator { // Returns the id of the leftmost node of any of the gives types in 'plan_root', // or INVALID_PLAN_NODE_ID if no such node present. - private PlanNode findLeftmostNode(PlanNode plan) { + private Pair<PlanNode, PlanNode> findLeftmostNode(PlanNode plan) { PlanNode newPlan = plan; + PlanNode fatherPlan = null; while (newPlan.getChildren().size() != 0 && !(newPlan instanceof ExchangeNode)) { + fatherPlan = newPlan; newPlan = newPlan.getChild(0); } - return newPlan; + return new Pair<PlanNode, PlanNode>(fatherPlan, newPlan); } private <K, V> V findOrInsert(HashMap<K, V> m, final K key, final V defaultVal) { @@ -1701,3 +1709,4 @@ public class Coordinator { } } } + --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
