lide-reed opened a new issue, #64172:
URL: https://github.com/apache/doris/issues/64172

   ### Search before asking
   
   - [x] I had searched in the 
[issues](https://github.com/apache/doris/issues?q=is%3Aissue) and found no 
similar issues.
   
   
   ### Version
   
   Reproduced on the current `master`. The same buggy code path
   (`PartitionSorter::_read_row_rank`) is present in 2.1 / 3.x / 4.x as well.
   The following reproduce verifying in 4.1.1
   
   ### What's Wrong?
   
   A window query that filters on `dense_rank()` (or `rank()`), such as
   
   ```sql
   SELECT ... FROM (
       SELECT *, dense_rank() OVER (PARTITION BY p ORDER BY o DESC) AS rk
       FROM t
   ) WHERE rk = 1;
   ```
   
   is rewritten by Nereids into a `VPartitionTopN` node with
   `partition limit = 1` and `partition topn phase: TWO_PHASE_LOCAL_PTOPN`.
   When the `dense_rank = 1` group of a partition contains **more rows than
   `batch_size`**, the query **silently returns only ~`batch_size` rows per
   pipeline instance** instead of the full group. No error is raised — the
   result is just wrong.
   
   `PartitionSorter::_read_row_rank()` declares EOS via a `Defer`
   (`be/src/exec/sort/partition_sorter.cpp`):
   
   ```cpp
   Status PartitionSorter::_read_row_rank(Block* output_block, bool* eos, int 
batch_size) {
       ...
       size_t merged_rows = 0;
   
       Defer defer {[&]() {
           if (merged_rows == 0 || _get_enough_data()) {   // <-- BUG
               *eos = true;
           }
       }};
   
       while (queue.is_valid() && merged_rows < batch_size) {
           ...
           for (...; merged_rows < batch_size; ...) {
               bool cmp_res = _previous_row->impl && 
_previous_row->compare_two_rows(current->impl);
               if (!cmp_res) {
                   if (_get_enough_data()) {
                       return Status::OK();
                   }
                   *_previous_row = *current;
                   _output_distinct_rows++;
               }
               // emit row
               ...
               merged_rows++;
               _output_total_rows++;
               ...
           }
       }
       return Status::OK();
   }
   ```
   
   For `DENSE_RANK`, `_get_enough_data()` is
   (`be/src/exec/sort/partition_sorter.h`):
   
   ```cpp
   bool _get_enough_data() const {
       if (_top_n_algorithm == TopNAlgorithm::DENSE_RANK) {
           return _output_distinct_rows >= _partition_inner_limit;
       } else {
           return _output_total_rows >= _partition_inner_limit;
       }
   }
   ```
   
   With `partition_inner_limit = 1` (i.e. `WHERE rk = 1`),
   `_output_distinct_rows` becomes `1` right after the **first** row of the
   `dense_rank = 1` group is emitted, so `_get_enough_data()` returns `true`
   immediately. The rest of that group has **not** been emitted yet — it is
   supposed to be drained across multiple `get_next()` calls.
   
   But when the inner loop exits because `merged_rows` reached `batch_size`,
   the `Defer` unconditionally sets `*eos = true`. The source operator then
   treats the sorter as fully drained
   (`be/src/exec/operator/partition_sort_source_operator.cpp`):
   
   ```cpp
   if (local_state._sort_idx < sorter_size) {
       RETURN_IF_ERROR(
               sorters[local_state._sort_idx]->get_next(state, output_block, 
&current_eos));
   }
   if (current_eos) {
       local_state._sort_idx++;   // advance to next sorter, dropping the rest
       ...
   }
   ```
   
   so it advances `_sort_idx` to the next sorter and **permanently drops all
   remaining rows of the current `dense_rank = 1` group**.
   
   Net effect: each pipeline instance emits at most `batch_size` rows of the
   `dense_rank = 1` group, so the observed row count is roughly
   `batch_size * number_of_pipeline_instances`, regardless of the group's
   true size.
   
   `row_number()` is **not** affected: its `_get_enough_data()` uses
   `_output_total_rows >= _partition_inner_limit`, which correctly becomes
   true only after exactly `partition_inner_limit` rows are emitted.
   
   ### What You Expected?
   
   The query should return the **entire** `dense_rank = 1` group (all rows
   that share the top sort-key value within each partition), independent of
   `batch_size` or the number of pipeline instances.
   
   ### How to Reproduce?
   
   Single BE, single tablet, single partition value — no special data layout
   needed (the bug is deterministic):
   
   ```sql
   DROP DATABASE IF EXISTS bug_repro;
   CREATE DATABASE bug_repro;
   USE bug_repro;
   
   CREATE TABLE t_min (
       ts  DATE   NOT NULL,
       pk  BIGINT NOT NULL,
       val INT
   )
   DUPLICATE KEY(ts, pk)
   DISTRIBUTED BY HASH(pk) BUCKETS 1
   PROPERTIES ("replication_num" = "1");
   
   -- dense_rank = 1 group (ts = 2024-01-02): 10000 rows, far larger than 
batch_size.
   -- Total rows (10003) < PARTITION_SORT_ROWS_THRESHOLD (20000 in release 
build),
   -- so partial sort is not triggered and the lost count is cleanly = 
batch_size.
   INSERT INTO t_min
   SELECT DATE '2024-01-02', 1, number FROM numbers("number" = "10000");
   
   -- dense_rank = 2 group (ts = 2024-01-01): 3 rows (just to create a 2nd 
distinct ts).
   INSERT INTO t_min
   SELECT DATE '2024-01-01', 1, number FROM numbers("number" = "3");
   
   -- Single instance for a clean, deterministic count.
   SET parallel_pipeline_task_num = 1;
   
   SELECT /*+ SET_VAR(batch_size =   256) */ ts, COUNT(*) cnt FROM (
     SELECT t.*, dense_rank() OVER (PARTITION BY pk ORDER BY ts DESC) dr FROM 
t_min t
   ) x WHERE dr = 1 GROUP BY ts ORDER BY ts;
   
   SELECT /*+ SET_VAR(batch_size =  1024) */ ts, COUNT(*) cnt FROM (
     SELECT t.*, dense_rank() OVER (PARTITION BY pk ORDER BY ts DESC) dr FROM 
t_min t
   ) x WHERE dr = 1 GROUP BY ts ORDER BY ts;
   
   SELECT /*+ SET_VAR(batch_size =  4096) */ ts, COUNT(*) cnt FROM (
     SELECT t.*, dense_rank() OVER (PARTITION BY pk ORDER BY ts DESC) dr FROM 
t_min t
   ) x WHERE dr = 1 GROUP BY ts ORDER BY ts;
   
   SELECT /*+ SET_VAR(batch_size = 16384) */ ts, COUNT(*) cnt FROM (
     SELECT t.*, dense_rank() OVER (PARTITION BY pk ORDER BY ts DESC) dr FROM 
t_min t
   ) x WHERE dr = 1 GROUP BY ts ORDER BY ts;
   ```
   
   **Actual output (buggy):**
   
   ```
   batch_size = 256    -> ts=2024-01-02, cnt=  256
   batch_size = 1024   -> ts=2024-01-02, cnt= 1024
   batch_size = 4096   -> ts=2024-01-02, cnt= 4096
   batch_size = 16384  -> ts=2024-01-02, cnt=10000   (>= group size, so nothing 
dropped)
   ```
   
   `cnt` is exactly `min(batch_size, group_size)` — the row count tracks
   `batch_size`, which is clearly wrong.
   
   **Expected output (correct):** always
   
   ```
   ts=2024-01-02, cnt=10000
   ```
   
   #### Confirm via EXPLAIN and the golden path
   
   ```sql
   EXPLAIN
   SELECT /*+ SET_VAR(batch_size = 1024) */ ts, COUNT(*) cnt FROM (
     SELECT t.*, dense_rank() OVER (PARTITION BY pk ORDER BY ts DESC) dr FROM 
t_min t
   ) x WHERE dr = 1 GROUP BY ts ORDER BY ts;
   -- contains:
   --   VPartitionTopN
   --     functions: dense_rank
   --     partition limit: 1
   --     partition topn phase: TWO_PHASE_LOCAL_PTOPN
   
   -- Disabling partition top-n routes through plain VAnalytic and is always 
correct:
   SELECT /*+ SET_VAR(enable_partition_topn = false, batch_size = 256) */
          ts, COUNT(*) cnt FROM (
     SELECT t.*, dense_rank() OVER (PARTITION BY pk ORDER BY ts DESC) dr FROM 
t_min t
   ) x WHERE dr = 1 GROUP BY ts ORDER BY ts;
   -- ts=2024-01-02, cnt=10000  (stable for any batch_size)
   ```
   
   ### Anything Else?
   
   Suggested fix: the `Defer` must not treat `_get_enough_data() == true` as
   "input exhausted". Introduce a `bool finished` flag that is set to `true`
   only when (a) the loop hits the boundary of the next distinct group with
   the limit already satisfied (the `if (_get_enough_data()) return;` branch),
   or (b) the input queue is fully drained. The `Defer` then signals EOS only
   when `finished` is true (or when no rows were emitted at all):
   
   ```cpp
   Status PartitionSorter::_read_row_rank(Block* output_block, bool* eos, int 
batch_size) {
       ...
       size_t merged_rows = 0;
       bool finished = false;                          // NEW
   
       Defer defer {[&]() {
           if (merged_rows == 0 || finished) {         // CHANGED: was 
_get_enough_data()
               *eos = true;
           }
       }};
   
       while (queue.is_valid() && merged_rows < batch_size) {
           ...
           for (...; merged_rows < batch_size; ...) {
               ...
               if (!cmp_res) {
                   if (_get_enough_data()) {
                       finished = true;                // NEW: real group 
boundary reached
                       return Status::OK();
                   }
                   *_previous_row = *current;
                   _output_distinct_rows++;
               }
               ...
           }
       }
   
       if (!queue.is_valid()) {                         // NEW: input fully 
drained
           finished = true;
       }
       return Status::OK();
   }
   ```
   
   With this fix the same group is drained across multiple `get_next()` calls
   and the full `dense_rank = 1` group is returned for any `batch_size`.
   Verified on a 3-BE cluster: all of `batch_size = 256 / 1024 / 2048 / 10240
   / 20480` return the correct, stable row count after the fix.
   
   I'm happy to submit a PR with the fix and BE unit tests covering both
   `DENSE_RANK` and `RANK` with a first group larger than `batch_size`.
   
   ### Are you willing to submit PR?
   
   - [x] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://www.apache.org/foundation/policies/conduct)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to