github-actions[bot] commented on code in PR #61713:
URL: https://github.com/apache/doris/pull/61713#discussion_r3008380905
##########
be/src/exec/scan/olap_scanner.cpp:
##########
@@ -491,20 +491,34 @@ Status OlapScanner::_init_tablet_reader_params(
_tablet_reader_params.enable_mor_value_predicate_pushdown = true;
}
- // order by table keys optimization for topn
- // will only read head/tail of data file since it's already sorted by
keys
- if (olap_scan_node.__isset.sort_info &&
!olap_scan_node.sort_info.is_asc_order.empty()) {
- _limit = _local_state->limit_per_scanner();
- _tablet_reader_params.read_orderby_key = true;
- if (!olap_scan_node.sort_info.is_asc_order[0]) {
- _tablet_reader_params.read_orderby_key_reverse = true;
- }
- _tablet_reader_params.read_orderby_key_num_prefix_columns =
- olap_scan_node.sort_info.is_asc_order.size();
- _tablet_reader_params.read_orderby_key_limit = _limit;
-
- if (_tablet_reader_params.read_orderby_key_limit > 0 &&
- olap_scan_local_state->_storage_no_merge()) {
+ // Skip topn / general-limit storage-layer optimizations when runtime
+ // filters have not all arrived. Late-arriving filters would
re-populate
+ // _conjuncts at the scanner level while the storage layer has already
+ // committed to a row budget counted before those filters, causing the
+ // scan to return fewer rows than the limit requires.
+ if (_applied_rf_num == _total_rf_num) {
+ // order by table keys optimization for topn
Review Comment:
**[High] `_applied_rf_num` is never updated — this guard disables topn/limit
pushdown for ALL queries with runtime filters.**
`_applied_rf_num` is initialized to `0` at `scanner.h:215` and is never
modified anywhere in the codebase (`grep -rn '_applied_rf_num =' be/src/`
confirms this). This means this condition evaluates to `0 == _total_rf_num`,
which is `true` only when there are **zero** runtime filters.
Consequence: This PR wraps the **existing** topn (ORDER BY key LIMIT)
optimization with this guard, so topn is now disabled for any query involving
JOINs that produce runtime filters. This is a significant performance
regression for the topn path.
The correctness concern (late-arriving RF conjuncts causing incorrect limit
counting at the storage layer) is valid, but the fix is too conservative. Two
possible alternatives:
1. Update `_applied_rf_num` in `try_append_late_arrival_runtime_filter()`
when new RFs arrive, so the guard fires correctly.
2. Only apply this guard for the general limit pushdown path, not the topn
path (topn already existed without this guard and may have tolerated the issue).
##########
be/src/exec/scan/olap_scanner.cpp:
##########
@@ -491,20 +491,34 @@ Status OlapScanner::_init_tablet_reader_params(
_tablet_reader_params.enable_mor_value_predicate_pushdown = true;
}
- // order by table keys optimization for topn
- // will only read head/tail of data file since it's already sorted by
keys
- if (olap_scan_node.__isset.sort_info &&
!olap_scan_node.sort_info.is_asc_order.empty()) {
- _limit = _local_state->limit_per_scanner();
- _tablet_reader_params.read_orderby_key = true;
- if (!olap_scan_node.sort_info.is_asc_order[0]) {
- _tablet_reader_params.read_orderby_key_reverse = true;
- }
- _tablet_reader_params.read_orderby_key_num_prefix_columns =
- olap_scan_node.sort_info.is_asc_order.size();
- _tablet_reader_params.read_orderby_key_limit = _limit;
-
- if (_tablet_reader_params.read_orderby_key_limit > 0 &&
- olap_scan_local_state->_storage_no_merge()) {
+ // Skip topn / general-limit storage-layer optimizations when runtime
+ // filters have not all arrived. Late-arriving filters would
re-populate
+ // _conjuncts at the scanner level while the storage layer has already
+ // committed to a row budget counted before those filters, causing the
+ // scan to return fewer rows than the limit requires.
+ if (_applied_rf_num == _total_rf_num) {
+ // order by table keys optimization for topn
+ // will only read head/tail of data file since it's already sorted
by keys
+ if (olap_scan_node.__isset.sort_info &&
+ !olap_scan_node.sort_info.is_asc_order.empty()) {
+ _limit = _local_state->limit_per_scanner();
+ _tablet_reader_params.read_orderby_key = true;
+ if (!olap_scan_node.sort_info.is_asc_order[0]) {
+ _tablet_reader_params.read_orderby_key_reverse = true;
+ }
+ _tablet_reader_params.read_orderby_key_num_prefix_columns =
+ olap_scan_node.sort_info.is_asc_order.size();
+ _tablet_reader_params.read_orderby_key_limit = _limit;
+
+ if (_tablet_reader_params.read_orderby_key_limit > 0 &&
+ olap_scan_local_state->_storage_no_merge()) {
+ _tablet_reader_params.filter_block_conjuncts = _conjuncts;
+ _conjuncts.clear();
+ }
+ } else if (_limit > 0 &&
olap_scan_local_state->_storage_no_merge()) {
+ // General limit pushdown for DUP_KEYS and UNIQUE_KEYS with MOW
Review Comment:
**[Info] `_limit` here is the operator-level SQL LIMIT, not
`limit_per_scanner`.**
In the topn branch above (line 504), `_limit` is overwritten with
`_local_state->limit_per_scanner()`. But in this `else if` branch, `_limit`
retains its original value from the Scanner constructor, which is the
operator-level `p._limit` (the SQL `LIMIT N` value, or -1 if no LIMIT).
This is correct: each scanner independently stops after the full SQL LIMIT
rows, and the operator-level `reached_limit()` + `_shared_scan_limit` enforces
the global total. But it might be worth adding a comment clarifying this is the
full SQL limit, not a per-scanner quota, to prevent future confusion.
##########
be/src/storage/iterator/vcollect_iterator.cpp:
##########
@@ -248,8 +255,39 @@ Status VCollectIterator::next(Block* block) {
return _topn_next(block);
}
+ // Fast path: if general limit already reached, return EOF immediately
+ if (_general_read_limit > 0 && _general_rows_returned >=
_general_read_limit) {
+ return Status::Error<END_OF_FILE>("");
+ }
+
if (LIKELY(_inner_iter)) {
- return _inner_iter->next(block);
+ auto st = _inner_iter->next(block);
+ if (UNLIKELY(!st.ok())) {
+ return st;
+ }
+
+ // Apply filter_block_conjuncts that were moved from
Scanner::_conjuncts.
+ // This must happen BEFORE limit counting so that
_general_rows_returned
+ // reflects post-filter rows (same pattern as _topn_next).
+ if (!_reader->_reader_context.filter_block_conjuncts.empty()) {
+ RETURN_IF_ERROR(VExprContext::filter_block(
Review Comment:
**[Note] `filter_block_conjuncts` is applied unconditionally whenever
non-empty, independent of `_general_read_limit`.**
This is correct behavior: if `_merge` was forced true by `force_merge`
(overlapping data for topn), `_general_read_limit` would be `-1` but
`filter_block_conjuncts` may still be non-empty (from the topn path). The
filter should still be applied. And the scanner-level limit check
(`_num_rows_return >= _limit`) still works because `_conjuncts` was cleared so
`_filter_output_block` is a no-op.
However, note that this changes the existing non-topn, non-limit path:
previously, `VCollectIterator::next(Block*)` simply returned
`_inner_iter->next(block)`. Now it checks `filter_block_conjuncts` even when
the vector is empty. This is a negligible cost (checking `.empty()` on an empty
vector) but worth being aware of.
##########
be/src/storage/iterator/vcollect_iterator.cpp:
##########
@@ -248,8 +255,39 @@ Status VCollectIterator::next(Block* block) {
return _topn_next(block);
}
+ // Fast path: if general limit already reached, return EOF immediately
+ if (_general_read_limit > 0 && _general_rows_returned >=
_general_read_limit) {
+ return Status::Error<END_OF_FILE>("");
+ }
+
if (LIKELY(_inner_iter)) {
- return _inner_iter->next(block);
+ auto st = _inner_iter->next(block);
+ if (UNLIKELY(!st.ok())) {
+ return st;
+ }
+
+ // Apply filter_block_conjuncts that were moved from
Scanner::_conjuncts.
+ // This must happen BEFORE limit counting so that
_general_rows_returned
+ // reflects post-filter rows (same pattern as _topn_next).
+ if (!_reader->_reader_context.filter_block_conjuncts.empty()) {
+ RETURN_IF_ERROR(VExprContext::filter_block(
+ _reader->_reader_context.filter_block_conjuncts, block,
block->columns()));
+ }
+
+ // Enforce general read limit: truncate block if needed
+ if (_general_read_limit > 0) {
+ _general_rows_returned += block->rows();
+ if (_general_rows_returned > _general_read_limit) {
Review Comment:
**[Note] After `filter_block`, if all rows are filtered out (`block->rows()
== 0`), this adds 0 to `_general_rows_returned` and returns OK with an empty
block.**
This is correct: the scanner's loop in `Scanner::get_block()`
(scanner.cpp:159) re-loops when `block->rows() == 0 && !(*eof)`, so it will
call back into the storage layer for more data. The `_general_rows_returned` is
unchanged, so limit tracking remains accurate.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]