This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit c60fea9bdfedd2e85b9c8ba32290fa003f6cff0e Author: zhangdong <[email protected]> AuthorDate: Thu Feb 29 12:06:38 2024 +0800 [fix](mtmv)fix getIdToItem cause ConcurrentModificationException (#31511) --- .../main/java/org/apache/doris/catalog/MTMV.java | 110 +++++++++++++++------ .../java/org/apache/doris/catalog/OlapTable.java | 23 ++++- .../doris/datasource/hive/HMSExternalTable.java | 6 +- .../org/apache/doris/mtmv/MTMVPartitionUtil.java | 39 +++++--- .../org/apache/doris/mtmv/MTMVRelatedTableIf.java | 2 +- .../apache/doris/mtmv/MTMVPartitionUtilTest.java | 4 + 6 files changed, 130 insertions(+), 54 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/MTMV.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/MTMV.java index 10739be8538..05622bfc0e7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/MTMV.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/MTMV.java @@ -112,7 +112,12 @@ public class MTMV extends OlapTable { } public MTMVRefreshInfo getRefreshInfo() { - return refreshInfo; + readMvLock(); + try { + return refreshInfo; + } finally { + readMvUnlock(); + } } public String getQuerySql() { @@ -120,8 +125,8 @@ public class MTMV extends OlapTable { } public MTMVStatus getStatus() { + readMvLock(); try { - readMvLock(); return status; } finally { readMvUnlock(); @@ -133,11 +138,21 @@ public class MTMV extends OlapTable { } public MTMVJobInfo getJobInfo() { - return jobInfo; + readMvLock(); + try { + return jobInfo; + } finally { + readMvUnlock(); + } } public MTMVRelation getRelation() { - return relation; + readMvLock(); + try { + return relation; + } finally { + readMvUnlock(); + } } public void setCache(MTMVCache cache) { @@ -145,12 +160,17 @@ public class MTMV extends OlapTable { } public MTMVRefreshInfo alterRefreshInfo(MTMVRefreshInfo newRefreshInfo) { - return refreshInfo.updateNotNull(newRefreshInfo); + writeMvLock(); + try { + return refreshInfo.updateNotNull(newRefreshInfo); + } finally { + writeMvUnlock(); + } } public MTMVStatus alterStatus(MTMVStatus newStatus) { + writeMvLock(); try { - writeMvLock(); return this.status.updateNotNull(newStatus); } finally { writeMvUnlock(); @@ -159,8 +179,8 @@ public class MTMV extends OlapTable { public void addTaskResult(MTMVTask task, MTMVRelation relation, Map<String, MTMVRefreshPartitionSnapshot> partitionSnapshots) { + writeMvLock(); try { - writeMvLock(); if (task.getStatus() == TaskStatus.SUCCESS) { this.status.setState(MTMVState.NORMAL); this.status.setSchemaChangeDetail(null); @@ -185,41 +205,66 @@ public class MTMV extends OlapTable { } public Map<String, String> alterMvProperties(Map<String, String> mvProperties) { - this.mvProperties.putAll(mvProperties); - return this.mvProperties; + writeMvLock(); + try { + this.mvProperties.putAll(mvProperties); + return this.mvProperties; + } finally { + writeMvUnlock(); + } } public long getGracePeriod() { - if (mvProperties.containsKey(PropertyAnalyzer.PROPERTIES_GRACE_PERIOD)) { - return Long.parseLong(mvProperties.get(PropertyAnalyzer.PROPERTIES_GRACE_PERIOD)) * 1000; - } else { - return 0L; + readMvLock(); + try { + if (mvProperties.containsKey(PropertyAnalyzer.PROPERTIES_GRACE_PERIOD)) { + return Long.parseLong(mvProperties.get(PropertyAnalyzer.PROPERTIES_GRACE_PERIOD)) * 1000; + } else { + return 0L; + } + } finally { + readMvUnlock(); } } public Optional<String> getWorkloadGroup() { - if (mvProperties.containsKey(PropertyAnalyzer.PROPERTIES_WORKLOAD_GROUP) && !StringUtils - .isEmpty(mvProperties.get(PropertyAnalyzer.PROPERTIES_WORKLOAD_GROUP))) { - return Optional.of(mvProperties.get(PropertyAnalyzer.PROPERTIES_WORKLOAD_GROUP)); + readMvLock(); + try { + if (mvProperties.containsKey(PropertyAnalyzer.PROPERTIES_WORKLOAD_GROUP) && !StringUtils + .isEmpty(mvProperties.get(PropertyAnalyzer.PROPERTIES_WORKLOAD_GROUP))) { + return Optional.of(mvProperties.get(PropertyAnalyzer.PROPERTIES_WORKLOAD_GROUP)); + } + return Optional.empty(); + } finally { + readMvUnlock(); } - return Optional.empty(); } public int getRefreshPartitionNum() { - if (mvProperties.containsKey(PropertyAnalyzer.PROPERTIES_REFRESH_PARTITION_NUM)) { - int value = Integer.parseInt(mvProperties.get(PropertyAnalyzer.PROPERTIES_REFRESH_PARTITION_NUM)); - return value < 1 ? MTMVTask.DEFAULT_REFRESH_PARTITION_NUM : value; - } else { - return MTMVTask.DEFAULT_REFRESH_PARTITION_NUM; + readMvLock(); + try { + if (mvProperties.containsKey(PropertyAnalyzer.PROPERTIES_REFRESH_PARTITION_NUM)) { + int value = Integer.parseInt(mvProperties.get(PropertyAnalyzer.PROPERTIES_REFRESH_PARTITION_NUM)); + return value < 1 ? MTMVTask.DEFAULT_REFRESH_PARTITION_NUM : value; + } else { + return MTMVTask.DEFAULT_REFRESH_PARTITION_NUM; + } + } finally { + readMvUnlock(); } } public Set<String> getExcludedTriggerTables() { - if (!mvProperties.containsKey(PropertyAnalyzer.PROPERTIES_EXCLUDED_TRIGGER_TABLES)) { - return Sets.newHashSet(); + readMvLock(); + try { + if (!mvProperties.containsKey(PropertyAnalyzer.PROPERTIES_EXCLUDED_TRIGGER_TABLES)) { + return Sets.newHashSet(); + } + String[] split = mvProperties.get(PropertyAnalyzer.PROPERTIES_EXCLUDED_TRIGGER_TABLES).split(","); + return Sets.newHashSet(split); + } finally { + readMvUnlock(); } - String[] split = mvProperties.get(PropertyAnalyzer.PROPERTIES_EXCLUDED_TRIGGER_TABLES).split(","); - return Sets.newHashSet(split); } public MTMVCache getOrGenerateCache() throws AnalysisException { @@ -237,7 +282,12 @@ public class MTMV extends OlapTable { } public Map<String, String> getMvProperties() { - return mvProperties; + readMvLock(); + try { + return mvProperties; + } finally { + readMvUnlock(); + } } public MTMVPartitionInfo getMvPartitionInfo() { @@ -254,7 +304,7 @@ public class MTMV extends OlapTable { * @return mvPartitionId ==> mvPartitionKeyDesc */ public Map<Long, PartitionKeyDesc> generateMvPartitionDescs() { - Map<Long, PartitionItem> mtmvItems = getPartitionItems(); + Map<Long, PartitionItem> mtmvItems = getAndCopyPartitionItems(); Map<Long, PartitionKeyDesc> result = Maps.newHashMap(); for (Entry<Long, PartitionItem> entry : mtmvItems.entrySet()) { result.put(entry.getKey(), entry.getValue().toPartitionKeyDesc()); @@ -276,7 +326,7 @@ public class MTMV extends OlapTable { return Maps.newHashMap(); } Map<PartitionKeyDesc, Set<Long>> res = new HashMap<>(); - Map<Long, PartitionItem> relatedPartitionItems = mvPartitionInfo.getRelatedTable().getPartitionItems(); + Map<Long, PartitionItem> relatedPartitionItems = mvPartitionInfo.getRelatedTable().getAndCopyPartitionItems(); int relatedColPos = mvPartitionInfo.getRelatedColPos(); for (Entry<Long, PartitionItem> entry : relatedPartitionItems.entrySet()) { PartitionKeyDesc partitionKeyDesc = entry.getValue().toPartitionKeyDesc(relatedColPos); @@ -303,7 +353,7 @@ public class MTMV extends OlapTable { } Map<Long, Set<Long>> res = Maps.newHashMap(); Map<PartitionKeyDesc, Set<Long>> relatedPartitionDescs = generateRelatedPartitionDescs(); - Map<Long, PartitionItem> mvPartitionItems = getPartitionInfo().getIdToItem(false); + Map<Long, PartitionItem> mvPartitionItems = getAndCopyPartitionItems(); for (Entry<Long, PartitionItem> entry : mvPartitionItems.entrySet()) { res.put(entry.getKey(), relatedPartitionDescs.getOrDefault(entry.getValue().toPartitionKeyDesc(), Sets.newHashSet())); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java index df34f4333a7..2acdf00a491 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java @@ -1053,7 +1053,12 @@ public class OlapTable extends Table implements MTMVRelatedTableIf { } public List<Long> getPartitionIds() { - return new ArrayList<>(idToPartition.keySet()); + readLock(); + try { + return new ArrayList<>(idToPartition.keySet()); + } finally { + readUnlock(); + } } public Set<String> getCopiedBfColumns() { @@ -2572,8 +2577,13 @@ public class OlapTable extends Table implements MTMVRelatedTableIf { } @Override - public Map<Long, PartitionItem> getPartitionItems() { - return getPartitionInfo().getIdToItem(false); + public Map<Long, PartitionItem> getAndCopyPartitionItems() { + readLock(); + try { + return Maps.newHashMap(getPartitionInfo().getIdToItem(false)); + } finally { + readUnlock(); + } } @Override @@ -2595,7 +2605,12 @@ public class OlapTable extends Table implements MTMVRelatedTableIf { @Override public String getPartitionName(long partitionId) throws AnalysisException { - return getPartitionOrAnalysisException(partitionId).getName(); + readLock(); + try { + return getPartitionOrAnalysisException(partitionId).getName(); + } finally { + readUnlock(); + } } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java index d095a959e90..47b684c264b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java @@ -773,7 +773,7 @@ public class HMSExternalTable extends ExternalTable implements MTMVRelatedTableI } @Override - public Map<Long, PartitionItem> getPartitionItems() { + public Map<Long, PartitionItem> getAndCopyPartitionItems() { HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr() .getMetaStoreCache((HMSExternalCatalog) getCatalog()); HiveMetaStoreCache.HivePartitionValues hivePartitionValues = cache.getPartitionValues( @@ -816,7 +816,7 @@ public class HMSExternalTable extends ExternalTable implements MTMVRelatedTableI long partitionId = 0L; long maxVersionTime = 0L; long visibleVersionTime; - for (Entry<Long, PartitionItem> entry : getPartitionItems().entrySet()) { + for (Entry<Long, PartitionItem> entry : getAndCopyPartitionItems().entrySet()) { visibleVersionTime = getPartitionLastModifyTime(entry.getKey()); if (visibleVersionTime > maxVersionTime) { maxVersionTime = visibleVersionTime; @@ -831,7 +831,7 @@ public class HMSExternalTable extends ExternalTable implements MTMVRelatedTableI } private HivePartition getPartitionById(long partitionId) throws AnalysisException { - PartitionItem item = getPartitionItems().get(partitionId); + PartitionItem item = getAndCopyPartitionItems().get(partitionId); List<List<String>> partitionValuesList = transferPartitionItemToPartitionValues(item); List<HivePartition> partitions = getPartitionsByPartitionValues(partitionValuesList); if (partitions.size() != 1) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionUtil.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionUtil.java index 88fe02a8b4f..9c85725b5bb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionUtil.java @@ -141,7 +141,7 @@ public class MTMVPartitionUtil { throws AnalysisException { int pos = getPos(relatedTable, relatedCol); Set<PartitionKeyDesc> res = Sets.newHashSet(); - for (Entry<Long, PartitionItem> entry : relatedTable.getPartitionItems().entrySet()) { + for (Entry<Long, PartitionItem> entry : relatedTable.getAndCopyPartitionItems().entrySet()) { PartitionKeyDesc partitionKeyDesc = entry.getValue().toPartitionKeyDesc(pos); res.add(partitionKeyDesc); } @@ -163,18 +163,24 @@ public class MTMVPartitionUtil { public static List<String> getPartitionNamesByIds(MTMV mtmv, Collection<Long> ids) throws AnalysisException { List<String> res = Lists.newArrayList(); for (Long partitionId : ids) { - res.add(mtmv.getPartitionOrAnalysisException(partitionId).getName()); + res.add(mtmv.getPartitionName(partitionId)); } return res; } public static List<Long> getPartitionsIdsByNames(MTMV mtmv, List<String> partitions) throws AnalysisException { - List<Long> res = Lists.newArrayList(); - for (String partitionName : partitions) { - Partition partition = mtmv.getPartitionOrAnalysisException(partitionName); - res.add(partition.getId()); + mtmv.readLock(); + try { + List<Long> res = Lists.newArrayList(); + for (String partitionName : partitions) { + Partition partition = mtmv.getPartitionOrAnalysisException(partitionName); + res.add(partition.getId()); + } + return res; + } finally { + mtmv.readUnlock(); } - return res; + } /** @@ -209,9 +215,9 @@ public class MTMVPartitionUtil { public static boolean isMTMVSync(MTMV mtmv, Set<BaseTableInfo> tables, Set<String> excludeTables, Map<Long, Set<Long>> partitionMappings) throws AnalysisException { - Collection<Partition> partitions = mtmv.getPartitions(); - for (Partition partition : partitions) { - if (!isMTMVPartitionSync(mtmv, partition.getId(), partitionMappings.get(partition.getId()), tables, + List<Long> partitionIds = mtmv.getPartitionIds(); + for (Long partitionId : partitionIds) { + if (!isMTMVPartitionSync(mtmv, partitionId, partitionMappings.get(partitionId), tables, excludeTables)) { return false; } @@ -277,16 +283,16 @@ public class MTMVPartitionUtil { */ public static List<Long> getMTMVNeedRefreshPartitions(MTMV mtmv, Set<BaseTableInfo> baseTables, Map<Long, Set<Long>> partitionMappings) { - Collection<Partition> allPartitions = mtmv.getPartitions(); + List<Long> partitionIds = mtmv.getPartitionIds(); List<Long> res = Lists.newArrayList(); - for (Partition partition : allPartitions) { + for (Long partitionId : partitionIds) { try { - if (!isMTMVPartitionSync(mtmv, partition.getId(), partitionMappings.get(partition.getId()), baseTables, + if (!isMTMVPartitionSync(mtmv, partitionId, partitionMappings.get(partitionId), baseTables, mtmv.getExcludedTriggerTables())) { - res.add(partition.getId()); + res.add(partitionId); } } catch (AnalysisException e) { - res.add(partition.getId()); + res.add(partitionId); LOG.warn("check isMTMVPartitionSync failed", e); } } @@ -361,6 +367,7 @@ public class MTMVPartitionUtil { /** * add partition for mtmv like relatedPartitionId of relatedTable + * `Env.getCurrentEnv().addPartition` has obtained the lock internally, but we do not obtain the lock here * * @param mtmv * @param oldPartitionKeyDesc @@ -448,7 +455,7 @@ public class MTMVPartitionUtil { throws AnalysisException { Map<String, MTMVRefreshPartitionSnapshot> res = Maps.newHashMap(); for (Long partitionId : partitionIds) { - res.put(mtmv.getPartition(partitionId).getName(), + res.put(mtmv.getPartitionName(partitionId), generatePartitionSnapshot(mtmv, baseTables, partitionMappings.get(partitionId))); } return res; diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedTableIf.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedTableIf.java index 46454679b56..5ec7e98a407 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedTableIf.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedTableIf.java @@ -38,7 +38,7 @@ public interface MTMVRelatedTableIf extends TableIf { * * @return partitionId->PartitionItem */ - Map<Long, PartitionItem> getPartitionItems(); + Map<Long, PartitionItem> getAndCopyPartitionItems(); /** * getPartitionType LIST/RANGE/UNPARTITIONED diff --git a/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVPartitionUtilTest.java b/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVPartitionUtilTest.java index df38ce18720..bf819553e86 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVPartitionUtilTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVPartitionUtilTest.java @@ -71,6 +71,10 @@ public class MTMVPartitionUtilTest { minTimes = 0; result = Lists.newArrayList(p1); + mtmv.getPartitionIds(); + minTimes = 0; + result = Lists.newArrayList(1L); + p1.getId(); minTimes = 0; result = 1L; --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
