This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.1 by this push:
     new 880e9e0d8b1 [fix](scan) Respect byte budget when merging scan blocks 
(#63296)
880e9e0d8b1 is described below

commit 880e9e0d8b1d8927b049acb39d8844bda0ed165f
Author: Jerry Hu <[email protected]>
AuthorDate: Fri May 22 23:19:13 2026 +0800

    [fix](scan) Respect byte budget when merging scan blocks (#63296)
    
    ## Summary
    
    Fix scanner scheduler block merging so the adaptive batch size byte
    budget is respected when multiple scanned blocks are stitched into a
    cached block.
    
    ## Root Cause
    
    The scheduler merge path only checked the row count against
    `batch_size()`. When adaptive batch size produced multiple blocks that
    were individually acceptable, the scheduler could still merge them into
    a much larger block because it ignored `preferred_block_size_bytes()`.
    
    ## Changes
    
    - Capture `preferred_block_size_bytes()` for the scan task.
    - Merge into the last cached block only when both the row budget and
    byte budget are satisfied.
    - Keep empty-block merge behavior unchanged so eos/filtered-empty blocks
    are not emitted separately.
    - Preserve `allocated_bytes()` for memory accounting while using
    `bytes()` for the adaptive data-size budget.
    
    ## Validation
    
    - `git diff --check -- be/src/exec/scan/scanner_scheduler.cpp`
    - `ninja -C be/ut_build_ASAN
    src/exec/CMakeFiles/Exec.dir/scan/scanner_scheduler.cpp.o`
    
    Note: `./run-be-ut.sh --run --filter=ScannerContextTest.*` was started
    earlier but stopped after it triggered a broad ASAN UT build; the
    changed object had already compiled successfully.
---
 be/src/exec/scan/scanner_scheduler.cpp | 31 +++++++++++++++++++++++++++----
 1 file changed, 27 insertions(+), 4 deletions(-)

diff --git a/be/src/exec/scan/scanner_scheduler.cpp 
b/be/src/exec/scan/scanner_scheduler.cpp
index fae4659361f..123e0515eb2 100644
--- a/be/src/exec/scan/scanner_scheduler.cpp
+++ b/be/src/exec/scan/scanner_scheduler.cpp
@@ -220,8 +220,10 @@ void 
ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx,
 
             size_t raw_bytes_read = 0;
             bool first_read = true; int64_t limit = scanner->limit();
-            // If the first block is full, then it is true. Or the first block 
+ second block > batch_size
+            // If the first block is full, then it is true. Or the first block 
+ second block
+            // exceeds the row/byte budget.
             bool has_first_full_block = false;
+            const size_t preferred_block_size_bytes = 
state->preferred_block_size_bytes();
 
             // During low memory mode, every scan task will return at most 2 
block to reduce memory usage.
             while (!eos && raw_bytes_read < raw_bytes_threshold &&
@@ -272,9 +274,30 @@ void 
ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx,
                 // Projection will truncate useless columns, makes block size 
change.
                 auto free_block_bytes = free_block->allocated_bytes();
                 raw_bytes_read += free_block_bytes;
-                if (!scan_task->cached_blocks.empty() &&
-                    scan_task->cached_blocks.back().first->rows() + 
free_block->rows() <=
-                            ctx->batch_size()) {
+                const auto can_merge_to_last_block = [&]() {
+                    if (scan_task->cached_blocks.empty()) {
+                        return false;
+                    }
+
+                    const auto* last_block = 
scan_task->cached_blocks.back().first.get();
+                    if (last_block->rows() == 0 || free_block->rows() == 0) {
+                        return true;
+                    }
+
+                    const bool within_row_budget =
+                            last_block->rows() + free_block->rows() <= 
ctx->batch_size();
+                    if (!within_row_budget) {
+                        return false;
+                    }
+
+                    if (!config::enable_adaptive_batch_size) {
+                        return true;
+                    }
+
+                    return last_block->bytes() + free_block->bytes() <= 
preferred_block_size_bytes;
+                }();
+
+                if (can_merge_to_last_block) {
                     size_t block_size = 
scan_task->cached_blocks.back().first->allocated_bytes();
                     MutableBlock 
mutable_block(scan_task->cached_blocks.back().first.get());
                     status = mutable_block.merge(*free_block);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to