This is an automated email from the ASF dual-hosted git repository.

924060929 pushed a commit to branch fe_local_shuffle_rebase
in repository https://gitbox.apache.org/repos/asf/doris.git

commit e27b1174004f32f89004cfee781785056f46573a
Author: 924060929 <[email protected]>
AuthorDate: Tue Jun 2 11:19:30 2026 +0800

    [fix](local shuffle) fall back to LOCAL hash for serial source in 
PARTITIONED join/intersect
    
    DORIS-26120: with use_serial_exchange=true, shuffle_idx_to_instance_idx
    has only one entry. GLOBAL_EXECUTION_HASH_SHUFFLE routes data to
    non-existent indices, causing "Rows mismatched" error.
    
    Fix: check fragment.useSerialSource() in HashJoinNode and
    SetOperationNode. When true, fall back to requireHash() (LOCAL),
    matching BE's _use_serial_source behavior. When false (normal case),
    keep requireGlobalExecutionHash() for DORIS-26101 correctness.
---
 .../org/apache/doris/planner/HashJoinNode.java     | 13 +++++--
 .../org/apache/doris/planner/SetOperationNode.java | 10 ++++--
 .../planner/LocalShuffleNodeCoverageTest.java      | 15 ++++++++
 .../test_local_shuffle_global_hash_require.groovy  | 42 ++++++++++++++++++++++
 4 files changed, 76 insertions(+), 4 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java
index a737e6106b5..cbf2dcd6511 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java
@@ -349,8 +349,17 @@ public class HashJoinNode extends JoinNodeBase {
             // instance mapping as the cross-fragment exchange. LOCAL hash has 
a different
             // modulus (per-BE instance count vs total instance count) and 
would cause
             // join mismatches (DORIS-26101).
-            buildSideRequire = probeSideRequire
-                    = LocalExchangeTypeRequire.requireGlobalExecutionHash();
+            //
+            // Exception: serial source (use_serial_exchange=true + pooling). 
The serial
+            // exchange sends to a single BE so shuffle_idx_to_instance_idx 
has only one
+            // entry — GLOBAL hash would route data to non-existent indices 
(DORIS-26120).
+            // Fall back to generic requireHash() which resolves to LOCAL, 
matching BE's
+            // _use_serial_source behavior.
+            boolean serialSource = fragment != null
+                    && 
fragment.useSerialSource(translatorContext.getConnectContext());
+            buildSideRequire = probeSideRequire = serialSource
+                    ? LocalExchangeTypeRequire.requireHash()
+                    : LocalExchangeTypeRequire.requireGlobalExecutionHash();
             outputType = null; // derived from probeResult.second below
         }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/SetOperationNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/SetOperationNode.java
index af9338fa74f..17fdf35bbd0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/SetOperationNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SetOperationNode.java
@@ -231,8 +231,14 @@ public abstract class SetOperationNode extends PlanNode {
                 // PARTITIONED intersect/except: all children enter via global 
hash
                 // exchange. Require GLOBAL so any inserted exchange matches 
the
                 // cross-fragment instance mapping (same fix as HashJoinNode 
DORIS-26101).
-                requireChild = 
LocalExchangeTypeRequire.requireGlobalExecutionHash();
-                outputType = LocalExchangeType.GLOBAL_EXECUTION_HASH_SHUFFLE;
+                // Exception: serial source → fall back to LOCAL (DORIS-26120).
+                boolean serialSource = fragment != null
+                        && 
fragment.useSerialSource(translatorContext.getConnectContext());
+                requireChild = serialSource
+                        ? LocalExchangeTypeRequire.requireHash()
+                        : 
LocalExchangeTypeRequire.requireGlobalExecutionHash();
+                outputType = AddLocalExchange.resolveExchangeType(
+                        requireChild, translatorContext, this, firstChild);
             }
         }
 
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/planner/LocalShuffleNodeCoverageTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/planner/LocalShuffleNodeCoverageTest.java
index 188941bdb3a..7288ba49ca2 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/planner/LocalShuffleNodeCoverageTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/planner/LocalShuffleNodeCoverageTest.java
@@ -251,6 +251,21 @@ public class LocalShuffleNodeCoverageTest {
                 "no exchange should be inserted when child already provides 
GLOBAL hash");
         Assertions.assertSame(buildGlobal, partitionedSatisfied.getChild(1));
 
+        // DORIS-26120: PARTITIONED join with serial source falls back to 
LOCAL hash
+        // because GLOBAL shuffle_idx_to_instance_idx is incomplete for serial 
exchange.
+        TrackingScanNode probeSerial = new TrackingScanNode(nextPlanNodeId(), 
LocalExchangeType.NOOP);
+        TrackingPlanNode buildSerial = new TrackingPlanNode(nextPlanNodeId(), 
LocalExchangeType.NOOP);
+        HashJoinNode serialPartitioned = new HashJoinNode(nextPlanNodeId(), 
probeSerial, buildSerial,
+                JoinOperator.INNER_JOIN, eqConjuncts, Collections.emptyList(), 
null, null, false);
+        serialPartitioned.setDistributionMode(DistributionMode.PARTITIONED);
+        serialPartitioned.fragment = Mockito.mock(PlanFragment.class);
+        
Mockito.when(serialPartitioned.fragment.useSerialSource(Mockito.any())).thenReturn(true);
+        Pair<PlanNode, LocalExchangeType> serialPartOutput = 
serialPartitioned.enforceAndDeriveLocalExchange(
+                ctx, null, LocalExchangeTypeRequire.requireHash());
+        
Assertions.assertEquals(LocalExchangeType.LOCAL_EXECUTION_HASH_SHUFFLE, 
serialPartOutput.second);
+        assertChildLocalExchangeType(serialPartitioned, 0, 
LocalExchangeType.LOCAL_EXECUTION_HASH_SHUFFLE);
+        assertChildLocalExchangeType(serialPartitioned, 1, 
LocalExchangeType.LOCAL_EXECUTION_HASH_SHUFFLE);
+
         TrackingPlanNode probe3 = new TrackingPlanNode(nextPlanNodeId(), 
LocalExchangeType.NOOP);
         TrackingPlanNode build3 = new TrackingPlanNode(nextPlanNodeId(), 
LocalExchangeType.NOOP);
         HashJoinNode nullAwareJoin = new HashJoinNode(nextPlanNodeId(), 
probe3, build3,
diff --git 
a/regression-test/suites/nereids_p0/local_shuffle/test_local_shuffle_global_hash_require.groovy
 
b/regression-test/suites/nereids_p0/local_shuffle/test_local_shuffle_global_hash_require.groovy
index 82b3a57e21f..f2aff5525b5 100644
--- 
a/regression-test/suites/nereids_p0/local_shuffle/test_local_shuffle_global_hash_require.groovy
+++ 
b/regression-test/suites/nereids_p0/local_shuffle/test_local_shuffle_global_hash_require.groovy
@@ -365,4 +365,46 @@ suite("test_local_shuffle_global_hash_require") {
     assertEquals(23, ptopn_baseline.size())
     assertEquals(ptopn_baseline, ptopn_fe,
         "DORIS-26103: UNION ALL -> PartitionTopN -> INTERSECT")
+
+    // ============================================================
+    // DORIS-26120: serial exchange + shuffle join → GLOBAL hash
+    // shuffle_idx_to_instance_idx incomplete → Rows mismatched.
+    // Fix: fall back to LOCAL hash when fragment uses serial source.
+    // ============================================================
+    sql "DROP TABLE IF EXISTS ls_serial_fact"
+    sql "DROP TABLE IF EXISTS ls_serial_dim"
+    sql """CREATE TABLE ls_serial_fact (pk INT NOT NULL, g INT NOT NULL)
+           ENGINE=OLAP DUPLICATE KEY(pk,g) DISTRIBUTED BY HASH(pk) BUCKETS 1
+           PROPERTIES ("replication_num"="1")"""
+    sql """CREATE TABLE ls_serial_dim (g INT NOT NULL)
+           ENGINE=OLAP DUPLICATE KEY(g) DISTRIBUTED BY HASH(g) BUCKETS 1
+           PROPERTIES ("replication_num"="1")"""
+    sql "INSERT INTO ls_serial_fact VALUES (1, 1)"
+    sql "INSERT INTO ls_serial_dim VALUES (1)"
+
+    def serial_baseline = sql """SELECT /*+SET_VAR(
+        enable_sql_cache=false,
+        enable_local_shuffle=false,
+        enable_local_shuffle_planner=false,
+        use_serial_exchange=true,
+        parallel_pipeline_task_num=4,
+        ignore_storage_data_distribution=true
+    )*/ a.g AS left_g, b.g AS right_g
+        FROM ls_serial_fact a JOIN [shuffle] ls_serial_dim b ON a.g = b.g
+        ORDER BY left_g, right_g"""
+
+    def serial_fe = sql """SELECT /*+SET_VAR(
+        enable_sql_cache=false,
+        enable_local_shuffle=true,
+        enable_local_shuffle_planner=true,
+        use_serial_exchange=true,
+        parallel_pipeline_task_num=4,
+        ignore_storage_data_distribution=true
+    )*/ a.g AS left_g, b.g AS right_g
+        FROM ls_serial_fact a JOIN [shuffle] ls_serial_dim b ON a.g = b.g
+        ORDER BY left_g, right_g"""
+
+    assertEquals(1, serial_baseline.size())
+    assertEquals(serial_baseline, serial_fe,
+        "DORIS-26120: serial exchange + shuffle join should not error")
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to