This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git

commit a6fe91322920515c3511027dae0de90c586c010b
Author: xzj7019 <[email protected]>
AuthorDate: Wed Jul 19 09:43:31 2023 +0800

    [fix](nereids) fix cte as bc right side hang bug (#21897)
    
    During original computeMultiCastFragmentParams process, we don't handle the 
scenario the cte as the broadcast right side, which will lead the missing 
setting of the buildHashTableForBroadcastJoin flag true and finally the sql 
hang.
---
 .../main/java/org/apache/doris/qe/Coordinator.java | 37 ++++++++++++++++++----
 1 file changed, 31 insertions(+), 6 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index a81bd1984b..5ccbbcaa1e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -1418,12 +1418,37 @@ public class Coordinator {
                             params.instanceExecParams.size() + 
destParams.perExchNumSenders.get(exchId.asInt()));
                 }
 
-                for (int j = 0; j < destParams.instanceExecParams.size(); ++j) 
{
-                    TPlanFragmentDestination dest = new 
TPlanFragmentDestination();
-                    dest.fragment_instance_id = 
destParams.instanceExecParams.get(j).instanceId;
-                    dest.server = 
toRpcHost(destParams.instanceExecParams.get(j).host);
-                    dest.brpc_server = 
toBrpcHost(destParams.instanceExecParams.get(j).host);
-                    multiSink.getDestinations().get(i).add(dest);
+                List<TPlanFragmentDestination> destinations = 
multiSink.getDestinations().get(i);
+                if (enablePipelineEngine && 
enableShareHashTableForBroadcastJoin
+                        && params.fragment.isRightChildOfBroadcastHashJoin()) {
+                    // here choose the first instance to build hash table.
+                    Map<TNetworkAddress, FInstanceExecParam> destHosts = new 
HashMap<>();
+
+                    destParams.instanceExecParams.forEach(param -> {
+                        if (destHosts.containsKey(param.host)) {
+                            
destHosts.get(param.host).instancesSharingHashTable.add(param.instanceId);
+                        } else {
+                            destHosts.put(param.host, param);
+                            param.buildHashTableForBroadcastJoin = true;
+                            TPlanFragmentDestination dest = new 
TPlanFragmentDestination();
+                            dest.fragment_instance_id = param.instanceId;
+                            try {
+                                dest.server = toRpcHost(param.host);
+                                dest.setBrpcServer(toBrpcHost(param.host));
+                            } catch (Exception e) {
+                                throw new RuntimeException(e);
+                            }
+                            destinations.add(dest);
+                        }
+                    });
+                } else {
+                    for (int j = 0; j < destParams.instanceExecParams.size(); 
++j) {
+                        TPlanFragmentDestination dest = new 
TPlanFragmentDestination();
+                        dest.fragment_instance_id = 
destParams.instanceExecParams.get(j).instanceId;
+                        dest.server = 
toRpcHost(destParams.instanceExecParams.get(j).host);
+                        dest.brpc_server = 
toBrpcHost(destParams.instanceExecParams.get(j).host);
+                        destinations.add(dest);
+                    }
                 }
             }
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to