This is an automated email from the ASF dual-hosted git repository.
morrysnow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 6bed454704e [fix](mtmv) release read lock when align mvmv's partition
(#53069)
6bed454704e is described below
commit 6bed454704e9dae5d69c4d91b723b1dd3940cd64
Author: Lijia Liu <[email protected]>
AuthorDate: Fri Jul 18 11:32:47 2025 +0800
[fix](mtmv) release read lock when align mvmv's partition (#53069)
I found a dead lock logic in the mv task dispatch.
In the following picture, both the table A and table B are MVs.
Thread 7 holds table-A's read lock to align partitions. It will add or
drop the table-B's partitions.
Thread 1 wants to get table-A's write lock to drop some partitions, and
it is blocked.
Thread 6 holds table-B's read lock and it want to get table-A's read
lock to generate plan. Because the table lock is fair, thread 6 is
blocked and placed after thread 1.
This state will last for one minute until thread 6 timeout to get
table-A's read lock. But if thread 7 needs to manipulate a large number
of partitions, the probability of this situation occurring will be very
high.
Resolve lock conflic.
After thie PR, if this mv uses an olap table as base table, the olap
table's partition may changes.
For example:
create table lineitem (
l_shipdate datetime
)
PARTITION BY RANGE(l_shipdate) (
FROM ('2020-01-01') TO ('2020-02-10') INTERVAL 1 DAY
);
CREATE MATERIALIZED VIEW mv_t
REFRESH AUTO ON SCHEDULE EVERY 1 hour
PARTITION by(order_date_month)
AS
SELECT
date_trunc(o_orderdate, 'month') as order_date_month
FROM lineitem;
1. align the partitions:
'2020-01-01' -> '2020-01-01', '2020-01-02', ..., '2020-01-31'
'2020-02-01' -> '2020-02-01', '2020-02-02', ..., '2020-02-09'
2. add and drop partitions in mv:
create two partitions in mv_t: '2020-01-01' and '2020-02-01'.
3. The table `lineitem`'s partitions may changes:
alter table lineitem drop partition p20200101;
alter table lineitme add partition p20200220 ...;
alter table lineitme add partition p20200320 ...;
4. Build MTMVRefreshContext and calculateNeedRefreshPartitions:
4.1 partitionMappings is:
'2020-01-01' -> '2020-01-02',..., '2020-01-31'
'2020-02-01' -> '2020-02-01', '2020-02-02', ..., '2020-02-09', '2020-02-20'
4.2 generate table versions and partitions vertions.
4.3 The partitions that need refresh are:
'2020-01-01' because partition '2020-01-01' is dropped.
'2020-02-01' becuause new partition '2020-02-20' is added.
---------
Co-authored-by: liutang123 <[email protected]>
---
.../org/apache/doris/job/extensions/mtmv/MTMVTask.java | 18 +++++++++++++++++-
.../java/org/apache/doris/mtmv/MTMVPartitionUtil.java | 16 ++++++++++------
2 files changed, 27 insertions(+), 7 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java
index 3c8ebc4e5e9..4e934509bc8 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java
@@ -17,6 +17,7 @@
package org.apache.doris.job.extensions.mtmv;
+import org.apache.doris.analysis.PartitionKeyDesc;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.MTMV;
@@ -27,6 +28,7 @@ import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.FeConstants;
+import org.apache.doris.common.Pair;
import org.apache.doris.common.Status;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.DebugPointUtil;
@@ -195,6 +197,7 @@ public class MTMVTask extends AbstractTask {
tableIfs.sort(Comparator.comparing(TableIf::getId));
MTMVRefreshContext context;
+ Pair<List<String>, List<PartitionKeyDesc>> syncPartitions = null;
// lock table order by id to avoid deadlock
MetaLockUtils.readLockTables(tableIfs);
try {
@@ -211,8 +214,21 @@ public class MTMVTask extends AbstractTask {
+ " e.g. Table has multiple partition columns"
+ " or including not supported transform
functions.");
}
- MTMVPartitionUtil.alignMvPartition(mtmv);
+ syncPartitions = MTMVPartitionUtil.alignMvPartition(mtmv);
}
+ } finally {
+ MetaLockUtils.readUnlockTables(tableIfs);
+ }
+ if (syncPartitions != null) {
+ for (String pName : syncPartitions.first) {
+ MTMVPartitionUtil.dropPartition(mtmv, pName);
+ }
+ for (PartitionKeyDesc partitionKeyDesc :
syncPartitions.second) {
+ MTMVPartitionUtil.addPartition(mtmv, partitionKeyDesc);
+ }
+ }
+ MetaLockUtils.readLockTables(tableIfs);
+ try {
context = MTMVRefreshContext.buildContext(mtmv);
this.needRefreshPartitions =
calculateNeedRefreshPartitions(context);
} finally {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionUtil.java
b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionUtil.java
index 3655fe34055..74cf6396eab 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionUtil.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionUtil.java
@@ -33,6 +33,7 @@ import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.Type;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.DdlException;
+import org.apache.doris.common.Pair;
import org.apache.doris.datasource.mvcc.MvccUtil;
import org.apache.doris.mtmv.MTMVPartitionInfo.MTMVPartitionType;
import org.apache.doris.rpc.RpcException;
@@ -47,6 +48,7 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -113,24 +115,26 @@ public class MTMVPartitionUtil {
* @throws DdlException
* @throws AnalysisException
*/
- public static void alignMvPartition(MTMV mtmv)
- throws DdlException, AnalysisException {
+ public static Pair<List<String>, List<PartitionKeyDesc>>
alignMvPartition(MTMV mtmv) throws AnalysisException {
Map<String, PartitionKeyDesc> mtmvPartitionDescs =
mtmv.generateMvPartitionDescs();
Set<PartitionKeyDesc> relatedPartitionDescs =
generateRelatedPartitionDescs(mtmv.getMvPartitionInfo(),
mtmv.getMvProperties()).keySet();
+ List<String> partitionsToDrop = new ArrayList<>();
+ List<PartitionKeyDesc> partitionsToAdd = new ArrayList<>();
// drop partition of mtmv
for (Entry<String, PartitionKeyDesc> entry :
mtmvPartitionDescs.entrySet()) {
if (!relatedPartitionDescs.contains(entry.getValue())) {
- dropPartition(mtmv, entry.getKey());
+ partitionsToDrop.add(entry.getKey());
}
}
// add partition for mtmv
HashSet<PartitionKeyDesc> mtmvPartitionDescsSet =
Sets.newHashSet(mtmvPartitionDescs.values());
for (PartitionKeyDesc desc : relatedPartitionDescs) {
if (!mtmvPartitionDescsSet.contains(desc)) {
- addPartition(mtmv, desc);
+ partitionsToAdd.add(desc);
}
}
+ return Pair.of(partitionsToDrop, partitionsToAdd);
}
/**
@@ -365,7 +369,7 @@ public class MTMVPartitionUtil {
* @param mtmv
* @param partitionName
*/
- private static void dropPartition(MTMV mtmv, String partitionName) throws
DdlException {
+ public static void dropPartition(MTMV mtmv, String partitionName) throws
DdlException {
if (!mtmv.writeLockIfExist()) {
return;
}
@@ -386,7 +390,7 @@ public class MTMVPartitionUtil {
* @param oldPartitionKeyDesc
* @throws DdlException
*/
- private static void addPartition(MTMV mtmv, PartitionKeyDesc
oldPartitionKeyDesc)
+ public static void addPartition(MTMV mtmv, PartitionKeyDesc
oldPartitionKeyDesc)
throws DdlException {
Map<String, String> partitionProperties = Maps.newHashMap();
SinglePartitionDesc singlePartitionDesc = new SinglePartitionDesc(true,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]