This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.1 by this push:
     new 8d32ab18adb branch-4.1: [fix](coordinator) fix 
computeDestIdToInstanceId picking wrong ExchangeNode for multi-input fragments 
#63615 (#63821)
8d32ab18adb is described below

commit 8d32ab18adbe00e3a13c5cddb0ab9c66b06a058f
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Fri May 29 15:02:34 2026 +0800

    branch-4.1: [fix](coordinator) fix computeDestIdToInstanceId picking wrong 
ExchangeNode for multi-input fragments #63615 (#63821)
    
    Cherry-picked from #63615
    
    Co-authored-by: 924060929 <[email protected]>
---
 .../doris/qe/runtime/ThriftPlansBuilder.java       | 29 +++++--
 .../test_multicast_sink_multi_exchange.groovy      | 96 ++++++++++++++++++++++
 2 files changed, 120 insertions(+), 5 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java
index 0738876282f..409037e3b61 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java
@@ -577,13 +577,32 @@ public class ThriftPlansBuilder {
             PipelineDistributedPlan receivePlan, DistributedPlanWorker 
filterWorker,
             BiConsumer<AssignedJob, Integer> computeFn) {
 
-        // current only support all input plans have same destination with 
same order,
-        // so we can get first input plan to compute shuffle index to instance 
id
-        Set<Entry<ExchangeNode, DistributedPlan>> exchangeToChildPlanSet = 
receivePlan.getInputs().entries();
-        if (exchangeToChildPlanSet.isEmpty()) {
+        // When a fragment has multiple ExchangeNode inputs (e.g. NLJ with 
probe + BROADCAST
+        // build), pick the one with the most destinations on this worker. A 
BROADCAST input has
+        // 1 dest per BE while HASH-partitioned has N; using BROADCAST would 
produce a 1-entry
+        // map and cause 'Rows mismatched' for GLOBAL_HASH LOCAL_EXCHANGE.
+        Entry<ExchangeNode, DistributedPlan> exchangeToChildPlan = null;
+        int maxDestsOnWorker = -1;
+        for (Entry<ExchangeNode, DistributedPlan> entry : 
receivePlan.getInputs().entries()) {
+            ExchangeNode exchNode = entry.getKey();
+            PipelineDistributedPlan childPlan = (PipelineDistributedPlan) 
entry.getValue();
+            for (Entry<DataSink, List<AssignedJob>> kv : 
childPlan.getDestinations().entrySet()) {
+                if (kv.getKey().getExchNodeId().asInt() != 
exchNode.getId().asInt()) {
+                    continue;
+                }
+                int destsOnWorker = (int) kv.getValue().stream()
+                        .filter(j -> j.getAssignedWorker().id() == 
filterWorker.id())
+                        .count();
+                if (destsOnWorker > maxDestsOnWorker) {
+                    maxDestsOnWorker = destsOnWorker;
+                    exchangeToChildPlan = entry;
+                }
+                break;
+            }
+        }
+        if (exchangeToChildPlan == null) {
             return;
         }
-        Entry<ExchangeNode, DistributedPlan> exchangeToChildPlan = 
exchangeToChildPlanSet.iterator().next();
         ExchangeNode linkNode = exchangeToChildPlan.getKey();
         PipelineDistributedPlan firstInputPlan = (PipelineDistributedPlan) 
exchangeToChildPlan.getValue();
         Map<DataSink, List<AssignedJob>> sinkToDestInstances = 
firstInputPlan.getDestinations();
diff --git 
a/regression-test/suites/nereids_syntax_p0/distribute/test_multicast_sink_multi_exchange.groovy
 
b/regression-test/suites/nereids_syntax_p0/distribute/test_multicast_sink_multi_exchange.groovy
new file mode 100644
index 00000000000..a2a670d50d4
--- /dev/null
+++ 
b/regression-test/suites/nereids_syntax_p0/distribute/test_multicast_sink_multi_exchange.groovy
@@ -0,0 +1,96 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Regression test for computeDestIdToInstanceId picking wrong ExchangeNode
+// when a fragment has multiple inputs with different partition types
+// (BROADCAST + HASH_PARTITIONED). Before the fix, .iterator().next() could
+// pick the BROADCAST input (1 dest per BE), producing a 1-entry shuffle map
+// and causing "Rows mismatched! Data may be lost".
+suite("test_multicast_sink_multi_exchange") {
+    sql "DROP TABLE IF EXISTS mse_fact"
+    sql "DROP TABLE IF EXISTS mse_dim"
+
+    sql """
+        CREATE TABLE mse_fact (
+            k1 VARCHAR(64) NOT NULL,
+            k2 INT NOT NULL,
+            v1 INT NOT NULL
+        ) ENGINE=OLAP
+        DUPLICATE KEY(k1, k2)
+        DISTRIBUTED BY HASH(k2) BUCKETS 8
+        PROPERTIES ("replication_num" = "1")
+    """
+    sql """
+        CREATE TABLE mse_dim (
+            k1 VARCHAR(64) NOT NULL,
+            v1 INT NOT NULL
+        ) ENGINE=OLAP
+        DUPLICATE KEY(k1)
+        DISTRIBUTED BY HASH(v1) BUCKETS 7
+        PROPERTIES ("replication_num" = "1")
+    """
+
+    // Use numbers() to generate enough rows across buckets
+    sql """
+        INSERT INTO mse_dim SELECT CONCAT('key_', number), number
+        FROM numbers('number' = '100')
+    """
+    sql """
+        INSERT INTO mse_fact SELECT CONCAT('key_', number % 100), number % 10, 
number
+        FROM numbers('number' = '500')
+    """
+
+    sql "SET enable_nereids_distribute_planner=true"
+    sql "SET disable_join_reorder=true"
+    sql "SET enable_local_shuffle=true"
+
+    // CTE consumed by LEFT SEMI JOIN [broadcast] (1 dest per BE) and
+    // INNER JOIN [shuffle] (N dests per BE), producing MultiCastDataSinks
+    // with different partition types to the same downstream fragment.
+    // Vary parallel_pipeline_task_num to change per-BE destination counts.
+    for (def ppt in [1, 2, 4, 8, 16]) {
+        def expected = sql """
+            SELECT /*+ 
SET_VAR(parallel_pipeline_task_num=${ppt},enable_nereids_distribute_planner=false)
 */
+                t2.k2, SUM(t1.v1)
+            FROM (
+                SELECT k1, k2 FROM mse_fact GROUP BY k1, k2
+            ) t2
+            LEFT SEMI JOIN mse_dim d ON t2.k1 = d.k1
+            INNER JOIN mse_dim t1 ON t2.k1 = t1.k1
+            GROUP BY t2.k2 ORDER BY t2.k2
+        """
+
+        test {
+            sql """
+                WITH /*+ SET_VAR(parallel_pipeline_task_num=${ppt}) */ dim_cte 
AS (
+                    SELECT k1, SUM(v1) as v1 FROM mse_dim GROUP BY k1
+                )
+                SELECT t2.k2, SUM(t1.v1)
+                FROM (
+                    SELECT k1, k2 FROM mse_fact GROUP BY k1, k2
+                ) t2
+                LEFT SEMI JOIN [broadcast] dim_cte d ON t2.k1 = d.k1
+                INNER JOIN [shuffle] dim_cte t1 ON t2.k1 = t1.k1
+                GROUP BY t2.k2 ORDER BY t2.k2
+            """
+            result(expected)
+        }
+    }
+
+    sql "DROP TABLE IF EXISTS mse_fact"
+    sql "DROP TABLE IF EXISTS mse_dim"
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to