xionglei0 commented on a change in pull request #1788: add param: 
doris_exchange_instances to set parallel after exchange
URL: https://github.com/apache/incubator-doris/pull/1788#discussion_r323541404
 
 

 ##########
 File path: fe/src/main/java/org/apache/doris/qe/Coordinator.java
 ##########
 @@ -848,18 +848,39 @@ private void computeFragmentHosts() throws Exception {
             // be BE that fragment's scannode locates,  avoid less data.
             // chenhao added
             boolean hasUnionNode = containsUnionNode(fragment.getPlanRoot());
+
             if (!(leftMostNode instanceof ScanNode) && !hasUnionNode) {
                 // there is no leftmost scan; we assign the same hosts as 
those of our
                 // leftmost input fragment (so that a partitioned aggregation
                 // fragment runs on the hosts that provide the input data)
                 PlanFragmentId inputFragmentIdx =
-                    fragments.get(i).getChild(0).getFragmentId();
+                        fragments.get(i).getChild(0).getFragmentId();
                 // AddAll() soft copy()
-                for (FInstanceExecParam execParams 
-                        : 
fragmentExecParamsMap.get(inputFragmentIdx).instanceExecParams) {
-                    FInstanceExecParam instanceParam = new 
FInstanceExecParam(null, execParams.host, 
-                            0, params);
-                    params.instanceExecParams.add(instanceParam);
+                int doris_exchange_instances= -1;
+                if (ConnectContext.get() != null && 
ConnectContext.get().getSessionVariable() != null) {
+                    doris_exchange_instances = 
ConnectContext.get().getSessionVariable().getDorisExchangeInstances();
+                }
+                if (doris_exchange_instances > 0 && 
fragmentExecParamsMap.get(inputFragmentIdx).instanceExecParams.size() > 
doris_exchange_instances) {
+                    // random select some instance
+                    List<TNetworkAddress> hosts = Lists.newArrayList();
+                    Set<String> cache = new HashSet<String>();
+                    for (FInstanceExecParam execParams: 
fragmentExecParamsMap.get(inputFragmentIdx).instanceExecParams) {
+                        String hostPort = execParams.host.getHostname() + 
execParams.host.getPort();
+                        if (!cache.contains(hostPort)) {
+                            hosts.add(execParams.host);
+                            cache.add(hostPort);
+                        }
+                    }
+
+                    for (int index = 0; index < doris_exchange_instances; 
index++) {
 
 Review comment:
   ok

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@doris.apache.org
For additional commands, e-mail: dev-h...@doris.apache.org

Reply via email to