This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.1 by this push:
new 8d32ab18adb branch-4.1: [fix](coordinator) fix
computeDestIdToInstanceId picking wrong ExchangeNode for multi-input fragments
#63615 (#63821)
8d32ab18adb is described below
commit 8d32ab18adbe00e3a13c5cddb0ab9c66b06a058f
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Fri May 29 15:02:34 2026 +0800
branch-4.1: [fix](coordinator) fix computeDestIdToInstanceId picking wrong
ExchangeNode for multi-input fragments #63615 (#63821)
Cherry-picked from #63615
Co-authored-by: 924060929 <[email protected]>
---
.../doris/qe/runtime/ThriftPlansBuilder.java | 29 +++++--
.../test_multicast_sink_multi_exchange.groovy | 96 ++++++++++++++++++++++
2 files changed, 120 insertions(+), 5 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java
index 0738876282f..409037e3b61 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java
@@ -577,13 +577,32 @@ public class ThriftPlansBuilder {
PipelineDistributedPlan receivePlan, DistributedPlanWorker
filterWorker,
BiConsumer<AssignedJob, Integer> computeFn) {
- // current only support all input plans have same destination with
same order,
- // so we can get first input plan to compute shuffle index to instance
id
- Set<Entry<ExchangeNode, DistributedPlan>> exchangeToChildPlanSet =
receivePlan.getInputs().entries();
- if (exchangeToChildPlanSet.isEmpty()) {
+ // When a fragment has multiple ExchangeNode inputs (e.g. NLJ with
probe + BROADCAST
+ // build), pick the one with the most destinations on this worker. A
BROADCAST input has
+ // 1 dest per BE while HASH-partitioned has N; using BROADCAST would
produce a 1-entry
+ // map and cause 'Rows mismatched' for GLOBAL_HASH LOCAL_EXCHANGE.
+ Entry<ExchangeNode, DistributedPlan> exchangeToChildPlan = null;
+ int maxDestsOnWorker = -1;
+ for (Entry<ExchangeNode, DistributedPlan> entry :
receivePlan.getInputs().entries()) {
+ ExchangeNode exchNode = entry.getKey();
+ PipelineDistributedPlan childPlan = (PipelineDistributedPlan)
entry.getValue();
+ for (Entry<DataSink, List<AssignedJob>> kv :
childPlan.getDestinations().entrySet()) {
+ if (kv.getKey().getExchNodeId().asInt() !=
exchNode.getId().asInt()) {
+ continue;
+ }
+ int destsOnWorker = (int) kv.getValue().stream()
+ .filter(j -> j.getAssignedWorker().id() ==
filterWorker.id())
+ .count();
+ if (destsOnWorker > maxDestsOnWorker) {
+ maxDestsOnWorker = destsOnWorker;
+ exchangeToChildPlan = entry;
+ }
+ break;
+ }
+ }
+ if (exchangeToChildPlan == null) {
return;
}
- Entry<ExchangeNode, DistributedPlan> exchangeToChildPlan =
exchangeToChildPlanSet.iterator().next();
ExchangeNode linkNode = exchangeToChildPlan.getKey();
PipelineDistributedPlan firstInputPlan = (PipelineDistributedPlan)
exchangeToChildPlan.getValue();
Map<DataSink, List<AssignedJob>> sinkToDestInstances =
firstInputPlan.getDestinations();
diff --git
a/regression-test/suites/nereids_syntax_p0/distribute/test_multicast_sink_multi_exchange.groovy
b/regression-test/suites/nereids_syntax_p0/distribute/test_multicast_sink_multi_exchange.groovy
new file mode 100644
index 00000000000..a2a670d50d4
--- /dev/null
+++
b/regression-test/suites/nereids_syntax_p0/distribute/test_multicast_sink_multi_exchange.groovy
@@ -0,0 +1,96 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Regression test for computeDestIdToInstanceId picking wrong ExchangeNode
+// when a fragment has multiple inputs with different partition types
+// (BROADCAST + HASH_PARTITIONED). Before the fix, .iterator().next() could
+// pick the BROADCAST input (1 dest per BE), producing a 1-entry shuffle map
+// and causing "Rows mismatched! Data may be lost".
+suite("test_multicast_sink_multi_exchange") {
+ sql "DROP TABLE IF EXISTS mse_fact"
+ sql "DROP TABLE IF EXISTS mse_dim"
+
+ sql """
+ CREATE TABLE mse_fact (
+ k1 VARCHAR(64) NOT NULL,
+ k2 INT NOT NULL,
+ v1 INT NOT NULL
+ ) ENGINE=OLAP
+ DUPLICATE KEY(k1, k2)
+ DISTRIBUTED BY HASH(k2) BUCKETS 8
+ PROPERTIES ("replication_num" = "1")
+ """
+ sql """
+ CREATE TABLE mse_dim (
+ k1 VARCHAR(64) NOT NULL,
+ v1 INT NOT NULL
+ ) ENGINE=OLAP
+ DUPLICATE KEY(k1)
+ DISTRIBUTED BY HASH(v1) BUCKETS 7
+ PROPERTIES ("replication_num" = "1")
+ """
+
+ // Use numbers() to generate enough rows across buckets
+ sql """
+ INSERT INTO mse_dim SELECT CONCAT('key_', number), number
+ FROM numbers('number' = '100')
+ """
+ sql """
+ INSERT INTO mse_fact SELECT CONCAT('key_', number % 100), number % 10,
number
+ FROM numbers('number' = '500')
+ """
+
+ sql "SET enable_nereids_distribute_planner=true"
+ sql "SET disable_join_reorder=true"
+ sql "SET enable_local_shuffle=true"
+
+ // CTE consumed by LEFT SEMI JOIN [broadcast] (1 dest per BE) and
+ // INNER JOIN [shuffle] (N dests per BE), producing MultiCastDataSinks
+ // with different partition types to the same downstream fragment.
+ // Vary parallel_pipeline_task_num to change per-BE destination counts.
+ for (def ppt in [1, 2, 4, 8, 16]) {
+ def expected = sql """
+ SELECT /*+
SET_VAR(parallel_pipeline_task_num=${ppt},enable_nereids_distribute_planner=false)
*/
+ t2.k2, SUM(t1.v1)
+ FROM (
+ SELECT k1, k2 FROM mse_fact GROUP BY k1, k2
+ ) t2
+ LEFT SEMI JOIN mse_dim d ON t2.k1 = d.k1
+ INNER JOIN mse_dim t1 ON t2.k1 = t1.k1
+ GROUP BY t2.k2 ORDER BY t2.k2
+ """
+
+ test {
+ sql """
+ WITH /*+ SET_VAR(parallel_pipeline_task_num=${ppt}) */ dim_cte
AS (
+ SELECT k1, SUM(v1) as v1 FROM mse_dim GROUP BY k1
+ )
+ SELECT t2.k2, SUM(t1.v1)
+ FROM (
+ SELECT k1, k2 FROM mse_fact GROUP BY k1, k2
+ ) t2
+ LEFT SEMI JOIN [broadcast] dim_cte d ON t2.k1 = d.k1
+ INNER JOIN [shuffle] dim_cte t1 ON t2.k1 = t1.k1
+ GROUP BY t2.k2 ORDER BY t2.k2
+ """
+ result(expected)
+ }
+ }
+
+ sql "DROP TABLE IF EXISTS mse_fact"
+ sql "DROP TABLE IF EXISTS mse_dim"
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]