This is an automated email from the ASF dual-hosted git repository.

924060929 pushed a commit to branch fe_local_shuffle_optimize_pr
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 0abc26eb08f3d45e015aa712baba970b809bf2f1
Author: 924060929 <[email protected]>
AuthorDate: Wed Jun 24 17:47:33 2026 +0800

    [test](local shuffle) regression tests for bucket upgrade + RF + 
count(distinct)
    
    - test_local_shuffle_bucket_upgrade: make assertions independent of BE count
      (pin bucket_shuffle_downgrade_ratio=0, remove environment-sensitive 
ratio=1.5 cases)
    - Forced runtime-filter correctness case: 4-arm test (upgrade+RF vs 
bucket+RF vs
      shuffle+RF vs upgrade-no-RF) ensuring the force_local_merge signal keeps 
RF
      correct under bucket-to-hash upgrade
    - Pin DORIS-25413 regression: count(distinct) under serial exchange, 
verifying the
      FE planner's require framework inserts hash LE for non-streaming dedup
---
 .../test_local_shuffle_bucket_upgrade.groovy       | 35 ++++++++++------
 .../test_local_shuffle_rqg_bugs.groovy             | 49 ++++++++++++++++++++++
 2 files changed, 72 insertions(+), 12 deletions(-)

diff --git 
a/regression-test/suites/nereids_p0/local_shuffle/test_local_shuffle_bucket_upgrade.groovy
 
b/regression-test/suites/nereids_p0/local_shuffle/test_local_shuffle_bucket_upgrade.groovy
index 0d9c21fe7d9..0e9c8ad85b4 100644
--- 
a/regression-test/suites/nereids_p0/local_shuffle/test_local_shuffle_bucket_upgrade.groovy
+++ 
b/regression-test/suites/nereids_p0/local_shuffle/test_local_shuffle_bucket_upgrade.groovy
@@ -28,10 +28,13 @@
  * Shape notes (verified against a live cluster):
  *  - LocalExchangeNodes only appear in EXPLAIN DISTRIBUTED PLAN (plain EXPLAIN
  *    renders the tree before AddLocalExchange runs).
- *  - Nereids bucket-shuffle downgrade: bucket shuffle only forms when
- *    totalBucketNum >= totalInstanceNum * 0.8, so BUCKETS 13 with
- *    parallel_pipeline_task_num=16 on 1 BE (13 >= 12.8) keeps the bucket join,
- *    and ratio=1.1 (16 > 13*1.1) enables the upgrade while default 1.5 does 
not.
+ *  - Nereids bucket-shuffle downgrade (bucket shuffle only forms when
+ *    totalBucketNum >= totalInstanceNum * bucket_shuffle_downgrade_ratio) 
depends on
+ *    the alive BE count, so the suite pins bucket_shuffle_downgrade_ratio=0 
to keep
+ *    the bucket join forming in any environment. ratio=1.1 fires on any 
machine
+ *    with >= 4 cores: min(task_num=16, cores) / min(buckets=4, cores) >= 4/4 
= 1.0
+ *    at 4 cores, and 5/4 = 1.25 > 1.1 at 5+ cores.  Bucket counts are kept low
+ *    (4/3/3) to avoid tripping the cores-aware gate on small machines.
  *  - The aggregation above must NOT group by the bucket key: a colocate agg
  *    requires bucket distribution of the join output and correctly blocks the
  *    upgrade via the parentRequire gate.
@@ -51,6 +54,7 @@ suite("test_local_shuffle_bucket_upgrade") {
             parallel_pipeline_task_num=16,
             parallel_exchange_instance_num=8,
             query_timeout=600,
+            bucket_shuffle_downgrade_ratio=0,
             local_shuffle_bucket_upgrade_ratio=${ratio},
             enable_local_shuffle=${ls_on},
             enable_local_shuffle_planner=${ls_on}
@@ -61,13 +65,13 @@ suite("test_local_shuffle_bucket_upgrade") {
     sql "DROP TABLE IF EXISTS lsbu_probe"
     sql "DROP TABLE IF EXISTS lsbu_probe2"
     sql """CREATE TABLE lsbu_fact (k INT, v BIGINT)
-           ENGINE=OLAP DUPLICATE KEY(k) DISTRIBUTED BY HASH(k) BUCKETS 13
+           ENGINE=OLAP DUPLICATE KEY(k) DISTRIBUTED BY HASH(k) BUCKETS 4
            PROPERTIES ("replication_num"="1")"""
     sql """CREATE TABLE lsbu_probe (pk INT, k INT, w BIGINT)
-           ENGINE=OLAP DUPLICATE KEY(pk) DISTRIBUTED BY HASH(pk) BUCKETS 7
+           ENGINE=OLAP DUPLICATE KEY(pk) DISTRIBUTED BY HASH(pk) BUCKETS 3
            PROPERTIES ("replication_num"="1")"""
     sql """CREATE TABLE lsbu_probe2 (pk INT, k INT, w BIGINT)
-           ENGINE=OLAP DUPLICATE KEY(pk) DISTRIBUTED BY HASH(pk) BUCKETS 5
+           ENGINE=OLAP DUPLICATE KEY(pk) DISTRIBUTED BY HASH(pk) BUCKETS 3
            PROPERTIES ("replication_num"="1")"""
     sql """INSERT INTO lsbu_fact
            SELECT CAST(number%50 AS INT), number*10+1
@@ -107,11 +111,6 @@ suite("test_local_shuffle_bucket_upgrade") {
     
assertFalse(ratioOnePlan.toString().contains("LOCAL_EXECUTION_HASH_SHUFFLE"),
         "ratio=1 must keep the upgrade off (<=1 disables)")
 
-    // default ratio 1.5 does not fire here: 16 < 13*1.5 (gate respects the 
threshold)
-    def ratioDefaultPlan = sql "EXPLAIN DISTRIBUTED PLAN 
${singleJoin(hints('true', '1.5'))}"
-    
assertFalse(ratioDefaultPlan.toString().contains("LOCAL_EXECUTION_HASH_SHUFFLE"),
-        "ratio=1.5 with 16 instances vs 13 buckets (16 < 19.5) must not 
upgrade")
-
     // Note: whether a group-by-bucket-key agg blocks the upgrade depends on 
the agg
     // shape the optimizer picks (a colocate one-phase agg requires bucket 
distribution
     // and blocks it; a two-phase agg does not). That parentRequire gate is 
covered
@@ -154,6 +153,18 @@ suite("test_local_shuffle_bucket_upgrade") {
     assertTrue(stackedUpgradedText.contains("LOCAL_EXECUTION_HASH_SHUFFLE"),
         "ratio=1.1 must upgrade the stacked bucket chain to LOCAL hash")
 
+    // Forced-RF killer case: with the upgrade, the join build is hash-sliced; 
the
+    // per-instance IN/MIN_MAX partial filters MUST be merged before 
application
+    // (TRuntimeFilterDesc.force_local_merge). Before that fix this query 
silently
+    // lost up to 96% of its rows.
+    def rfHints = { ratio ->
+        hints('true', ratio).replace(")*/",
+            ", enable_runtime_filter_prune=false, 
runtime_filter_type='IN,MIN_MAX')*/")
+    }
+    def single_up_rf = sql "SELECT ${rfHints('1.1')} p.pk % 10 AS g, COUNT(*) 
c, SUM(f.v) sv, SUM(p.w) sw FROM lsbu_fact f JOIN lsbu_probe p ON p.k = f.k 
GROUP BY g ORDER BY g"
+    assertEquals(single_baseline, single_up_rf,
+        "upgraded bucket join with forced IN/MIN_MAX runtime filters must stay 
correct")
+
     def stacked_baseline = sql stackedJoin(hints('false', '0'))
     def stacked_bucket = sql stackedJoin(hints('true', '0'))
     def stacked_upgraded = sql stackedJoin(hints('true', '1.1'))
diff --git 
a/regression-test/suites/nereids_p0/local_shuffle/test_local_shuffle_rqg_bugs.groovy
 
b/regression-test/suites/nereids_p0/local_shuffle/test_local_shuffle_rqg_bugs.groovy
index e369dc0f11e..31cc5720581 100644
--- 
a/regression-test/suites/nereids_p0/local_shuffle/test_local_shuffle_rqg_bugs.groovy
+++ 
b/regression-test/suites/nereids_p0/local_shuffle/test_local_shuffle_rqg_bugs.groovy
@@ -1160,6 +1160,55 @@ suite("test_local_shuffle_rqg_bugs") {
         assertTrue(false, "Bug 20: Serial exchange + agg hang: ${t.message}")
     }
 
+    // ============================================================
+    //  DORIS-25413: count(distinct)+std + RIGHT JOIN returns inflated 
distinct count
+    //  when use_serial_exchange=true + enable_local_exchange_before_agg=false.
+    //  Root cause (BE-planned): AggSink early-return ignored that the serial 
exchange
+    //  child breaks the HASH(s) invariant via PASSTHROUGH fan-out; fixed 
upstream by
+    //  child_breaks_local_key_distribution (#63766). The FE planner fixes it
+    //  structurally: requires are semantic, a hash LE is inserted instead of
+    //  PASSTHROUGH. This case pins both paths.
+    // ============================================================
+    try {
+        logger.info("DORIS-25413: count(distinct) under serial exchange")
+        sql "DROP TABLE IF EXISTS rqg_25413_t1"
+        sql "DROP TABLE IF EXISTS rqg_25413_t2"
+        sql """CREATE TABLE rqg_25413_t1 (pk INT NOT NULL, s VARCHAR(64) NOT 
NULL, d DECIMAL(10,2) NOT NULL)
+               ENGINE=OLAP DUPLICATE KEY(pk) DISTRIBUTED BY HASH(pk) BUCKETS 5
+               PROPERTIES ("replication_num"="1")"""
+        sql """CREATE TABLE rqg_25413_t2 (pk INT NOT NULL, dt DATETIME NOT 
NULL)
+               ENGINE=OLAP DUPLICATE KEY(pk) DISTRIBUTED BY HASH(pk) BUCKETS 5
+               PROPERTIES ("replication_num"="1")"""
+        sql """INSERT INTO rqg_25413_t1
+               SELECT CAST(number AS INT), concat('s', CAST(number % 29 AS 
INT)),
+                      CAST(number * 13 % 1000 AS DECIMAL(10,2))
+               FROM numbers("number"="200")"""
+        sql """INSERT INTO rqg_25413_t2
+               SELECT CAST(number AS INT),
+                      date_add('2000-01-01 00:00:00', INTERVAL CAST(number % 
3000 AS INT) DAY)
+               FROM numbers("number"="200")"""
+
+        def q25413 = { vars -> """
+            SELECT /*+SET_VAR(${vars})*/
+                count(distinct t1.s) AS cnt_distinct, std(t1.d) AS std_val
+            FROM rqg_25413_t1 t1
+            RIGHT JOIN rqg_25413_t2 t2 ON t1.pk = t2.pk
+            WHERE t2.dt < '2005-01-01 00:00:00'
+        """ }
+        def base25413 = "enable_sql_cache=false, 
enable_local_exchange_before_agg=false, parallel_pipeline_task_num=4"
+        def expected25413 = sql q25413(base25413)
+        for (planner in ['false', 'true']) {
+            def actual = sql q25413(
+                "${base25413}, experimental_use_serial_exchange=true, 
enable_local_shuffle_planner=${planner}")
+            assertEquals(expected25413, actual,
+                "DORIS-25413 planner=${planner}: distinct count must not be 
inflated under serial exchange")
+        }
+        logger.info("DORIS-25413: PASSED")
+    } catch (Throwable t) {
+        logger.error("DORIS-25413 FAILED: ${t.message}")
+        assertTrue(false, "DORIS-25413: ${t.message}")
+    }
+
     // ============================================================
     //  Bug 21: Multi-distinct COUNT on many-bucket table → COREDUMP
     //  RQG build 186737/186929/186952: AggSinkOperatorX::sink → 
set_ready_to_read


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to