This is an automated email from the ASF dual-hosted git repository.
BiteTheDDDDt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 28cc2bf11d5 [Improvement](scan) support push down limit to segment
iterator (#62222)
28cc2bf11d5 is described below
commit 28cc2bf11d5b9a3d01997eaaf918d28486480803
Author: Pxl <[email protected]>
AuthorDate: Mon May 11 14:10:54 2026 +0800
[Improvement](scan) support push down limit to segment iterator (#62222)
This pull request introduces significant improvements to scan operator
logic, particularly enhancing the correctness and efficiency of LIMIT
and predicate pushdown in OLAP scans. The main changes include a robust
mechanism for sharing LIMIT counters among scanners, more precise
control over predicate pushdown, and stricter validation for residual
predicates. These changes help ensure that queries with LIMIT and TopN
semantics return accurate results and avoid incorrect or inefficient
execution paths.
**Improvements to LIMIT handling and scanner coordination:**
- Introduced a shared atomic counter (`_shared_scan_limit`) for LIMIT
queries, allowing all scanners to collectively respect the global row
limit. Scanners now check and update this counter to stop early when the
LIMIT is reached, preventing over-scanning and improving efficiency.
TopN scans bypass this mechanism as required.
[[1]](diffhunk://#diff-7fe2376680f9ab835053a0a93c7f9458e6d526acfc180318be0ad936e0c2dedeR50-R55)
[[2]](diffhunk://#diff-7fe2376680f9ab835053a0a93c7f9458e6d526acfc180318be0ad936e0c2dedeR135-R143)
[[3]](diffhunk://#diff-7fe2376680f9ab835053a0a93c7f9458e6d526acfc180318be0ad936e0c2dedeR177-R183)
[[4]](diffhunk://#diff-7fe2376680f9ab835053a0a93c7f9458e6d526acfc180318be0ad936e0c2dedeR196)
[[5]](diffhunk://#diff-9c777f60e1d076777f042a0b702967bd5864bcfe9ee1eee9b456bb67c6fdf817R266-R269)
[[6]](diffhunk://#diff-a45108106b12759815ac5991d56a9d6ead7ffb9c915fb0cf7b6f0b8123b30262R71)
[[7]](diffhunk://#diff-a45108106b12759815ac5991d56a9d6ead7ffb9c915fb0cf7b6f0b8123b30262R239)
[[8]](diffhunk://#diff-a45108106b12759815ac5991d56a9d6ead7ffb9c915fb0cf7b6f0b8123b30262L283-R285)
[[9]](diffhunk://#diff-3c4794a864169735a628d8fcb1de986523a9516aad318aa0a893913f4ce031d4R1043-R1050)
- Adjusted scanner and scan local state interfaces to support the new
shared LIMIT mechanism, including new virtual methods and atomic counter
management.
[[1]](diffhunk://#diff-a45108106b12759815ac5991d56a9d6ead7ffb9c915fb0cf7b6f0b8123b30262R71)
[[2]](diffhunk://#diff-a45108106b12759815ac5991d56a9d6ead7ffb9c915fb0cf7b6f0b8123b30262R239)
[[3]](diffhunk://#diff-a45108106b12759815ac5991d56a9d6ead7ffb9c915fb0cf7b6f0b8123b30262L283-R285)
**Predicate pushdown and validation enhancements:**
- Refactored `_should_push_down_common_expr` to require the expression
as an argument and added logic to ensure only eligible expressions are
pushed down, considering storage merge requirements and key columns.
[[1]](diffhunk://#diff-780cdfbcbb1d30adb63c61521e6b681cad347bebeaa5332a781d05bb51543527L46-R46)
[[2]](diffhunk://#diff-235cc51f4698ebf0ebe796e7253912907bb7c1caf4efcac92538ac1ff2eac171L104-R107)
[[3]](diffhunk://#diff-3ddc75656071d9c0e6b0be450e152a1c94559f7e70ea820e7f0c80a7078e3292L465-R547)
[[4]](diffhunk://#diff-3c4794a864169735a628d8fcb1de986523a9516aad318aa0a893913f4ce031d4L313-R313)
- Added a validation step (`validate_residual_scan_conjuncts`) to
prevent unsupported residual predicates (like SEARCH or disabled MATCH)
and to ensure correctness when using COUNT_ON_INDEX pushdown.
**OLAP scan and segment limit logic improvements:**
- Improved the logic for pushing down segment limits and TopN
optimizations, ensuring that such optimizations are only enabled when
all necessary conditions are met (e.g., no residual predicates, no
runtime filters, storage does not require merging). The code now asserts
correct states and disables shared LIMITs for TopN scans as appropriate.
[[1]](diffhunk://#diff-4d3f2b2a64e1dac3bc076994d3cd9a6c569f7a8c6ad5bc69425188ffccb4266bL446-R502)
[[2]](diffhunk://#diff-3ddc75656071d9c0e6b0be450e152a1c94559f7e70ea820e7f0c80a7078e3292R1155-R1156)
**Code cleanup and maintenance:**
- Removed unused or obsolete logic related to filter block conjuncts and
the old limit quota acquisition method.
[[1]](diffhunk://#diff-4d3f2b2a64e1dac3bc076994d3cd9a6c569f7a8c6ad5bc69425188ffccb4266bL90)
[[2]](diffhunk://#diff-0c9a817d45d8130ea3211189e1321d1275e22fd4a9a3fac2bd707b1cfeefa5e5L115-L135)
**Dependency and include updates:**
- Added missing includes for expression types to support new logic.
```sql
CREATE DATABASE IF NOT EXISTS lm_bench;
USE lm_bench;
DROP TABLE IF EXISTS lm_wide_fact_10m;
CREATE TABLE lm_wide_fact_10m (
id BIGINT NOT NULL,
filter_key INT NOT NULL,
sort_key BIGINT NOT NULL,
metric BIGINT NOT NULL,
payload_a VARCHAR(1024) NOT NULL,
payload_b VARCHAR(1024) NOT NULL,
long_payload_a VARCHAR(4096) NOT NULL,
long_payload_b VARCHAR(4096) NOT NULL
)
ENGINE=OLAP
DUPLICATE KEY(`id`)
DISTRIBUTED BY HASH(`id`) BUCKETS 32
PROPERTIES (
"replication_num" = "1"
);
INSERT INTO lm_wide_fact_10m
SELECT
number AS id,
CAST(number % 1000000 AS INT) AS filter_key,
(number * 2654435761) % 1000000007 AS sort_key,
(number * 1103515245 + 12345) % 2147483647 AS metric,
REPEAT(CONCAT('payload_a_', CAST(number AS STRING), '_',
CAST((number * 1315423911) % 1000000007 AS STRING), '_'), 16) AS
payload_a,
REPEAT(CONCAT('payload_b_', CAST(number AS STRING), '_',
CAST((number * 2654435761) % 1000000009 AS STRING), '_'), 16) AS
payload_b,
REPEAT(CONCAT('long_payload_a_', CAST(number AS STRING), '_',
CAST((number * 48271) % 2147483647 AS STRING), '_'), 64) AS
long_payload_a,
REPEAT(CONCAT('long_payload_b_', CAST(number AS STRING), '_',
CAST((number * 69621) % 2147483629 AS STRING), '_'), 64) AS
long_payload_b
FROM numbers("number" = "10000000");
ANALYZE TABLE lm_wide_fact_10m WITH SYNC;
-- ============================================================
-- Case 1: TopN next + LIMIT + high-filter-rate pushed-down predicate
-- Expected observation:
-- filter_key is a narrow pushed-down predicate and keeps about 1% rows.
-- TopN works on narrow metric/sort_key, while long strings are final
output.
-- ============================================================
SELECT
id,
filter_key,
metric,
sort_key,
payload_a,
payload_b,
long_payload_a,
long_payload_b
FROM lm_wide_fact_10m
WHERE filter_key < 10000
ORDER BY metric DESC, sort_key ASC
LIMIT 10;
-- Result: 0.12 sec -> 0.10 sec
-- ============================================================
-- Case 2: Normal scan + LIMIT
-- Expected observation:
-- No ORDER BY and no predicate. This isolates ordinary scan limit
behavior
-- while still returning long string columns.
-- ============================================================
SELECT
id,
filter_key,
metric,
sort_key,
payload_a,
payload_b,
long_payload_a,
long_payload_b
FROM lm_wide_fact_10m
LIMIT 10;
-- Result: 0.06 sec -> 0.01 sec
-- ============================================================
-- Case 3: Normal scan + LIMIT + high-filter-rate pushed-down predicate
-- Expected observation:
-- No ORDER BY. This isolates scan predicate + limit behavior with long
-- string result columns and a predicate that filters about 99% of rows.
-- ============================================================
SELECT
id,
filter_key,
metric,
sort_key,
payload_a,
payload_b,
long_payload_a,
long_payload_b
FROM lm_wide_fact_10m
WHERE filter_key < 10000
LIMIT 10;
-- Result: 0.07 sec -> 0.02 sec
```
---------
Co-authored-by: Copilot <[email protected]>
---
be/src/exec/operator/mock_scan_operator.h | 2 +-
be/src/exec/operator/olap_scan_operator.cpp | 87 +++++++-
be/src/exec/operator/olap_scan_operator.h | 5 +-
be/src/exec/operator/scan_operator.cpp | 11 +-
be/src/exec/operator/scan_operator.h | 4 +-
be/src/exec/scan/olap_scanner.cpp | 77 ++++---
be/src/exec/scan/scanner.cpp | 23 ++
be/src/exec/scan/scanner.h | 4 +
be/src/exec/scan/scanner_context.cpp | 30 +--
be/src/exec/scan/scanner_context.h | 3 -
be/src/exec/scan/scanner_scheduler.cpp | 31 +--
be/src/exprs/vexpr.cpp | 11 -
be/src/exprs/vexpr.h | 2 -
be/src/runtime/runtime_state.h | 12 +-
be/src/storage/iterator/vcollect_iterator.cpp | 91 +++-----
be/src/storage/iterator/vcollect_iterator.h | 14 +-
be/src/storage/iterators.h | 7 +-
be/src/storage/rowset/beta_rowset_reader.cpp | 2 +-
be/src/storage/rowset/rowset_reader_context.h | 5 +-
be/src/storage/segment/segment_iterator.cpp | 88 ++++++--
be/src/storage/segment/segment_iterator.h | 7 +-
be/src/storage/tablet/tablet_reader.cpp | 1 -
be/src/storage/tablet/tablet_reader.h | 7 +-
.../compaction/collection_statistics_test.cpp | 2 +-
.../segment/segment_iterator_limit_opt_test.cpp | 207 ++++++++++++++++++
.../java/org/apache/doris/qe/SessionVariable.java | 12 ++
gensrc/thrift/PaloInternalService.thrift | 5 +-
.../query_p0/limit/test_unified_limit_pushdown.out | 43 ++++
.../data/search/test_search_slash_in_term.out | 5 +
.../predefine/test_predefine_type_index.out | 8 +
.../limit/test_general_limit_pushdown.groovy | 12 +-
.../limit/test_unified_limit_pushdown.groovy | 233 +++++++++++++++++++++
.../suites/search/test_search_slash_in_term.groovy | 7 +
.../predefine/test_predefine_type_index.groovy | 14 +-
34 files changed, 849 insertions(+), 223 deletions(-)
diff --git a/be/src/exec/operator/mock_scan_operator.h
b/be/src/exec/operator/mock_scan_operator.h
index 8800dc97860..3250a4219b3 100644
--- a/be/src/exec/operator/mock_scan_operator.h
+++ b/be/src/exec/operator/mock_scan_operator.h
@@ -43,7 +43,7 @@ private:
return PushDownType::ACCEPTABLE;
}
- bool _should_push_down_common_expr() override { return true; }
+ bool _should_push_down_common_expr(const VExprSPtr&) override { return
true; }
PushDownType _should_push_down_topn_filter() const override { return
PushDownType::ACCEPTABLE; }
PushDownType _should_push_down_is_null_predicate(VectorizedFnCall*
fn_call) const override {
diff --git a/be/src/exec/operator/olap_scan_operator.cpp
b/be/src/exec/operator/olap_scan_operator.cpp
index c1113ba9a3e..d0365919d6d 100644
--- a/be/src/exec/operator/olap_scan_operator.cpp
+++ b/be/src/exec/operator/olap_scan_operator.cpp
@@ -37,6 +37,7 @@
#include "exprs/vectorized_fn_call.h"
#include "exprs/vexpr.h"
#include "exprs/vexpr_context.h"
+#include "exprs/virtual_slot_ref.h"
#include "exprs/vslot_ref.h"
#include "io/cache/block_file_cache_profile.h"
#include "runtime/query_cache/query_cache.h"
@@ -407,12 +408,62 @@ Status OlapScanLocalState::_init_profile() {
return Status::OK();
}
+static bool contains_expr_node_type(const VExprSPtr& expr, TExprNodeType::type
node_type) {
+ DORIS_CHECK(expr != nullptr);
+ if (expr->node_type() == node_type) {
+ return true;
+ }
+ if (expr->is_rf_wrapper() && contains_expr_node_type(expr->get_impl(),
node_type)) {
+ return true;
+ }
+ return std::ranges::any_of(expr->children(), [node_type](const auto&
child) {
+ return contains_expr_node_type(child, node_type);
+ });
+}
+
+static Status validate_residual_scan_conjuncts(RuntimeState* state,
+ TPushAggOp::type
push_down_agg_type,
+ const VExprContextSPtrs&
conjuncts) {
+ for (const auto& conjunct : conjuncts) {
+ const auto& root = conjunct->root();
+ if (contains_expr_node_type(root, TExprNodeType::SEARCH_EXPR)) {
+ return Status::InvalidArgument(
+ "SEARCH expression remains as a residual scan predicate. A
valid search() "
+ "must bind at least one indexed field and be evaluated in
SegmentIterator. "
+ "enable_segment_limit_pushdown only controls
SegmentIterator LIMIT pushdown "
+ "and cannot make residual SEARCH executable.");
+ }
+ if (!state->query_options().enable_match_without_inverted_index &&
+ contains_expr_node_type(root, TExprNodeType::MATCH_PRED)) {
+ return Status::InvalidArgument(
+ "MATCH expression remains as a residual scan predicate and
would fall back to "
+ "a disabled slow path because
enable_match_without_inverted_index is false. "
+ "enable_segment_limit_pushdown only controls
SegmentIterator LIMIT pushdown "
+ "and cannot make residual MATCH executable. Set "
+ "enable_match_without_inverted_index=true to allow slow
MATCH execution.");
+ }
+ }
+
+ if (push_down_agg_type == TPushAggOp::COUNT_ON_INDEX &&
!conjuncts.empty()) {
+ return Status::InvalidArgument(
+ "COUNT_ON_INDEX pushdown cannot be used with residual scan
predicates. "
+ "Residual predicates must be evaluated before COUNT_ON_INDEX
counts rows; "
+ "otherwise the query may return incorrect results. "
+ "enable_segment_limit_pushdown only controls SegmentIterator
LIMIT pushdown and "
+ "does not make COUNT_ON_INDEX safe with residual predicates.
Set "
+ "enable_count_on_index_pushdown=false to disable
COUNT_ON_INDEX pushdown.");
+ }
+ return Status::OK();
+}
+
Status OlapScanLocalState::_process_conjuncts(RuntimeState* state) {
SCOPED_TIMER(_process_conjunct_timer);
RETURN_IF_ERROR(ScanLocalState::_process_conjuncts(state));
if (ScanLocalState::_eos) {
return Status::OK();
}
+ auto& p = _parent->cast<OlapScanOperatorX>();
+ RETURN_IF_ERROR(validate_residual_scan_conjuncts(state,
p._push_down_agg_type, _conjuncts));
RETURN_IF_ERROR(_build_key_ranges_and_filters());
return Status::OK();
}
@@ -472,8 +523,38 @@ Status
OlapScanLocalState::_should_push_down_function_filter(VectorizedFnCall* f
return Status::OK();
}
-bool OlapScanLocalState::_should_push_down_common_expr() {
- return state()->enable_common_expr_pushdown() && _storage_no_merge();
+bool OlapScanLocalState::_should_push_down_common_expr(const VExprSPtr& expr) {
+ // SegmentIterator common exprs must eventually act on at least one scan
slot.
+ if (!_check_expr_storage_filter(expr,
ExprStorageFilterCheckMode::HAS_SEGMENT_EVALUABLE_EXPR)) {
+ return false;
+ }
+
+ // DUP and UNIQUE-MOW/MOR-as-DUP do not need storage aggregation/merge, so
any slot-based common
+ // expression can be evaluated together with SegmentIterator lazy
materialization.
+ if (_storage_no_merge()) {
+ return true;
+ }
+
+ // AGG and UNIQUE-MOR may still merge value columns above SegmentIterator.
Push only key-column
+ // expressions so filtering does not observe pre-merge values.
+ return !_check_expr_storage_filter(expr,
ExprStorageFilterCheckMode::HAS_NON_KEY_SLOT);
+}
+
+bool OlapScanLocalState::_check_expr_storage_filter(const VExprSPtr& expr,
+ ExprStorageFilterCheckMode
mode) {
+ if (expr->is_slot_ref()) {
+ const auto* slot_ref = assert_cast<const VSlotRef*>(expr.get());
+ return mode == ExprStorageFilterCheckMode::HAS_SEGMENT_EVALUABLE_EXPR
||
+ !_is_key_column(slot_ref->expr_name());
+ }
+ if (expr->is_virtual_slot_ref()) {
+ // Treat virtual slot ref as non-key because it may depend on non-key
source columns.
+ return true;
+ }
+
+ return std::ranges::any_of(expr->children(), [this, mode](const auto&
child) {
+ return _check_expr_storage_filter(child, mode);
+ });
}
bool OlapScanLocalState::_storage_no_merge() {
@@ -1081,6 +1162,8 @@ OlapScanOperatorX::OlapScanOperatorX(ObjectPool* pool,
const TPlanNode& tnode, i
_cache_param(param) {
_output_tuple_id = tnode.olap_scan_node.tuple_id;
if (_olap_scan_node.__isset.sort_info &&
_olap_scan_node.__isset.sort_limit) {
+ DORIS_CHECK(_limit < 0);
+ DORIS_CHECK(_olap_scan_node.sort_limit > 0);
_limit_per_scanner = _olap_scan_node.sort_limit;
}
DBUG_EXECUTE_IF("segment_iterator.topn_opt_1", {
diff --git a/be/src/exec/operator/olap_scan_operator.h
b/be/src/exec/operator/olap_scan_operator.h
index cd82c7ab08e..38d3ae64634 100644
--- a/be/src/exec/operator/olap_scan_operator.h
+++ b/be/src/exec/operator/olap_scan_operator.h
@@ -101,7 +101,10 @@ private:
VectorizedFnCall* fn_call, VExprContext* expr_ctx, Field&
constant_val,
const std::set<std::string> fn_name) const override;
- bool _should_push_down_common_expr() override;
+ bool _should_push_down_common_expr(const VExprSPtr& expr) override;
+
+ enum class ExprStorageFilterCheckMode { HAS_SEGMENT_EVALUABLE_EXPR,
HAS_NON_KEY_SLOT };
+ bool _check_expr_storage_filter(const VExprSPtr& expr,
ExprStorageFilterCheckMode mode);
bool _storage_no_merge() override;
diff --git a/be/src/exec/operator/scan_operator.cpp
b/be/src/exec/operator/scan_operator.cpp
index e734b65ac24..7f72b4be146 100644
--- a/be/src/exec/operator/scan_operator.cpp
+++ b/be/src/exec/operator/scan_operator.cpp
@@ -310,8 +310,7 @@ Status
ScanLocalState<Derived>::_normalize_conjuncts(RuntimeState* state) {
RETURN_IF_ERROR(_normalize_predicate(conjunct.get(),
conjunct->root(), new_root));
if (new_root) {
conjunct->set_root(new_root);
- if (_should_push_down_common_expr() &&
- VExpr::is_acting_on_a_slot(*(conjunct->root()))) {
+ if (_should_push_down_common_expr(conjunct->root())) {
_common_expr_ctxs_push_down.emplace_back(conjunct);
it = _conjuncts.erase(it);
continue;
@@ -1041,6 +1040,14 @@ int64_t ScanLocalState<Derived>::limit_per_scanner() {
return _parent->cast<typename Derived::Parent>()._limit_per_scanner;
}
+template <typename Derived>
+std::atomic<int64_t>* ScanLocalState<Derived>::shared_scan_limit_ptr() {
+ auto* p = &_parent->cast<typename Derived::Parent>()._shared_scan_limit;
+ // -1 means "no SQL LIMIT" — return nullptr so callers naturally skip
+ // all limit logic.
+ return p->load(std::memory_order_relaxed) < 0 ? nullptr : p;
+}
+
template <typename Derived>
Status ScanLocalState<Derived>::_init_profile() {
// 1. counters for scan node
diff --git a/be/src/exec/operator/scan_operator.h
b/be/src/exec/operator/scan_operator.h
index 591bdcac5ee..f2ba87d4291 100644
--- a/be/src/exec/operator/scan_operator.h
+++ b/be/src/exec/operator/scan_operator.h
@@ -68,6 +68,7 @@ public:
[[nodiscard]] virtual const TupleDescriptor* output_tuple_desc() const = 0;
virtual int64_t limit_per_scanner() = 0;
+ virtual std::atomic<int64_t>* shared_scan_limit_ptr() = 0;
virtual void set_scan_ranges(RuntimeState* state,
const std::vector<TScanRangeParams>&
scan_ranges) = 0;
@@ -235,6 +236,7 @@ class ScanLocalState : public ScanLocalStateBase {
[[nodiscard]] const TupleDescriptor* output_tuple_desc() const override;
int64_t limit_per_scanner() override;
+ std::atomic<int64_t>* shared_scan_limit_ptr() override;
void set_scan_ranges(RuntimeState* state,
const std::vector<TScanRangeParams>& scan_ranges)
override {}
@@ -280,7 +282,7 @@ protected:
Status _init_profile() override;
virtual Status _process_conjuncts(RuntimeState* state) { return
_normalize_conjuncts(state); }
- virtual bool _should_push_down_common_expr() { return false; }
+ virtual bool _should_push_down_common_expr(const VExprSPtr&) { return
false; }
virtual bool _storage_no_merge() { return false; }
virtual bool _is_key_column(const std::string& col_name) { return false; }
diff --git a/be/src/exec/scan/olap_scanner.cpp
b/be/src/exec/scan/olap_scanner.cpp
index a7949535e37..562f6c7a474 100644
--- a/be/src/exec/scan/olap_scanner.cpp
+++ b/be/src/exec/scan/olap_scanner.cpp
@@ -87,7 +87,6 @@ OlapScanner::OlapScanner(ScanLocalStateBase* parent,
OlapScanner::Params&& param
.remaining_conjunct_roots {},
.common_expr_ctxs_push_down {},
.topn_filter_source_node_ids {},
- .filter_block_conjuncts {},
.key_group_cluster_key_idxes {},
.virtual_column_exprs {},
.vir_cid_to_idx_in_block {},
@@ -443,17 +442,31 @@ Status OlapScanner::_init_tablet_reader_params(
_tablet_reader_params.enable_mor_value_predicate_pushdown = true;
}
- // Skip topn / general-limit storage-layer optimizations when runtime
- // filters exist. Late-arriving filters would re-populate _conjuncts
- // at the scanner level while the storage layer has already committed
- // to a row budget counted before those filters, causing the scan to
- // return fewer rows than the limit requires.
- if (_total_rf_num == 0) {
- // order by table keys optimization for topn
- // will only read head/tail of data file since it's already sorted
by keys
- if (olap_scan_node.__isset.sort_info &&
- !olap_scan_node.sort_info.is_asc_order.empty()) {
- _limit = _local_state->limit_per_scanner();
+ const bool has_key_topn =
+ olap_scan_node.__isset.sort_info &&
!olap_scan_node.sort_info.is_asc_order.empty();
+ if (has_key_topn) {
+ _limit = _local_state->limit_per_scanner();
+ }
+
+ const bool no_runtime_filters = _total_rf_num == 0;
+ const bool segment_limit_enabled =
_state->enable_segment_limit_pushdown();
+ const bool storage_no_merge =
olap_scan_local_state->_storage_no_merge();
+
+ if (_limit > 0 && no_runtime_filters && segment_limit_enabled &&
storage_no_merge) {
+ for (const auto& conjunct : _conjuncts) {
+ DORIS_CHECK(!olap_scan_local_state->_check_expr_storage_filter(
+ conjunct->root(),
OlapScanLocalState::ExprStorageFilterCheckMode::
+ HAS_SEGMENT_EVALUABLE_EXPR));
+ }
+ }
+
+ // Segment LIMIT has only two legal states: completely disabled, or
enabled after every
+ // row-filtering conjunct has become a storage predicate or
SegmentIterator common expr.
+ const bool can_push_down_segment_limit = _limit > 0 &&
no_runtime_filters &&
+ _conjuncts.empty() &&
segment_limit_enabled &&
+ storage_no_merge;
+ if (can_push_down_segment_limit) {
+ if (has_key_topn) {
_tablet_reader_params.read_orderby_key = true;
if (!olap_scan_node.sort_info.is_asc_order[0]) {
_tablet_reader_params.read_orderby_key_reverse = true;
@@ -461,28 +474,32 @@ Status OlapScanner::_init_tablet_reader_params(
_tablet_reader_params.read_orderby_key_num_prefix_columns =
olap_scan_node.sort_info.is_asc_order.size();
_tablet_reader_params.read_orderby_key_limit = _limit;
-
- if (_tablet_reader_params.read_orderby_key_limit > 0 &&
- olap_scan_local_state->_storage_no_merge()) {
- _tablet_reader_params.filter_block_conjuncts = _conjuncts;
- _conjuncts.clear();
- }
- } else if (_limit > 0 &&
olap_scan_local_state->_storage_no_merge()) {
- // General limit pushdown for DUP_KEYS and UNIQUE_KEYS with MOW
- // (non-merge path). Only when topn optimization is NOT active.
- // NOTE: _limit is the global query limit (TPlanNode.limit),
not a
- // per-scanner budget. With N scanners each scanner may read
up to
- // _limit rows, so up to N * _limit rows are read in total
before
- // the _shared_scan_limit coordinator stops them. This is
- // acceptable because _shared_scan_limit guarantees
correctness,
- // and the over-read is bounded by (N-1) * _limit which is
small
- // for typical LIMIT values.
+ } else {
_tablet_reader_params.general_read_limit = _limit;
- _tablet_reader_params.filter_block_conjuncts = _conjuncts;
- _conjuncts.clear();
}
}
+ if (_tablet_reader_params.read_orderby_key_limit > 0 ||
+ _tablet_reader_params.general_read_limit > 0) {
+ DORIS_CHECK(can_push_down_segment_limit);
+ DORIS_CHECK(_conjuncts.empty());
+ }
+
+ // A key TopN scan cannot share the plain LIMIT early-stop counter. If
+ // storage TopN is pushed down, each scanner must produce its full
local
+ // candidates. If it is not pushed down for any reason, the upper TopN
+ // still needs all rows from the scan.
+ if (has_key_topn) {
+ _shared_scan_limit = nullptr;
+ if (_tablet_reader_params.read_orderby_key_limit == 0) {
+ _limit = -1;
+ }
+ }
+ // Note: _shared_scan_limit is intentionally not pushed into the
+ // storage layer. SegmentIterator's _process_eof() is irreversible,
+ // so a concurrently-decremented atomic could reach 0 while a segment
+ // still has data needed by other scanners.
+
// set push down topn filter
_tablet_reader_params.topn_filter_source_node_ids =
olap_scan_local_state->get_topn_filter_source_node_ids(_state,
true);
diff --git a/be/src/exec/scan/scanner.cpp b/be/src/exec/scan/scanner.cpp
index 97f12d1195c..ae1ed96e000 100644
--- a/be/src/exec/scan/scanner.cpp
+++ b/be/src/exec/scan/scanner.cpp
@@ -47,6 +47,12 @@ Scanner::Scanner(RuntimeState* state, ScanLocalStateBase*
local_state, int64_t l
}
Status Scanner::init(RuntimeState* state, const VExprContextSPtrs& conjuncts) {
+ // All scanners share a remaining-limit counter so a LIMIT query can
+ // stop once enough rows have been collected across scanners.
+ // Key TopN scans have no ordinary scan LIMIT, so each scanner can
+ // independently produce its full local top-N.
+ _shared_scan_limit = _local_state->shared_scan_limit_ptr();
+
if (!conjuncts.empty()) {
_conjuncts.resize(conjuncts.size());
for (size_t i = 0; i != conjuncts.size(); ++i) {
@@ -126,6 +132,15 @@ Status Scanner::get_block_after_projects(RuntimeState*
state, Block* block, bool
Status Scanner::get_block(RuntimeState* state, Block* block, bool* eof) {
// only empty block should be here
DCHECK(block->rows() == 0);
+
+ // Stop early if other scanners have already collected enough rows
+ // for the SQL LIMIT. Skipped when _shared_scan_limit is null (topn
+ // path or no LIMIT).
+ if (_shared_scan_limit &&
_shared_scan_limit->load(std::memory_order_acquire) <= 0) {
+ *eof = true;
+ return Status::OK();
+ }
+
// scanner running time
SCOPED_RAW_TIMER(&_per_scanner_timer);
int64_t rows_read_threshold = _num_rows_read +
config::doris_scanner_row_num;
@@ -159,6 +174,13 @@ Status Scanner::get_block(RuntimeState* state, Block*
block, bool* eof) {
}
// record rows return (after filter) for _limit check
_num_rows_return += block->rows();
+ // Publish progress to the shared counter so peer scanners can
+ // observe it. The counter may go negative when several scanners
+ // subtract concurrently; that is harmless because the operator's
+ // reached_limit() makes the final cut.
+ if (_shared_scan_limit && block->rows() > 0) {
+ _shared_scan_limit->fetch_sub(block->rows(),
std::memory_order_acq_rel);
+ }
} while (!_should_stop && !state->is_cancelled() && block->rows() == 0
&& !(*eof) &&
_num_rows_read < rows_read_threshold);
}
@@ -171,6 +193,7 @@ Status Scanner::get_block(RuntimeState* state, Block*
block, bool* eof) {
// set eof to true if per scanner limit is reached
// currently for query: ORDER BY key LIMIT n
*eof = *eof || (_limit > 0 && _num_rows_return >= _limit);
+ *eof = *eof || (_shared_scan_limit &&
_shared_scan_limit->load(std::memory_order_acquire) <= 0);
return Status::OK();
}
diff --git a/be/src/exec/scan/scanner.h b/be/src/exec/scan/scanner.h
index bd56dc0f08f..c14f6ee2048 100644
--- a/be/src/exec/scan/scanner.h
+++ b/be/src/exec/scan/scanner.h
@@ -274,6 +274,10 @@ protected:
int64_t _projection_timer = 0;
bool _should_stop = false;
+
+ // Cached pointer to ScanOperator's remaining-limit counter. Null when
+ // this scanner is on the topn path or the query has no LIMIT.
+ std::atomic<int64_t>* _shared_scan_limit = nullptr;
};
using ScannerSPtr = std::shared_ptr<Scanner>;
diff --git a/be/src/exec/scan/scanner_context.cpp
b/be/src/exec/scan/scanner_context.cpp
index 8ca4ff8cc1a..b461b289be1 100644
--- a/be/src/exec/scan/scanner_context.cpp
+++ b/be/src/exec/scan/scanner_context.cpp
@@ -112,27 +112,6 @@ ScannerContext::ScannerContext(RuntimeState* state,
ScanLocalStateBase* local_st
DorisMetrics::instance()->scanner_ctx_cnt->increment(1);
}
-int64_t ScannerContext::acquire_limit_quota(int64_t desired) {
- DCHECK(desired > 0);
- int64_t remaining = _shared_scan_limit->load(std::memory_order_acquire);
- while (true) {
- if (remaining < 0) {
- // No limit set, grant all desired rows.
- return desired;
- }
- if (remaining == 0) {
- return 0;
- }
- int64_t granted = std::min(desired, remaining);
- if (_shared_scan_limit->compare_exchange_weak(remaining, remaining -
granted,
-
std::memory_order_acq_rel,
-
std::memory_order_acquire)) {
- return granted;
- }
- // CAS failed, `remaining` is updated to current value, retry.
- }
-}
-
void ScannerContext::_adjust_scan_mem_limit(int64_t old_value, int64_t
new_value) {
if (!_enable_adaptive_scanners) {
return;
@@ -421,9 +400,6 @@ Status ScannerContext::get_block_from_queue(RuntimeState*
state, Block* block, b
}
}
- // Mark finished when either:
- // (1) all scanners completed normally, or
- // (2) shared limit exhausted and no scanners are still running.
if (_completed_tasks.empty() &&
(_num_finished_scanners == _all_scanners.size() ||
(_shared_scan_limit->load(std::memory_order_acquire) == 0 &&
_in_flight_tasks_num == 0))) {
@@ -757,9 +733,9 @@ std::shared_ptr<ScanTask>
ScannerContext::_pull_next_scan_task(
}
if (!_pending_tasks.empty()) {
- // If shared limit quota is exhausted, do not submit new scanners from
pending queue.
- int64_t remaining =
_shared_scan_limit->load(std::memory_order_acquire);
- if (remaining == 0) {
+ // Skip submitting more pending scanners once the LIMIT budget is
+ // exhausted; they would only open and immediately EOF.
+ if (_shared_scan_limit->load(std::memory_order_acquire) == 0) {
return nullptr;
}
std::shared_ptr<ScanTask> next_scan_task;
diff --git a/be/src/exec/scan/scanner_context.h
b/be/src/exec/scan/scanner_context.h
index d3b61c5f8c0..d96557ac5e5 100644
--- a/be/src/exec/scan/scanner_context.h
+++ b/be/src/exec/scan/scanner_context.h
@@ -288,9 +288,6 @@ protected:
// Points to the shared remaining limit on ScanOperatorX, shared across all
// parallel instances and their scanners. -1 means no limit.
std::atomic<int64_t>* _shared_scan_limit = nullptr;
- // Atomically acquire up to `desired` rows. Returns actual granted count
(0 = exhausted).
- int64_t acquire_limit_quota(int64_t desired);
- int64_t remaining_limit() const { return
_shared_scan_limit->load(std::memory_order_acquire); }
int64_t _max_bytes_in_queue = 0;
// _transfer_lock protects _completed_tasks, _pending_tasks, and all other
shared state
diff --git a/be/src/exec/scan/scanner_scheduler.cpp
b/be/src/exec/scan/scanner_scheduler.cpp
index b61f26e2c88..4845c79a2b9 100644
--- a/be/src/exec/scan/scanner_scheduler.cpp
+++ b/be/src/exec/scan/scanner_scheduler.cpp
@@ -211,9 +211,8 @@ void
ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx,
}
bool first_read = true;
- int64_t limit = scanner->limit(); if (UNLIKELY(ctx->done())) {
- eos = true;
- } else if (ctx->remaining_limit() == 0) { eos = true; } else if
(!eos) {
+ int64_t limit = scanner->limit();
+ if (UNLIKELY(ctx->done())) { eos = true; } else if (!eos) {
do {
DEFER_RELEASE_RESERVED();
BlockUPtr free_block;
@@ -248,22 +247,6 @@ void
ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx,
// Or it may cause a crash when the block is not normal.
_make_sure_virtual_col_is_materialized(scanner,
free_block.get());
- // Shared limit quota: acquire rows from the context's
shared pool.
- // Discard or truncate the block if quota is exhausted.
- if (free_block->rows() > 0) {
- int64_t block_rows = free_block->rows();
- int64_t granted = ctx->acquire_limit_quota(block_rows);
- if (granted == 0) {
- // No quota remaining, discard this block and mark
eos.
- ctx->return_free_block(std::move(free_block));
- eos = true;
- break;
- } else if (granted < block_rows) {
- // Partial quota: truncate block to granted rows
and mark eos.
- free_block->set_num_rows(granted);
- eos = true;
- }
- }
// Projection will truncate useless columns, makes block
size change.
auto free_block_bytes = free_block->allocated_bytes();
ctx->reestimated_block_mem_bytes(cast_set<int64_t>(free_block_bytes));
@@ -289,11 +272,11 @@ void
ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx,
} while (false);
}
- if (UNLIKELY(!status.ok())) {
-
scan_task->set_status(status);
- eos = true;
- },
- status);
+ if (UNLIKELY(!status.ok())) {
+ scan_task->set_status(status);
+ eos = true;
+ },
+ status);
if (UNLIKELY(!status.ok())) {
scan_task->set_status(status);
diff --git a/be/src/exprs/vexpr.cpp b/be/src/exprs/vexpr.cpp
index 8affbb0e874..384bb9ca60b 100644
--- a/be/src/exprs/vexpr.cpp
+++ b/be/src/exprs/vexpr.cpp
@@ -339,17 +339,6 @@ TExprNode create_texpr_node_from(const Field& field, const
PrimitiveType& type,
namespace doris {
-bool VExpr::is_acting_on_a_slot(const VExpr& expr) {
- const auto& children = expr.children();
-
- auto is_a_slot = std::any_of(children.begin(), children.end(),
- [](const auto& child) { return
is_acting_on_a_slot(*child); });
-
- return is_a_slot ? true
- : (expr.node_type() == TExprNodeType::SLOT_REF ||
- expr.node_type() == TExprNodeType::VIRTUAL_SLOT_REF);
-}
-
VExpr::VExpr(const TExprNode& node)
: _node_type(node.node_type),
_opcode(node.__isset.opcode ? node.opcode :
TExprOpcode::INVALID_OPCODE) {
diff --git a/be/src/exprs/vexpr.h b/be/src/exprs/vexpr.h
index b28054461e5..591fbe3d743 100644
--- a/be/src/exprs/vexpr.h
+++ b/be/src/exprs/vexpr.h
@@ -91,8 +91,6 @@ public:
return block->columns() - 1;
}
- static bool is_acting_on_a_slot(const VExpr& expr);
-
VExpr(const TExprNode& node);
VExpr(const VExpr& vexpr);
VExpr(DataTypePtr type, bool is_slotref);
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index 905b1ffa611..619b842eef7 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -243,17 +243,11 @@ public:
return _query_options.__isset.enable_insert_strict &&
_query_options.enable_insert_strict;
}
- bool enable_common_expr_pushdown() const {
- return _query_options.__isset.enable_common_expr_pushdown &&
- _query_options.enable_common_expr_pushdown;
+ bool enable_segment_limit_pushdown() const {
+ return !_query_options.__isset.enable_segment_limit_pushdown ||
+ _query_options.enable_segment_limit_pushdown;
}
- bool enable_common_expr_pushdown_for_inverted_index() const {
- return enable_common_expr_pushdown() &&
-
_query_options.__isset.enable_common_expr_pushdown_for_inverted_index &&
- _query_options.enable_common_expr_pushdown_for_inverted_index;
- };
-
bool mysql_row_binary_format() const {
return _query_options.__isset.mysql_row_binary_format &&
_query_options.mysql_row_binary_format;
diff --git a/be/src/storage/iterator/vcollect_iterator.cpp
b/be/src/storage/iterator/vcollect_iterator.cpp
index 1d70e61eee6..51bb501d0fb 100644
--- a/be/src/storage/iterator/vcollect_iterator.cpp
+++ b/be/src/storage/iterator/vcollect_iterator.cpp
@@ -37,7 +37,6 @@
#include "core/data_type/data_type.h"
#include "core/data_type/data_type_number.h" // IWYU pragma: keep
#include "core/field.h"
-#include "exprs/vexpr_context.h"
#include "io/io_common.h"
#include "runtime/query_context.h"
#include "runtime/runtime_predicate.h"
@@ -84,31 +83,31 @@ void VCollectIterator::init(TabletReader* reader, bool
ori_data_overlapping, boo
}
_is_reverse = is_reverse;
- // use topn_next opt only for DUP_KEYS and UNIQUE_KEYS with MOW
- if (_reader->_reader_context.read_orderby_key_limit > 0 &&
- (_reader->_tablet->keys_type() == KeysType::DUP_KEYS ||
- (_reader->_tablet->keys_type() == KeysType::UNIQUE_KEYS &&
- _reader->_tablet->enable_unique_key_merge_on_write()))) {
+ // TopN limit. Whether we reach here is decided by the scanner;
+ // it only sets read_orderby_key_limit on the no-merge tablet types.
+ if (_reader->_reader_context.read_orderby_key_limit > 0) {
_topn_limit = _reader->_reader_context.read_orderby_key_limit;
- } else {
- _topn_limit = 0;
+ DORIS_CHECK(_reader->_reader_context.general_read_limit <= 0);
}
-
- // General limit pushdown: only for non-merge path (DUP_KEYS or
UNIQUE_KEYS with MOW).
- // The scanner already guards this with _storage_no_merge(), but we also
check !_merge
- // here because _merge can be forced true by overlapping data
(force_merge), in which
- // case limit pushdown is not safe.
- if (!_merge && _reader->_reader_context.general_read_limit > 0) {
- _general_read_limit = _reader->_reader_context.general_read_limit;
+ // General limit is forwarded to SegmentIterator, which applies it after
+ // predicate/common-expr filtering and before lazy-reading non-predicate
columns.
+ if (_reader->_reader_context.general_read_limit > 0) {
+ _general_read_limit =
static_cast<size_t>(_reader->_reader_context.general_read_limit);
}
}
Status VCollectIterator::add_child(const RowSetSplits& rs_splits) {
+ // Forward the local row budget to SegmentIterator. SegmentIterator applies
+ // it after its own predicate/common-expr filtering, so LIMIT and pushed
+ // conjuncts stay in the same layer.
if (use_topn_next()) {
rs_splits.rs_reader->set_topn_limit(_topn_limit);
_rs_splits.push_back(rs_splits);
return Status::OK();
}
+ if (_general_read_limit > 0) {
+ rs_splits.rs_reader->set_topn_limit(_general_read_limit);
+ }
_children.push_back(std::make_unique<Level0Iterator>(rs_splits.rs_reader,
_reader));
return Status::OK();
@@ -119,6 +118,7 @@ Status VCollectIterator::add_child(const RowSetSplits&
rs_splits) {
// then merged with the base rowset.
Status VCollectIterator::build_heap(std::vector<RowsetReaderSharedPtr>&
rs_readers) {
if (use_topn_next()) {
+ DORIS_CHECK(_children.empty());
return Status::OK();
}
@@ -255,42 +255,11 @@ Status VCollectIterator::next(Block* block) {
return _topn_next(block);
}
- // Fast path: if general limit already reached, return EOF immediately
- if (_general_read_limit > 0 && _general_rows_returned >=
_general_read_limit) {
- return Status::Error<END_OF_FILE>("");
- }
-
if (LIKELY(_inner_iter)) {
auto st = _inner_iter->next(block);
if (UNLIKELY(!st.ok())) {
return st;
}
-
- // Apply filter_block_conjuncts that were moved from
Scanner::_conjuncts.
- // This must happen BEFORE limit counting so that
_general_rows_returned
- // reflects post-filter rows (same pattern as _topn_next).
- // Intentionally not gated by _general_read_limit > 0:
- // filter_block_conjuncts is only populated when the general-limit or
- // topn branches move conjuncts into the storage layer (topn takes
- // the _topn_next path and never reaches here).
- if (!_reader->_reader_context.filter_block_conjuncts.empty()) {
- RETURN_IF_ERROR(VExprContext::filter_block(
- _reader->_reader_context.filter_block_conjuncts, block,
block->columns()));
- }
-
- // Enforce general read limit: truncate block if needed
- if (_general_read_limit > 0) {
- _general_rows_returned += block->rows();
- if (_general_rows_returned > _general_read_limit) {
- // Truncate block to return exactly the remaining rows needed
- int64_t excess = _general_rows_returned - _general_read_limit;
- int64_t keep = block->rows() - excess;
- DCHECK_GT(keep, 0);
- block->set_num_rows(keep);
- _general_rows_returned = _general_read_limit;
- }
- }
-
return Status::OK();
} else {
return Status::Error<END_OF_FILE>("");
@@ -298,20 +267,16 @@ Status VCollectIterator::next(Block* block) {
}
Status VCollectIterator::_topn_next(Block* block) {
+ DORIS_CHECK(block != nullptr);
+ DORIS_CHECK(use_topn_next());
+ DORIS_CHECK(_reader->_reader_context.read_orderby_key_columns != nullptr);
+ DORIS_CHECK(!_reader->_reader_context.read_orderby_key_columns->empty());
+
if (_topn_eof) {
return Status::Error<END_OF_FILE>("");
}
auto clone_block = block->clone_empty();
- /*
- select id, "${tR2}",
- l2_distance_approximate
- from ann_index_only_scan
- where l2_distance_approximate < 10
- order by id
- limit 20;
- where id is the orderby key column.
- */
// Initialize virtual slot columns by schema (avoid runtime type checks):
// use _reader_context.vir_col_idx_to_type to construct real columns for
those positions.
if (!_reader->_reader_context.vir_col_idx_to_type.empty()) {
@@ -325,13 +290,11 @@ Status VCollectIterator::_topn_next(Block* block) {
}
MutableBlock mutable_block =
MutableBlock::build_mutable_block(&clone_block);
- if (!_reader->_reader_context.read_orderby_key_columns) {
- return Status::Error<ErrorCode::INTERNAL_ERROR>(
- "read_orderby_key_columns should not be nullptr");
- }
-
- size_t first_sort_column_idx =
(*_reader->_reader_context.read_orderby_key_columns)[0];
const std::vector<uint32_t>* sort_columns =
_reader->_reader_context.read_orderby_key_columns;
+ for (auto column_idx : *sort_columns) {
+ DORIS_CHECK(column_idx < clone_block.columns());
+ }
+ size_t first_sort_column_idx = (*sort_columns)[0];
BlockRowPosComparator row_pos_comparator(&mutable_block, sort_columns,
_reader->_reader_context.read_orderby_key_reverse);
@@ -364,10 +327,6 @@ Status VCollectIterator::_topn_next(Block* block) {
}
}
- // filter block
- RETURN_IF_ERROR(VExprContext::filter_block(
- _reader->_reader_context.filter_block_conjuncts, block,
block->columns()));
-
// update read rows
read_rows += block->rows();
@@ -395,6 +354,8 @@ Status VCollectIterator::_topn_next(Block* block) {
int res = 0;
for (auto k : *sort_columns) {
+ DORIS_CHECK(k < block->columns());
+ DORIS_CHECK(k < mutable_block.columns());
DCHECK(block->get_by_position(k).type->equals(
*mutable_block.get_datatype_by_position(k)));
res = block->get_by_position(k).column->compare_at(
diff --git a/be/src/storage/iterator/vcollect_iterator.h
b/be/src/storage/iterator/vcollect_iterator.h
index 634b36c979d..c297f9f3750 100644
--- a/be/src/storage/iterator/vcollect_iterator.h
+++ b/be/src/storage/iterator/vcollect_iterator.h
@@ -92,7 +92,8 @@ public:
inline bool use_topn_next() const { return _topn_limit > 0; }
private:
- // next for topn query
+ // Scanner-local TopN merge. SegmentIterator has already applied all
pushed filters and its
+ // per-segment row budget; this path only keeps the best local rows across
rowsets.
Status _topn_next(Block* block);
class BlockRowPosComparator {
@@ -356,17 +357,10 @@ private:
// for topn next
size_t _topn_limit = 0;
bool _topn_eof = false;
- // For chunked topN output when result exceeds byte budget.
- Block _topn_result_block;
- size_t _topn_result_offset = 0;
+ // for forwarding general LIMIT to SegmentIterator
+ size_t _general_read_limit = 0;
std::vector<RowSetSplits> _rs_splits;
- // General limit pushdown for DUP_KEYS and UNIQUE_KEYS with MOW (non-merge
path).
- // When > 0, VCollectIterator will stop reading after returning this many
rows.
- int64_t _general_read_limit = -1;
- // Number of rows already returned to the caller.
- int64_t _general_rows_returned = 0;
-
// Hold reader point to access read params, such as fetch conditions.
TabletReader* _reader = nullptr;
diff --git a/be/src/storage/iterators.h b/be/src/storage/iterators.h
index 1c9b5518743..24de1b89315 100644
--- a/be/src/storage/iterators.h
+++ b/be/src/storage/iterators.h
@@ -136,7 +136,12 @@ public:
// slots that cast may be eliminated in storage layer
std::map<std::string, DataTypePtr> target_cast_type_for_variants;
RowRanges row_ranges;
- size_t topn_limit = 0;
+
+ // Per-segment row budget pushed down from the scanner (topn or general
+ // limit). SegmentIterator applies it after predicate/common-expr
filtering;
+ // _can_opt_limit_reads() only decides whether the pre-filter read can also
+ // be capped. 0 disables the optimization.
+ size_t read_limit = 0;
std::map<ColumnId, VExprContextSPtr> virtual_column_exprs;
std::shared_ptr<segment_v2::AnnTopNRuntime> ann_topn_runtime;
diff --git a/be/src/storage/rowset/beta_rowset_reader.cpp
b/be/src/storage/rowset/beta_rowset_reader.cpp
index 61170b37226..e3987eefbc4 100644
--- a/be/src/storage/rowset/beta_rowset_reader.cpp
+++ b/be/src/storage/rowset/beta_rowset_reader.cpp
@@ -114,7 +114,7 @@ Status
BetaRowsetReader::get_segment_iterators(RowsetReaderContext* read_context
_read_options.rowset_id = _rowset->rowset_id();
_read_options.version = _rowset->version();
_read_options.tablet_id = _rowset->rowset_meta()->tablet_id();
- _read_options.topn_limit = _topn_limit;
+ _read_options.read_limit = _topn_limit;
if (_read_context->lower_bound_keys != nullptr) {
for (int i = 0; i < _read_context->lower_bound_keys->size(); ++i) {
_read_options.key_ranges.emplace_back(&_read_context->lower_bound_keys->at(i),
diff --git a/be/src/storage/rowset/rowset_reader_context.h
b/be/src/storage/rowset/rowset_reader_context.h
index 6332ab51fc2..f78774aef86 100644
--- a/be/src/storage/rowset/rowset_reader_context.h
+++ b/be/src/storage/rowset/rowset_reader_context.h
@@ -51,8 +51,6 @@ struct RowsetReaderContext {
std::vector<uint32_t>* read_orderby_key_columns = nullptr;
// limit of rows for read_orderby_key
size_t read_orderby_key_limit = 0;
- // filter_block arguments
- VExprContextSPtrs filter_block_conjuncts;
// projection columns: the set of columns rowset reader should return
const std::vector<uint32_t>* return_columns = nullptr;
TPushAggOp::type push_down_agg_type_opt = TPushAggOp::NONE;
@@ -110,8 +108,7 @@ struct RowsetReaderContext {
// When true, push down value predicates for MOR tables
bool enable_mor_value_predicate_pushdown = false;
- // General limit pushdown for DUP_KEYS and UNIQUE_KEYS with MOW.
- // Propagated from ReaderParams.general_read_limit.
+ // General LIMIT budget forwarded to SegmentIterator.
int64_t general_read_limit = -1;
};
diff --git a/be/src/storage/segment/segment_iterator.cpp
b/be/src/storage/segment/segment_iterator.cpp
index 47992c592da..df63306f92b 100644
--- a/be/src/storage/segment/segment_iterator.cpp
+++ b/be/src/storage/segment/segment_iterator.cpp
@@ -1463,12 +1463,6 @@ Status SegmentIterator::_apply_inverted_index() {
*/
bool
SegmentIterator::_check_all_conditions_passed_inverted_index_for_column(ColumnId
cid,
bool default_return) {
- // If common_expr_pushdown is disabled, we cannot guarantee that all
conditions are processed by the inverted index.
- // Consider a scenario where there is a column predicate and an expression
involving the same column in the SQL query,
- // such as 'a < 0' and 'abs(a) > 1'. This could potentially lead to errors.
- if (_opts.runtime_state &&
!_opts.runtime_state->query_options().enable_common_expr_pushdown) {
- return false;
- }
auto pred_it = _column_predicate_index_exec_status.find(cid);
if (pred_it != _column_predicate_index_exec_status.end()) {
const auto& pred_map = pred_it->second;
@@ -2489,6 +2483,53 @@ uint16_t
SegmentIterator::_evaluate_short_circuit_predicate(uint16_t* vec_sel_ro
return selected_size;
}
+static void shrink_materialized_block_columns(Block* block, size_t rows) {
+ for (auto& entry : *block) {
+ if (entry.column && entry.column->size() > rows) {
+ entry.column = entry.column->shrink(rows);
+ }
+ }
+}
+
+static void slice_materialized_block_columns(Block* block, size_t offset,
size_t rows,
+ size_t original_rows) {
+ for (auto& entry : *block) {
+ if (!entry.column || entry.column->size() == 0) {
+ continue;
+ }
+ DORIS_CHECK(entry.column->size() == original_rows);
+ entry.column = entry.column->cut(offset, rows);
+ }
+}
+
+Status SegmentIterator::_apply_read_limit_to_selected_rows(Block* block,
uint16_t& selected_size) {
+ if (_opts.read_limit == 0) {
+ return Status::OK();
+ }
+ DORIS_CHECK(_rows_returned <= _opts.read_limit);
+ size_t remaining = _opts.read_limit - _rows_returned;
+ if (remaining == 0) {
+ selected_size = 0;
+ shrink_materialized_block_columns(block, 0);
+ return Status::OK();
+ }
+ if (selected_size > remaining) {
+ if (_opts.read_orderby_key_reverse) {
+ const auto original_size = selected_size;
+ const auto offset = original_size - remaining;
+ for (size_t i = 0; i < remaining; ++i) {
+ _sel_rowid_idx[i] = _sel_rowid_idx[offset + i];
+ }
+ selected_size = cast_set<uint16_t>(remaining);
+ slice_materialized_block_columns(block, offset, remaining,
original_size);
+ return Status::OK();
+ }
+ selected_size = cast_set<uint16_t>(remaining);
+ shrink_materialized_block_columns(block, selected_size);
+ }
+ return Status::OK();
+}
+
Status SegmentIterator::_read_columns_by_rowids(std::vector<ColumnId>&
read_column_ids,
std::vector<rowid_t>&
rowid_vector,
uint16_t* sel_rowid_idx,
size_t select_size,
@@ -2683,17 +2724,28 @@ Status SegmentIterator::_next_batch_internal(Block*
block) {
SCOPED_RAW_TIMER(&_opts.stats->block_load_ns);
+ if (_opts.read_limit > 0 && _rows_returned >= _opts.read_limit) {
+ return _process_eof(block);
+ }
+
// If the row bitmap size is smaller than nrows_read_limit, there's no
need to reserve that many column rows.
uint32_t nrows_read_limit =
std::min(cast_set<uint32_t>(_row_bitmap.cardinality()),
_opts.block_row_max);
- if (_can_opt_topn_reads()) {
- nrows_read_limit = std::min(static_cast<uint32_t>(_opts.topn_limit),
nrows_read_limit);
+ if (_can_opt_limit_reads()) {
+ // No SegmentIterator-side conjunct remains to be evaluated, so LIMIT
is equivalent before
+ // and after filtering. Cap the first read directly; this is the
no-conjunct fast path that
+ // avoids reading rows past the pushed-down local LIMIT.
+ size_t cap = (_opts.read_limit > _rows_returned) ? (_opts.read_limit -
_rows_returned) : 0;
+ if (cap < nrows_read_limit) {
+ nrows_read_limit = static_cast<uint32_t>(cap);
+ }
}
DBUG_EXECUTE_IF("segment_iterator.topn_opt_1", {
if (nrows_read_limit != 1) {
return Status::Error<ErrorCode::INTERNAL_ERROR>(
- "topn opt 1 execute failed: nrows_read_limit={},
_opts.topn_limit={}",
- nrows_read_limit, _opts.topn_limit);
+ "topn opt 1 execute failed: nrows_read_limit={}, "
+ "_opts.read_limit={}",
+ nrows_read_limit, _opts.read_limit);
}
})
@@ -2769,6 +2821,8 @@ Status SegmentIterator::_next_batch_internal(Block*
block) {
RETURN_IF_ERROR(_process_common_expr(_sel_rowid_idx.data(),
_selected_size, block));
}
+ RETURN_IF_ERROR(_apply_read_limit_to_selected_rows(block,
_selected_size));
+
// step4: read non_predicate column
if (_selected_size > 0) {
if (!_non_predicate_columns.empty()) {
@@ -2807,6 +2861,9 @@ Status SegmentIterator::_next_batch_internal(Block*
block) {
RETURN_IF_ERROR(_materialization_of_virtual_column(block));
// shrink char_type suffix zero data
block->shrink_char_type_column_suffix_zero(_char_type_idx);
+ if (_opts.read_limit > 0) {
+ _rows_returned += block->rows();
+ }
return _check_output_block(block);
}
@@ -3240,8 +3297,15 @@ bool SegmentIterator::_has_delete_predicate(ColumnId
cid) {
return delete_columns_set.contains(cid);
}
-bool SegmentIterator::_can_opt_topn_reads() {
- if (_opts.topn_limit <= 0) {
+bool SegmentIterator::_can_opt_limit_reads() {
+ if (_opts.read_limit == 0) {
+ return false;
+ }
+
+ // If SegmentIterator still needs to evaluate predicates/common exprs,
LIMIT must be applied to
+ // post-filter rows by _apply_read_limit_to_selected_rows(); capping the
raw read here could
+ // return fewer rows than the query LIMIT.
+ if (_is_need_vec_eval || _is_need_short_eval || _is_need_expr_eval) {
return false;
}
diff --git a/be/src/storage/segment/segment_iterator.h
b/be/src/storage/segment/segment_iterator.h
index 3852cf8743d..4fff2de3318 100644
--- a/be/src/storage/segment/segment_iterator.h
+++ b/be/src/storage/segment/segment_iterator.h
@@ -225,6 +225,7 @@ private:
uint32_t nrows_read_limit);
uint16_t _evaluate_vectorization_predicate(uint16_t* sel_rowid_idx,
uint16_t selected_size);
uint16_t _evaluate_short_circuit_predicate(uint16_t* sel_rowid_idx,
uint16_t selected_size);
+ Status _apply_read_limit_to_selected_rows(Block* block, uint16_t&
selected_size);
void _collect_runtime_filter_predicate();
Status _output_non_pred_columns(Block* block);
[[nodiscard]] Status _read_columns_by_rowids(std::vector<ColumnId>&
read_column_ids,
@@ -320,7 +321,7 @@ private:
bool _has_delete_predicate(ColumnId cid);
- bool _can_opt_topn_reads();
+ bool _can_opt_limit_reads();
void _initialize_predicate_results();
bool _check_all_conditions_passed_inverted_index_for_column(ColumnId cid,
@@ -440,6 +441,10 @@ private:
uint16_t _selected_size;
std::vector<uint16_t> _sel_rowid_idx;
+ // Rows already produced by this iterator. Used together with
+ // _opts.read_limit to compute the remaining per-batch budget.
+ size_t _rows_returned = 0;
+
std::unique_ptr<ObjectPool> _pool;
// used to collect filter information.
diff --git a/be/src/storage/tablet/tablet_reader.cpp
b/be/src/storage/tablet/tablet_reader.cpp
index 79a9d05599a..16b836f4f95 100644
--- a/be/src/storage/tablet/tablet_reader.cpp
+++ b/be/src/storage/tablet/tablet_reader.cpp
@@ -156,7 +156,6 @@ Status TabletReader::_capture_rs_readers(const
ReaderParams& read_params) {
_reader_context.topn_filter_target_node_id =
read_params.topn_filter_target_node_id;
_reader_context.read_orderby_key_reverse =
read_params.read_orderby_key_reverse;
_reader_context.read_orderby_key_limit =
read_params.read_orderby_key_limit;
- _reader_context.filter_block_conjuncts =
read_params.filter_block_conjuncts;
_reader_context.return_columns = &_return_columns;
_reader_context.read_orderby_key_columns =
!_orderby_key_columns.empty() ? &_orderby_key_columns : nullptr;
diff --git a/be/src/storage/tablet/tablet_reader.h
b/be/src/storage/tablet/tablet_reader.h
index a26c8074036..d12fc6ccd6d 100644
--- a/be/src/storage/tablet/tablet_reader.h
+++ b/be/src/storage/tablet/tablet_reader.h
@@ -182,9 +182,6 @@ public:
size_t read_orderby_key_num_prefix_columns = 0;
// limit of rows for read_orderby_key
size_t read_orderby_key_limit = 0;
- // filter_block arguments
- VExprContextSPtrs filter_block_conjuncts;
-
// for vertical compaction
bool is_key_column_group = false;
std::vector<uint32_t> key_group_cluster_key_idxes;
@@ -214,9 +211,7 @@ public:
uint64_t condition_cache_digest = 0;
- // General limit pushdown for DUP_KEYS and UNIQUE_KEYS with MOW.
- // When > 0, the storage layer (VCollectIterator) will stop reading
- // after returning this many rows. -1 means no limit.
+ // General LIMIT budget forwarded to SegmentIterator. -1 means no
limit.
int64_t general_read_limit = -1;
};
diff --git a/be/test/storage/compaction/collection_statistics_test.cpp
b/be/test/storage/compaction/collection_statistics_test.cpp
index b21c2264bd9..b78355b316e 100644
--- a/be/test/storage/compaction/collection_statistics_test.cpp
+++ b/be/test/storage/compaction/collection_statistics_test.cpp
@@ -235,7 +235,7 @@ public:
RowsetReaderSharedPtr clone() override { return
std::make_shared<MockRowsetReader>(_rowset); }
- void set_topn_limit(size_t topn_limit) override {}
+ void set_topn_limit(size_t limit) override {}
private:
std::shared_ptr<MockRowset> _rowset;
diff --git a/be/test/storage/segment/segment_iterator_limit_opt_test.cpp
b/be/test/storage/segment/segment_iterator_limit_opt_test.cpp
new file mode 100644
index 00000000000..4d8d84e6ab8
--- /dev/null
+++ b/be/test/storage/segment/segment_iterator_limit_opt_test.cpp
@@ -0,0 +1,207 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Use #define private public to access private members for white-box testing
+// of SegmentIterator::_can_opt_limit_reads() and its dependent state. Mirrors
+// the convention in segment_iterator_apply_index_expr_test.cpp.
+#include "core/block/block.h"
+#include "core/column/column_vector.h"
+#include "core/data_type/data_type_number.h"
+#include "gtest/gtest.h"
+#include "storage/olap_common.h"
+#include "storage/predicate/block_column_predicate.h"
+#include "storage/predicate/null_predicate.h"
+#include "storage/tablet/tablet_schema.h"
+
+#if defined(__clang__)
+#pragma clang diagnostic push
+#pragma clang diagnostic ignored "-Wkeyword-macro"
+#endif
+#define private public
+#define protected public
+#include "storage/segment/segment_iterator.h"
+#undef private
+#undef protected
+#if defined(__clang__)
+#pragma clang diagnostic pop
+#endif
+
+namespace doris::segment_v2 {
+
+static MutableColumnPtr make_int_column(size_t rows) {
+ auto column = ColumnVector<TYPE_INT>::create();
+ column->reserve(rows);
+ for (size_t i = 0; i < rows; ++i) {
+ column->insert_value(1);
+ }
+ return column;
+}
+
+static MutableColumnPtr make_int_sequence_column(size_t rows) {
+ auto column = ColumnVector<TYPE_INT>::create();
+ column->reserve(rows);
+ for (size_t i = 0; i < rows; ++i) {
+ column->insert_value(cast_set<int32_t>(i));
+ }
+ return column;
+}
+
+class SegmentIteratorLimitOptTest : public ::testing::Test {
+protected:
+ void SetUp() override {
+ TabletSchemaPB schema_pb;
+ schema_pb.set_keys_type(KeysType::DUP_KEYS);
+ auto* col = schema_pb.add_column();
+ col->set_unique_id(0);
+ col->set_name("k1");
+ col->set_type("INT");
+ col->set_is_key(true);
+ col->set_is_nullable(true);
+
+ _tablet_schema = std::make_shared<TabletSchema>();
+ _tablet_schema->init_from_pb(schema_pb);
+ _read_schema = std::make_shared<Schema>(_tablet_schema);
+ }
+
+ // Build a SegmentIterator with minimal opts for _can_opt_limit_reads()
testing.
+ // The segment pointer is null — only _opts and internal maps are accessed.
+ std::unique_ptr<SegmentIterator> make_iter() {
+ auto iter = std::make_unique<SegmentIterator>(nullptr, _read_schema);
+ iter->_opts.tablet_schema = _tablet_schema;
+ iter->_opts.stats = &_stats;
+ // delete_condition_predicates is default-constructed (empty)
+ return iter;
+ }
+
+ std::shared_ptr<TabletSchema> _tablet_schema;
+ SchemaSPtr _read_schema;
+ OlapReaderStatistics _stats;
+};
+
+// No limit at all → optimization disabled.
+TEST_F(SegmentIteratorLimitOptTest, no_limit_returns_false) {
+ auto iter = make_iter();
+ iter->_opts.read_limit = 0;
+ EXPECT_FALSE(iter->_can_opt_limit_reads());
+}
+
+// topn_limit > 0, no predicates, no delete conditions → should return true.
+// All columns pass the index check vacuously because
+// _column_predicate_index_exec_status is empty and default_return=true.
+TEST_F(SegmentIteratorLimitOptTest, topn_limit_no_predicates) {
+ auto iter = make_iter();
+ iter->_opts.read_limit = 100;
+ EXPECT_TRUE(iter->_can_opt_limit_reads());
+}
+
+// If SegmentIterator still needs to evaluate a pushed conjunct, raw reads
cannot
+// be capped by LIMIT. The limit must be applied after filtering selected rows.
+TEST_F(SegmentIteratorLimitOptTest,
pushed_conjunct_requires_post_filter_limit) {
+ auto iter = make_iter();
+ iter->_opts.read_limit = 100;
+ iter->_is_need_expr_eval = true;
+ EXPECT_FALSE(iter->_can_opt_limit_reads());
+
+ iter = make_iter();
+ iter->_opts.read_limit = 100;
+ iter->_is_need_vec_eval = true;
+ EXPECT_FALSE(iter->_can_opt_limit_reads());
+
+ iter = make_iter();
+ iter->_opts.read_limit = 100;
+ iter->_is_need_short_eval = true;
+ EXPECT_FALSE(iter->_can_opt_limit_reads());
+}
+
+TEST_F(SegmentIteratorLimitOptTest, read_limit_shrinks_materialized_columns) {
+ auto iter = make_iter();
+ iter->_opts.read_limit = 20;
+ iter->_rows_returned = 0;
+
+ Block block;
+ block.insert({make_int_column(0), std::make_shared<DataTypeInt32>(),
"not_materialized"});
+ block.insert({make_int_column(100), std::make_shared<DataTypeInt32>(),
"delete_sign"});
+ uint16_t selected_size = 100;
+
+ ASSERT_TRUE(iter->_apply_read_limit_to_selected_rows(&block,
selected_size).ok());
+ EXPECT_EQ(20, selected_size);
+ EXPECT_EQ(0, block.get_by_position(0).column->size());
+ EXPECT_EQ(20, block.get_by_position(1).column->size());
+}
+
+TEST_F(SegmentIteratorLimitOptTest, reverse_read_limit_keeps_suffix_rows) {
+ auto iter = make_iter();
+ iter->_opts.read_limit = 20;
+ iter->_opts.read_orderby_key_reverse = true;
+ iter->_rows_returned = 0;
+ iter->_sel_rowid_idx.resize(100);
+ for (uint16_t i = 0; i < 100; ++i) {
+ iter->_sel_rowid_idx[i] = i;
+ }
+
+ Block block;
+ block.insert({make_int_sequence_column(100),
std::make_shared<DataTypeInt32>(), "k1"});
+ uint16_t selected_size = 100;
+
+ ASSERT_TRUE(iter->_apply_read_limit_to_selected_rows(&block,
selected_size).ok());
+ EXPECT_EQ(20, selected_size);
+ EXPECT_EQ(20, block.get_by_position(0).column->size());
+ EXPECT_EQ(80, assert_cast<const
ColumnVector<TYPE_INT>*>(block.get_by_position(0).column.get())
+ ->get_data()[0]);
+ EXPECT_EQ(99, assert_cast<const
ColumnVector<TYPE_INT>*>(block.get_by_position(0).column.get())
+ ->get_data()[19]);
+ EXPECT_EQ(80, iter->_sel_rowid_idx[0]);
+ EXPECT_EQ(99, iter->_sel_rowid_idx[19]);
+}
+
+// Has delete condition predicates → should return false even with limit set.
+TEST_F(SegmentIteratorLimitOptTest, delete_predicates_returns_false) {
+ auto iter = make_iter();
+ iter->_opts.read_limit = 100;
+
+ // Add a column predicate to delete_condition_predicates so
+ // num_of_column_predicate() > 0.
+ auto null_pred = NullPredicate::create_shared(0, "k1", true,
PrimitiveType::TYPE_INT);
+ auto single = SingleColumnBlockPredicate::create_unique(null_pred);
+
iter->_opts.delete_condition_predicates->add_column_predicate(std::move(single));
+
+ EXPECT_FALSE(iter->_can_opt_limit_reads());
+}
+
+// Column has a predicate that did NOT pass inverted index → should return
false.
+TEST_F(SegmentIteratorLimitOptTest, column_predicate_not_passed_index) {
+ auto iter = make_iter();
+ iter->_opts.read_limit = 100;
+
+ auto pred = NullPredicate::create_shared(0, "k1", true,
PrimitiveType::TYPE_INT);
+ iter->_column_predicate_index_exec_status[0][pred] = false;
+
+ EXPECT_FALSE(iter->_can_opt_limit_reads());
+}
+
+// Column has a predicate that passed inverted index → should return true.
+TEST_F(SegmentIteratorLimitOptTest, column_predicate_passed_index) {
+ auto iter = make_iter();
+ iter->_opts.read_limit = 100;
+
+ auto pred = NullPredicate::create_shared(0, "k1", true,
PrimitiveType::TYPE_INT);
+ iter->_column_predicate_index_exec_status[0][pred] = true;
+
+ EXPECT_TRUE(iter->_can_opt_limit_reads());
+}
+
+} // namespace doris::segment_v2
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 72670ec6f02..3884d42b80c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -445,6 +445,8 @@ public class SessionVariable implements Serializable,
Writable {
public static final String ENABLE_COMMON_EXPR_PUSHDOWN =
"enable_common_expr_pushdown";
+ public static final String ENABLE_SEGMENT_LIMIT_PUSHDOWN =
"enable_segment_limit_pushdown";
+
public static final String FRAGMENT_TRANSMISSION_COMPRESSION_CODEC =
"fragment_transmission_compression_codec";
public static final String ENABLE_LOCAL_EXCHANGE = "enable_local_exchange";
@@ -2098,9 +2100,15 @@ public class SessionVariable implements Serializable,
Writable {
@VarAttrDef.VarAttr(name = FORBID_UNKNOWN_COLUMN_STATS)
public boolean forbidUnknownColStats = false;
+ // Legacy session variable. BE treats common expr pushdown as enabled in
this branch.
@VarAttrDef.VarAttr(name = ENABLE_COMMON_EXPR_PUSHDOWN, fuzzy = true)
public boolean enableCommonExprPushdown = true;
+ @VarAttrDef.VarAttr(name = ENABLE_SEGMENT_LIMIT_PUSHDOWN, fuzzy = true,
needForward = true,
+ description = {"是否启用 SegmentIterator 层 LIMIT 下推。",
+ "Set whether to push down LIMIT into SegmentIterator."})
+ public boolean enableSegmentLimitPushdown = true;
+
@VarAttrDef.VarAttr(name = ENABLE_LOCAL_EXCHANGE, fuzzy = false, flag =
VarAttrDef.INVISIBLE,
varType = VariableAnnotation.DEPRECATED)
public boolean enableLocalExchange = true;
@@ -3706,6 +3714,9 @@ public class SessionVariable implements Serializable,
Writable {
this.parallelPipelineTaskNum = random.nextInt(8);
this.parallelPrepareThreshold = random.nextInt(32) + 1;
this.enableCommonExprPushdown = random.nextBoolean();
+ // enable fuzzy after we clean all case of
+ //
enable_common_expr_pushdown/enable_common_exp_pushdown_for_inverted_index
+ // this.enableSegmentLimitPushdown = random.nextBoolean();
this.enableLocalExchange = random.nextBoolean();
this.enableSharedExchangeSinkBuffer = random.nextBoolean();
this.useSerialExchange = random.nextBoolean();
@@ -5487,6 +5498,7 @@ public class SessionVariable implements Serializable,
Writable {
tResult.setEnableFunctionPushdown(enableFunctionPushdown);
tResult.setEnableCommonExprPushdown(enableCommonExprPushdown);
+ tResult.setEnableSegmentLimitPushdown(enableSegmentLimitPushdown);
tResult.setCheckOverflowForDecimal(checkOverflowForDecimal);
tResult.setFragmentTransmissionCompressionCodec(fragmentTransmissionCompressionCodec.trim().toLowerCase());
tResult.setEnableLocalExchange(enableLocalExchange);
diff --git a/gensrc/thrift/PaloInternalService.thrift
b/gensrc/thrift/PaloInternalService.thrift
index eca17033647..b8d2c2655cf 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -299,7 +299,7 @@ struct TQueryOptions {
// max rows of each sub-queue in DataQueue.
106: optional i64 data_queue_max_blocks = 0;
- // expr pushdown for index filter rows
+ // deprecated
107: optional bool enable_common_expr_pushdown_for_inverted_index = false;
108: optional i64 local_exchange_free_blocks_limit;
@@ -489,6 +489,9 @@ struct TQueryOptions {
// Default 8MB. Sent by FE session variable preferred_block_size_bytes.
218: optional i64 preferred_block_size_bytes = 8388608
+ // Push LIMIT into SegmentIterator when safe.
+ 219: optional bool enable_segment_limit_pushdown = true
+
// For cloud, to control if the content would be written into file cache
// In write path, to control if the content would be written into file cache.
// In read path, read from file cache or remote storage when execute query.
diff --git
a/regression-test/data/query_p0/limit/test_unified_limit_pushdown.out
b/regression-test/data/query_p0/limit/test_unified_limit_pushdown.out
new file mode 100644
index 00000000000..386e4b44a93
--- /dev/null
+++ b/regression-test/data/query_p0/limit/test_unified_limit_pushdown.out
@@ -0,0 +1,43 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !topn_asc --
+1 999
+10 990
+2 998
+3 997
+4 996
+5 995
+6 994
+7 993
+8 992
+9 991
+
+-- !topn_desc_filter --
+186 814
+187 813
+188 812
+189 811
+190 810
+191 809
+192 808
+193 807
+194 806
+195 805
+196 804
+197 803
+198 802
+199 801
+200 800
+
+-- !topn_multi_bucket --
+1 9999
+10 9990
+11 9989
+12 9988
+2 9998
+3 9997
+4 9996
+5 9995
+6 9994
+7 9993
+8 9992
+9 9991
diff --git a/regression-test/data/search/test_search_slash_in_term.out
b/regression-test/data/search/test_search_slash_in_term.out
index 3b0d6acd7bf..58a4cce9add 100644
--- a/regression-test/data/search/test_search_slash_in_term.out
+++ b/regression-test/data/search/test_search_slash_in_term.out
@@ -1,4 +1,9 @@
-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !segment_limit_disabled_still_pushes_search --
+1
+2
+3
+
-- !slash_in_term --
1 AC/DC is a rock band
2 AC power supply
diff --git
a/regression-test/data/variant_p0/predefine/test_predefine_type_index.out
b/regression-test/data/variant_p0/predefine/test_predefine_type_index.out
index 2e0e9dce3ef..45ccfb1430e 100644
--- a/regression-test/data/variant_p0/predefine/test_predefine_type_index.out
+++ b/regression-test/data/variant_p0/predefine/test_predefine_type_index.out
@@ -13,6 +13,14 @@
4 {"path":{"decimal":100.100123456789,"int":100,"string":"world"}}
5 {"path":{"decimal":111.111111111111,"int":111,"string":"hello"}}
+-- !count_on_index_with_segment_limit_disabled --
+1
+
+-- !match_with_segment_limit_disabled --
+1
+3
+5
+
-- !sql --
1
diff --git
a/regression-test/suites/query_p0/limit/test_general_limit_pushdown.groovy
b/regression-test/suites/query_p0/limit/test_general_limit_pushdown.groovy
index 332d9870bce..eb2c37d5337 100644
--- a/regression-test/suites/query_p0/limit/test_general_limit_pushdown.groovy
+++ b/regression-test/suites/query_p0/limit/test_general_limit_pushdown.groovy
@@ -16,8 +16,8 @@
// under the License.
// Test general limit pushdown to storage layer for DUP_KEYS and UNIQUE_KEYS
(MOW).
-// This exercises the non-topn limit path where VCollectIterator enforces
-// general_read_limit with filter_block_conjuncts applied before counting.
+// This exercises the non-topn limit path where SegmentIterator applies
+// general_read_limit after pushed filters and before lazy materialization.
suite("test_general_limit_pushdown") {
@@ -49,8 +49,8 @@ suite("test_general_limit_pushdown") {
SELECT k1, k2 FROM dup_limit_pushdown LIMIT 10
"""
- // LIMIT with WHERE clause — filter_block_conjuncts must be applied before
- // limit counting, otherwise we may get fewer rows than requested.
+ // LIMIT with WHERE clause -- pushed filters must be applied before limit
+ // counting, otherwise we may get fewer rows than requested.
// k1 > 10 matches 40 rows, LIMIT 15 should return exactly 15.
order_qt_dup_filter_limit """
SELECT k1, k2 FROM dup_limit_pushdown WHERE k1 > 10 LIMIT 15
@@ -63,7 +63,7 @@ suite("test_general_limit_pushdown") {
"""
// LIMIT with complex predicate (function-based, may not push into storage
predicates).
- // This exercises the filter_block_conjuncts path for predicates that
remain as conjuncts.
+ // This exercises the SegmentIterator common-expr path for predicates that
remain as conjuncts.
order_qt_dup_complex_filter_limit """
SELECT k1, k2 FROM dup_limit_pushdown WHERE abs(k1 - 25) < 10 LIMIT 8
"""
@@ -192,7 +192,7 @@ suite("test_general_limit_pushdown") {
// ---- MOW with DELETEs ----
// Verify __DORIS_DELETE_SIGN__ predicate (in _conjuncts) is correctly
- // handled after being moved to filter_block_conjuncts.
+ // handled after being pushed into SegmentIterator.
sql "DROP TABLE IF EXISTS mow_delete_limit_pushdown"
sql """
CREATE TABLE mow_delete_limit_pushdown (
diff --git
a/regression-test/suites/query_p0/limit/test_unified_limit_pushdown.groovy
b/regression-test/suites/query_p0/limit/test_unified_limit_pushdown.groovy
new file mode 100644
index 00000000000..69fedf42b0f
--- /dev/null
+++ b/regression-test/suites/query_p0/limit/test_unified_limit_pushdown.groovy
@@ -0,0 +1,233 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Test unified limit pushdown to SegmentIterator.
+// This exercises the code path where _can_opt_limit_reads() returns true
+// because all column predicates are evaluated by inverted index, allowing
+// the segment iterator to tighten nrows_read_limit and avoid reading
+// non-predicate columns past the pushed-down local read limit.
+
+suite("test_unified_limit_pushdown") {
+
+ // ---- DUP_KEYS table with inverted indexes ----
+ // Inverted indexes on all queried columns allow _can_opt_limit_reads()
+ // to return true, enabling the SegmentIterator limit optimization.
+ sql "DROP TABLE IF EXISTS dup_inv_limit"
+ sql """
+ CREATE TABLE dup_inv_limit (
+ k1 INT NOT NULL,
+ k2 INT NOT NULL,
+ v1 VARCHAR(100) NULL,
+ INDEX idx_k1 (k1) USING INVERTED,
+ INDEX idx_k2 (k2) USING INVERTED,
+ INDEX idx_v1 (v1) USING INVERTED
+ )
+ DUPLICATE KEY(k1, k2)
+ DISTRIBUTED BY HASH(k1) BUCKETS 1
+ PROPERTIES ("replication_num" = "1");
+ """
+
+ // Insert 200 rows for sufficient data volume
+ StringBuilder sb = new StringBuilder()
+ sb.append("INSERT INTO dup_inv_limit VALUES ")
+ for (int i = 1; i <= 200; i++) {
+ if (i > 1) sb.append(",")
+ sb.append("(${i}, ${1000 - i}, 'val_${i}')")
+ }
+ sql sb.toString()
+
+ // Basic LIMIT — general_read_limit path with inverted index optimization.
+ def count1 = sql """
+ SELECT COUNT(*) FROM (
+ SELECT k1, k2 FROM dup_inv_limit LIMIT 10
+ ) t
+ """
+ assert count1[0][0] == 10 : "DUP inverted: basic LIMIT 10, got
${count1[0][0]}"
+
+ // LIMIT with WHERE predicate covered by inverted index.
+ // k1 > 100 matches 100 rows, LIMIT 15 should return exactly 15.
+ def count2 = sql """
+ SELECT COUNT(*) FROM (
+ SELECT k1, k2 FROM dup_inv_limit WHERE k1 > 100 LIMIT 15
+ ) t
+ """
+ assert count2[0][0] == 15 : "DUP inverted: WHERE k1>100 LIMIT 15, got
${count2[0][0]}"
+
+ // LIMIT with multiple predicates on indexed columns.
+ // k1 > 50 AND k2 < 980 → k1 in [51..200] AND k2 < 980 → k2=1000-k1 < 980
→ k1 > 20
+ // intersection: k1 in [51..200], all 150 rows match. LIMIT 30.
+ def count3 = sql """
+ SELECT COUNT(*) FROM (
+ SELECT k1, k2, v1 FROM dup_inv_limit WHERE k1 > 50 AND k2 < 980
LIMIT 30
+ ) t
+ """
+ assert count3[0][0] == 30 : "DUP inverted: multi-pred LIMIT 30, got
${count3[0][0]}"
+
+ // LIMIT larger than matching rows — should return all matching rows.
+ // k1 > 195 matches 5 rows, LIMIT 20 should return all 5.
+ def count4 = sql """
+ SELECT COUNT(*) FROM (
+ SELECT k1, k2 FROM dup_inv_limit WHERE k1 > 195 LIMIT 20
+ ) t
+ """
+ assert count4[0][0] == 5 : "DUP inverted: LIMIT > matching, got
${count4[0][0]}"
+
+ // ---- MOW UNIQUE_KEYS with inverted indexes ----
+ sql "DROP TABLE IF EXISTS mow_inv_limit"
+ sql """
+ CREATE TABLE mow_inv_limit (
+ k1 INT NOT NULL,
+ k2 INT NOT NULL,
+ v1 VARCHAR(100) NULL,
+ INDEX idx_k1 (k1) USING INVERTED,
+ INDEX idx_k2 (k2) USING INVERTED
+ )
+ UNIQUE KEY(k1, k2)
+ DISTRIBUTED BY HASH(k1) BUCKETS 1
+ PROPERTIES (
+ "replication_num" = "1",
+ "enable_unique_key_merge_on_write" = "true"
+ );
+ """
+ sb = new StringBuilder()
+ sb.append("INSERT INTO mow_inv_limit VALUES ")
+ for (int i = 1; i <= 200; i++) {
+ if (i > 1) sb.append(",")
+ sb.append("(${i}, ${1000 - i}, 'val_${i}')")
+ }
+ sql sb.toString()
+
+ def count5 = sql """
+ SELECT COUNT(*) FROM (
+ SELECT k1, k2 FROM mow_inv_limit WHERE k1 > 100 LIMIT 20
+ ) t
+ """
+ assert count5[0][0] == 20 : "MOW inverted: WHERE k1>100 LIMIT 20, got
${count5[0][0]}"
+
+ // MOW with DELETEs + inverted index
+ sql "DELETE FROM mow_inv_limit WHERE k1 <= 50"
+ def count6 = sql """
+ SELECT COUNT(*) FROM (
+ SELECT k1, k2 FROM mow_inv_limit LIMIT 25
+ ) t
+ """
+ assert count6[0][0] == 25 : "MOW inverted after delete: LIMIT 25, got
${count6[0][0]}"
+
+ def count7 = sql """
+ SELECT COUNT(*) FROM (
+ SELECT k1, k2 FROM mow_inv_limit WHERE k1 > 150 LIMIT 10
+ ) t
+ """
+ assert count7[0][0] == 10 : "MOW inverted delete+filter: LIMIT 10, got
${count7[0][0]}"
+
+ // ---- Multi-bucket table with inverted indexes ----
+ // Tests scanner-level shared limit coordination across multiple scanners.
+ sql "DROP TABLE IF EXISTS dup_multi_inv_limit"
+ sql """
+ CREATE TABLE dup_multi_inv_limit (
+ k1 INT NOT NULL,
+ k2 INT NOT NULL,
+ v1 VARCHAR(100) NULL,
+ INDEX idx_k1 (k1) USING INVERTED,
+ INDEX idx_k2 (k2) USING INVERTED
+ )
+ DUPLICATE KEY(k1, k2)
+ DISTRIBUTED BY HASH(k1) BUCKETS 8
+ PROPERTIES ("replication_num" = "1");
+ """
+ sb = new StringBuilder()
+ sb.append("INSERT INTO dup_multi_inv_limit VALUES ")
+ for (int i = 1; i <= 500; i++) {
+ if (i > 1) sb.append(",")
+ sb.append("(${i}, ${10000 - i}, 'val_${i}')")
+ }
+ sql sb.toString()
+
+ // With 8 buckets, up to 8 scanners may run. The scanner-level shared
limit coordinates
+ // to return exactly LIMIT rows.
+ def count8 = sql """
+ SELECT COUNT(*) FROM (
+ SELECT k1, k2 FROM dup_multi_inv_limit LIMIT 20
+ ) t
+ """
+ assert count8[0][0] == 20 : "Multi-bucket inverted: LIMIT 20, got
${count8[0][0]}"
+
+ def count9 = sql """
+ SELECT COUNT(*) FROM (
+ SELECT k1, k2 FROM dup_multi_inv_limit WHERE k1 > 200 LIMIT 50
+ ) t
+ """
+ assert count9[0][0] == 50 : "Multi-bucket inverted filter: LIMIT 50, got
${count9[0][0]}"
+
+ // Small limit with many buckets — tests that each scanner correctly
+ // respects the shared limit even when some scanners have no data.
+ def count10 = sql """
+ SELECT COUNT(*) FROM (
+ SELECT k1, k2 FROM dup_multi_inv_limit LIMIT 3
+ ) t
+ """
+ assert count10[0][0] == 3 : "Multi-bucket inverted: LIMIT 3, got
${count10[0][0]}"
+
+ // ---- ORDER BY key LIMIT with inverted index (topn path) ----
+ // Tests that the topn_limit path also benefits from inverted index
+ // optimization in SegmentIterator. Use order_qt_* to validate the
+ // actual ordered rows, not just cardinality, so the optimization
+ // cannot silently return a wrong subset of the right size.
+ order_qt_topn_asc """
+ SELECT k1, k2 FROM dup_inv_limit ORDER BY k1 LIMIT 10
+ """
+
+ order_qt_topn_desc_filter """
+ SELECT k1, k2 FROM dup_inv_limit WHERE k1 > 100 ORDER BY k1 DESC LIMIT
15
+ """
+
+ // Also assert ORDER BY content for the multi-bucket case so that the
+ // multi-bucket path is row-validated, not just count-validated.
+ order_qt_topn_multi_bucket """
+ SELECT k1, k2 FROM dup_multi_inv_limit ORDER BY k1 LIMIT 12
+ """
+
+ def count11 = sql """
+ SELECT COUNT(*) FROM (
+ SELECT k1, k2 FROM dup_inv_limit ORDER BY k1 LIMIT 10
+ ) t
+ """
+ assert count11[0][0] == 10 : "DUP inverted ORDER BY LIMIT: got
${count11[0][0]}"
+
+ def count12 = sql """
+ SELECT COUNT(*) FROM (
+ SELECT k1, k2 FROM dup_inv_limit WHERE k1 > 100 ORDER BY k1 DESC
LIMIT 15
+ ) t
+ """
+ assert count12[0][0] == 15 : "DUP inverted ORDER BY DESC with filter: got
${count12[0][0]}"
+
+ // ---- LIMIT + OFFSET with inverted index ----
+ def count13 = sql """
+ SELECT COUNT(*) FROM (
+ SELECT k1, k2 FROM dup_inv_limit LIMIT 10 OFFSET 5
+ ) t
+ """
+ assert count13[0][0] == 10 : "DUP inverted LIMIT+OFFSET: got
${count13[0][0]}"
+
+ // OFFSET near end: k1 > 195 matches 5 rows, OFFSET 2 LIMIT 10 should
return 3
+ def count14 = sql """
+ SELECT COUNT(*) FROM (
+ SELECT k1, k2 FROM dup_inv_limit WHERE k1 > 195 LIMIT 10 OFFSET 2
+ ) t
+ """
+ assert count14[0][0] == 3 : "DUP inverted LIMIT+OFFSET near end: got
${count14[0][0]}"
+}
diff --git a/regression-test/suites/search/test_search_slash_in_term.groovy
b/regression-test/suites/search/test_search_slash_in_term.groovy
index 0749929f252..fa625c8171f 100644
--- a/regression-test/suites/search/test_search_slash_in_term.groovy
+++ b/regression-test/suites/search/test_search_slash_in_term.groovy
@@ -61,6 +61,13 @@ suite("test_search_slash_in_term", "p0") {
// Wait for index building
Thread.sleep(3000)
+ order_qt_segment_limit_disabled_still_pushes_search """
+ SELECT /*+SET_VAR(enable_segment_limit_pushdown=false) */ id
+ FROM ${tableName}
+ WHERE search('title:AC/DC')
+ ORDER BY id
+ """
+
// ============ Test 1: Slash in term with field prefix ============
// title:AC/DC should parse as single term, standard analyzer tokenizes to
"ac" and "dc"
// With default OR operator, matches rows containing "ac" or "dc" in title
diff --git
a/regression-test/suites/variant_p0/predefine/test_predefine_type_index.groovy
b/regression-test/suites/variant_p0/predefine/test_predefine_type_index.groovy
index 6fb0872b497..11b388fd4b1 100644
---
a/regression-test/suites/variant_p0/predefine/test_predefine_type_index.groovy
+++
b/regression-test/suites/variant_p0/predefine/test_predefine_type_index.groovy
@@ -49,6 +49,18 @@ suite("test_variant_predefine_index_type", "p0"){
sql """ set inverted_index_skip_threshold = 0 """
sql """ set enable_common_expr_pushdown = true """
sql """ set enable_match_without_inverted_index = false """
+
+ sql """ set enable_segment_limit_pushdown = false """
+ qt_count_on_index_with_segment_limit_disabled """
+ select count() from ${tableName} where cast(var['path']['int'] as int)
= 789
+ """
+ sql """ set enable_count_on_index_pushdown = false """
+ qt_match_with_segment_limit_disabled """
+ select id from ${tableName} where var['path']['string'] match 'hello'
order by id
+ """
+ sql """ set enable_count_on_index_pushdown = true """
+ sql """ set enable_segment_limit_pushdown = true """
+
qt_sql """ select count() from ${tableName} where cast(var['path']['int']
as int) = 789 """
qt_sql """ select count() from ${tableName} where
cast(var['path']['decimal'] as DECIMAL(15, 12)) = 789.789123456789 """
qt_sql """ select count() from ${tableName} where var['path']['string']
match 'hello' """
@@ -111,4 +123,4 @@ suite("test_variant_predefine_index_type", "p0"){
qt_sql "select count() from objects where
(array_contains(cast(overflow_properties['tags'] as array<string>),
'plastic'))"
qt_sql "select cast(overflow_properties['color'] as string) from objects
where overflow_properties['color'] IS NOT NULL and id = 6 limit 1"
qt_sql "select overflow_properties['color'] from objects where
overflow_properties['color'] IS NOT NULL and id = 6 limit 1"
-}
\ No newline at end of file
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]