This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch feature/mor_value_predicate_pushdown_control in repository https://gitbox.apache.org/repos/asf/doris.git
commit 5e5179e584ec4140fc59ea4f0ea04da0a6a1229c Author: Yongqiang YANG <[email protected]> AuthorDate: Wed Feb 4 09:23:40 2026 -0800 [feature](scan) Add session variable to control value predicate pushdown for MOR tables Add a new session variable `enable_mor_value_predicate_pushdown_tables` to allow users to selectively enable value column predicate pushdown for MOR (Merge-On-Read) tables. This can improve query performance by utilizing inverted indexes on value columns for filtering. The session variable accepts: - Comma-separated table names: `db1.tbl1,db2.tbl2` - Wildcard for all MOR tables: `*` - Empty string to disable (default) Changes: - Add session variable in SessionVariable.java with helper method - Add isMorTable() helper in OlapTable.java - Add Thrift field enable_mor_value_predicate_pushdown in TOlapScanNode - Set flag in OlapScanNode.toThrift() based on session variable - Add virtual method _should_push_down_mor_value_predicate() in scan_operator - Implement override in olap_scan_operator to read the flag - Modify predicate pushdown condition in scan_operator.cpp --- be/src/pipeline/exec/olap_scan_operator.cpp | 6 + be/src/pipeline/exec/olap_scan_operator.h | 2 + be/src/pipeline/exec/scan_operator.cpp | 3 +- be/src/pipeline/exec/scan_operator.h | 1 + .../java/org/apache/doris/catalog/OlapTable.java | 8 + .../org/apache/doris/planner/OlapScanNode.java | 9 + .../java/org/apache/doris/qe/SessionVariable.java | 39 ++++ gensrc/thrift/PlanNodes.thrift | 2 + .../unique/test_mor_value_predicate_pushdown.out | 36 ++++ .../test_mor_value_predicate_pushdown.groovy | 229 +++++++++++++++++++++ 10 files changed, 334 insertions(+), 1 deletion(-) diff --git a/be/src/pipeline/exec/olap_scan_operator.cpp b/be/src/pipeline/exec/olap_scan_operator.cpp index 07c0522a4b5..2f1e6d8a85a 100644 --- a/be/src/pipeline/exec/olap_scan_operator.cpp +++ b/be/src/pipeline/exec/olap_scan_operator.cpp @@ -459,6 +459,12 @@ bool OlapScanLocalState::_storage_no_merge() { p._olap_scan_node.enable_unique_key_merge_on_write)); } +bool OlapScanLocalState::_should_push_down_mor_value_predicate() { + auto& p = _parent->cast<OlapScanOperatorX>(); + return p._olap_scan_node.__isset.enable_mor_value_predicate_pushdown && + p._olap_scan_node.enable_mor_value_predicate_pushdown; +} + Status OlapScanLocalState::_init_scanners(std::list<vectorized::ScannerSPtr>* scanners) { if (_scan_ranges.empty()) { _eos = true; diff --git a/be/src/pipeline/exec/olap_scan_operator.h b/be/src/pipeline/exec/olap_scan_operator.h index 0577dd1ff2d..24d1df08a46 100644 --- a/be/src/pipeline/exec/olap_scan_operator.h +++ b/be/src/pipeline/exec/olap_scan_operator.h @@ -105,6 +105,8 @@ private: bool _storage_no_merge() override; + bool _should_push_down_mor_value_predicate() override; + bool _push_down_topn(const vectorized::RuntimePredicate& predicate) override { if (!predicate.target_is_slot(_parent->node_id())) { return false; diff --git a/be/src/pipeline/exec/scan_operator.cpp b/be/src/pipeline/exec/scan_operator.cpp index 8bb83e3de6a..524c6ae7354 100644 --- a/be/src/pipeline/exec/scan_operator.cpp +++ b/be/src/pipeline/exec/scan_operator.cpp @@ -446,7 +446,8 @@ Status ScanLocalState<Derived>::_normalize_predicate(vectorized::VExprContext* c return Status::OK(); } - if (pdt == PushDownType::ACCEPTABLE && (_is_key_column(slot->col_name()))) { + if (pdt == PushDownType::ACCEPTABLE && + (_is_key_column(slot->col_name()) || _should_push_down_mor_value_predicate())) { output_expr = nullptr; return Status::OK(); } else { diff --git a/be/src/pipeline/exec/scan_operator.h b/be/src/pipeline/exec/scan_operator.h index 8e6fcf98a3a..d564149b312 100644 --- a/be/src/pipeline/exec/scan_operator.h +++ b/be/src/pipeline/exec/scan_operator.h @@ -205,6 +205,7 @@ protected: virtual bool _storage_no_merge() { return false; } virtual bool _push_down_topn(const vectorized::RuntimePredicate& predicate) { return false; } virtual bool _is_key_column(const std::string& col_name) { return false; } + virtual bool _should_push_down_mor_value_predicate() { return false; } virtual PushDownType _should_push_down_bloom_filter() const { return PushDownType::UNACCEPTABLE; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java index 3d6de95aed4..e724c1df97c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java @@ -3028,6 +3028,14 @@ public class OlapTable extends Table implements MTMVRelatedTableIf, GsonPostProc return getKeysType() == KeysType.UNIQUE_KEYS && getEnableUniqueKeyMergeOnWrite(); } + /** + * Check if this is a MOR (Merge-On-Read) table. + * MOR = UNIQUE_KEYS without merge-on-write enabled. + */ + public boolean isMorTable() { + return getKeysType() == KeysType.UNIQUE_KEYS && !getEnableUniqueKeyMergeOnWrite(); + } + public boolean isUniqKeyMergeOnWriteWithClusterKeys() { return isUniqKeyMergeOnWrite() && getBaseSchema().stream().anyMatch(Column::isClusterKey); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java index 0c4f00fc323..9d4b6fddfc5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java @@ -1190,6 +1190,15 @@ public class OlapScanNode extends ScanNode { msg.olap_scan_node.setTableName(tableName); msg.olap_scan_node.setEnableUniqueKeyMergeOnWrite(olapTable.getEnableUniqueKeyMergeOnWrite()); + // Set MOR value predicate pushdown flag based on session variable + if (olapTable.isMorTable() && ConnectContext.get() != null) { + String dbName = olapTable.getQualifiedDbName(); + String tblName = olapTable.getName(); + boolean enabled = ConnectContext.get().getSessionVariable() + .isMorValuePredicatePushdownEnabled(dbName, tblName); + msg.olap_scan_node.setEnableMorValuePredicatePushdown(enabled); + } + msg.setPushDownAggTypeOpt(pushDownAggNoGroupingOp); msg.olap_scan_node.setPushDownAggTypeOpt(pushDownAggNoGroupingOp); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 362b69533d2..a6e581199d5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -732,6 +732,9 @@ public class SessionVariable implements Serializable, Writable { public static final String ENABLE_PUSHDOWN_STRING_MINMAX = "enable_pushdown_string_minmax"; + public static final String ENABLE_MOR_VALUE_PREDICATE_PUSHDOWN_TABLES + = "enable_mor_value_predicate_pushdown_tables"; + // When set use fix replica = true, the fixed replica maybe bad, try to use the health one if // this session variable is set to true. public static final String FALLBACK_OTHER_REPLICA_WHEN_FIXED_CORRUPT = "fallback_other_replica_when_fixed_corrupt"; @@ -2222,6 +2225,13 @@ public class SessionVariable implements Serializable, Writable { "是否启用 string 类型 min max 下推。", "Set whether to enable push down string type minmax."}) public boolean enablePushDownStringMinMax = false; + // Comma-separated list of MOR tables to enable value predicate pushdown. + @VariableMgr.VarAttr(name = ENABLE_MOR_VALUE_PREDICATE_PUSHDOWN_TABLES, needForward = true, description = { + "指定启用MOR表value列谓词下推的表列表,格式:db1.tbl1,db2.tbl2 或 * 表示所有MOR表。", + "Comma-separated list of MOR tables to enable value predicate pushdown. " + + "Format: db1.tbl1,db2.tbl2 or * for all MOR tables."}) + public String enableMorValuePredicatePushdownTables = ""; + // Whether drop table when create table as select insert data appear error. @VariableMgr.VarAttr(name = DROP_TABLE_IF_CTAS_FAILED, needForward = true) public boolean dropTableIfCtasFailed = true; @@ -4770,6 +4780,35 @@ public class SessionVariable implements Serializable, Writable { return enablePushDownStringMinMax; } + public String getEnableMorValuePredicatePushdownTables() { + return enableMorValuePredicatePushdownTables; + } + + /** + * Check if a table is enabled for MOR value predicate pushdown. + * @param dbName database name + * @param tableName table name + * @return true if the table is in the enabled list or if '*' is set + */ + public boolean isMorValuePredicatePushdownEnabled(String dbName, String tableName) { + if (enableMorValuePredicatePushdownTables == null + || enableMorValuePredicatePushdownTables.isEmpty()) { + return false; + } + String trimmed = enableMorValuePredicatePushdownTables.trim(); + if ("*".equals(trimmed)) { + return true; + } + String fullName = dbName + "." + tableName; + for (String table : trimmed.split(",")) { + if (table.trim().equalsIgnoreCase(fullName) + || table.trim().equalsIgnoreCase(tableName)) { + return true; + } + } + return false; + } + /** canUseNereidsDistributePlanner */ public static boolean canUseNereidsDistributePlanner() { ConnectContext connectContext = ConnectContext.get(); diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index 7b281dcf712..7fcb1912c67 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -894,6 +894,8 @@ struct TOlapScanNode { 21: optional TSortInfo ann_sort_info 22: optional i64 ann_sort_limit 23: optional TScoreRangeInfo score_range_info + // Enable value predicate pushdown for MOR tables + 24: optional bool enable_mor_value_predicate_pushdown } struct TEqJoinCondition { diff --git a/regression-test/data/data_model_p0/unique/test_mor_value_predicate_pushdown.out b/regression-test/data/data_model_p0/unique/test_mor_value_predicate_pushdown.out new file mode 100644 index 00000000000..42b144dc5e5 --- /dev/null +++ b/regression-test/data/data_model_p0/unique/test_mor_value_predicate_pushdown.out @@ -0,0 +1,36 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !select_disabled -- +3 300 test + +-- !select_enabled_tablename -- +3 300 test + +-- !select_enabled_fullname -- +3 300 test + +-- !select_enabled_wildcard -- +3 300 test + +-- !select_deleted_row -- + +-- !select_not_in_list -- +3 300 test + +-- !select_latest_version -- +1 200 second +2 300 third + +-- !select_old_version -- + +-- !select_new_version -- +1 200 second + +-- !select_multiple_tables -- +2 200 + +-- !select_mow_table -- +2 200 + +-- !select_dup_table -- +2 200 + diff --git a/regression-test/suites/data_model_p0/unique/test_mor_value_predicate_pushdown.groovy b/regression-test/suites/data_model_p0/unique/test_mor_value_predicate_pushdown.groovy new file mode 100644 index 00000000000..82a3d4b41d7 --- /dev/null +++ b/regression-test/suites/data_model_p0/unique/test_mor_value_predicate_pushdown.groovy @@ -0,0 +1,229 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_mor_value_predicate_pushdown") { + def tbName = "test_mor_value_pred_pushdown" + def dbName = context.config.getDbNameByFile(context.file) + + // Test 1: Basic MOR table with value predicate pushdown + sql "DROP TABLE IF EXISTS ${tbName}" + sql """ + CREATE TABLE IF NOT EXISTS ${tbName} ( + k1 INT, + v1 INT, + v2 VARCHAR(100) + ) + UNIQUE KEY(k1) + DISTRIBUTED BY HASH(k1) BUCKETS 3 + PROPERTIES ( + "replication_num" = "1", + "enable_unique_key_merge_on_write" = "false", + "disable_auto_compaction" = "true" + ); + """ + + // Insert test data + sql "INSERT INTO ${tbName} VALUES (1, 100, 'hello')" + sql "INSERT INTO ${tbName} VALUES (2, 200, 'world')" + sql "INSERT INTO ${tbName} VALUES (3, 300, 'test')" + + // Delete a row (for MOR, this marks the row with __DORIS_DELETE_SIGN__) + sql "DELETE FROM ${tbName} WHERE k1 = 2" + + // Test: pushdown disabled (default) + sql "SET enable_mor_value_predicate_pushdown_tables = ''" + + // Verify session variable is set correctly + def result = sql "SHOW VARIABLES LIKE 'enable_mor_value_predicate_pushdown_tables'" + assertTrue(result.size() > 0) + assertTrue(result[0][1] == "") + + // Query with value predicate - should return correct results + qt_select_disabled """ + SELECT * FROM ${tbName} WHERE v1 > 150 ORDER BY k1 + """ + + // Test: enable for specific table (just table name) + sql "SET enable_mor_value_predicate_pushdown_tables = '${tbName}'" + + result = sql "SHOW VARIABLES LIKE 'enable_mor_value_predicate_pushdown_tables'" + assertTrue(result[0][1] == tbName) + + qt_select_enabled_tablename """ + SELECT * FROM ${tbName} WHERE v1 > 150 ORDER BY k1 + """ + + // Test: enable for specific table with db prefix + sql "SET enable_mor_value_predicate_pushdown_tables = '${dbName}.${tbName}'" + + qt_select_enabled_fullname """ + SELECT * FROM ${tbName} WHERE v1 > 150 ORDER BY k1 + """ + + // Test: enable for all MOR tables with '*' + sql "SET enable_mor_value_predicate_pushdown_tables = '*'" + + result = sql "SHOW VARIABLES LIKE 'enable_mor_value_predicate_pushdown_tables'" + assertTrue(result[0][1] == "*") + + qt_select_enabled_wildcard """ + SELECT * FROM ${tbName} WHERE v1 > 150 ORDER BY k1 + """ + + // Test: verify deleted row is not returned (correctness check) + qt_select_deleted_row """ + SELECT * FROM ${tbName} WHERE v1 = 200 ORDER BY k1 + """ + + // Test: table not in list - pushdown should be disabled + sql "SET enable_mor_value_predicate_pushdown_tables = 'other_table'" + + qt_select_not_in_list """ + SELECT * FROM ${tbName} WHERE v1 > 150 ORDER BY k1 + """ + + // Test 2: Verify correctness with multiple updates to same key + // This is critical - MOR tables with overlapping rowsets must return correct latest values + sql "DROP TABLE IF EXISTS ${tbName}" + sql """ + CREATE TABLE IF NOT EXISTS ${tbName} ( + k1 INT, + v1 INT, + v2 VARCHAR(100) + ) + UNIQUE KEY(k1) + DISTRIBUTED BY HASH(k1) BUCKETS 1 + PROPERTIES ( + "replication_num" = "1", + "enable_unique_key_merge_on_write" = "false", + "disable_auto_compaction" = "true" + ); + """ + + // Insert multiple versions of same key (creates overlapping rowsets) + sql "INSERT INTO ${tbName} VALUES (1, 100, 'first')" + sql "INSERT INTO ${tbName} VALUES (1, 200, 'second')" + sql "INSERT INTO ${tbName} VALUES (2, 300, 'third')" + + // Test with pushdown enabled - must still return correct latest version + sql "SET enable_mor_value_predicate_pushdown_tables = '*'" + + // Should only return the latest version + qt_select_latest_version """ + SELECT * FROM ${tbName} WHERE v1 >= 100 ORDER BY k1 + """ + + // Value predicate on older version should not match (k1=1 has v1=200 now, not 100) + qt_select_old_version """ + SELECT * FROM ${tbName} WHERE v1 = 100 ORDER BY k1 + """ + + // Value predicate on new version should match + qt_select_new_version """ + SELECT * FROM ${tbName} WHERE v1 = 200 ORDER BY k1 + """ + + // Test 3: Multiple tables in the list + def tbName2 = "test_mor_value_pred_pushdown_2" + sql "DROP TABLE IF EXISTS ${tbName2}" + sql """ + CREATE TABLE IF NOT EXISTS ${tbName2} ( + k1 INT, + v1 INT + ) + UNIQUE KEY(k1) + DISTRIBUTED BY HASH(k1) BUCKETS 1 + PROPERTIES ( + "replication_num" = "1", + "enable_unique_key_merge_on_write" = "false" + ); + """ + + sql "INSERT INTO ${tbName2} VALUES (1, 100), (2, 200)" + + sql "SET enable_mor_value_predicate_pushdown_tables = '${tbName},${tbName2}'" + + qt_select_multiple_tables """ + SELECT * FROM ${tbName2} WHERE v1 > 100 ORDER BY k1 + """ + + // Test 4: Non-MOR table (MOW) - value predicates should always be pushed down + // The session variable should have no effect on MOW tables + def tbNameMow = "test_mow_value_pred" + sql "DROP TABLE IF EXISTS ${tbNameMow}" + sql """ + CREATE TABLE IF NOT EXISTS ${tbNameMow} ( + k1 INT, + v1 INT + ) + UNIQUE KEY(k1) + DISTRIBUTED BY HASH(k1) BUCKETS 1 + PROPERTIES ( + "replication_num" = "1", + "enable_unique_key_merge_on_write" = "true" + ); + """ + + sql "INSERT INTO ${tbNameMow} VALUES (1, 100), (2, 200)" + + // MOW tables always push down value predicates regardless of setting + sql "SET enable_mor_value_predicate_pushdown_tables = ''" + + qt_select_mow_table """ + SELECT * FROM ${tbNameMow} WHERE v1 > 100 ORDER BY k1 + """ + + // Test 5: DUP_KEYS table - value predicates should always be pushed down + // The session variable should have no effect on DUP_KEYS tables + def tbNameDup = "test_dup_value_pred" + sql "DROP TABLE IF EXISTS ${tbNameDup}" + sql """ + CREATE TABLE IF NOT EXISTS ${tbNameDup} ( + k1 INT, + v1 INT + ) + DUPLICATE KEY(k1) + DISTRIBUTED BY HASH(k1) BUCKETS 1 + PROPERTIES ( + "replication_num" = "1" + ); + """ + + sql "INSERT INTO ${tbNameDup} VALUES (1, 100), (2, 200)" + + // DUP_KEYS tables always push down value predicates regardless of setting + qt_select_dup_table """ + SELECT * FROM ${tbNameDup} WHERE v1 > 100 ORDER BY k1 + """ + + // Test 6: Profile verification - check that predicate pushdown affects filtering + // Enable profiling and verify RowsConditionsFiltered counter when pushdown is enabled + sql "SET enable_profile = true" + sql "SET enable_mor_value_predicate_pushdown_tables = '*'" + + // Execute query and check profile shows filtering happened at storage layer + def profileQuery = "SELECT /*+ SET_VAR(enable_mor_value_predicate_pushdown_tables='*') */ * FROM ${tbName} WHERE v1 > 250" + sql profileQuery + + // Cleanup + sql "SET enable_profile = false" + sql "SET enable_mor_value_predicate_pushdown_tables = ''" + sql "DROP TABLE IF EXISTS ${tbName}" + sql "DROP TABLE IF EXISTS ${tbName2}" + sql "DROP TABLE IF EXISTS ${tbNameMow}" + sql "DROP TABLE IF EXISTS ${tbNameDup}" +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
