This is an automated email from the ASF dual-hosted git repository.
morrysnow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 7aec6ffb6ae [fix](mtmv) Fix get mv read lock too late when rewritten
by materialized view (#44164)
7aec6ffb6ae is described below
commit 7aec6ffb6aef6b41f345cfeaf5fbdd78d19bc5b9
Author: seawinde <[email protected]>
AuthorDate: Tue Nov 19 16:13:40 2024 +0800
[fix](mtmv) Fix get mv read lock too late when rewritten by materialized
view (#44164)
Problem Summary:
When materialized view is rewritten, it would use the mv metadata.
Should try to get read lock before use these metadata. or it would cause
error.
Such as mv def is as following
CREATE MATERIALIZED VIEW mv1
BUILD IMMEDIATE REFRESH COMPLETE ON MANUAL
DISTRIBUTED BY RANDOM BUCKETS 2
PROPERTIES ('replication_num' = '1')
AS
select
o_orderdate,
o_shippriority,
o_comment,
o.o_code as o_o_code,
l_orderkey,
l_partkey,
l.o_code as l_o_code
from
orders_same_col o left
join lineitem_same_col l on l_orderkey = o_orderkey
left join partsupp on ps_partkey = l_partkey and l_suppkey =
ps_suppkey;
When handling transparent rewriting, a MV scan plan is used for the
transparent rewrite. During the initialization of the scan plan, the
partitions of the table are retrieved, so it is necessary to attempt to
acquire a read lock on the table during initialization. If the read lock
is not acquired, subsequent operations may add or delete partitions, and
in the later processing of table partitions, calling get Partition may
not retrieve the corresponding partition, leading to data errors.
---
.../mv/AbstractMaterializedViewRule.java | 12 ++----
.../mv/AsyncMaterializationContext.java | 14 ++++---
.../exploration/mv/MaterializationContext.java | 48 +++++++++++----------
.../exploration/mv/SyncMaterializationContext.java | 25 +++++++----
.../doris/nereids/mv/IdStatisticsMapTest.java | 2 +-
.../mv/join/left_outer/outer_join.out | 46 ++++++++++++++++++++
.../mv/join/left_outer/outer_join.groovy | 49 ++++++++++++++++++++++
7 files changed, 150 insertions(+), 46 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/AbstractMaterializedViewRule.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/AbstractMaterializedViewRule.java
index 60b5c58d4c5..8e9ef1eaa97 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/AbstractMaterializedViewRule.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/AbstractMaterializedViewRule.java
@@ -234,7 +234,7 @@ public abstract class AbstractMaterializedViewRule
implements ExplorationRuleFac
continue;
}
Plan rewrittenPlan;
- Plan mvScan = materializationContext.getScanPlan(queryStructInfo);
+ Plan mvScan = materializationContext.getScanPlan(queryStructInfo,
cascadesContext);
Plan queryPlan = queryStructInfo.getTopPlan();
if (compensatePredicates.isAlwaysTrue()) {
rewrittenPlan = mvScan;
@@ -262,12 +262,6 @@ public abstract class AbstractMaterializedViewRule
implements ExplorationRuleFac
// Rewrite query by view
rewrittenPlan = rewriteQueryByView(matchMode, queryStructInfo,
viewStructInfo, viewToQuerySlotMapping,
rewrittenPlan, materializationContext, cascadesContext);
- // If rewrite successfully, try to get mv read lock to avoid data
inconsistent,
- // try to get lock which should added before RBO
- if (materializationContext instanceof AsyncMaterializationContext
&& !materializationContext.isSuccess()) {
- cascadesContext.getStatementContext()
- .addTableReadLock(((AsyncMaterializationContext)
materializationContext).getMtmv());
- }
rewrittenPlan =
MaterializedViewUtils.rewriteByRules(cascadesContext,
childContext -> {
Rewriter.getWholeTreeRewriter(childContext).execute();
@@ -379,9 +373,9 @@ public abstract class AbstractMaterializedViewRule
implements ExplorationRuleFac
}
trySetStatistics(materializationContext, cascadesContext);
rewriteResults.add(rewrittenPlan);
- // if rewrite successfully, try to regenerate mv scan because it
maybe used again
- materializationContext.tryReGenerateScanPlan(cascadesContext);
recordIfRewritten(queryStructInfo.getOriginalPlan(),
materializationContext, cascadesContext);
+ // If rewrite successfully, try to clear mv scan currently because
it maybe used again
+ materializationContext.clearScanPlan(cascadesContext);
}
return rewriteResults;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/AsyncMaterializationContext.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/AsyncMaterializationContext.java
index 0d88672fed6..96d37ad546a 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/AsyncMaterializationContext.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/AsyncMaterializationContext.java
@@ -57,9 +57,7 @@ public class AsyncMaterializationContext extends
MaterializationContext {
*/
public AsyncMaterializationContext(MTMV mtmv, Plan mvPlan, Plan
mvOriginalPlan, List<Table> baseTables,
List<Table> baseViews, CascadesContext cascadesContext, StructInfo
structInfo) {
- super(mvPlan, mvOriginalPlan,
MaterializedViewUtils.generateMvScanPlan(mtmv, mtmv.getBaseIndexId(),
- mtmv.getPartitionIds(), PreAggStatus.on(),
cascadesContext),
- cascadesContext, structInfo);
+ super(mvPlan, mvOriginalPlan, cascadesContext, structInfo);
this.mtmv = mtmv;
}
@@ -110,7 +108,7 @@ public class AsyncMaterializationContext extends
MaterializationContext {
return Optional.empty();
}
RelationId relationId = null;
- Optional<LogicalOlapScan> logicalOlapScan = this.getScanPlan(null)
+ Optional<LogicalOlapScan> logicalOlapScan = this.getScanPlan(null,
cascadesContext)
.collectFirst(LogicalOlapScan.class::isInstance);
if (logicalOlapScan.isPresent()) {
relationId = logicalOlapScan.get().getRelationId();
@@ -132,7 +130,13 @@ public class AsyncMaterializationContext extends
MaterializationContext {
}
@Override
- public Plan getScanPlan(StructInfo queryInfo) {
+ public Plan getScanPlan(StructInfo queryInfo, CascadesContext
cascadesContext) {
+ // If try to get scan plan or rewrite successfully, try to get mv read
lock to avoid meta data inconsistent,
+ // try to get lock which should added before RBO
+ if (!this.isSuccess()) {
+
cascadesContext.getStatementContext().addTableReadLock(this.getMtmv());
+ }
+ super.getScanPlan(queryInfo, cascadesContext);
return scanPlan;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/MaterializationContext.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/MaterializationContext.java
index df535d59d87..38eba2ac340 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/MaterializationContext.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/MaterializationContext.java
@@ -105,22 +105,13 @@ public abstract class MaterializationContext {
/**
* MaterializationContext, this contains necessary info for query
rewriting by materialization
*/
- public MaterializationContext(Plan plan, Plan originalPlan, Plan scanPlan,
+ public MaterializationContext(Plan plan, Plan originalPlan,
CascadesContext cascadesContext, StructInfo structInfo) {
this.plan = plan;
this.originalPlan = originalPlan;
- this.scanPlan = scanPlan;
-
StatementBase parsedStatement =
cascadesContext.getStatementContext().getParsedStatement();
this.enableRecordFailureDetail = parsedStatement != null &&
parsedStatement.isExplain()
&& ExplainLevel.MEMO_PLAN ==
parsedStatement.getExplainOptions().getExplainLevel();
- List<Slot> originalPlanOutput = originalPlan.getOutput();
- List<Slot> scanPlanOutput = this.scanPlan.getOutput();
- if (originalPlanOutput.size() == scanPlanOutput.size()) {
- for (int slotIndex = 0; slotIndex < originalPlanOutput.size();
slotIndex++) {
-
this.exprToScanExprMapping.put(originalPlanOutput.get(slotIndex),
scanPlanOutput.get(slotIndex));
- }
- }
// Construct materialization struct info, catch exception which may
cause planner roll back
this.structInfo = structInfo == null
? constructStructInfo(plan, originalPlan, cascadesContext, new
BitSet()).orElseGet(() -> null)
@@ -128,10 +119,6 @@ public abstract class MaterializationContext {
this.available = this.structInfo != null;
if (available) {
this.planOutputShuttledExpressions =
this.structInfo.getPlanOutputShuttledExpressions();
- // materialization output expression shuttle, this will be used to
expression rewrite
- this.shuttledExprToScanExprMapping = ExpressionMapping.generate(
- this.planOutputShuttledExpressions,
- scanPlanOutput);
}
}
@@ -176,17 +163,19 @@ public abstract class MaterializationContext {
* if MaterializationContext is already rewritten successfully, then
should generate new scan plan in later
* query rewrite, because one plan may hit the materialized view
repeatedly and the materialization scan output
* should be different.
- * This method should be called when query rewrite successfully
*/
- public void tryReGenerateScanPlan(CascadesContext cascadesContext) {
+ public void tryGenerateScanPlan(CascadesContext cascadesContext) {
+ if (!this.isAvailable()) {
+ return;
+ }
this.scanPlan = doGenerateScanPlan(cascadesContext);
- // materialization output expression shuttle, this will be used to
expression rewrite
- this.shuttledExprToScanExprMapping = ExpressionMapping.generate(
- this.planOutputShuttledExpressions,
- this.scanPlan.getOutput());
+ // Materialization output expression shuttle, this will be used to
expression rewrite
+ List<Slot> scanPlanOutput = this.scanPlan.getOutput();
+ this.shuttledExprToScanExprMapping =
ExpressionMapping.generate(this.planOutputShuttledExpressions,
+ scanPlanOutput);
+ // This is used by normalize statistics column expression
Map<Expression, Expression> regeneratedMapping = new HashMap<>();
List<Slot> originalPlanOutput = originalPlan.getOutput();
- List<Slot> scanPlanOutput = this.scanPlan.getOutput();
if (originalPlanOutput.size() == scanPlanOutput.size()) {
for (int slotIndex = 0; slotIndex < originalPlanOutput.size();
slotIndex++) {
regeneratedMapping.put(originalPlanOutput.get(slotIndex),
scanPlanOutput.get(slotIndex));
@@ -195,6 +184,17 @@ public abstract class MaterializationContext {
this.exprToScanExprMapping = regeneratedMapping;
}
+ /**
+ * Should clear scan plan after materializationContext is already
rewritten successfully,
+ * Because one plan may hit the materialized view repeatedly and the
materialization scan output
+ * should be different.
+ */
+ public void clearScanPlan(CascadesContext cascadesContext) {
+ this.scanPlan = null;
+ this.shuttledExprToScanExprMapping = null;
+ this.exprToScanExprMapping = null;
+ }
+
public void addSlotMappingToCache(RelationMapping relationMapping,
SlotMapping slotMapping) {
queryToMaterializationSlotMappingCache.put(relationMapping,
slotMapping);
}
@@ -275,7 +275,11 @@ public abstract class MaterializationContext {
return originalPlan;
}
- public Plan getScanPlan(StructInfo queryStructInfo) {
+ public Plan getScanPlan(StructInfo queryStructInfo, CascadesContext
cascadesContext) {
+ if (this.scanPlan == null || this.shuttledExprToScanExprMapping == null
+ || this.exprToScanExprMapping == null) {
+ tryGenerateScanPlan(cascadesContext);
+ }
return scanPlan;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/SyncMaterializationContext.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/SyncMaterializationContext.java
index 47b01385ac1..e27b3d51743 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/SyncMaterializationContext.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/SyncMaterializationContext.java
@@ -25,6 +25,7 @@ import org.apache.doris.nereids.trees.plans.ObjectId;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.PreAggStatus;
import org.apache.doris.nereids.trees.plans.RelationId;
+import org.apache.doris.nereids.trees.plans.algebra.CatalogRelation;
import org.apache.doris.nereids.trees.plans.algebra.Relation;
import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan;
@@ -55,9 +56,7 @@ public class SyncMaterializationContext extends
MaterializationContext {
*/
public SyncMaterializationContext(Plan mvPlan, Plan mvOriginalPlan,
OlapTable olapTable,
long indexId, String indexName, CascadesContext cascadesContext,
Statistics statistics) {
- super(mvPlan, mvOriginalPlan,
- MaterializedViewUtils.generateMvScanPlan(olapTable, indexId,
olapTable.getPartitionIds(),
- PreAggStatus.unset(), cascadesContext),
cascadesContext, null);
+ super(mvPlan, mvOriginalPlan, cascadesContext, null);
this.olapTable = olapTable;
this.indexId = indexId;
this.indexName = indexName;
@@ -100,7 +99,7 @@ public class SyncMaterializationContext extends
MaterializationContext {
@Override
Optional<Pair<Id, Statistics>> getPlanStatistics(CascadesContext
cascadesContext) {
RelationId relationId = null;
- Optional<LogicalOlapScan> scanObj = this.getScanPlan(null)
+ Optional<LogicalOlapScan> scanObj = this.getScanPlan(null,
cascadesContext)
.collectFirst(LogicalOlapScan.class::isInstance);
if (scanObj.isPresent()) {
relationId = scanObj.get().getRelationId();
@@ -109,19 +108,27 @@ public class SyncMaterializationContext extends
MaterializationContext {
}
@Override
- public Plan getScanPlan(StructInfo queryStructInfo) {
+ public Plan getScanPlan(StructInfo queryStructInfo, CascadesContext
cascadesContext) {
+ // Already get lock if sync mv, doesn't need to get lock
+ super.getScanPlan(queryStructInfo, cascadesContext);
if (queryStructInfo == null) {
return scanPlan;
}
- if (queryStructInfo.getRelations().size() == 1
- && queryStructInfo.getRelations().get(0) instanceof
LogicalOlapScan
- && !((LogicalOlapScan)
queryStructInfo.getRelations().get(0)).getSelectedPartitionIds().isEmpty()) {
+ List<CatalogRelation> queryStructInfoRelations =
queryStructInfo.getRelations();
+ if (queryStructInfoRelations.size() == 1
+ && queryStructInfoRelations.get(0) instanceof LogicalOlapScan
+ && !((LogicalOlapScan)
queryStructInfoRelations.get(0)).getSelectedPartitionIds().isEmpty()) {
// Partition prune if sync materialized view
return scanPlan.accept(new DefaultPlanRewriter<Void>() {
@Override
public Plan visitLogicalOlapScan(LogicalOlapScan olapScan,
Void context) {
+ if
(!queryStructInfoRelations.get(0).getTable().getFullQualifiers().equals(
+ olapScan.getTable().getFullQualifiers())) {
+ // Only the same table, we can do partition prue
+ return olapScan;
+ }
return olapScan.withSelectedPartitionIds(
- ((LogicalOlapScan)
queryStructInfo.getRelations().get(0)).getSelectedPartitionIds());
+ ((LogicalOlapScan)
queryStructInfoRelations.get(0)).getSelectedPartitionIds());
}
}, null);
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/nereids/mv/IdStatisticsMapTest.java
b/fe/fe-core/src/test/java/org/apache/doris/nereids/mv/IdStatisticsMapTest.java
index a4c05fa81e6..0090982db00 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/nereids/mv/IdStatisticsMapTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/nereids/mv/IdStatisticsMapTest.java
@@ -76,7 +76,7 @@ public class IdStatisticsMapTest extends SqlTestBase {
.rewrite();
// scan plan output will be refreshed after mv rewrite successfully,
so need tmp store
Set<Slot> materializationScanOutput =
c1.getMaterializationContexts().get(0)
- .getScanPlan(null).getOutputSet();
+ .getScanPlan(null, c1).getOutputSet();
tmpPlanChecker
.optimize()
.printlnBestPlanTree();
diff --git
a/regression-test/data/nereids_rules_p0/mv/join/left_outer/outer_join.out
b/regression-test/data/nereids_rules_p0/mv/join/left_outer/outer_join.out
index 1a1b846054b..b8e78048d8e 100644
--- a/regression-test/data/nereids_rules_p0/mv/join/left_outer/outer_join.out
+++ b/regression-test/data/nereids_rules_p0/mv/join/left_outer/outer_join.out
@@ -373,3 +373,49 @@
2023-12-12 2 mi 108 2
2023-12-12 2 mi 108 2
+-- !query12_0_before --
+2023-12-09 1 yy 95 4
+2023-12-09 1 yy 95 4
+2023-12-09 1 yy 96 4
+2023-12-09 1 yy 96 4
+2023-12-09 1 yy 97 4
+2023-12-09 1 yy 97 4
+2023-12-10 1 yy 100 2
+2023-12-10 1 yy 101 2
+2023-12-10 1 yy 98 2
+2023-12-10 1 yy 99 2
+2023-12-11 2 mm 102 3
+2023-12-11 2 mm 103 3
+2023-12-11 2 mm 104 3
+2023-12-12 2 mi 105 2
+2023-12-12 2 mi 105 2
+2023-12-12 2 mi 106 2
+2023-12-12 2 mi 106 2
+2023-12-12 2 mi 107 2
+2023-12-12 2 mi 107 2
+2023-12-12 2 mi 108 2
+2023-12-12 2 mi 108 2
+
+-- !query12_0_after --
+2023-12-09 1 yy 95 4
+2023-12-09 1 yy 95 4
+2023-12-09 1 yy 96 4
+2023-12-09 1 yy 96 4
+2023-12-09 1 yy 97 4
+2023-12-09 1 yy 97 4
+2023-12-10 1 yy 100 2
+2023-12-10 1 yy 101 2
+2023-12-10 1 yy 98 2
+2023-12-10 1 yy 99 2
+2023-12-11 2 mm 102 3
+2023-12-11 2 mm 103 3
+2023-12-11 2 mm 104 3
+2023-12-12 2 mi 105 2
+2023-12-12 2 mi 105 2
+2023-12-12 2 mi 106 2
+2023-12-12 2 mi 106 2
+2023-12-12 2 mi 107 2
+2023-12-12 2 mi 107 2
+2023-12-12 2 mi 108 2
+2023-12-12 2 mi 108 2
+
diff --git
a/regression-test/suites/nereids_rules_p0/mv/join/left_outer/outer_join.groovy
b/regression-test/suites/nereids_rules_p0/mv/join/left_outer/outer_join.groovy
index f31a1a77978..faa2c747a83 100644
---
a/regression-test/suites/nereids_rules_p0/mv/join/left_outer/outer_join.groovy
+++
b/regression-test/suites/nereids_rules_p0/mv/join/left_outer/outer_join.groovy
@@ -759,4 +759,53 @@ suite("outer_join") {
async_mv_rewrite_success(db, mv11_0, query11_0, "mv11_0")
order_qt_query11_0_after "${query11_0}"
sql """ DROP MATERIALIZED VIEW IF EXISTS mv11_0"""
+
+
+ def mv12_0 = """
+ select
+ o_orderdate,
+ o_shippriority,
+ o_comment,
+ o.o_code as o_o_code,
+ l_orderkey,
+ l_partkey,
+ l.o_code as l_o_code
+ from
+ orders_same_col o left
+ join lineitem_same_col l on l_orderkey = o_orderkey
+ left join partsupp on ps_partkey = l_partkey and l_suppkey =
ps_suppkey;
+ """
+
+ def query12_0 = """
+ select
+ o_orderdate,
+ o_shippriority,
+ o_comment,
+ o.o_code
+ l_orderkey,
+ l_partkey
+ from
+ orders_same_col o left
+ join lineitem_same_col l on l_orderkey = o_orderkey
+ left join partsupp on ps_partkey = l_partkey and l_suppkey =
ps_suppkey
+ where l.o_code <> '91'
+ union all
+ select
+ o_orderdate,
+ o_shippriority,
+ o_comment,
+ o.o_code
+ l_orderkey,
+ l_partkey
+ from
+ orders_same_col o left
+ join lineitem_same_col l on l_orderkey = o_orderkey
+ left join partsupp on ps_partkey = l_partkey and l_suppkey =
ps_suppkey
+ where l.o_code = '92';
+ """
+
+ order_qt_query12_0_before "${query12_0}"
+ async_mv_rewrite_success(db, mv12_0, query12_0, "mv12_0")
+ order_qt_query12_0_after "${query12_0}"
+ sql """ DROP MATERIALIZED VIEW IF EXISTS mv12_0"""
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]