This is an automated email from the ASF dual-hosted git repository.

924060929 pushed a commit to branch fe_local_shuffle_rebase
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 261abe7dc475f0a838f0c1083ec856b8e5ed47c0
Author: 924060929 <[email protected]>
AuthorDate: Mon Jun 1 12:53:15 2026 +0800

    [fix](local shuffle) PARTITIONED hash join: requireGlobalExecutionHash 
instead of requireHash
    
    PARTITIONED (shuffle) joins have both sides entering via global hash
    exchange. The else branch used generic requireHash() which resolved to
    LOCAL_EXECUTION_HASH_SHUFFLE when a new exchange was needed. LOCAL hash
    uses per-BE instance count as modulus, incompatible with the global
    exchange's total-instance modulus on the other side, causing join
    mismatches and missing rows (DORIS-26101).
    
    Fix: use requireGlobalExecutionHash() so any inserted exchange matches
    the cross-fragment global exchange instance mapping.
---
 .../org/apache/doris/planner/HashJoinNode.java     |  14 +-
 .../planner/LocalShuffleNodeCoverageTest.java      |  25 ++-
 .../test_local_shuffle_cross_join_hash_join.groovy | 230 +++++++++++++++++++++
 3 files changed, 258 insertions(+), 11 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java
index ae01b515cab..a737e6106b5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java
@@ -344,13 +344,13 @@ public class HashJoinNode extends JoinNodeBase {
                     LocalExchangeTypeRequire.requireBucketHash(), 
translatorContext, this,
                     children.get(0));
         } else {
-            // Use requireHash() (not requireGlobalExecutionHash()) so that 
resolveExchangeType()
-            // can downgrade to LOCAL_EXECUTION_HASH_SHUFFLE via 
shouldUseLocalExecutionHash().
-            // This matches BE-native behavior where use_serial_exchange=true 
sets _use_serial_source=true,
-            // causing _add_local_exchange_impl to use LOCAL (not GLOBAL) hash 
shuffle.
-            // With use_serial_exchange=false, the upstream ExchangeNode 
already outputs
-            // GLOBAL_EXECUTION_HASH_SHUFFLE which satisfies requireHash() — 
no new exchange inserted.
-            buildSideRequire = probeSideRequire = 
LocalExchangeTypeRequire.requireHash();
+            // PARTITIONED (shuffle) join: both sides enter via global hash 
exchange.
+            // Require GLOBAL specifically so that any inserted exchange uses 
the same
+            // instance mapping as the cross-fragment exchange. LOCAL hash has 
a different
+            // modulus (per-BE instance count vs total instance count) and 
would cause
+            // join mismatches (DORIS-26101).
+            buildSideRequire = probeSideRequire
+                    = LocalExchangeTypeRequire.requireGlobalExecutionHash();
             outputType = null; // derived from probeResult.second below
         }
 
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/planner/LocalShuffleNodeCoverageTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/planner/LocalShuffleNodeCoverageTest.java
index 4abd9286384..dbf9d1cda81 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/planner/LocalShuffleNodeCoverageTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/planner/LocalShuffleNodeCoverageTest.java
@@ -229,10 +229,27 @@ public class LocalShuffleNodeCoverageTest {
         hashJoin.setDistributionMode(DistributionMode.PARTITIONED);
         Pair<PlanNode, LocalExchangeType> hashOutput = 
hashJoin.enforceAndDeriveLocalExchange(
                 ctx, null, LocalExchangeTypeRequire.requireHash());
-        // enforceRequire resolves RequireHash to LOCAL_EXECUTION_HASH_SHUFFLE 
(FE-planned always uses LOCAL)
-        
Assertions.assertEquals(LocalExchangeType.LOCAL_EXECUTION_HASH_SHUFFLE, 
hashOutput.second);
-        assertChildLocalExchangeType(hashJoin, 0, 
LocalExchangeType.LOCAL_EXECUTION_HASH_SHUFFLE);
-        assertChildLocalExchangeType(hashJoin, 1, 
LocalExchangeType.LOCAL_EXECUTION_HASH_SHUFFLE);
+        // PARTITIONED join requires GLOBAL hash to match cross-fragment 
exchange (DORIS-26101)
+        
Assertions.assertEquals(LocalExchangeType.GLOBAL_EXECUTION_HASH_SHUFFLE, 
hashOutput.second);
+        assertChildLocalExchangeType(hashJoin, 0, 
LocalExchangeType.GLOBAL_EXECUTION_HASH_SHUFFLE);
+        assertChildLocalExchangeType(hashJoin, 1, 
LocalExchangeType.GLOBAL_EXECUTION_HASH_SHUFFLE);
+
+        // DORIS-26101: PARTITIONED join with probe child already providing 
GLOBAL hash
+        // (e.g. upstream ExchangeNode) should satisfy 
requireGlobalExecutionHash without
+        // inserting a new exchange.
+        TrackingPlanNode probeGlobal = new TrackingPlanNode(nextPlanNodeId(),
+                LocalExchangeType.GLOBAL_EXECUTION_HASH_SHUFFLE);
+        TrackingPlanNode buildGlobal = new TrackingPlanNode(nextPlanNodeId(),
+                LocalExchangeType.GLOBAL_EXECUTION_HASH_SHUFFLE);
+        HashJoinNode partitionedSatisfied = new HashJoinNode(nextPlanNodeId(), 
probeGlobal, buildGlobal,
+                JoinOperator.INNER_JOIN, eqConjuncts, Collections.emptyList(), 
null, null, false);
+        partitionedSatisfied.setDistributionMode(DistributionMode.PARTITIONED);
+        Pair<PlanNode, LocalExchangeType> satisfiedOutput = 
partitionedSatisfied.enforceAndDeriveLocalExchange(
+                ctx, null, LocalExchangeTypeRequire.requireHash());
+        
Assertions.assertEquals(LocalExchangeType.GLOBAL_EXECUTION_HASH_SHUFFLE, 
satisfiedOutput.second);
+        Assertions.assertSame(probeGlobal, partitionedSatisfied.getChild(0),
+                "no exchange should be inserted when child already provides 
GLOBAL hash");
+        Assertions.assertSame(buildGlobal, partitionedSatisfied.getChild(1));
 
         TrackingPlanNode probe3 = new TrackingPlanNode(nextPlanNodeId(), 
LocalExchangeType.NOOP);
         TrackingPlanNode build3 = new TrackingPlanNode(nextPlanNodeId(), 
LocalExchangeType.NOOP);
diff --git 
a/regression-test/suites/nereids_p0/local_shuffle/test_local_shuffle_cross_join_hash_join.groovy
 
b/regression-test/suites/nereids_p0/local_shuffle/test_local_shuffle_cross_join_hash_join.groovy
new file mode 100644
index 00000000000..93e91355ef4
--- /dev/null
+++ 
b/regression-test/suites/nereids_p0/local_shuffle/test_local_shuffle_cross_join_hash_join.groovy
@@ -0,0 +1,230 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+/**
+ * DORIS-26101: FE local shuffle planner returns wrong result for
+ * aggregate → CROSS/NLJ → downstream partitioned hash join.
+ *
+ * Root cause: PARTITIONED hash join used generic requireHash() which
+ * resolved to LOCAL_EXECUTION_HASH_SHUFFLE. LOCAL hash has a different
+ * modulus (per-BE instance count) than the GLOBAL hash exchange on
+ * the other side of the join, causing instance mapping mismatch and
+ * missing rows.
+ *
+ * Fix: PARTITIONED join uses requireGlobalExecutionHash() so any
+ * inserted exchange matches the cross-fragment global exchange.
+ */
+suite("test_local_shuffle_cross_join_hash_join") {
+
+    sql "DROP TABLE IF EXISTS ls_cross_a"
+    sql "DROP TABLE IF EXISTS ls_cross_dim"
+
+    sql """
+        CREATE TABLE ls_cross_a (
+            id INT,
+            g INT,
+            v INT
+        ) ENGINE=OLAP DUPLICATE KEY(id, g)
+        DISTRIBUTED BY HASH(id) BUCKETS 13
+        PROPERTIES ("replication_num" = "1")
+    """
+
+    sql """
+        CREATE TABLE ls_cross_dim (
+            id INT,
+            g INT,
+            w INT
+        ) ENGINE=OLAP DUPLICATE KEY(id, g)
+        DISTRIBUTED BY HASH(g) BUCKETS 17
+        PROPERTIES ("replication_num" = "1")
+    """
+
+    sql """
+        INSERT INTO ls_cross_a
+        SELECT CAST(number AS INT) id, CAST(number AS INT) g, CAST(number * 10 
+ 1 AS INT) v
+        FROM numbers("number" = "23")
+    """
+
+    sql """
+        INSERT INTO ls_cross_dim
+        SELECT CAST(number AS INT) id, CAST(number % 23 AS INT) g, CAST(1000 + 
number AS INT) w
+        FROM numbers("number" = "713")
+    """
+
+    def commonHints = """/*+SET_VAR(
+        enable_sql_cache=false,
+        disable_join_reorder=true,
+        enable_local_exchange_before_agg=false,
+        experimental_force_to_local_shuffle=true,
+        experimental_enable_parallel_scan=false,
+        enable_runtime_filter_prune=false,
+        enable_runtime_filter_partition_prune=false,
+        runtime_filter_type='IN,MIN_MAX',
+        parallel_pipeline_task_num=8,
+        parallel_exchange_instance_num=8,
+        query_timeout=600,
+        prefer_join_method=shuffle
+    )*/"""
+
+    def queryBody = """
+        SELECT x.g, COUNT(*) c, SUM(d.w) sw
+        FROM (
+            SELECT a.g, one.v
+            FROM (
+                SELECT g, SUM(v) sv
+                FROM ls_cross_a
+                GROUP BY g
+            ) a
+            CROSS JOIN (SELECT 1 v) one
+        ) x
+        JOIN [shuffle] ls_cross_dim d ON x.g = d.g
+        GROUP BY x.g
+        ORDER BY x.g
+    """
+
+    // Baseline: local shuffle OFF
+    def baseline = sql """
+        ${commonHints}
+        SELECT ${commonHints.contains('x') ? '' : ''}
+        /*+SET_VAR(enable_local_shuffle=false, 
enable_local_shuffle_planner=false)*/
+        ${queryBody}
+    """.toString().replace('SELECT /*+SET_VAR', 'SELECT /*+SET_VAR')
+
+    // Actually run them properly:
+    baseline = sql """
+        SELECT /*+SET_VAR(
+            enable_sql_cache=false,
+            disable_join_reorder=true,
+            enable_local_exchange_before_agg=false,
+            experimental_force_to_local_shuffle=true,
+            experimental_enable_parallel_scan=false,
+            enable_runtime_filter_prune=false,
+            enable_runtime_filter_partition_prune=false,
+            runtime_filter_type='IN,MIN_MAX',
+            parallel_pipeline_task_num=8,
+            parallel_exchange_instance_num=8,
+            query_timeout=600,
+            prefer_join_method=shuffle,
+            enable_local_shuffle=false,
+            enable_local_shuffle_planner=false
+        )*/ x.g, COUNT(*) c, SUM(d.w) sw
+        FROM (
+            SELECT a.g, one.v
+            FROM (
+                SELECT g, SUM(v) sv FROM ls_cross_a GROUP BY g
+            ) a
+            CROSS JOIN (SELECT 1 v) one
+        ) x
+        JOIN [shuffle] ls_cross_dim d ON x.g = d.g
+        GROUP BY x.g
+        ORDER BY x.g
+    """
+
+    // FE local shuffle planner
+    def feResult = sql """
+        SELECT /*+SET_VAR(
+            enable_sql_cache=false,
+            disable_join_reorder=true,
+            enable_local_exchange_before_agg=false,
+            experimental_force_to_local_shuffle=true,
+            experimental_enable_parallel_scan=false,
+            enable_runtime_filter_prune=false,
+            enable_runtime_filter_partition_prune=false,
+            runtime_filter_type='IN,MIN_MAX',
+            parallel_pipeline_task_num=8,
+            parallel_exchange_instance_num=8,
+            query_timeout=600,
+            prefer_join_method=shuffle,
+            enable_local_shuffle=true,
+            enable_local_shuffle_planner=true
+        )*/ x.g, COUNT(*) c, SUM(d.w) sw
+        FROM (
+            SELECT a.g, one.v
+            FROM (
+                SELECT g, SUM(v) sv FROM ls_cross_a GROUP BY g
+            ) a
+            CROSS JOIN (SELECT 1 v) one
+        ) x
+        JOIN [shuffle] ls_cross_dim d ON x.g = d.g
+        GROUP BY x.g
+        ORDER BY x.g
+    """
+
+    assertEquals(23, baseline.size(), "baseline should return 23 rows")
+    assertEquals(baseline, feResult,
+        "DORIS-26101: FE local shuffle planner should match baseline for 
aggregate->CROSS JOIN->shuffle join")
+
+    // Also test with NLJ (non-cross, inner join with always-true condition)
+    def nljBaseline = sql """
+        SELECT /*+SET_VAR(
+            enable_sql_cache=false,
+            disable_join_reorder=true,
+            enable_local_exchange_before_agg=false,
+            experimental_force_to_local_shuffle=true,
+            experimental_enable_parallel_scan=false,
+            enable_runtime_filter_prune=false,
+            enable_runtime_filter_partition_prune=false,
+            runtime_filter_type='IN,MIN_MAX',
+            parallel_pipeline_task_num=8,
+            parallel_exchange_instance_num=8,
+            query_timeout=600,
+            prefer_join_method=shuffle,
+            enable_local_shuffle=false,
+            enable_local_shuffle_planner=false
+        )*/ x.g, COUNT(*) c, SUM(d.w) sw
+        FROM (
+            SELECT a.g
+            FROM (
+                SELECT g, SUM(v) sv FROM ls_cross_a GROUP BY g
+            ) a, (SELECT 1 v) one
+        ) x
+        JOIN [shuffle] ls_cross_dim d ON x.g = d.g
+        GROUP BY x.g
+        ORDER BY x.g
+    """
+
+    def nljFeResult = sql """
+        SELECT /*+SET_VAR(
+            enable_sql_cache=false,
+            disable_join_reorder=true,
+            enable_local_exchange_before_agg=false,
+            experimental_force_to_local_shuffle=true,
+            experimental_enable_parallel_scan=false,
+            enable_runtime_filter_prune=false,
+            enable_runtime_filter_partition_prune=false,
+            runtime_filter_type='IN,MIN_MAX',
+            parallel_pipeline_task_num=8,
+            parallel_exchange_instance_num=8,
+            query_timeout=600,
+            prefer_join_method=shuffle,
+            enable_local_shuffle=true,
+            enable_local_shuffle_planner=true
+        )*/ x.g, COUNT(*) c, SUM(d.w) sw
+        FROM (
+            SELECT a.g
+            FROM (
+                SELECT g, SUM(v) sv FROM ls_cross_a GROUP BY g
+            ) a, (SELECT 1 v) one
+        ) x
+        JOIN [shuffle] ls_cross_dim d ON x.g = d.g
+        GROUP BY x.g
+        ORDER BY x.g
+    """
+
+    assertEquals(nljBaseline, nljFeResult,
+        "DORIS-26101: NLJ variant should also match baseline")
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to