imay commented on a change in pull request #851: Parallel fragment exec instance
URL: https://github.com/apache/incubator-doris/pull/851#discussion_r270840685
 
 

 ##########
 File path: fe/src/main/java/org/apache/doris/qe/Coordinator.java
 ##########
 @@ -882,12 +889,60 @@ private void computeFragmentHosts() throws Exception {
         }
     }
 
-    //One fragment could only have one HashJoinNode
-    private boolean isColocateJoin(PlanNode node) {
-        if (Config.disable_colocate_join) {
+    private boolean needParallelInstance(PlanNode leftMostNode) {
+        if (getParallelExecInstanceNum() <= 
SessionVariable.MIN_EXEC_INSTANCE_NUM) {
             return false;
         }
 
+        if (leftMostNode instanceof OlapScanNode) {
+            OlapScanNode olapScanNode = (OlapScanNode) leftMostNode;
+
+            //case 1: the small table of broadcast join need not parallel
+            PlanFragment destFragment =  
olapScanNode.getFragment().getDestFragment();
+            if (destFragment != null)  {
+                PlanNode destRootNode = destFragment.getPlanRoot();
+                if (destRootNode instanceof HashJoinNode) {
+                    HashJoinNode joinNode = (HashJoinNode) destRootNode;
+                    if (!joinNode.isShuffleJoin()) {
+                        LOG.debug("ScanNode {} for fragment {} need not 
parallel because of broadcast join",
+                        olapScanNode.getOlapTable().getName(), 
olapScanNode.getFragmentId());
+                        return false;
+                    }
+                }
+            }
+
+            //case 2: the small table scan  need not parallel
+            long scanDataSize =  Math.round((double) 
olapScanNode.getCardinality() * olapScanNode.getAvgRowSize());
+            long smallScanSize = Config.parallel_scan_data_size_threshold;
+            if (scanDataSize <= smallScanSize) {
+                LOG.debug("ScanNode {} for fragment {} need not parallel 
because of data size {} is small",
+                olapScanNode.getOlapTable().getName(), 
olapScanNode.getFragmentId(), scanDataSize);
+                return false;
+            }
+        }
+
+        return true;
+    }
+
+    //get ParallelScanInstanceNum from session config or server config
+    private int getParallelExecInstanceNum() {
+        //cache the result. this method will call many times
+        if (parallelExecInstanceNum > 0) {
+            return parallelExecInstanceNum;
+        }
+
+        int sessionParallelExecInstanceNum = 
ConnectContext.get().getSessionVariable().getParallelExecInstanceNum();
+        //we prefer session config
+        if (sessionParallelExecInstanceNum > 
SessionVariable.MIN_EXEC_INSTANCE_NUM) {
+            parallelExecInstanceNum = sessionParallelExecInstanceNum;
+        } else {
+            parallelExecInstanceNum = 
Config.parallel_fragment_exec_instance_num;
+        }
 
 Review comment:
   I think that one session variable is enough.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to