This is an automated email from the ASF dual-hosted git repository.

panxiaolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 396b16109f7 [Improvement](scan) Update scanner limit controller 
(#61617)
396b16109f7 is described below

commit 396b16109f7295fd46ed972752e013595f20621f
Author: Pxl <[email protected]>
AuthorDate: Wed Mar 25 15:07:45 2026 +0800

    [Improvement](scan) Update scanner limit controller (#61617)
    
    This pull request introduces a shared atomic limit mechanism in the
    scanner execution context to more accurately enforce SQL `LIMIT`
    constraints across multiple concurrent scanners. The changes ensure that
    the row limit is respected globally, preventing over-scanning and
    improving resource efficiency. Key updates include the introduction of a
    thread-safe quota system, modifications to scanner scheduling and task
    pulling logic, and enhancements to debugging output.
    
    **Shared Limit Enforcement and Quota Management:**
    
    * Added a new atomic member `_remaining_limit` to `ScannerContext`,
    initialized with the SQL `LIMIT` value and decremented atomically as
    rows are scanned. Introduced the `acquire_limit_quota()` method for
    scanners to claim their share of the remaining quota.
    
[[1]](diffhunk://#diff-0c9a817d45d8130ea3211189e1321d1275e22fd4a9a3fac2bd707b1cfeefa5e5R74)
    
[[2]](diffhunk://#diff-0c9a817d45d8130ea3211189e1321d1275e22fd4a9a3fac2bd707b1cfeefa5e5R99)
    
[[3]](diffhunk://#diff-3049f42cade971254aae07ced700d9a10b2505b03da743efea3270e63bd88dceR224-R229)
    * Modified the scanner scheduling logic in
    `ScannerScheduler::_scanner_scan` to check and acquire quota before
    processing blocks, ensuring that scanners stop or truncate blocks when
    the shared limit is exhausted.
    
[[1]](diffhunk://#diff-ecdf52f3fb33b9018cc1aff92085e470087071b25d79efed8a849a289215d05fR235-R239)
    
[[2]](diffhunk://#diff-ecdf52f3fb33b9018cc1aff92085e470087071b25d79efed8a849a289215d05fR276-R292)
    * Updated the pending scanner task pulling logic to avoid scheduling new
    scanners when the shared limit is depleted.
    
    **Scanner Completion and Task Management:**
    
    * Enhanced the logic for marking scanner context as finished: now,
    completion is triggered either when all scanners are done or when the
    shared limit is exhausted and no scanners are running.
    
    **Debugging and Observability:**
    
    * Improved the `debug_string()` output in `ScannerContext` to include
    the current value of `remaining_limit`, aiding in troubleshooting and
    monitoring.
    
    **Performance Optimization:**
    
    * Clarified and preserved the per-scanner small-limit optimization,
    ensuring that when the limit is smaller than the batch size, scanners
    return early to avoid unnecessary data processing.
---
 be/src/exec/operator/scan_operator.cpp     |  3 +-
 be/src/exec/operator/scan_operator.h       |  4 +++
 be/src/exec/scan/scanner_context.cpp       | 44 ++++++++++++++++++++++++++----
 be/src/exec/scan/scanner_context.h         |  8 +++++-
 be/src/exec/scan/scanner_scheduler.cpp     | 32 ++++++++++++++++------
 be/test/exec/scan/scanner_context_test.cpp | 29 ++++++++++----------
 6 files changed, 91 insertions(+), 29 deletions(-)

diff --git a/be/src/exec/operator/scan_operator.cpp 
b/be/src/exec/operator/scan_operator.cpp
index 0125172458e..4cb194f3115 100644
--- a/be/src/exec/operator/scan_operator.cpp
+++ b/be/src/exec/operator/scan_operator.cpp
@@ -1000,7 +1000,7 @@ Status ScanLocalState<Derived>::_start_scanners(
     auto& p = _parent->cast<typename Derived::Parent>();
     _scanner_ctx.store(ScannerContext::create_shared(state(), this, 
p._output_tuple_desc,
                                                      
p.output_row_descriptor(), scanners, p.limit(),
-                                                     _scan_dependency
+                                                     _scan_dependency, 
&p._shared_scan_limit
 #ifdef BE_TEST
                                                      ,
                                                      
max_scanners_concurrency(state())
@@ -1158,6 +1158,7 @@ ScanOperatorX<LocalStateType>::ScanOperatorX(ObjectPool* 
pool, const TPlanNode&
     if (tnode.__isset.push_down_count) {
         _push_down_count = tnode.push_down_count;
     }
+    _shared_scan_limit.store(this->_limit, std::memory_order_relaxed);
 }
 
 template <typename LocalStateType>
diff --git a/be/src/exec/operator/scan_operator.h 
b/be/src/exec/operator/scan_operator.h
index 957eed9f164..f9b162be62d 100644
--- a/be/src/exec/operator/scan_operator.h
+++ b/be/src/exec/operator/scan_operator.h
@@ -424,6 +424,10 @@ protected:
     // If sort info is set, push limit to each scanner;
     int64_t _limit_per_scanner = -1;
 
+    // Shared remaining limit across all parallel instances and their scanners.
+    // Initialized to _limit (SQL LIMIT); -1 means no limit.
+    std::atomic<int64_t> _shared_scan_limit {-1};
+
     std::vector<TRuntimeFilterDesc> _runtime_filter_descs;
 
     TPushAggOp::type _push_down_agg_type;
diff --git a/be/src/exec/scan/scanner_context.cpp 
b/be/src/exec/scan/scanner_context.cpp
index 2ab29805cd9..d62b6880ecd 100644
--- a/be/src/exec/scan/scanner_context.cpp
+++ b/be/src/exec/scan/scanner_context.cpp
@@ -56,7 +56,8 @@ ScannerContext::ScannerContext(RuntimeState* state, 
ScanLocalStateBase* local_st
                                const TupleDescriptor* output_tuple_desc,
                                const RowDescriptor* output_row_descriptor,
                                const 
std::list<std::shared_ptr<ScannerDelegate>>& scanners,
-                               int64_t limit_, std::shared_ptr<Dependency> 
dependency
+                               int64_t limit_, std::shared_ptr<Dependency> 
dependency,
+                               std::atomic<int64_t>* shared_scan_limit
 #ifdef BE_TEST
                                ,
                                int num_parallel_instances
@@ -71,6 +72,7 @@ ScannerContext::ScannerContext(RuntimeState* state, 
ScanLocalStateBase* local_st
           _output_row_descriptor(output_row_descriptor),
           _batch_size(state->batch_size()),
           limit(limit_),
+          _shared_scan_limit(shared_scan_limit),
           _all_scanners(scanners.begin(), scanners.end()),
 #ifndef BE_TEST
           _scanner_scheduler(local_state->scan_scheduler(state)),
@@ -103,6 +105,27 @@ ScannerContext::ScannerContext(RuntimeState* state, 
ScanLocalStateBase* local_st
     }
 }
 
+int64_t ScannerContext::acquire_limit_quota(int64_t desired) {
+    DCHECK(desired > 0);
+    int64_t remaining = _shared_scan_limit->load(std::memory_order_acquire);
+    while (true) {
+        if (remaining < 0) {
+            // No limit set, grant all desired rows.
+            return desired;
+        }
+        if (remaining == 0) {
+            return 0;
+        }
+        int64_t granted = std::min(desired, remaining);
+        if (_shared_scan_limit->compare_exchange_weak(remaining, remaining - 
granted,
+                                                      
std::memory_order_acq_rel,
+                                                      
std::memory_order_acquire)) {
+            return granted;
+        }
+        // CAS failed, `remaining` is updated to current value, retry.
+    }
+}
+
 // After init function call, should not access _parent
 Status ScannerContext::init() {
 #ifndef BE_TEST
@@ -331,7 +354,12 @@ Status ScannerContext::get_block_from_queue(RuntimeState* 
state, Block* block, b
         }
     }
 
-    if (_num_finished_scanners == _all_scanners.size() && 
_tasks_queue.empty()) {
+    // Mark finished when either:
+    // (1) all scanners completed normally, or
+    // (2) shared limit exhausted and no scanners are still running.
+    if (_tasks_queue.empty() && (_num_finished_scanners == 
_all_scanners.size() ||
+                                 
(_shared_scan_limit->load(std::memory_order_acquire) == 0 &&
+                                  _num_scheduled_scanners == 0))) {
         _set_scanner_done();
         _is_finished = true;
     }
@@ -440,11 +468,12 @@ std::string ScannerContext::debug_string() {
     return fmt::format(
             "id: {}, total scanners: {}, pending tasks: {},"
             " _should_stop: {}, _is_finished: {}, free blocks: {},"
-            " limit: {}, _num_running_scanners: {}, _max_thread_num: {},"
+            " limit: {}, remaining_limit: {}, _num_running_scanners: {}, 
_max_thread_num: {},"
             " _max_bytes_in_queue: {}, query_id: {}",
             ctx_id, _all_scanners.size(), _tasks_queue.size(), _should_stop, 
_is_finished,
-            _free_blocks.size_approx(), limit, _num_scheduled_scanners, 
_max_scan_concurrency,
-            _max_bytes_in_queue, print_id(_query_id));
+            _free_blocks.size_approx(), limit, 
_shared_scan_limit->load(std::memory_order_relaxed),
+            _num_scheduled_scanners, _max_scan_concurrency, 
_max_bytes_in_queue,
+            print_id(_query_id));
 }
 
 void ScannerContext::_set_scanner_done() {
@@ -607,6 +636,11 @@ std::shared_ptr<ScanTask> 
ScannerContext::_pull_next_scan_task(
     }
 
     if (!_pending_scanners.empty()) {
+        // If shared limit quota is exhausted, do not submit new scanners from 
pending queue.
+        int64_t remaining = 
_shared_scan_limit->load(std::memory_order_acquire);
+        if (remaining == 0) {
+            return nullptr;
+        }
         std::shared_ptr<ScanTask> next_scan_task;
         next_scan_task = _pending_scanners.top();
         _pending_scanners.pop();
diff --git a/be/src/exec/scan/scanner_context.h 
b/be/src/exec/scan/scanner_context.h
index 553408ebc96..1aea5da03c9 100644
--- a/be/src/exec/scan/scanner_context.h
+++ b/be/src/exec/scan/scanner_context.h
@@ -115,7 +115,7 @@ public:
                    const TupleDescriptor* output_tuple_desc,
                    const RowDescriptor* output_row_descriptor,
                    const std::list<std::shared_ptr<ScannerDelegate>>& 
scanners, int64_t limit_,
-                   std::shared_ptr<Dependency> dependency
+                   std::shared_ptr<Dependency> dependency, 
std::atomic<int64_t>* shared_scan_limit
 #ifdef BE_TEST
                    ,
                    int num_parallel_instances
@@ -221,6 +221,12 @@ protected:
     int _batch_size;
     // The limit from SQL's limit clause
     int64_t limit;
+    // Points to the shared remaining limit on ScanOperatorX, shared across all
+    // parallel instances and their scanners. -1 means no limit.
+    std::atomic<int64_t>* _shared_scan_limit = nullptr;
+    // Atomically acquire up to `desired` rows. Returns actual granted count 
(0 = exhausted).
+    int64_t acquire_limit_quota(int64_t desired);
+    int64_t remaining_limit() const { return 
_shared_scan_limit->load(std::memory_order_acquire); }
 
     int64_t _max_bytes_in_queue = 0;
     // Using stack so that we can resubmit scanner in a LIFO order, maybe more 
cache friendly
diff --git a/be/src/exec/scan/scanner_scheduler.cpp 
b/be/src/exec/scan/scanner_scheduler.cpp
index 3dfa1fdf4cd..9961407bdbb 100644
--- a/be/src/exec/scan/scanner_scheduler.cpp
+++ b/be/src/exec/scan/scanner_scheduler.cpp
@@ -232,6 +232,11 @@ void 
ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx,
                     eos = true;
                     break;
                 }
+                // If shared limit quota is exhausted, stop scanning.
+                if (ctx->remaining_limit() == 0) {
+                    eos = true;
+                    break;
+                }
                 if (max_run_time_watch.elapsed_time() >
                     config::doris_scanner_max_run_time_ms * 1e6) {
                     break;
@@ -268,6 +273,23 @@ void 
ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx,
                 // Check column type only after block is read successfully.
                 // Or it may cause a crash when the block is not normal.
                 _make_sure_virtual_col_is_materialized(scanner, 
free_block.get());
+
+                // Shared limit quota: acquire rows from the context's shared 
pool.
+                // Discard or truncate the block if quota is exhausted.
+                if (free_block->rows() > 0) {
+                    int64_t block_rows = free_block->rows();
+                    int64_t granted = ctx->acquire_limit_quota(block_rows);
+                    if (granted == 0) {
+                        // No quota remaining, discard this block and mark eos.
+                        ctx->return_free_block(std::move(free_block));
+                        eos = true;
+                        break;
+                    } else if (granted < block_rows) {
+                        // Partial quota: truncate block to granted rows and 
mark eos.
+                        free_block->set_num_rows(granted);
+                        eos = true;
+                    }
+                }
                 // Projection will truncate useless columns, makes block size 
change.
                 auto free_block_bytes = free_block->allocated_bytes();
                 raw_bytes_read += free_block_bytes;
@@ -298,15 +320,9 @@ void 
ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx,
                     
scan_task->cached_blocks.emplace_back(std::move(free_block), free_block_bytes);
                 }
 
+                // Per-scanner small-limit optimization: if limit is small (< 
batch_size),
+                // return immediately instead of accumulating to 
raw_bytes_threshold.
                 if (limit > 0 && limit < ctx->batch_size()) {
-                    // If this scanner has limit, and less than batch size,
-                    // return immediately and no need to wait 
raw_bytes_threshold.
-                    // This can save time that each scanner may only return a 
small number of rows,
-                    // but rows are enough from all scanners.
-                    // If not break, the query like "select * from tbl where 
id=1 limit 10"
-                    // may scan a lot data when the "id=1"'s filter ratio is 
high.
-                    // If limit is larger than batch size, this rule is 
skipped,
-                    // to avoid user specify a large limit and causing too 
much small blocks.
                     break;
                 }
 
diff --git a/be/test/exec/scan/scanner_context_test.cpp 
b/be/test/exec/scan/scanner_context_test.cpp
index 5b741bcae68..18e815f7fe8 100644
--- a/be/test/exec/scan/scanner_context_test.cpp
+++ b/be/test/exec/scan/scanner_context_test.cpp
@@ -120,6 +120,7 @@ private:
     std::shared_ptr<CgroupCpuCtl> cgroup_cpu_ctl = 
std::make_shared<CgroupV2CpuCtl>(1);
     std::unique_ptr<ScannerScheduler> scan_scheduler =
             std::make_unique<ThreadPoolSimplifiedScanScheduler>("ForTest", 
cgroup_cpu_ctl);
+    std::atomic<int64_t> shared_limit {-1};
 };
 
 TEST_F(ScannerContextTest, test_init) {
@@ -148,7 +149,7 @@ TEST_F(ScannerContextTest, test_init) {
 
     std::shared_ptr<ScannerContext> scanner_context = 
ScannerContext::create_shared(
             state.get(), olap_scan_local_state.get(), output_tuple_desc, 
output_row_descriptor,
-            scanners, limit, scan_dependency, parallel_tasks);
+            scanners, limit, scan_dependency, &shared_limit, parallel_tasks);
 
     scan_operator->_should_run_serial = false;
 
@@ -208,7 +209,7 @@ TEST_F(ScannerContextTest, test_serial_run) {
 
     std::shared_ptr<ScannerContext> scanner_context = 
ScannerContext::create_shared(
             state.get(), olap_scan_local_state.get(), output_tuple_desc, 
output_row_descriptor,
-            scanners, limit, scan_dependency, parallel_tasks);
+            scanners, limit, scan_dependency, &shared_limit, parallel_tasks);
 
     scan_operator->_should_run_serial = true;
 
@@ -266,7 +267,7 @@ TEST_F(ScannerContextTest, test_max_column_reader_num) {
 
     std::shared_ptr<ScannerContext> scanner_context = 
ScannerContext::create_shared(
             state.get(), olap_scan_local_state.get(), output_tuple_desc, 
output_row_descriptor,
-            scanners, limit, scan_dependency, parallel_tasks);
+            scanners, limit, scan_dependency, &shared_limit, parallel_tasks);
 
     scan_operator->_should_run_serial = false;
 
@@ -316,7 +317,7 @@ TEST_F(ScannerContextTest, test_push_back_scan_task) {
 
     std::shared_ptr<ScannerContext> scanner_context = 
ScannerContext::create_shared(
             state.get(), olap_scan_local_state.get(), output_tuple_desc, 
output_row_descriptor,
-            scanners, limit, scan_dependency, parallel_tasks);
+            scanners, limit, scan_dependency, &shared_limit, parallel_tasks);
 
     scanner_context->_num_scheduled_scanners = 11;
 
@@ -353,7 +354,7 @@ TEST_F(ScannerContextTest, get_margin) {
 
     std::shared_ptr<ScannerContext> scanner_context = 
ScannerContext::create_shared(
             state.get(), olap_scan_local_state.get(), output_tuple_desc, 
output_row_descriptor,
-            scanners, limit, scan_dependency, parallel_tasks);
+            scanners, limit, scan_dependency, &shared_limit, parallel_tasks);
 
     std::mutex transfer_mutex;
     std::unique_lock<std::mutex> transfer_lock(transfer_mutex);
@@ -449,7 +450,7 @@ TEST_F(ScannerContextTest, pull_next_scan_task) {
 
     std::shared_ptr<ScannerContext> scanner_context = 
ScannerContext::create_shared(
             state.get(), olap_scan_local_state.get(), output_tuple_desc, 
output_row_descriptor,
-            scanners, limit, scan_dependency, parallel_tasks);
+            scanners, limit, scan_dependency, &shared_limit, parallel_tasks);
 
     std::mutex transfer_mutex;
     std::unique_lock<std::mutex> transfer_lock(transfer_mutex);
@@ -525,7 +526,7 @@ TEST_F(ScannerContextTest, schedule_scan_task) {
 
     std::shared_ptr<ScannerContext> scanner_context = 
ScannerContext::create_shared(
             state.get(), olap_scan_local_state.get(), output_tuple_desc, 
output_row_descriptor,
-            scanners, limit, scan_dependency, parallel_tasks);
+            scanners, limit, scan_dependency, &shared_limit, parallel_tasks);
 
     std::mutex transfer_mutex;
     std::unique_lock<std::mutex> transfer_lock(transfer_mutex);
@@ -557,7 +558,7 @@ TEST_F(ScannerContextTest, schedule_scan_task) {
 
     scanner_context = ScannerContext::create_shared(
             state.get(), olap_scan_local_state.get(), output_tuple_desc, 
output_row_descriptor,
-            scanners, limit, scan_dependency, parallel_tasks);
+            scanners, limit, scan_dependency, &shared_limit, parallel_tasks);
 
     scanner_context->_scanner_scheduler = scheduler.get();
 
@@ -578,7 +579,7 @@ TEST_F(ScannerContextTest, schedule_scan_task) {
 
     scanner_context = ScannerContext::create_shared(
             state.get(), olap_scan_local_state.get(), output_tuple_desc, 
output_row_descriptor,
-            scanners, limit, scan_dependency, parallel_tasks);
+            scanners, limit, scan_dependency, &shared_limit, parallel_tasks);
 
     scanner_context->_scanner_scheduler = scheduler.get();
 
@@ -594,7 +595,7 @@ TEST_F(ScannerContextTest, schedule_scan_task) {
 
     scanner_context = ScannerContext::create_shared(
             state.get(), olap_scan_local_state.get(), output_tuple_desc, 
output_row_descriptor,
-            scanners, limit, scan_dependency, parallel_tasks);
+            scanners, limit, scan_dependency, &shared_limit, parallel_tasks);
 
     scanner_context->_scanner_scheduler = scheduler.get();
 
@@ -647,7 +648,7 @@ TEST_F(ScannerContextTest, scan_queue_mem_limit) {
 
     std::shared_ptr<ScannerContext> scanner_context = 
ScannerContext::create_shared(
             state.get(), olap_scan_local_state.get(), output_tuple_desc, 
output_row_descriptor,
-            scanners, limit, scan_dependency, parallel_tasks);
+            scanners, limit, scan_dependency, &shared_limit, parallel_tasks);
 
     std::unique_ptr<MockSimplifiedScanScheduler> scheduler =
             std::make_unique<MockSimplifiedScanScheduler>(cgroup_cpu_ctl);
@@ -687,7 +688,7 @@ TEST_F(ScannerContextTest, get_free_block) {
 
     std::shared_ptr<ScannerContext> scanner_context = 
ScannerContext::create_shared(
             state.get(), olap_scan_local_state.get(), output_tuple_desc, 
output_row_descriptor,
-            scanners, limit, scan_dependency, parallel_tasks);
+            scanners, limit, scan_dependency, &shared_limit, parallel_tasks);
     scanner_context->_newly_create_free_blocks_num = 
newly_create_free_blocks_num.get();
     scanner_context->_newly_create_free_blocks_num->set(0L);
     scanner_context->_scanner_memory_used_counter = 
scanner_memory_used_counter.get();
@@ -740,7 +741,7 @@ TEST_F(ScannerContextTest, return_free_block) {
 
     std::shared_ptr<ScannerContext> scanner_context = 
ScannerContext::create_shared(
             state.get(), olap_scan_local_state.get(), output_tuple_desc, 
output_row_descriptor,
-            scanners, limit, scan_dependency, parallel_tasks);
+            scanners, limit, scan_dependency, &shared_limit, parallel_tasks);
     scanner_context->_newly_create_free_blocks_num = 
newly_create_free_blocks_num.get();
     scanner_context->_scanner_memory_used_counter = 
scanner_memory_used_counter.get();
     scanner_context->_max_bytes_in_queue = 200;
@@ -784,7 +785,7 @@ TEST_F(ScannerContextTest, get_block_from_queue) {
 
     std::shared_ptr<ScannerContext> scanner_context = 
ScannerContext::create_shared(
             state.get(), olap_scan_local_state.get(), output_tuple_desc, 
output_row_descriptor,
-            scanners, limit, scan_dependency, parallel_tasks);
+            scanners, limit, scan_dependency, &shared_limit, parallel_tasks);
     scanner_context->_newly_create_free_blocks_num = 
newly_create_free_blocks_num.get();
     scanner_context->_scanner_memory_used_counter = 
scanner_memory_used_counter.get();
     scanner_context->_max_bytes_in_queue = 200;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to