This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch feature/mor_value_predicate_pushdown_control in repository https://gitbox.apache.org/repos/asf/doris.git
commit 89d3715b3aa8180f724a0eae14049942a6af29d6 Author: Yongqiang YANG <[email protected]> AuthorDate: Sun Feb 8 19:57:23 2026 -0800 [feature](scan) Add read_mor_as_dup_tables session variable to read MOR tables without merge Add a per-table session variable that allows reading MOR (Merge-On-Read) UNIQUE tables as DUP tables, skipping storage engine merge and delete sign filter while still honoring delete predicates. Co-Authored-By: Claude Opus 4.6 <[email protected]> --- be/src/pipeline/exec/olap_scan_operator.cpp | 8 +- be/src/pipeline/exec/olap_scan_operator.h | 1 + be/src/vec/exec/scan/olap_scanner.cpp | 5 +- .../doris/nereids/rules/analysis/BindRelation.java | 5 +- .../nereids/rules/rewrite/SetPreAggStatus.java | 9 +- .../trees/plans/logical/LogicalOlapScan.java | 7 + .../org/apache/doris/planner/OlapScanNode.java | 4 + .../java/org/apache/doris/qe/SessionVariable.java | 28 +++ .../nereids/rules/analysis/ReadMorAsDupTest.java | 280 +++++++++++++++++++++ gensrc/thrift/PlanNodes.thrift | 2 + .../unique_with_mor_p0/test_read_mor_as_dup.groovy | 119 +++++++++ 11 files changed, 464 insertions(+), 4 deletions(-) diff --git a/be/src/pipeline/exec/olap_scan_operator.cpp b/be/src/pipeline/exec/olap_scan_operator.cpp index 07c0522a4b5..b19f0d23cfe 100644 --- a/be/src/pipeline/exec/olap_scan_operator.cpp +++ b/be/src/pipeline/exec/olap_scan_operator.cpp @@ -456,7 +456,13 @@ bool OlapScanLocalState::_storage_no_merge() { return (p._olap_scan_node.keyType == TKeysType::DUP_KEYS || (p._olap_scan_node.keyType == TKeysType::UNIQUE_KEYS && p._olap_scan_node.__isset.enable_unique_key_merge_on_write && - p._olap_scan_node.enable_unique_key_merge_on_write)); + p._olap_scan_node.enable_unique_key_merge_on_write)) || + _read_mor_as_dup(); +} + +bool OlapScanLocalState::_read_mor_as_dup() { + auto& p = _parent->cast<OlapScanOperatorX>(); + return p._olap_scan_node.__isset.read_mor_as_dup && p._olap_scan_node.read_mor_as_dup; } Status OlapScanLocalState::_init_scanners(std::list<vectorized::ScannerSPtr>* scanners) { diff --git a/be/src/pipeline/exec/olap_scan_operator.h b/be/src/pipeline/exec/olap_scan_operator.h index 0577dd1ff2d..6517b13f271 100644 --- a/be/src/pipeline/exec/olap_scan_operator.h +++ b/be/src/pipeline/exec/olap_scan_operator.h @@ -105,6 +105,7 @@ private: bool _storage_no_merge() override; + bool _read_mor_as_dup(); bool _push_down_topn(const vectorized::RuntimePredicate& predicate) override { if (!predicate.target_is_slot(_parent->node_id())) { return false; diff --git a/be/src/vec/exec/scan/olap_scanner.cpp b/be/src/vec/exec/scan/olap_scanner.cpp index 7b577bdb629..2bd7b0f3269 100644 --- a/be/src/vec/exec/scan/olap_scanner.cpp +++ b/be/src/vec/exec/scan/olap_scanner.cpp @@ -334,7 +334,10 @@ Status OlapScanner::_init_tablet_reader_params( // if the table with rowset [0-x] or [0-1] [2-y], and [0-1] is empty const bool single_version = _tablet_reader_params.has_single_version(); - if (_state->skip_storage_engine_merge()) { + auto* olap_local_state = static_cast<pipeline::OlapScanLocalState*>(_local_state); + bool read_mor_as_dup = olap_local_state->olap_scan_node().__isset.read_mor_as_dup && + olap_local_state->olap_scan_node().read_mor_as_dup; + if (_state->skip_storage_engine_merge() || read_mor_as_dup) { _tablet_reader_params.direct_mode = true; _tablet_reader_params.aggregation = true; } else { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindRelation.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindRelation.java index 2d11e1c75db..81a54f8c67e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindRelation.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindRelation.java @@ -371,7 +371,10 @@ public class BindRelation extends OneAnalysisRuleFactory { public static LogicalPlan checkAndAddDeleteSignFilter(LogicalOlapScan scan, ConnectContext connectContext, OlapTable olapTable) { if (!Util.showHiddenColumns() && scan.getTable().hasDeleteSign() - && !connectContext.getSessionVariable().skipDeleteSign()) { + && !connectContext.getSessionVariable().skipDeleteSign() + && !(olapTable.isMorTable() + && connectContext.getSessionVariable().isReadMorAsDupEnabled( + olapTable.getQualifiedDbName(), olapTable.getName()))) { // table qualifier is catalog.db.table, we make db.table.column Slot deleteSlot = null; for (Slot slot : scan.getOutput()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/SetPreAggStatus.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/SetPreAggStatus.java index 2f7743cb86f..7ae49fc6ec8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/SetPreAggStatus.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/SetPreAggStatus.java @@ -53,6 +53,7 @@ import org.apache.doris.nereids.trees.plans.logical.LogicalRepeat; import org.apache.doris.nereids.trees.plans.visitor.CustomRewriter; import org.apache.doris.nereids.trees.plans.visitor.DefaultPlanRewriter; import org.apache.doris.nereids.util.ExpressionUtils; +import org.apache.doris.qe.ConnectContext; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; @@ -151,7 +152,13 @@ public class SetPreAggStatus extends DefaultPlanRewriter<Stack<SetPreAggStatus.P long selectIndexId = logicalOlapScan.getSelectedIndexId(); MaterializedIndexMeta meta = logicalOlapScan.getTable().getIndexMetaByIndexId(selectIndexId); if (meta.getKeysType() == KeysType.DUP_KEYS || (meta.getKeysType() == KeysType.UNIQUE_KEYS - && logicalOlapScan.getTable().getEnableUniqueKeyMergeOnWrite())) { + && logicalOlapScan.getTable().getEnableUniqueKeyMergeOnWrite()) + || (meta.getKeysType() == KeysType.UNIQUE_KEYS + && logicalOlapScan.getTable().isMorTable() + && ConnectContext.get() != null + && ConnectContext.get().getSessionVariable().isReadMorAsDupEnabled( + logicalOlapScan.getTable().getQualifiedDbName(), + logicalOlapScan.getTable().getName()))) { return logicalOlapScan.withPreAggStatus(PreAggStatus.on()); } else { if (context.empty()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalOlapScan.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalOlapScan.java index bbcc44a38fc..d03a7d1b35b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalOlapScan.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalOlapScan.java @@ -769,6 +769,13 @@ public class LogicalOlapScan extends LogicalCatalogRelation implements OlapScan, && getTable().getKeysType() == KeysType.UNIQUE_KEYS) { return; } + // When readMorAsDup is enabled, MOR tables are read as DUP, so uniqueness cannot be guaranteed. + if (getTable().getKeysType() == KeysType.UNIQUE_KEYS + && getTable().isMorTable() + && ConnectContext.get().getSessionVariable().isReadMorAsDupEnabled( + getTable().getQualifiedDbName(), getTable().getName())) { + return; + } ImmutableSet.Builder<Slot> uniqSlots = ImmutableSet.builderWithExpectedSize(outputSet.size()); for (Slot slot : outputSet) { if (!(slot instanceof SlotReference)) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java index 9d4b6fddfc5..48614966468 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java @@ -1197,6 +1197,10 @@ public class OlapScanNode extends ScanNode { boolean enabled = ConnectContext.get().getSessionVariable() .isMorValuePredicatePushdownEnabled(dbName, tblName); msg.olap_scan_node.setEnableMorValuePredicatePushdown(enabled); + if (ConnectContext.get().getSessionVariable() + .isReadMorAsDupEnabled(dbName, tblName)) { + msg.olap_scan_node.setReadMorAsDup(true); + } } msg.setPushDownAggTypeOpt(pushDownAggNoGroupingOp); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index a6e581199d5..42aef65243d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -735,6 +735,8 @@ public class SessionVariable implements Serializable, Writable { public static final String ENABLE_MOR_VALUE_PREDICATE_PUSHDOWN_TABLES = "enable_mor_value_predicate_pushdown_tables"; + public static final String READ_MOR_AS_DUP_TABLES = "read_mor_as_dup_tables"; + // When set use fix replica = true, the fixed replica maybe bad, try to use the health one if // this session variable is set to true. public static final String FALLBACK_OTHER_REPLICA_WHEN_FIXED_CORRUPT = "fallback_other_replica_when_fixed_corrupt"; @@ -2232,6 +2234,14 @@ public class SessionVariable implements Serializable, Writable { + "Format: db1.tbl1,db2.tbl2 or * for all MOR tables."}) public String enableMorValuePredicatePushdownTables = ""; + // Comma-separated list of MOR tables to read as DUP (skip merge, skip delete sign filter). + @VariableMgr.VarAttr(name = READ_MOR_AS_DUP_TABLES, needForward = true, + affectQueryResultInPlan = true, description = { + "指定以DUP模式读取MOR表的表列表(跳过合并和删除标记过滤),格式:db1.tbl1,db2.tbl2 或 * 表示所有MOR表。", + "Comma-separated list of MOR tables to read as DUP (skip merge, skip delete sign filter). " + + "Format: db1.tbl1,db2.tbl2 or * for all MOR tables."}) + public String readMorAsDupTables = ""; + // Whether drop table when create table as select insert data appear error. @VariableMgr.VarAttr(name = DROP_TABLE_IF_CTAS_FAILED, needForward = true) public boolean dropTableIfCtasFailed = true; @@ -4809,6 +4819,24 @@ public class SessionVariable implements Serializable, Writable { return false; } + public boolean isReadMorAsDupEnabled(String dbName, String tableName) { + if (readMorAsDupTables == null || readMorAsDupTables.isEmpty()) { + return false; + } + String trimmed = readMorAsDupTables.trim(); + if ("*".equals(trimmed)) { + return true; + } + String fullName = dbName + "." + tableName; + for (String table : trimmed.split(",")) { + if (table.trim().equalsIgnoreCase(fullName) + || table.trim().equalsIgnoreCase(tableName)) { + return true; + } + } + return false; + } + /** canUseNereidsDistributePlanner */ public static boolean canUseNereidsDistributePlanner() { ConnectContext connectContext = ConnectContext.get(); diff --git a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/analysis/ReadMorAsDupTest.java b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/analysis/ReadMorAsDupTest.java new file mode 100644 index 00000000000..579ae3c527f --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/analysis/ReadMorAsDupTest.java @@ -0,0 +1,280 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.rules.analysis; + +import org.apache.doris.catalog.Column; +import org.apache.doris.nereids.pattern.GeneratedPlanPatterns; +import org.apache.doris.nereids.rules.RulePromise; +import org.apache.doris.nereids.trees.expressions.Expression; +import org.apache.doris.nereids.trees.plans.Plan; +import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan; +import org.apache.doris.nereids.util.PlanChecker; +import org.apache.doris.utframe.TestWithFeService; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.Set; + +class ReadMorAsDupTest extends TestWithFeService implements GeneratedPlanPatterns { + private static final String DB = "test_read_mor_as_dup_db"; + + @Override + protected void runBeforeAll() throws Exception { + createDatabase(DB); + connectContext.setDatabase(DEFAULT_CLUSTER_PREFIX + DB); + + // MOR UNIQUE table (enable_unique_key_merge_on_write = false) + createTable("CREATE TABLE " + DB + ".mor_tbl (\n" + + " k INT NOT NULL,\n" + + " v1 INT,\n" + + " v2 VARCHAR(100)\n" + + ") ENGINE=OLAP\n" + + "UNIQUE KEY(k)\n" + + "DISTRIBUTED BY HASH(k) BUCKETS 1\n" + + "PROPERTIES ('replication_num' = '1', 'enable_unique_key_merge_on_write' = 'false');"); + + // MOW UNIQUE table (should not be affected) + createTable("CREATE TABLE " + DB + ".mow_tbl (\n" + + " k INT NOT NULL,\n" + + " v1 INT\n" + + ") ENGINE=OLAP\n" + + "UNIQUE KEY(k)\n" + + "DISTRIBUTED BY HASH(k) BUCKETS 1\n" + + "PROPERTIES ('replication_num' = '1', 'enable_unique_key_merge_on_write' = 'true');"); + + // DUP table (should not be affected) + createTable("CREATE TABLE " + DB + ".dup_tbl (\n" + + " k INT NOT NULL,\n" + + " v1 INT\n" + + ") ENGINE=OLAP\n" + + "DUPLICATE KEY(k)\n" + + "DISTRIBUTED BY HASH(k) BUCKETS 1\n" + + "PROPERTIES ('replication_num' = '1');"); + + connectContext.getSessionVariable().setDisableNereidsRules("PRUNE_EMPTY_PARTITION"); + } + + @Test + void testDeleteSignFilterPresentByDefault() { + connectContext.getSessionVariable().readMorAsDupTables = ""; + + PlanChecker.from(connectContext) + .analyze("select * from mor_tbl") + .rewrite() + .matches( + logicalProject( + logicalFilter( + logicalOlapScan() + ).when(filter -> hasDeleteSignPredicate(filter.getConjuncts())) + ) + ); + } + + @Test + void testDeleteSignFilterSkippedWithWildcard() { + connectContext.getSessionVariable().readMorAsDupTables = "*"; + + try { + PlanChecker.from(connectContext) + .analyze("select * from mor_tbl") + .rewrite() + .nonMatch( + logicalFilter( + logicalOlapScan() + ).when(filter -> hasDeleteSignPredicate(filter.getConjuncts())) + ); + } finally { + connectContext.getSessionVariable().readMorAsDupTables = ""; + } + } + + @Test + void testDeleteSignFilterSkippedWithTableName() { + connectContext.getSessionVariable().readMorAsDupTables = "mor_tbl"; + + try { + PlanChecker.from(connectContext) + .analyze("select * from mor_tbl") + .rewrite() + .nonMatch( + logicalFilter( + logicalOlapScan() + ).when(filter -> hasDeleteSignPredicate(filter.getConjuncts())) + ); + } finally { + connectContext.getSessionVariable().readMorAsDupTables = ""; + } + } + + @Test + void testDeleteSignFilterSkippedWithDbTableName() { + connectContext.getSessionVariable().readMorAsDupTables = + DEFAULT_CLUSTER_PREFIX + DB + ".mor_tbl"; + + try { + PlanChecker.from(connectContext) + .analyze("select * from mor_tbl") + .rewrite() + .nonMatch( + logicalFilter( + logicalOlapScan() + ).when(filter -> hasDeleteSignPredicate(filter.getConjuncts())) + ); + } finally { + connectContext.getSessionVariable().readMorAsDupTables = ""; + } + } + + @Test + void testMowTableNotAffected() { + // MOW table should still have delete sign filter even with read_mor_as_dup = * + connectContext.getSessionVariable().readMorAsDupTables = "*"; + + try { + // MOW tables also have delete sign filter; read_mor_as_dup should NOT remove it + PlanChecker.from(connectContext) + .analyze("select * from mow_tbl") + .rewrite() + .matches( + logicalProject( + logicalFilter( + logicalOlapScan() + ).when(filter -> hasDeleteSignPredicate(filter.getConjuncts())) + ) + ); + } finally { + connectContext.getSessionVariable().readMorAsDupTables = ""; + } + } + + @Test + void testPreAggOnForMorWithReadAsDup() { + connectContext.getSessionVariable().readMorAsDupTables = "*"; + + try { + Plan plan = PlanChecker.from(connectContext) + .analyze("select * from mor_tbl") + .rewrite() + .getPlan(); + + // Find the LogicalOlapScan and verify preAggStatus is ON + LogicalOlapScan scan = findOlapScan(plan); + Assertions.assertNotNull(scan, "Should find LogicalOlapScan in plan"); + Assertions.assertTrue(scan.getPreAggStatus().isOn(), + "PreAggStatus should be ON when read_mor_as_dup is enabled"); + } finally { + connectContext.getSessionVariable().readMorAsDupTables = ""; + } + } + + @Test + void testPerTableControlOnlyAffectsSpecifiedTable() { + // Only enable for mor_tbl, not for other tables + connectContext.getSessionVariable().readMorAsDupTables = "mor_tbl"; + + try { + // mor_tbl should NOT have delete sign filter + PlanChecker.from(connectContext) + .analyze("select * from mor_tbl") + .rewrite() + .nonMatch( + logicalFilter( + logicalOlapScan() + ).when(filter -> hasDeleteSignPredicate(filter.getConjuncts())) + ); + + // dup_tbl should be unaffected (no delete sign filter for DUP) + PlanChecker.from(connectContext) + .analyze("select * from dup_tbl") + .rewrite() + .matches(logicalOlapScan()); + } finally { + connectContext.getSessionVariable().readMorAsDupTables = ""; + } + } + + @Test + void testSessionVariableHelperMethod() { + // Test the isReadMorAsDupEnabled helper directly + connectContext.getSessionVariable().readMorAsDupTables = ""; + Assertions.assertFalse( + connectContext.getSessionVariable().isReadMorAsDupEnabled("db", "tbl")); + + connectContext.getSessionVariable().readMorAsDupTables = "*"; + Assertions.assertTrue( + connectContext.getSessionVariable().isReadMorAsDupEnabled("db", "tbl")); + Assertions.assertTrue( + connectContext.getSessionVariable().isReadMorAsDupEnabled("any_db", "any_tbl")); + + connectContext.getSessionVariable().readMorAsDupTables = "mydb.mytbl"; + Assertions.assertTrue( + connectContext.getSessionVariable().isReadMorAsDupEnabled("mydb", "mytbl")); + Assertions.assertFalse( + connectContext.getSessionVariable().isReadMorAsDupEnabled("otherdb", "othertbl")); + // "mydb.mytbl" entry only matches full name or exact table name "mydb.mytbl" + Assertions.assertFalse( + connectContext.getSessionVariable().isReadMorAsDupEnabled("anything", "mytbl")); + + // Table name only (no db prefix) matches any db + connectContext.getSessionVariable().readMorAsDupTables = "mytbl"; + Assertions.assertTrue( + connectContext.getSessionVariable().isReadMorAsDupEnabled("anydb", "mytbl")); + Assertions.assertFalse( + connectContext.getSessionVariable().isReadMorAsDupEnabled("anydb", "othertbl")); + + connectContext.getSessionVariable().readMorAsDupTables = "db1.tbl1,db2.tbl2"; + Assertions.assertTrue( + connectContext.getSessionVariable().isReadMorAsDupEnabled("db1", "tbl1")); + Assertions.assertTrue( + connectContext.getSessionVariable().isReadMorAsDupEnabled("db2", "tbl2")); + Assertions.assertFalse( + connectContext.getSessionVariable().isReadMorAsDupEnabled("db1", "tbl2")); + + // Case insensitive + connectContext.getSessionVariable().readMorAsDupTables = "MyDB.MyTbl"; + Assertions.assertTrue( + connectContext.getSessionVariable().isReadMorAsDupEnabled("mydb", "mytbl")); + + // Cleanup + connectContext.getSessionVariable().readMorAsDupTables = ""; + } + + private boolean hasDeleteSignPredicate(Set<Expression> conjuncts) { + return conjuncts.stream() + .anyMatch(expr -> expr.toSql().contains(Column.DELETE_SIGN)); + } + + private LogicalOlapScan findOlapScan(Plan plan) { + if (plan instanceof LogicalOlapScan) { + return (LogicalOlapScan) plan; + } + for (Plan child : plan.children()) { + LogicalOlapScan result = findOlapScan(child); + if (result != null) { + return result; + } + } + return null; + } + + @Override + public RulePromise defaultPromise() { + return RulePromise.REWRITE; + } +} diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index 7fcb1912c67..9c6320c86e8 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -896,6 +896,8 @@ struct TOlapScanNode { 23: optional TScoreRangeInfo score_range_info // Enable value predicate pushdown for MOR tables 24: optional bool enable_mor_value_predicate_pushdown + // Read MOR table as DUP table: skip merge, skip delete sign + 25: optional bool read_mor_as_dup } struct TEqJoinCondition { diff --git a/regression-test/suites/unique_with_mor_p0/test_read_mor_as_dup.groovy b/regression-test/suites/unique_with_mor_p0/test_read_mor_as_dup.groovy new file mode 100644 index 00000000000..aada4292ff9 --- /dev/null +++ b/regression-test/suites/unique_with_mor_p0/test_read_mor_as_dup.groovy @@ -0,0 +1,119 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_read_mor_as_dup") { + def tableName = "test_read_mor_as_dup_tbl" + def tableName2 = "test_read_mor_as_dup_tbl2" + + sql "DROP TABLE IF EXISTS ${tableName}" + sql "DROP TABLE IF EXISTS ${tableName2}" + + // Create a MOR (Merge-On-Read) UNIQUE table + sql """ + CREATE TABLE ${tableName} ( + `k` int NOT NULL, + `v1` int NULL, + `v2` varchar(100) NULL + ) ENGINE=OLAP + UNIQUE KEY(`k`) + DISTRIBUTED BY HASH(`k`) BUCKETS 1 + PROPERTIES ( + "enable_unique_key_merge_on_write" = "false", + "disable_auto_compaction" = "true", + "replication_num" = "1" + ); + """ + + // Create a second MOR table for per-table control test + sql """ + CREATE TABLE ${tableName2} ( + `k` int NOT NULL, + `v1` int NULL + ) ENGINE=OLAP + UNIQUE KEY(`k`) + DISTRIBUTED BY HASH(`k`) BUCKETS 1 + PROPERTIES ( + "enable_unique_key_merge_on_write" = "false", + "disable_auto_compaction" = "true", + "replication_num" = "1" + ); + """ + + // Insert multiple versions of the same key + sql "INSERT INTO ${tableName} VALUES (1, 10, 'first');" + sql "INSERT INTO ${tableName} VALUES (1, 20, 'second');" + sql "INSERT INTO ${tableName} VALUES (1, 30, 'third');" + sql "INSERT INTO ${tableName} VALUES (2, 100, 'only_version');" + + // Insert a row and then delete it + sql "INSERT INTO ${tableName} VALUES (3, 50, 'to_be_deleted');" + sql "DELETE FROM ${tableName} WHERE k = 3;" + + // Insert into second table + sql "INSERT INTO ${tableName2} VALUES (1, 10);" + sql "INSERT INTO ${tableName2} VALUES (1, 20);" + + // === Test 1: Normal query returns merged result === + sql "SET read_mor_as_dup_tables = '';" + def normalResult = sql "SELECT * FROM ${tableName} ORDER BY k;" + // Should see only latest version per key: (1,30,'third'), (2,100,'only_version') + // Key 3 was deleted, should not appear + assertTrue(normalResult.size() == 2, "Normal query should return 2 rows (merged), got ${normalResult.size()}") + + // === Test 2: Wildcard — read all MOR tables as DUP === + sql "SET read_mor_as_dup_tables = '*';" + def dupResult = sql "SELECT * FROM ${tableName} ORDER BY k, v1;" + // Should see all row versions for key 1 (3 versions) + key 2 (1 version) + // Key 3: delete predicates are still applied, so deleted row should still be filtered + // But the delete sign filter is skipped, so we may see it depending on how delete was done. + // For MOR tables, DELETE adds a delete predicate in rowsets, which IS still honored. + assertTrue(dupResult.size() >= 4, "DUP mode should return at least 4 rows (all versions), got ${dupResult.size()}") + + // Verify key 1 has multiple versions + def key1Rows = dupResult.findAll { it[0] == 1 } + assertTrue(key1Rows.size() == 3, "Key 1 should have 3 versions in DUP mode, got ${key1Rows.size()}") + + // === Test 3: Per-table control with db.table format === + def dbName = sql "SELECT DATABASE();" + def currentDb = dbName[0][0] + + sql "SET read_mor_as_dup_tables = '${currentDb}.${tableName}';" + + // tableName should be in DUP mode + def perTableResult = sql "SELECT * FROM ${tableName} ORDER BY k, v1;" + assertTrue(perTableResult.size() >= 4, "Per-table DUP mode should return all versions, got ${perTableResult.size()}") + + // tableName2 should still be in normal (merged) mode + def table2Result = sql "SELECT * FROM ${tableName2} ORDER BY k;" + assertTrue(table2Result.size() == 1, "Table2 should still be merged (1 row), got ${table2Result.size()}") + + // === Test 4: Per-table control with just table name === + sql "SET read_mor_as_dup_tables = '${tableName2}';" + + // tableName should now be in normal mode + def revertedResult = sql "SELECT * FROM ${tableName} ORDER BY k;" + assertTrue(revertedResult.size() == 2, "Table1 should be merged again (2 rows), got ${revertedResult.size()}") + + // tableName2 should be in DUP mode + def table2DupResult = sql "SELECT * FROM ${tableName2} ORDER BY k, v1;" + assertTrue(table2DupResult.size() == 2, "Table2 in DUP mode should return 2 rows, got ${table2DupResult.size()}") + + // === Cleanup === + sql "SET read_mor_as_dup_tables = '';" + sql "DROP TABLE IF EXISTS ${tableName}" + sql "DROP TABLE IF EXISTS ${tableName2}" +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
