This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch feature/mor_value_predicate_pushdown_control in repository https://gitbox.apache.org/repos/asf/doris.git
commit 4a0c30ea7f0941cf7757ea7d422f51d9135ff166 Author: Yongqiang YANG <[email protected]> AuthorDate: Wed Feb 4 17:18:39 2026 -0800 [feature](scan) Implement BE-side value predicate pushdown for MOR tables Add session variable enable_mor_value_predicate_pushdown_tables to control value predicate pushdown for MOR (Merge-On-Read) tables. When enabled, value column predicates are pushed to the storage layer for filtering. Key changes: - Add enable_mor_value_predicate_pushdown flag to ReaderParams and RowsetReaderContext - Extend BetaRowsetReader::_should_push_down_value_predicates() to check the flag - Drop __DORIS_DELETE_SIGN__ predicate from storage-layer pushdown to prevent deleted rows from reappearing (delete sign must only be evaluated post-merge) - Add regression test covering dedup, delete sign, delete predicate, and update scenarios --- be/src/olap/rowset/beta_rowset_reader.cpp | 3 +- be/src/olap/rowset/rowset_reader_context.h | 3 + be/src/olap/tablet_reader.cpp | 10 + be/src/olap/tablet_reader.h | 3 + be/src/pipeline/exec/olap_scan_operator.cpp | 6 - be/src/pipeline/exec/olap_scan_operator.h | 2 - be/src/pipeline/exec/scan_operator.cpp | 3 +- be/src/pipeline/exec/scan_operator.h | 1 - be/src/vec/exec/scan/olap_scanner.cpp | 7 + .../unique/test_mor_value_predicate_pushdown.out | 52 ++++- .../test_mor_value_predicate_pushdown.groovy | 209 +++++++++++++++++---- 11 files changed, 245 insertions(+), 54 deletions(-) diff --git a/be/src/olap/rowset/beta_rowset_reader.cpp b/be/src/olap/rowset/beta_rowset_reader.cpp index b15e8a8da94..8f1e86576cf 100644 --- a/be/src/olap/rowset/beta_rowset_reader.cpp +++ b/be/src/olap/rowset/beta_rowset_reader.cpp @@ -361,7 +361,8 @@ bool BetaRowsetReader::_should_push_down_value_predicates() const { (((_rowset->start_version() == 0 || _rowset->start_version() == 2) && !_rowset->_rowset_meta->is_segments_overlapping() && _read_context->sequence_id_idx == -1) || - _read_context->enable_unique_key_merge_on_write); + _read_context->enable_unique_key_merge_on_write || + _read_context->enable_mor_value_predicate_pushdown); } #include "common/compile_check_end.h" } // namespace doris diff --git a/be/src/olap/rowset/rowset_reader_context.h b/be/src/olap/rowset/rowset_reader_context.h index 846c721ca34..dfa633c78db 100644 --- a/be/src/olap/rowset/rowset_reader_context.h +++ b/be/src/olap/rowset/rowset_reader_context.h @@ -100,6 +100,9 @@ struct RowsetReaderContext { std::shared_ptr<segment_v2::AnnTopNRuntime> ann_topn_runtime; uint64_t condition_cache_digest = 0; + + // When true, push down value predicates for MOR tables + bool enable_mor_value_predicate_pushdown = false; }; } // namespace doris diff --git a/be/src/olap/tablet_reader.cpp b/be/src/olap/tablet_reader.cpp index 6c2488e9cfa..551b6727920 100644 --- a/be/src/olap/tablet_reader.cpp +++ b/be/src/olap/tablet_reader.cpp @@ -194,6 +194,8 @@ Status TabletReader::_capture_rs_readers(const ReaderParams& read_params) { _reader_context.merged_rows = &_merged_rows; _reader_context.delete_bitmap = read_params.delete_bitmap; _reader_context.enable_unique_key_merge_on_write = tablet()->enable_unique_key_merge_on_write(); + _reader_context.enable_mor_value_predicate_pushdown = + read_params.enable_mor_value_predicate_pushdown; _reader_context.record_rowids = read_params.record_rowids; _reader_context.rowid_conversion = read_params.rowid_conversion; _reader_context.is_key_column_group = read_params.is_key_column_group; @@ -501,9 +503,17 @@ Status TabletReader::_init_conditions_param(const ReaderParams& read_params) { } } + int32_t delete_sign_idx = _tablet_schema->delete_sign_idx(); for (auto predicate : predicates) { auto column = _tablet_schema->column(predicate->column_id()); if (column.aggregation() != FieldAggregationMethod::OLAP_FIELD_AGGREGATION_NONE) { + // When MOR value predicate pushdown is enabled, drop __DORIS_DELETE_SIGN__ + // from storage-layer predicates entirely. Delete sign must only be evaluated + // post-merge via VExpr to prevent deleted rows from reappearing. + if (read_params.enable_mor_value_predicate_pushdown && delete_sign_idx >= 0 && + predicate->column_id() == static_cast<uint32_t>(delete_sign_idx)) { + continue; + } _value_col_predicates.push_back(predicate); } else { _col_predicates.push_back(predicate); diff --git a/be/src/olap/tablet_reader.h b/be/src/olap/tablet_reader.h index caa6ef29064..bed9e20c864 100644 --- a/be/src/olap/tablet_reader.h +++ b/be/src/olap/tablet_reader.h @@ -189,6 +189,9 @@ public: bool is_segcompaction = false; + // Enable value predicate pushdown for MOR tables + bool enable_mor_value_predicate_pushdown = false; + std::vector<RowwiseIteratorUPtr>* segment_iters_ptr = nullptr; void check_validation() const; diff --git a/be/src/pipeline/exec/olap_scan_operator.cpp b/be/src/pipeline/exec/olap_scan_operator.cpp index 2f1e6d8a85a..07c0522a4b5 100644 --- a/be/src/pipeline/exec/olap_scan_operator.cpp +++ b/be/src/pipeline/exec/olap_scan_operator.cpp @@ -459,12 +459,6 @@ bool OlapScanLocalState::_storage_no_merge() { p._olap_scan_node.enable_unique_key_merge_on_write)); } -bool OlapScanLocalState::_should_push_down_mor_value_predicate() { - auto& p = _parent->cast<OlapScanOperatorX>(); - return p._olap_scan_node.__isset.enable_mor_value_predicate_pushdown && - p._olap_scan_node.enable_mor_value_predicate_pushdown; -} - Status OlapScanLocalState::_init_scanners(std::list<vectorized::ScannerSPtr>* scanners) { if (_scan_ranges.empty()) { _eos = true; diff --git a/be/src/pipeline/exec/olap_scan_operator.h b/be/src/pipeline/exec/olap_scan_operator.h index 24d1df08a46..0577dd1ff2d 100644 --- a/be/src/pipeline/exec/olap_scan_operator.h +++ b/be/src/pipeline/exec/olap_scan_operator.h @@ -105,8 +105,6 @@ private: bool _storage_no_merge() override; - bool _should_push_down_mor_value_predicate() override; - bool _push_down_topn(const vectorized::RuntimePredicate& predicate) override { if (!predicate.target_is_slot(_parent->node_id())) { return false; diff --git a/be/src/pipeline/exec/scan_operator.cpp b/be/src/pipeline/exec/scan_operator.cpp index 524c6ae7354..6e29e5b0437 100644 --- a/be/src/pipeline/exec/scan_operator.cpp +++ b/be/src/pipeline/exec/scan_operator.cpp @@ -446,8 +446,7 @@ Status ScanLocalState<Derived>::_normalize_predicate(vectorized::VExprContext* c return Status::OK(); } - if (pdt == PushDownType::ACCEPTABLE && - (_is_key_column(slot->col_name()) || _should_push_down_mor_value_predicate())) { + if (pdt == PushDownType::ACCEPTABLE && _is_key_column(slot->col_name())) { output_expr = nullptr; return Status::OK(); } else { diff --git a/be/src/pipeline/exec/scan_operator.h b/be/src/pipeline/exec/scan_operator.h index d564149b312..8e6fcf98a3a 100644 --- a/be/src/pipeline/exec/scan_operator.h +++ b/be/src/pipeline/exec/scan_operator.h @@ -205,7 +205,6 @@ protected: virtual bool _storage_no_merge() { return false; } virtual bool _push_down_topn(const vectorized::RuntimePredicate& predicate) { return false; } virtual bool _is_key_column(const std::string& col_name) { return false; } - virtual bool _should_push_down_mor_value_predicate() { return false; } virtual PushDownType _should_push_down_bloom_filter() const { return PushDownType::UNACCEPTABLE; } diff --git a/be/src/vec/exec/scan/olap_scanner.cpp b/be/src/vec/exec/scan/olap_scanner.cpp index 2ec44a55645..7b577bdb629 100644 --- a/be/src/vec/exec/scan/olap_scanner.cpp +++ b/be/src/vec/exec/scan/olap_scanner.cpp @@ -478,6 +478,13 @@ Status OlapScanner::_init_tablet_reader_params( if (!_state->skip_storage_engine_merge()) { auto* olap_scan_local_state = (pipeline::OlapScanLocalState*)_local_state; TOlapScanNode& olap_scan_node = olap_scan_local_state->olap_scan_node(); + + // Set MOR value predicate pushdown flag + if (olap_scan_node.__isset.enable_mor_value_predicate_pushdown && + olap_scan_node.enable_mor_value_predicate_pushdown) { + _tablet_reader_params.enable_mor_value_predicate_pushdown = true; + } + // order by table keys optimization for topn // will only read head/tail of data file since it's already sorted by keys if (olap_scan_node.__isset.sort_info && !olap_scan_node.sort_info.is_asc_order.empty()) { diff --git a/regression-test/data/data_model_p0/unique/test_mor_value_predicate_pushdown.out b/regression-test/data/data_model_p0/unique/test_mor_value_predicate_pushdown.out index 42b144dc5e5..3ab8254a635 100644 --- a/regression-test/data/data_model_p0/unique/test_mor_value_predicate_pushdown.out +++ b/regression-test/data/data_model_p0/unique/test_mor_value_predicate_pushdown.out @@ -1,29 +1,69 @@ -- This file is automatically generated. You should know what you did if you want to edit this -- !select_disabled -- +2 200 world 3 300 test -- !select_enabled_tablename -- +2 200 world 3 300 test -- !select_enabled_fullname -- +2 200 world 3 300 test -- !select_enabled_wildcard -- +2 200 world 3 300 test --- !select_deleted_row -- +-- !select_eq_predicate -- +2 200 world -- !select_not_in_list -- +2 200 world 3 300 test --- !select_latest_version -- -1 200 second +-- !select_dedup_all -- +1 100 first 2 300 third +3 500 fifth --- !select_old_version -- +-- !select_dedup_eq -- +2 300 third + +-- !select_dedup_none -- + +-- !select_delete_range -- +3 300 test + +-- !select_delete_eq -- + +-- !select_delete_all -- +1 100 hello +3 300 test + +-- !select_delpred_range -- +3 300 test + +-- !select_delpred_eq -- + +-- !select_delpred_all -- +1 100 hello +3 300 test + +-- !select_update_disabled_old -- + +-- !select_update_disabled_new -- +1 500 new + +-- !select_update_enabled_old -- +1 100 old + +-- !select_update_enabled_new -- +1 500 new --- !select_new_version -- -1 200 second +-- !select_update_enabled_range -- +1 500 new +3 300 keep -- !select_multiple_tables -- 2 200 diff --git a/regression-test/suites/data_model_p0/unique/test_mor_value_predicate_pushdown.groovy b/regression-test/suites/data_model_p0/unique/test_mor_value_predicate_pushdown.groovy index 82a3d4b41d7..b82aba0f712 100644 --- a/regression-test/suites/data_model_p0/unique/test_mor_value_predicate_pushdown.groovy +++ b/regression-test/suites/data_model_p0/unique/test_mor_value_predicate_pushdown.groovy @@ -19,7 +19,9 @@ suite("test_mor_value_predicate_pushdown") { def tbName = "test_mor_value_pred_pushdown" def dbName = context.config.getDbNameByFile(context.file) - // Test 1: Basic MOR table with value predicate pushdown + // Test 1: Basic MOR table with value predicate pushdown (dedup-only scenario) + // This feature is designed for insert-only/dedup-only workloads where + // the same key always has identical values across rowsets. sql "DROP TABLE IF EXISTS ${tbName}" sql """ CREATE TABLE IF NOT EXISTS ${tbName} ( @@ -36,13 +38,10 @@ suite("test_mor_value_predicate_pushdown") { ); """ - // Insert test data - sql "INSERT INTO ${tbName} VALUES (1, 100, 'hello')" - sql "INSERT INTO ${tbName} VALUES (2, 200, 'world')" - sql "INSERT INTO ${tbName} VALUES (3, 300, 'test')" - - // Delete a row (for MOR, this marks the row with __DORIS_DELETE_SIGN__) - sql "DELETE FROM ${tbName} WHERE k1 = 2" + // Insert test data across separate rowsets (dedup-only: same key has same values) + sql "INSERT INTO ${tbName} VALUES (1, 100, 'hello'), (2, 200, 'world'), (3, 300, 'test')" + // Re-insert duplicates to create overlapping rowsets for dedup + sql "INSERT INTO ${tbName} VALUES (1, 100, 'hello'), (2, 200, 'world')" // Test: pushdown disabled (default) sql "SET enable_mor_value_predicate_pushdown_tables = ''" @@ -84,8 +83,8 @@ suite("test_mor_value_predicate_pushdown") { SELECT * FROM ${tbName} WHERE v1 > 150 ORDER BY k1 """ - // Test: verify deleted row is not returned (correctness check) - qt_select_deleted_row """ + // Test: equality predicate on value column with pushdown + qt_select_eq_predicate """ SELECT * FROM ${tbName} WHERE v1 = 200 ORDER BY k1 """ @@ -96,8 +95,8 @@ suite("test_mor_value_predicate_pushdown") { SELECT * FROM ${tbName} WHERE v1 > 150 ORDER BY k1 """ - // Test 2: Verify correctness with multiple updates to same key - // This is critical - MOR tables with overlapping rowsets must return correct latest values + // Test 2: Multiple rowsets with dedup-only pattern + // Verify correctness when same keys appear across many rowsets with identical values sql "DROP TABLE IF EXISTS ${tbName}" sql """ CREATE TABLE IF NOT EXISTS ${tbName} ( @@ -114,30 +113,178 @@ suite("test_mor_value_predicate_pushdown") { ); """ - // Insert multiple versions of same key (creates overlapping rowsets) + // Create multiple overlapping rowsets (dedup-only: all versions have same values) + sql "INSERT INTO ${tbName} VALUES (1, 100, 'first'), (2, 300, 'third')" sql "INSERT INTO ${tbName} VALUES (1, 100, 'first')" - sql "INSERT INTO ${tbName} VALUES (1, 200, 'second')" - sql "INSERT INTO ${tbName} VALUES (2, 300, 'third')" + sql "INSERT INTO ${tbName} VALUES (2, 300, 'third'), (3, 500, 'fifth')" - // Test with pushdown enabled - must still return correct latest version + // Test with pushdown enabled sql "SET enable_mor_value_predicate_pushdown_tables = '*'" - // Should only return the latest version - qt_select_latest_version """ + // Should return all rows matching predicate after dedup + qt_select_dedup_all """ SELECT * FROM ${tbName} WHERE v1 >= 100 ORDER BY k1 """ - // Value predicate on older version should not match (k1=1 has v1=200 now, not 100) - qt_select_old_version """ - SELECT * FROM ${tbName} WHERE v1 = 100 ORDER BY k1 + // Equality match on a value that exists + qt_select_dedup_eq """ + SELECT * FROM ${tbName} WHERE v1 = 300 ORDER BY k1 + """ + + // Predicate that matches no rows + qt_select_dedup_none """ + SELECT * FROM ${tbName} WHERE v1 = 999 ORDER BY k1 + """ + + // Test 3: Dedup + delete scenario + // Value columns are identical across rowsets (dedup-only), but some rows are deleted. + // Verifies that __DORIS_DELETE_SIGN__ is NOT pushed per-segment so deletions are honored. + sql "DROP TABLE IF EXISTS ${tbName}" + sql """ + CREATE TABLE IF NOT EXISTS ${tbName} ( + k1 INT, + v1 INT, + v2 VARCHAR(100) + ) + UNIQUE KEY(k1) + DISTRIBUTED BY HASH(k1) BUCKETS 1 + PROPERTIES ( + "replication_num" = "1", + "enable_unique_key_merge_on_write" = "false", + "disable_auto_compaction" = "true" + ); + """ + + // Rowset 1: initial data + sql "INSERT INTO ${tbName} VALUES (1, 100, 'hello'), (2, 200, 'world'), (3, 300, 'test')" + // Rowset 2: dedup insert (same key, same values) + sql "INSERT INTO ${tbName} VALUES (1, 100, 'hello'), (2, 200, 'world')" + // Rowset 3: delete k1=2 via insert with __DORIS_DELETE_SIGN__=1 + // Value columns are identical (dedup-only), only delete sign differs + sql "INSERT INTO ${tbName}(k1, v1, v2, __DORIS_DELETE_SIGN__) VALUES (2, 200, 'world', 1)" + + sql "SET enable_mor_value_predicate_pushdown_tables = '*'" + + // Deleted row must not appear even though v1=200 matches the predicate + qt_select_delete_range """ + SELECT * FROM ${tbName} WHERE v1 > 150 ORDER BY k1 """ - // Value predicate on new version should match - qt_select_new_version """ + // Equality on deleted row's value — should return empty + qt_select_delete_eq """ SELECT * FROM ${tbName} WHERE v1 = 200 ORDER BY k1 """ - // Test 3: Multiple tables in the list + // Broader predicate — deleted row still excluded + qt_select_delete_all """ + SELECT * FROM ${tbName} WHERE v1 >= 100 ORDER BY k1 + """ + + // Test 4: Dedup + delete predicate scenario + // DELETE FROM creates a delete predicate stored in rowset metadata. + // Delete predicates go through DeleteHandler, separate from value predicates. + // Verify they work correctly alongside value predicate pushdown. + sql "DROP TABLE IF EXISTS ${tbName}" + sql """ + CREATE TABLE IF NOT EXISTS ${tbName} ( + k1 INT, + v1 INT, + v2 VARCHAR(100) + ) + UNIQUE KEY(k1) + DISTRIBUTED BY HASH(k1) BUCKETS 1 + PROPERTIES ( + "replication_num" = "1", + "enable_unique_key_merge_on_write" = "false", + "disable_auto_compaction" = "true" + ); + """ + + // Rowset 1: initial data + sql "INSERT INTO ${tbName} VALUES (1, 100, 'hello'), (2, 200, 'world'), (3, 300, 'test')" + // Rowset 2: dedup insert (same key, same values) + sql "INSERT INTO ${tbName} VALUES (1, 100, 'hello'), (2, 200, 'world')" + // Delete predicate rowset: DELETE FROM creates a predicate, not row markers + sql "DELETE FROM ${tbName} WHERE k1 = 2" + + sql "SET enable_mor_value_predicate_pushdown_tables = '*'" + + // Deleted row must not appear + qt_select_delpred_range """ + SELECT * FROM ${tbName} WHERE v1 > 150 ORDER BY k1 + """ + + // Equality on deleted row's value — should return empty + qt_select_delpred_eq """ + SELECT * FROM ${tbName} WHERE v1 = 200 ORDER BY k1 + """ + + // Broader predicate — deleted row still excluded + qt_select_delpred_all """ + SELECT * FROM ${tbName} WHERE v1 >= 100 ORDER BY k1 + """ + + // Test 5: Update scenario — proves pushdown is active at storage layer + // k1=1 is updated from v1=100 to v1=500 across two rowsets. + // Comparing results with pushdown disabled vs enabled for the SAME query + // proves per-segment filtering is happening: + // disabled: merge picks latest (v1=500), VExpr filters → empty + // enabled: rowset 2 filtered per-segment (v1=500≠100), old version survives + // merge sees only old version, VExpr passes → stale (1,100,'old') + sql "DROP TABLE IF EXISTS ${tbName}" + sql """ + CREATE TABLE IF NOT EXISTS ${tbName} ( + k1 INT, + v1 INT, + v2 VARCHAR(100) + ) + UNIQUE KEY(k1) + DISTRIBUTED BY HASH(k1) BUCKETS 1 + PROPERTIES ( + "replication_num" = "1", + "enable_unique_key_merge_on_write" = "false", + "disable_auto_compaction" = "true" + ); + """ + + // Rowset 1: initial data + sql "INSERT INTO ${tbName} VALUES (1, 100, 'old'), (2, 200, 'keep'), (3, 300, 'keep')" + // Rowset 2: update k1=1 from v1=100 to v1=500 + sql "INSERT INTO ${tbName} VALUES (1, 500, 'new')" + + // --- Pushdown disabled: correct merge-then-filter behavior --- + sql "SET enable_mor_value_predicate_pushdown_tables = ''" + + // v1=100 does not match latest version (v1=500) → empty + qt_select_update_disabled_old """ + SELECT * FROM ${tbName} WHERE v1 = 100 ORDER BY k1 + """ + + // v1=500 matches latest → returns updated row + qt_select_update_disabled_new """ + SELECT * FROM ${tbName} WHERE v1 = 500 ORDER BY k1 + """ + + // --- Pushdown enabled: per-segment filtering observable --- + sql "SET enable_mor_value_predicate_pushdown_tables = '*'" + + // v1=100: rowset 2 (v1=500) filtered per-segment, old version survives. + // Returns stale data — this proves pushdown is filtering at storage layer. + qt_select_update_enabled_old """ + SELECT * FROM ${tbName} WHERE v1 = 100 ORDER BY k1 + """ + + // v1=500: rowset 1 (v1=100) filtered per-segment, new version passes → correct + qt_select_update_enabled_new """ + SELECT * FROM ${tbName} WHERE v1 = 500 ORDER BY k1 + """ + + // v1 > 200: old v1=100 filtered, new v1=500 passes, k1=3 v1=300 passes → correct + qt_select_update_enabled_range """ + SELECT * FROM ${tbName} WHERE v1 > 200 ORDER BY k1 + """ + + // Test 6: Multiple tables in the list def tbName2 = "test_mor_value_pred_pushdown_2" sql "DROP TABLE IF EXISTS ${tbName2}" sql """ @@ -161,7 +308,7 @@ suite("test_mor_value_predicate_pushdown") { SELECT * FROM ${tbName2} WHERE v1 > 100 ORDER BY k1 """ - // Test 4: Non-MOR table (MOW) - value predicates should always be pushed down + // Test 7: Non-MOR table (MOW) - value predicates should always be pushed down // The session variable should have no effect on MOW tables def tbNameMow = "test_mow_value_pred" sql "DROP TABLE IF EXISTS ${tbNameMow}" @@ -187,7 +334,7 @@ suite("test_mor_value_predicate_pushdown") { SELECT * FROM ${tbNameMow} WHERE v1 > 100 ORDER BY k1 """ - // Test 5: DUP_KEYS table - value predicates should always be pushed down + // Test 8: DUP_KEYS table - value predicates should always be pushed down // The session variable should have no effect on DUP_KEYS tables def tbNameDup = "test_dup_value_pred" sql "DROP TABLE IF EXISTS ${tbNameDup}" @@ -210,17 +357,7 @@ suite("test_mor_value_predicate_pushdown") { SELECT * FROM ${tbNameDup} WHERE v1 > 100 ORDER BY k1 """ - // Test 6: Profile verification - check that predicate pushdown affects filtering - // Enable profiling and verify RowsConditionsFiltered counter when pushdown is enabled - sql "SET enable_profile = true" - sql "SET enable_mor_value_predicate_pushdown_tables = '*'" - - // Execute query and check profile shows filtering happened at storage layer - def profileQuery = "SELECT /*+ SET_VAR(enable_mor_value_predicate_pushdown_tables='*') */ * FROM ${tbName} WHERE v1 > 250" - sql profileQuery - // Cleanup - sql "SET enable_profile = false" sql "SET enable_mor_value_predicate_pushdown_tables = ''" sql "DROP TABLE IF EXISTS ${tbName}" sql "DROP TABLE IF EXISTS ${tbName2}" --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
