This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit c60fea9bdfedd2e85b9c8ba32290fa003f6cff0e
Author: zhangdong <[email protected]>
AuthorDate: Thu Feb 29 12:06:38 2024 +0800

    [fix](mtmv)fix getIdToItem cause ConcurrentModificationException (#31511)
---
 .../main/java/org/apache/doris/catalog/MTMV.java   | 110 +++++++++++++++------
 .../java/org/apache/doris/catalog/OlapTable.java   |  23 ++++-
 .../doris/datasource/hive/HMSExternalTable.java    |   6 +-
 .../org/apache/doris/mtmv/MTMVPartitionUtil.java   |  39 +++++---
 .../org/apache/doris/mtmv/MTMVRelatedTableIf.java  |   2 +-
 .../apache/doris/mtmv/MTMVPartitionUtilTest.java   |   4 +
 6 files changed, 130 insertions(+), 54 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/MTMV.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/MTMV.java
index 10739be8538..05622bfc0e7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/MTMV.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/MTMV.java
@@ -112,7 +112,12 @@ public class MTMV extends OlapTable {
     }
 
     public MTMVRefreshInfo getRefreshInfo() {
-        return refreshInfo;
+        readMvLock();
+        try {
+            return refreshInfo;
+        } finally {
+            readMvUnlock();
+        }
     }
 
     public String getQuerySql() {
@@ -120,8 +125,8 @@ public class MTMV extends OlapTable {
     }
 
     public MTMVStatus getStatus() {
+        readMvLock();
         try {
-            readMvLock();
             return status;
         } finally {
             readMvUnlock();
@@ -133,11 +138,21 @@ public class MTMV extends OlapTable {
     }
 
     public MTMVJobInfo getJobInfo() {
-        return jobInfo;
+        readMvLock();
+        try {
+            return jobInfo;
+        } finally {
+            readMvUnlock();
+        }
     }
 
     public MTMVRelation getRelation() {
-        return relation;
+        readMvLock();
+        try {
+            return relation;
+        } finally {
+            readMvUnlock();
+        }
     }
 
     public void setCache(MTMVCache cache) {
@@ -145,12 +160,17 @@ public class MTMV extends OlapTable {
     }
 
     public MTMVRefreshInfo alterRefreshInfo(MTMVRefreshInfo newRefreshInfo) {
-        return refreshInfo.updateNotNull(newRefreshInfo);
+        writeMvLock();
+        try {
+            return refreshInfo.updateNotNull(newRefreshInfo);
+        } finally {
+            writeMvUnlock();
+        }
     }
 
     public MTMVStatus alterStatus(MTMVStatus newStatus) {
+        writeMvLock();
         try {
-            writeMvLock();
             return this.status.updateNotNull(newStatus);
         } finally {
             writeMvUnlock();
@@ -159,8 +179,8 @@ public class MTMV extends OlapTable {
 
     public void addTaskResult(MTMVTask task, MTMVRelation relation,
             Map<String, MTMVRefreshPartitionSnapshot> partitionSnapshots) {
+        writeMvLock();
         try {
-            writeMvLock();
             if (task.getStatus() == TaskStatus.SUCCESS) {
                 this.status.setState(MTMVState.NORMAL);
                 this.status.setSchemaChangeDetail(null);
@@ -185,41 +205,66 @@ public class MTMV extends OlapTable {
     }
 
     public Map<String, String> alterMvProperties(Map<String, String> 
mvProperties) {
-        this.mvProperties.putAll(mvProperties);
-        return this.mvProperties;
+        writeMvLock();
+        try {
+            this.mvProperties.putAll(mvProperties);
+            return this.mvProperties;
+        } finally {
+            writeMvUnlock();
+        }
     }
 
     public long getGracePeriod() {
-        if 
(mvProperties.containsKey(PropertyAnalyzer.PROPERTIES_GRACE_PERIOD)) {
-            return 
Long.parseLong(mvProperties.get(PropertyAnalyzer.PROPERTIES_GRACE_PERIOD)) * 
1000;
-        } else {
-            return 0L;
+        readMvLock();
+        try {
+            if 
(mvProperties.containsKey(PropertyAnalyzer.PROPERTIES_GRACE_PERIOD)) {
+                return 
Long.parseLong(mvProperties.get(PropertyAnalyzer.PROPERTIES_GRACE_PERIOD)) * 
1000;
+            } else {
+                return 0L;
+            }
+        } finally {
+            readMvUnlock();
         }
     }
 
     public Optional<String> getWorkloadGroup() {
-        if 
(mvProperties.containsKey(PropertyAnalyzer.PROPERTIES_WORKLOAD_GROUP) && 
!StringUtils
-                
.isEmpty(mvProperties.get(PropertyAnalyzer.PROPERTIES_WORKLOAD_GROUP))) {
-            return 
Optional.of(mvProperties.get(PropertyAnalyzer.PROPERTIES_WORKLOAD_GROUP));
+        readMvLock();
+        try {
+            if 
(mvProperties.containsKey(PropertyAnalyzer.PROPERTIES_WORKLOAD_GROUP) && 
!StringUtils
+                    
.isEmpty(mvProperties.get(PropertyAnalyzer.PROPERTIES_WORKLOAD_GROUP))) {
+                return 
Optional.of(mvProperties.get(PropertyAnalyzer.PROPERTIES_WORKLOAD_GROUP));
+            }
+            return Optional.empty();
+        } finally {
+            readMvUnlock();
         }
-        return Optional.empty();
     }
 
     public int getRefreshPartitionNum() {
-        if 
(mvProperties.containsKey(PropertyAnalyzer.PROPERTIES_REFRESH_PARTITION_NUM)) {
-            int value = 
Integer.parseInt(mvProperties.get(PropertyAnalyzer.PROPERTIES_REFRESH_PARTITION_NUM));
-            return value < 1 ? MTMVTask.DEFAULT_REFRESH_PARTITION_NUM : value;
-        } else {
-            return MTMVTask.DEFAULT_REFRESH_PARTITION_NUM;
+        readMvLock();
+        try {
+            if 
(mvProperties.containsKey(PropertyAnalyzer.PROPERTIES_REFRESH_PARTITION_NUM)) {
+                int value = 
Integer.parseInt(mvProperties.get(PropertyAnalyzer.PROPERTIES_REFRESH_PARTITION_NUM));
+                return value < 1 ? MTMVTask.DEFAULT_REFRESH_PARTITION_NUM : 
value;
+            } else {
+                return MTMVTask.DEFAULT_REFRESH_PARTITION_NUM;
+            }
+        } finally {
+            readMvUnlock();
         }
     }
 
     public Set<String> getExcludedTriggerTables() {
-        if 
(!mvProperties.containsKey(PropertyAnalyzer.PROPERTIES_EXCLUDED_TRIGGER_TABLES))
 {
-            return Sets.newHashSet();
+        readMvLock();
+        try {
+            if 
(!mvProperties.containsKey(PropertyAnalyzer.PROPERTIES_EXCLUDED_TRIGGER_TABLES))
 {
+                return Sets.newHashSet();
+            }
+            String[] split = 
mvProperties.get(PropertyAnalyzer.PROPERTIES_EXCLUDED_TRIGGER_TABLES).split(",");
+            return Sets.newHashSet(split);
+        } finally {
+            readMvUnlock();
         }
-        String[] split = 
mvProperties.get(PropertyAnalyzer.PROPERTIES_EXCLUDED_TRIGGER_TABLES).split(",");
-        return Sets.newHashSet(split);
     }
 
     public MTMVCache getOrGenerateCache() throws AnalysisException {
@@ -237,7 +282,12 @@ public class MTMV extends OlapTable {
     }
 
     public Map<String, String> getMvProperties() {
-        return mvProperties;
+        readMvLock();
+        try {
+            return mvProperties;
+        } finally {
+            readMvUnlock();
+        }
     }
 
     public MTMVPartitionInfo getMvPartitionInfo() {
@@ -254,7 +304,7 @@ public class MTMV extends OlapTable {
      * @return mvPartitionId ==> mvPartitionKeyDesc
      */
     public Map<Long, PartitionKeyDesc> generateMvPartitionDescs() {
-        Map<Long, PartitionItem> mtmvItems = getPartitionItems();
+        Map<Long, PartitionItem> mtmvItems = getAndCopyPartitionItems();
         Map<Long, PartitionKeyDesc> result = Maps.newHashMap();
         for (Entry<Long, PartitionItem> entry : mtmvItems.entrySet()) {
             result.put(entry.getKey(), entry.getValue().toPartitionKeyDesc());
@@ -276,7 +326,7 @@ public class MTMV extends OlapTable {
             return Maps.newHashMap();
         }
         Map<PartitionKeyDesc, Set<Long>> res = new HashMap<>();
-        Map<Long, PartitionItem> relatedPartitionItems = 
mvPartitionInfo.getRelatedTable().getPartitionItems();
+        Map<Long, PartitionItem> relatedPartitionItems = 
mvPartitionInfo.getRelatedTable().getAndCopyPartitionItems();
         int relatedColPos = mvPartitionInfo.getRelatedColPos();
         for (Entry<Long, PartitionItem> entry : 
relatedPartitionItems.entrySet()) {
             PartitionKeyDesc partitionKeyDesc = 
entry.getValue().toPartitionKeyDesc(relatedColPos);
@@ -303,7 +353,7 @@ public class MTMV extends OlapTable {
         }
         Map<Long, Set<Long>> res = Maps.newHashMap();
         Map<PartitionKeyDesc, Set<Long>> relatedPartitionDescs = 
generateRelatedPartitionDescs();
-        Map<Long, PartitionItem> mvPartitionItems = 
getPartitionInfo().getIdToItem(false);
+        Map<Long, PartitionItem> mvPartitionItems = getAndCopyPartitionItems();
         for (Entry<Long, PartitionItem> entry : mvPartitionItems.entrySet()) {
             res.put(entry.getKey(),
                     
relatedPartitionDescs.getOrDefault(entry.getValue().toPartitionKeyDesc(), 
Sets.newHashSet()));
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
index df34f4333a7..2acdf00a491 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
@@ -1053,7 +1053,12 @@ public class OlapTable extends Table implements 
MTMVRelatedTableIf {
     }
 
     public List<Long> getPartitionIds() {
-        return new ArrayList<>(idToPartition.keySet());
+        readLock();
+        try {
+            return new ArrayList<>(idToPartition.keySet());
+        } finally {
+            readUnlock();
+        }
     }
 
     public Set<String> getCopiedBfColumns() {
@@ -2572,8 +2577,13 @@ public class OlapTable extends Table implements 
MTMVRelatedTableIf {
     }
 
     @Override
-    public Map<Long, PartitionItem> getPartitionItems() {
-        return getPartitionInfo().getIdToItem(false);
+    public Map<Long, PartitionItem> getAndCopyPartitionItems() {
+        readLock();
+        try {
+            return Maps.newHashMap(getPartitionInfo().getIdToItem(false));
+        } finally {
+            readUnlock();
+        }
     }
 
     @Override
@@ -2595,7 +2605,12 @@ public class OlapTable extends Table implements 
MTMVRelatedTableIf {
 
     @Override
     public String getPartitionName(long partitionId) throws AnalysisException {
-        return getPartitionOrAnalysisException(partitionId).getName();
+        readLock();
+        try {
+            return getPartitionOrAnalysisException(partitionId).getName();
+        } finally {
+            readUnlock();
+        }
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java
index d095a959e90..47b684c264b 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java
@@ -773,7 +773,7 @@ public class HMSExternalTable extends ExternalTable 
implements MTMVRelatedTableI
     }
 
     @Override
-    public Map<Long, PartitionItem> getPartitionItems() {
+    public Map<Long, PartitionItem> getAndCopyPartitionItems() {
         HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr()
                 .getMetaStoreCache((HMSExternalCatalog) getCatalog());
         HiveMetaStoreCache.HivePartitionValues hivePartitionValues = 
cache.getPartitionValues(
@@ -816,7 +816,7 @@ public class HMSExternalTable extends ExternalTable 
implements MTMVRelatedTableI
         long partitionId = 0L;
         long maxVersionTime = 0L;
         long visibleVersionTime;
-        for (Entry<Long, PartitionItem> entry : 
getPartitionItems().entrySet()) {
+        for (Entry<Long, PartitionItem> entry : 
getAndCopyPartitionItems().entrySet()) {
             visibleVersionTime = getPartitionLastModifyTime(entry.getKey());
             if (visibleVersionTime > maxVersionTime) {
                 maxVersionTime = visibleVersionTime;
@@ -831,7 +831,7 @@ public class HMSExternalTable extends ExternalTable 
implements MTMVRelatedTableI
     }
 
     private HivePartition getPartitionById(long partitionId) throws 
AnalysisException {
-        PartitionItem item = getPartitionItems().get(partitionId);
+        PartitionItem item = getAndCopyPartitionItems().get(partitionId);
         List<List<String>> partitionValuesList = 
transferPartitionItemToPartitionValues(item);
         List<HivePartition> partitions = 
getPartitionsByPartitionValues(partitionValuesList);
         if (partitions.size() != 1) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionUtil.java 
b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionUtil.java
index 88fe02a8b4f..9c85725b5bb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionUtil.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionUtil.java
@@ -141,7 +141,7 @@ public class MTMVPartitionUtil {
             throws AnalysisException {
         int pos = getPos(relatedTable, relatedCol);
         Set<PartitionKeyDesc> res = Sets.newHashSet();
-        for (Entry<Long, PartitionItem> entry : 
relatedTable.getPartitionItems().entrySet()) {
+        for (Entry<Long, PartitionItem> entry : 
relatedTable.getAndCopyPartitionItems().entrySet()) {
             PartitionKeyDesc partitionKeyDesc = 
entry.getValue().toPartitionKeyDesc(pos);
             res.add(partitionKeyDesc);
         }
@@ -163,18 +163,24 @@ public class MTMVPartitionUtil {
     public static List<String> getPartitionNamesByIds(MTMV mtmv, 
Collection<Long> ids) throws AnalysisException {
         List<String> res = Lists.newArrayList();
         for (Long partitionId : ids) {
-            
res.add(mtmv.getPartitionOrAnalysisException(partitionId).getName());
+            res.add(mtmv.getPartitionName(partitionId));
         }
         return res;
     }
 
     public static List<Long> getPartitionsIdsByNames(MTMV mtmv, List<String> 
partitions) throws AnalysisException {
-        List<Long> res = Lists.newArrayList();
-        for (String partitionName : partitions) {
-            Partition partition = 
mtmv.getPartitionOrAnalysisException(partitionName);
-            res.add(partition.getId());
+        mtmv.readLock();
+        try {
+            List<Long> res = Lists.newArrayList();
+            for (String partitionName : partitions) {
+                Partition partition = 
mtmv.getPartitionOrAnalysisException(partitionName);
+                res.add(partition.getId());
+            }
+            return res;
+        } finally {
+            mtmv.readUnlock();
         }
-        return res;
+
     }
 
     /**
@@ -209,9 +215,9 @@ public class MTMVPartitionUtil {
     public static boolean isMTMVSync(MTMV mtmv, Set<BaseTableInfo> tables, 
Set<String> excludeTables,
             Map<Long, Set<Long>> partitionMappings)
             throws AnalysisException {
-        Collection<Partition> partitions = mtmv.getPartitions();
-        for (Partition partition : partitions) {
-            if (!isMTMVPartitionSync(mtmv, partition.getId(), 
partitionMappings.get(partition.getId()), tables,
+        List<Long> partitionIds = mtmv.getPartitionIds();
+        for (Long partitionId : partitionIds) {
+            if (!isMTMVPartitionSync(mtmv, partitionId, 
partitionMappings.get(partitionId), tables,
                     excludeTables)) {
                 return false;
             }
@@ -277,16 +283,16 @@ public class MTMVPartitionUtil {
      */
     public static List<Long> getMTMVNeedRefreshPartitions(MTMV mtmv, 
Set<BaseTableInfo> baseTables,
             Map<Long, Set<Long>> partitionMappings) {
-        Collection<Partition> allPartitions = mtmv.getPartitions();
+        List<Long> partitionIds = mtmv.getPartitionIds();
         List<Long> res = Lists.newArrayList();
-        for (Partition partition : allPartitions) {
+        for (Long partitionId : partitionIds) {
             try {
-                if (!isMTMVPartitionSync(mtmv, partition.getId(), 
partitionMappings.get(partition.getId()), baseTables,
+                if (!isMTMVPartitionSync(mtmv, partitionId, 
partitionMappings.get(partitionId), baseTables,
                         mtmv.getExcludedTriggerTables())) {
-                    res.add(partition.getId());
+                    res.add(partitionId);
                 }
             } catch (AnalysisException e) {
-                res.add(partition.getId());
+                res.add(partitionId);
                 LOG.warn("check isMTMVPartitionSync failed", e);
             }
         }
@@ -361,6 +367,7 @@ public class MTMVPartitionUtil {
 
     /**
      * add partition for mtmv like relatedPartitionId of relatedTable
+     * `Env.getCurrentEnv().addPartition` has obtained the lock internally, 
but we do not obtain the lock here
      *
      * @param mtmv
      * @param oldPartitionKeyDesc
@@ -448,7 +455,7 @@ public class MTMVPartitionUtil {
             throws AnalysisException {
         Map<String, MTMVRefreshPartitionSnapshot> res = Maps.newHashMap();
         for (Long partitionId : partitionIds) {
-            res.put(mtmv.getPartition(partitionId).getName(),
+            res.put(mtmv.getPartitionName(partitionId),
                     generatePartitionSnapshot(mtmv, baseTables, 
partitionMappings.get(partitionId)));
         }
         return res;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedTableIf.java 
b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedTableIf.java
index 46454679b56..5ec7e98a407 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedTableIf.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRelatedTableIf.java
@@ -38,7 +38,7 @@ public interface MTMVRelatedTableIf extends TableIf {
      *
      * @return partitionId->PartitionItem
      */
-    Map<Long, PartitionItem> getPartitionItems();
+    Map<Long, PartitionItem> getAndCopyPartitionItems();
 
     /**
      * getPartitionType LIST/RANGE/UNPARTITIONED
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVPartitionUtilTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVPartitionUtilTest.java
index df38ce18720..bf819553e86 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVPartitionUtilTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVPartitionUtilTest.java
@@ -71,6 +71,10 @@ public class MTMVPartitionUtilTest {
                 minTimes = 0;
                 result = Lists.newArrayList(p1);
 
+                mtmv.getPartitionIds();
+                minTimes = 0;
+                result = Lists.newArrayList(1L);
+
                 p1.getId();
                 minTimes = 0;
                 result = 1L;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to