This is an automated email from the ASF dual-hosted git repository.
panxiaolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 396b16109f7 [Improvement](scan) Update scanner limit controller
(#61617)
396b16109f7 is described below
commit 396b16109f7295fd46ed972752e013595f20621f
Author: Pxl <[email protected]>
AuthorDate: Wed Mar 25 15:07:45 2026 +0800
[Improvement](scan) Update scanner limit controller (#61617)
This pull request introduces a shared atomic limit mechanism in the
scanner execution context to more accurately enforce SQL `LIMIT`
constraints across multiple concurrent scanners. The changes ensure that
the row limit is respected globally, preventing over-scanning and
improving resource efficiency. Key updates include the introduction of a
thread-safe quota system, modifications to scanner scheduling and task
pulling logic, and enhancements to debugging output.
**Shared Limit Enforcement and Quota Management:**
* Added a new atomic member `_remaining_limit` to `ScannerContext`,
initialized with the SQL `LIMIT` value and decremented atomically as
rows are scanned. Introduced the `acquire_limit_quota()` method for
scanners to claim their share of the remaining quota.
[[1]](diffhunk://#diff-0c9a817d45d8130ea3211189e1321d1275e22fd4a9a3fac2bd707b1cfeefa5e5R74)
[[2]](diffhunk://#diff-0c9a817d45d8130ea3211189e1321d1275e22fd4a9a3fac2bd707b1cfeefa5e5R99)
[[3]](diffhunk://#diff-3049f42cade971254aae07ced700d9a10b2505b03da743efea3270e63bd88dceR224-R229)
* Modified the scanner scheduling logic in
`ScannerScheduler::_scanner_scan` to check and acquire quota before
processing blocks, ensuring that scanners stop or truncate blocks when
the shared limit is exhausted.
[[1]](diffhunk://#diff-ecdf52f3fb33b9018cc1aff92085e470087071b25d79efed8a849a289215d05fR235-R239)
[[2]](diffhunk://#diff-ecdf52f3fb33b9018cc1aff92085e470087071b25d79efed8a849a289215d05fR276-R292)
* Updated the pending scanner task pulling logic to avoid scheduling new
scanners when the shared limit is depleted.
**Scanner Completion and Task Management:**
* Enhanced the logic for marking scanner context as finished: now,
completion is triggered either when all scanners are done or when the
shared limit is exhausted and no scanners are running.
**Debugging and Observability:**
* Improved the `debug_string()` output in `ScannerContext` to include
the current value of `remaining_limit`, aiding in troubleshooting and
monitoring.
**Performance Optimization:**
* Clarified and preserved the per-scanner small-limit optimization,
ensuring that when the limit is smaller than the batch size, scanners
return early to avoid unnecessary data processing.
---
be/src/exec/operator/scan_operator.cpp | 3 +-
be/src/exec/operator/scan_operator.h | 4 +++
be/src/exec/scan/scanner_context.cpp | 44 ++++++++++++++++++++++++++----
be/src/exec/scan/scanner_context.h | 8 +++++-
be/src/exec/scan/scanner_scheduler.cpp | 32 ++++++++++++++++------
be/test/exec/scan/scanner_context_test.cpp | 29 ++++++++++----------
6 files changed, 91 insertions(+), 29 deletions(-)
diff --git a/be/src/exec/operator/scan_operator.cpp
b/be/src/exec/operator/scan_operator.cpp
index 0125172458e..4cb194f3115 100644
--- a/be/src/exec/operator/scan_operator.cpp
+++ b/be/src/exec/operator/scan_operator.cpp
@@ -1000,7 +1000,7 @@ Status ScanLocalState<Derived>::_start_scanners(
auto& p = _parent->cast<typename Derived::Parent>();
_scanner_ctx.store(ScannerContext::create_shared(state(), this,
p._output_tuple_desc,
p.output_row_descriptor(), scanners, p.limit(),
- _scan_dependency
+ _scan_dependency,
&p._shared_scan_limit
#ifdef BE_TEST
,
max_scanners_concurrency(state())
@@ -1158,6 +1158,7 @@ ScanOperatorX<LocalStateType>::ScanOperatorX(ObjectPool*
pool, const TPlanNode&
if (tnode.__isset.push_down_count) {
_push_down_count = tnode.push_down_count;
}
+ _shared_scan_limit.store(this->_limit, std::memory_order_relaxed);
}
template <typename LocalStateType>
diff --git a/be/src/exec/operator/scan_operator.h
b/be/src/exec/operator/scan_operator.h
index 957eed9f164..f9b162be62d 100644
--- a/be/src/exec/operator/scan_operator.h
+++ b/be/src/exec/operator/scan_operator.h
@@ -424,6 +424,10 @@ protected:
// If sort info is set, push limit to each scanner;
int64_t _limit_per_scanner = -1;
+ // Shared remaining limit across all parallel instances and their scanners.
+ // Initialized to _limit (SQL LIMIT); -1 means no limit.
+ std::atomic<int64_t> _shared_scan_limit {-1};
+
std::vector<TRuntimeFilterDesc> _runtime_filter_descs;
TPushAggOp::type _push_down_agg_type;
diff --git a/be/src/exec/scan/scanner_context.cpp
b/be/src/exec/scan/scanner_context.cpp
index 2ab29805cd9..d62b6880ecd 100644
--- a/be/src/exec/scan/scanner_context.cpp
+++ b/be/src/exec/scan/scanner_context.cpp
@@ -56,7 +56,8 @@ ScannerContext::ScannerContext(RuntimeState* state,
ScanLocalStateBase* local_st
const TupleDescriptor* output_tuple_desc,
const RowDescriptor* output_row_descriptor,
const
std::list<std::shared_ptr<ScannerDelegate>>& scanners,
- int64_t limit_, std::shared_ptr<Dependency>
dependency
+ int64_t limit_, std::shared_ptr<Dependency>
dependency,
+ std::atomic<int64_t>* shared_scan_limit
#ifdef BE_TEST
,
int num_parallel_instances
@@ -71,6 +72,7 @@ ScannerContext::ScannerContext(RuntimeState* state,
ScanLocalStateBase* local_st
_output_row_descriptor(output_row_descriptor),
_batch_size(state->batch_size()),
limit(limit_),
+ _shared_scan_limit(shared_scan_limit),
_all_scanners(scanners.begin(), scanners.end()),
#ifndef BE_TEST
_scanner_scheduler(local_state->scan_scheduler(state)),
@@ -103,6 +105,27 @@ ScannerContext::ScannerContext(RuntimeState* state,
ScanLocalStateBase* local_st
}
}
+int64_t ScannerContext::acquire_limit_quota(int64_t desired) {
+ DCHECK(desired > 0);
+ int64_t remaining = _shared_scan_limit->load(std::memory_order_acquire);
+ while (true) {
+ if (remaining < 0) {
+ // No limit set, grant all desired rows.
+ return desired;
+ }
+ if (remaining == 0) {
+ return 0;
+ }
+ int64_t granted = std::min(desired, remaining);
+ if (_shared_scan_limit->compare_exchange_weak(remaining, remaining -
granted,
+
std::memory_order_acq_rel,
+
std::memory_order_acquire)) {
+ return granted;
+ }
+ // CAS failed, `remaining` is updated to current value, retry.
+ }
+}
+
// After init function call, should not access _parent
Status ScannerContext::init() {
#ifndef BE_TEST
@@ -331,7 +354,12 @@ Status ScannerContext::get_block_from_queue(RuntimeState*
state, Block* block, b
}
}
- if (_num_finished_scanners == _all_scanners.size() &&
_tasks_queue.empty()) {
+ // Mark finished when either:
+ // (1) all scanners completed normally, or
+ // (2) shared limit exhausted and no scanners are still running.
+ if (_tasks_queue.empty() && (_num_finished_scanners ==
_all_scanners.size() ||
+
(_shared_scan_limit->load(std::memory_order_acquire) == 0 &&
+ _num_scheduled_scanners == 0))) {
_set_scanner_done();
_is_finished = true;
}
@@ -440,11 +468,12 @@ std::string ScannerContext::debug_string() {
return fmt::format(
"id: {}, total scanners: {}, pending tasks: {},"
" _should_stop: {}, _is_finished: {}, free blocks: {},"
- " limit: {}, _num_running_scanners: {}, _max_thread_num: {},"
+ " limit: {}, remaining_limit: {}, _num_running_scanners: {},
_max_thread_num: {},"
" _max_bytes_in_queue: {}, query_id: {}",
ctx_id, _all_scanners.size(), _tasks_queue.size(), _should_stop,
_is_finished,
- _free_blocks.size_approx(), limit, _num_scheduled_scanners,
_max_scan_concurrency,
- _max_bytes_in_queue, print_id(_query_id));
+ _free_blocks.size_approx(), limit,
_shared_scan_limit->load(std::memory_order_relaxed),
+ _num_scheduled_scanners, _max_scan_concurrency,
_max_bytes_in_queue,
+ print_id(_query_id));
}
void ScannerContext::_set_scanner_done() {
@@ -607,6 +636,11 @@ std::shared_ptr<ScanTask>
ScannerContext::_pull_next_scan_task(
}
if (!_pending_scanners.empty()) {
+ // If shared limit quota is exhausted, do not submit new scanners from
pending queue.
+ int64_t remaining =
_shared_scan_limit->load(std::memory_order_acquire);
+ if (remaining == 0) {
+ return nullptr;
+ }
std::shared_ptr<ScanTask> next_scan_task;
next_scan_task = _pending_scanners.top();
_pending_scanners.pop();
diff --git a/be/src/exec/scan/scanner_context.h
b/be/src/exec/scan/scanner_context.h
index 553408ebc96..1aea5da03c9 100644
--- a/be/src/exec/scan/scanner_context.h
+++ b/be/src/exec/scan/scanner_context.h
@@ -115,7 +115,7 @@ public:
const TupleDescriptor* output_tuple_desc,
const RowDescriptor* output_row_descriptor,
const std::list<std::shared_ptr<ScannerDelegate>>&
scanners, int64_t limit_,
- std::shared_ptr<Dependency> dependency
+ std::shared_ptr<Dependency> dependency,
std::atomic<int64_t>* shared_scan_limit
#ifdef BE_TEST
,
int num_parallel_instances
@@ -221,6 +221,12 @@ protected:
int _batch_size;
// The limit from SQL's limit clause
int64_t limit;
+ // Points to the shared remaining limit on ScanOperatorX, shared across all
+ // parallel instances and their scanners. -1 means no limit.
+ std::atomic<int64_t>* _shared_scan_limit = nullptr;
+ // Atomically acquire up to `desired` rows. Returns actual granted count
(0 = exhausted).
+ int64_t acquire_limit_quota(int64_t desired);
+ int64_t remaining_limit() const { return
_shared_scan_limit->load(std::memory_order_acquire); }
int64_t _max_bytes_in_queue = 0;
// Using stack so that we can resubmit scanner in a LIFO order, maybe more
cache friendly
diff --git a/be/src/exec/scan/scanner_scheduler.cpp
b/be/src/exec/scan/scanner_scheduler.cpp
index 3dfa1fdf4cd..9961407bdbb 100644
--- a/be/src/exec/scan/scanner_scheduler.cpp
+++ b/be/src/exec/scan/scanner_scheduler.cpp
@@ -232,6 +232,11 @@ void
ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx,
eos = true;
break;
}
+ // If shared limit quota is exhausted, stop scanning.
+ if (ctx->remaining_limit() == 0) {
+ eos = true;
+ break;
+ }
if (max_run_time_watch.elapsed_time() >
config::doris_scanner_max_run_time_ms * 1e6) {
break;
@@ -268,6 +273,23 @@ void
ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx,
// Check column type only after block is read successfully.
// Or it may cause a crash when the block is not normal.
_make_sure_virtual_col_is_materialized(scanner,
free_block.get());
+
+ // Shared limit quota: acquire rows from the context's shared
pool.
+ // Discard or truncate the block if quota is exhausted.
+ if (free_block->rows() > 0) {
+ int64_t block_rows = free_block->rows();
+ int64_t granted = ctx->acquire_limit_quota(block_rows);
+ if (granted == 0) {
+ // No quota remaining, discard this block and mark eos.
+ ctx->return_free_block(std::move(free_block));
+ eos = true;
+ break;
+ } else if (granted < block_rows) {
+ // Partial quota: truncate block to granted rows and
mark eos.
+ free_block->set_num_rows(granted);
+ eos = true;
+ }
+ }
// Projection will truncate useless columns, makes block size
change.
auto free_block_bytes = free_block->allocated_bytes();
raw_bytes_read += free_block_bytes;
@@ -298,15 +320,9 @@ void
ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx,
scan_task->cached_blocks.emplace_back(std::move(free_block), free_block_bytes);
}
+ // Per-scanner small-limit optimization: if limit is small (<
batch_size),
+ // return immediately instead of accumulating to
raw_bytes_threshold.
if (limit > 0 && limit < ctx->batch_size()) {
- // If this scanner has limit, and less than batch size,
- // return immediately and no need to wait
raw_bytes_threshold.
- // This can save time that each scanner may only return a
small number of rows,
- // but rows are enough from all scanners.
- // If not break, the query like "select * from tbl where
id=1 limit 10"
- // may scan a lot data when the "id=1"'s filter ratio is
high.
- // If limit is larger than batch size, this rule is
skipped,
- // to avoid user specify a large limit and causing too
much small blocks.
break;
}
diff --git a/be/test/exec/scan/scanner_context_test.cpp
b/be/test/exec/scan/scanner_context_test.cpp
index 5b741bcae68..18e815f7fe8 100644
--- a/be/test/exec/scan/scanner_context_test.cpp
+++ b/be/test/exec/scan/scanner_context_test.cpp
@@ -120,6 +120,7 @@ private:
std::shared_ptr<CgroupCpuCtl> cgroup_cpu_ctl =
std::make_shared<CgroupV2CpuCtl>(1);
std::unique_ptr<ScannerScheduler> scan_scheduler =
std::make_unique<ThreadPoolSimplifiedScanScheduler>("ForTest",
cgroup_cpu_ctl);
+ std::atomic<int64_t> shared_limit {-1};
};
TEST_F(ScannerContextTest, test_init) {
@@ -148,7 +149,7 @@ TEST_F(ScannerContextTest, test_init) {
std::shared_ptr<ScannerContext> scanner_context =
ScannerContext::create_shared(
state.get(), olap_scan_local_state.get(), output_tuple_desc,
output_row_descriptor,
- scanners, limit, scan_dependency, parallel_tasks);
+ scanners, limit, scan_dependency, &shared_limit, parallel_tasks);
scan_operator->_should_run_serial = false;
@@ -208,7 +209,7 @@ TEST_F(ScannerContextTest, test_serial_run) {
std::shared_ptr<ScannerContext> scanner_context =
ScannerContext::create_shared(
state.get(), olap_scan_local_state.get(), output_tuple_desc,
output_row_descriptor,
- scanners, limit, scan_dependency, parallel_tasks);
+ scanners, limit, scan_dependency, &shared_limit, parallel_tasks);
scan_operator->_should_run_serial = true;
@@ -266,7 +267,7 @@ TEST_F(ScannerContextTest, test_max_column_reader_num) {
std::shared_ptr<ScannerContext> scanner_context =
ScannerContext::create_shared(
state.get(), olap_scan_local_state.get(), output_tuple_desc,
output_row_descriptor,
- scanners, limit, scan_dependency, parallel_tasks);
+ scanners, limit, scan_dependency, &shared_limit, parallel_tasks);
scan_operator->_should_run_serial = false;
@@ -316,7 +317,7 @@ TEST_F(ScannerContextTest, test_push_back_scan_task) {
std::shared_ptr<ScannerContext> scanner_context =
ScannerContext::create_shared(
state.get(), olap_scan_local_state.get(), output_tuple_desc,
output_row_descriptor,
- scanners, limit, scan_dependency, parallel_tasks);
+ scanners, limit, scan_dependency, &shared_limit, parallel_tasks);
scanner_context->_num_scheduled_scanners = 11;
@@ -353,7 +354,7 @@ TEST_F(ScannerContextTest, get_margin) {
std::shared_ptr<ScannerContext> scanner_context =
ScannerContext::create_shared(
state.get(), olap_scan_local_state.get(), output_tuple_desc,
output_row_descriptor,
- scanners, limit, scan_dependency, parallel_tasks);
+ scanners, limit, scan_dependency, &shared_limit, parallel_tasks);
std::mutex transfer_mutex;
std::unique_lock<std::mutex> transfer_lock(transfer_mutex);
@@ -449,7 +450,7 @@ TEST_F(ScannerContextTest, pull_next_scan_task) {
std::shared_ptr<ScannerContext> scanner_context =
ScannerContext::create_shared(
state.get(), olap_scan_local_state.get(), output_tuple_desc,
output_row_descriptor,
- scanners, limit, scan_dependency, parallel_tasks);
+ scanners, limit, scan_dependency, &shared_limit, parallel_tasks);
std::mutex transfer_mutex;
std::unique_lock<std::mutex> transfer_lock(transfer_mutex);
@@ -525,7 +526,7 @@ TEST_F(ScannerContextTest, schedule_scan_task) {
std::shared_ptr<ScannerContext> scanner_context =
ScannerContext::create_shared(
state.get(), olap_scan_local_state.get(), output_tuple_desc,
output_row_descriptor,
- scanners, limit, scan_dependency, parallel_tasks);
+ scanners, limit, scan_dependency, &shared_limit, parallel_tasks);
std::mutex transfer_mutex;
std::unique_lock<std::mutex> transfer_lock(transfer_mutex);
@@ -557,7 +558,7 @@ TEST_F(ScannerContextTest, schedule_scan_task) {
scanner_context = ScannerContext::create_shared(
state.get(), olap_scan_local_state.get(), output_tuple_desc,
output_row_descriptor,
- scanners, limit, scan_dependency, parallel_tasks);
+ scanners, limit, scan_dependency, &shared_limit, parallel_tasks);
scanner_context->_scanner_scheduler = scheduler.get();
@@ -578,7 +579,7 @@ TEST_F(ScannerContextTest, schedule_scan_task) {
scanner_context = ScannerContext::create_shared(
state.get(), olap_scan_local_state.get(), output_tuple_desc,
output_row_descriptor,
- scanners, limit, scan_dependency, parallel_tasks);
+ scanners, limit, scan_dependency, &shared_limit, parallel_tasks);
scanner_context->_scanner_scheduler = scheduler.get();
@@ -594,7 +595,7 @@ TEST_F(ScannerContextTest, schedule_scan_task) {
scanner_context = ScannerContext::create_shared(
state.get(), olap_scan_local_state.get(), output_tuple_desc,
output_row_descriptor,
- scanners, limit, scan_dependency, parallel_tasks);
+ scanners, limit, scan_dependency, &shared_limit, parallel_tasks);
scanner_context->_scanner_scheduler = scheduler.get();
@@ -647,7 +648,7 @@ TEST_F(ScannerContextTest, scan_queue_mem_limit) {
std::shared_ptr<ScannerContext> scanner_context =
ScannerContext::create_shared(
state.get(), olap_scan_local_state.get(), output_tuple_desc,
output_row_descriptor,
- scanners, limit, scan_dependency, parallel_tasks);
+ scanners, limit, scan_dependency, &shared_limit, parallel_tasks);
std::unique_ptr<MockSimplifiedScanScheduler> scheduler =
std::make_unique<MockSimplifiedScanScheduler>(cgroup_cpu_ctl);
@@ -687,7 +688,7 @@ TEST_F(ScannerContextTest, get_free_block) {
std::shared_ptr<ScannerContext> scanner_context =
ScannerContext::create_shared(
state.get(), olap_scan_local_state.get(), output_tuple_desc,
output_row_descriptor,
- scanners, limit, scan_dependency, parallel_tasks);
+ scanners, limit, scan_dependency, &shared_limit, parallel_tasks);
scanner_context->_newly_create_free_blocks_num =
newly_create_free_blocks_num.get();
scanner_context->_newly_create_free_blocks_num->set(0L);
scanner_context->_scanner_memory_used_counter =
scanner_memory_used_counter.get();
@@ -740,7 +741,7 @@ TEST_F(ScannerContextTest, return_free_block) {
std::shared_ptr<ScannerContext> scanner_context =
ScannerContext::create_shared(
state.get(), olap_scan_local_state.get(), output_tuple_desc,
output_row_descriptor,
- scanners, limit, scan_dependency, parallel_tasks);
+ scanners, limit, scan_dependency, &shared_limit, parallel_tasks);
scanner_context->_newly_create_free_blocks_num =
newly_create_free_blocks_num.get();
scanner_context->_scanner_memory_used_counter =
scanner_memory_used_counter.get();
scanner_context->_max_bytes_in_queue = 200;
@@ -784,7 +785,7 @@ TEST_F(ScannerContextTest, get_block_from_queue) {
std::shared_ptr<ScannerContext> scanner_context =
ScannerContext::create_shared(
state.get(), olap_scan_local_state.get(), output_tuple_desc,
output_row_descriptor,
- scanners, limit, scan_dependency, parallel_tasks);
+ scanners, limit, scan_dependency, &shared_limit, parallel_tasks);
scanner_context->_newly_create_free_blocks_num =
newly_create_free_blocks_num.get();
scanner_context->_scanner_memory_used_counter =
scanner_memory_used_counter.get();
scanner_context->_max_bytes_in_queue = 200;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]