This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new efcd87d75fe branch-4.0: [fix](meta)fix after the FE restarts,
ConcurrentMap becomes LinkedHashMap #58382 (#58446)
efcd87d75fe is described below
commit efcd87d75fe57769d0ac1e216dd60dcef7d069f8
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Thu Nov 27 18:11:05 2025 +0800
branch-4.0: [fix](meta)fix after the FE restarts, ConcurrentMap becomes
LinkedHashMap #58382 (#58446)
Cherry-picked from #58382
Co-authored-by: zhangdong <[email protected]>
---
.../java/org/apache/doris/backup/BackupJob.java | 5 +-
.../apache/doris/blockrule/SqlBlockRuleMgr.java | 15 +----
.../apache/doris/catalog/ColocateTableIndex.java | 3 +-
.../java/org/apache/doris/catalog/ResourceMgr.java | 3 +-
.../apache/doris/cloud/catalog/CloudReplica.java | 3 +-
.../doris/common/profile/RuntimeProfile.java | 7 +-
.../org/apache/doris/datasource/CatalogMgr.java | 9 +--
.../apache/doris/dictionary/DictionaryManager.java | 7 +-
.../insertoverwrite/InsertOverwriteManager.java | 9 +--
.../java/org/apache/doris/load/DeleteHandler.java | 3 +-
.../doris/mtmv/MTMVRefreshPartitionSnapshot.java | 9 +--
.../org/apache/doris/mtmv/MTMVRefreshSnapshot.java | 3 +-
.../mysql/privilege/PasswordPolicyManager.java | 4 +-
.../org/apache/doris/mysql/privilege/Role.java | 13 ++--
.../java/org/apache/doris/policy/PolicyMgr.java | 3 +-
.../WorkloadSchedPolicyMgr.java | 3 +-
.../doris/persist/gson/GsonSerializationTest.java | 75 +++++++++++++++++++++-
17 files changed, 124 insertions(+), 50 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java
b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java
index 92a0fa47c8f..d6883dd2fc5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java
@@ -76,6 +76,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;
@@ -111,7 +112,7 @@ public class BackupJob extends AbstractJob implements
GsonPostProcessable {
private Map<Long, Long> unfinishedTaskIds = Maps.newConcurrentMap();
// tablet id -> snapshot info
@SerializedName("si")
- private Map<Long, SnapshotInfo> snapshotInfos = Maps.newConcurrentMap();
+ private ConcurrentMap<Long, SnapshotInfo> snapshotInfos =
Maps.newConcurrentMap();
// save all related table[partition] info
@SerializedName("meta")
private BackupMeta backupMeta;
@@ -136,7 +137,7 @@ public class BackupJob extends AbstractJob implements
GsonPostProcessable {
// Record partition IDs that were dropped during backup (tableId -> set of
partitionIds)
@SerializedName("dp")
- private Map<Long, Set<Long>> droppedPartitionsByTable =
Maps.newConcurrentMap();
+ private ConcurrentMap<Long, Set<Long>> droppedPartitionsByTable =
Maps.newConcurrentMap();
private long commitSeq = 0;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/blockrule/SqlBlockRuleMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/blockrule/SqlBlockRuleMgr.java
index cad0ebafa6a..62b6c70a46c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/blockrule/SqlBlockRuleMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/blockrule/SqlBlockRuleMgr.java
@@ -29,7 +29,6 @@ import org.apache.doris.metric.MetricRepo;
import org.apache.doris.mysql.privilege.Auth;
import org.apache.doris.nereids.trees.plans.commands.AlterSqlBlockRuleCommand;
import org.apache.doris.nereids.trees.plans.commands.CreateSqlBlockRuleCommand;
-import org.apache.doris.persist.gson.GsonPostProcessable;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.qe.ConnectContext;
@@ -46,6 +45,7 @@ import java.io.DataOutput;
import java.io.IOException;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
@@ -53,13 +53,13 @@ import java.util.stream.Collectors;
/**
* Manage SqlBlockRule.
**/
-public class SqlBlockRuleMgr implements Writable, GsonPostProcessable {
+public class SqlBlockRuleMgr implements Writable {
private static final Logger LOG =
LogManager.getLogger(SqlBlockRuleMgr.class);
private ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
@SerializedName(value = "nameToSqlBlockRuleMap")
- private Map<String, SqlBlockRule> nameToSqlBlockRuleMap =
Maps.newConcurrentMap();
+ private ConcurrentMap<String, SqlBlockRule> nameToSqlBlockRuleMap =
Maps.newConcurrentMap();
private void writeLock() {
lock.writeLock().lock();
@@ -327,13 +327,4 @@ public class SqlBlockRuleMgr implements Writable,
GsonPostProcessable {
String json = Text.readString(in);
return GsonUtils.GSON.fromJson(json, SqlBlockRuleMgr.class);
}
-
- @Override
- public void gsonPostProcess() throws IOException {
- Map<String, SqlBlockRule> nameToSqlBlockRuleMapNew =
Maps.newConcurrentMap();
- if (this.nameToSqlBlockRuleMap != null) {
- nameToSqlBlockRuleMapNew.putAll(this.nameToSqlBlockRuleMap);
- }
- this.nameToSqlBlockRuleMap = nameToSqlBlockRuleMapNew;
- }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/ColocateTableIndex.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/ColocateTableIndex.java
index f670892fec1..9fa53d06210 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/ColocateTableIndex.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/ColocateTableIndex.java
@@ -57,6 +57,7 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;
/**
@@ -163,7 +164,7 @@ public class ColocateTableIndex implements Writable {
private Multimap<GroupId, Long> group2Tables = ArrayListMultimap.create();
// table_id -> group_id
@SerializedName(value = "table2Group")
- private Map<Long, GroupId> table2Group = Maps.newConcurrentMap();
+ private ConcurrentMap<Long, GroupId> table2Group = Maps.newConcurrentMap();
// group id -> group schema
@SerializedName(value = "group2Schema")
private Map<GroupId, ColocateGroupSchema> group2Schema = Maps.newHashMap();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/ResourceMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/ResourceMgr.java
index 1dc90fd4adf..81ce3b3009b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/ResourceMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/ResourceMgr.java
@@ -49,6 +49,7 @@ import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;
/**
@@ -65,7 +66,7 @@ public class ResourceMgr implements Writable {
// { resourceName -> Resource}
@SerializedName(value = "nameToResource")
- private final Map<String, Resource> nameToResource =
Maps.newConcurrentMap();
+ private final ConcurrentMap<String, Resource> nameToResource =
Maps.newConcurrentMap();
private final ResourceProcNode procNode = new ResourceProcNode();
public ResourceMgr() {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java
index 3fc7c652f58..4a15a83b740 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java
@@ -54,7 +54,8 @@ public class CloudReplica extends Replica {
// In the future, a replica may be mapped to multiple BEs in a cluster,
// so this value is be list
@SerializedName(value = "bes")
- private Map<String, List<Long>> primaryClusterToBackends = new
ConcurrentHashMap<String, List<Long>>();
+ private ConcurrentHashMap<String, List<Long>> primaryClusterToBackends
+ = new ConcurrentHashMap<String, List<Long>>();
@SerializedName(value = "dbId")
private long dbId = -1;
@SerializedName(value = "tableId")
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/profile/RuntimeProfile.java
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/RuntimeProfile.java
index 4067cd3e1fb..4ee72f5c100 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/common/profile/RuntimeProfile.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/RuntimeProfile.java
@@ -50,6 +50,7 @@ import java.util.Map.Entry;
import java.util.Objects;
import java.util.Set;
import java.util.TreeSet;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -92,13 +93,13 @@ public class RuntimeProfile {
private transient ReentrantReadWriteLock infoStringsLock = new
ReentrantReadWriteLock();
@SerializedName(value = "counterMap")
- private Map<String, Counter> counterMap = Maps.newConcurrentMap();
+ private ConcurrentMap<String, Counter> counterMap =
Maps.newConcurrentMap();
@SerializedName(value = "childCounterMap")
- private Map<String, TreeSet<String>> childCounterMap =
Maps.newConcurrentMap();
+ private ConcurrentMap<String, TreeSet<String>> childCounterMap =
Maps.newConcurrentMap();
// protect TreeSet in ChildCounterMap
private transient ReentrantReadWriteLock counterLock = new
ReentrantReadWriteLock();
@SerializedName(value = "childMap")
- private Map<String, RuntimeProfile> childMap = Maps.newConcurrentMap();
+ private ConcurrentMap<String, RuntimeProfile> childMap =
Maps.newConcurrentMap();
@SerializedName(value = "childList")
private LinkedList<Pair<RuntimeProfile, Boolean>> childList =
Lists.newLinkedList();
private transient ReentrantReadWriteLock childLock = new
ReentrantReadWriteLock();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java
index a8458448764..f5b1d30eadb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java
@@ -65,6 +65,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
+import java.util.concurrent.ConcurrentMap;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -85,7 +86,8 @@ public class CatalogMgr implements Writable,
GsonPostProcessable {
private final MonitoredReentrantReadWriteLock lock = new
MonitoredReentrantReadWriteLock(true);
@SerializedName(value = "idToCatalog")
- private Map<Long, CatalogIf<? extends DatabaseIf<? extends TableIf>>>
idToCatalog = Maps.newConcurrentMap();
+ private ConcurrentMap<Long, CatalogIf<? extends DatabaseIf<? extends
TableIf>>> idToCatalog
+ = Maps.newConcurrentMap();
// this map will be regenerated from idToCatalog, so not need to persist.
private Map<String, CatalogIf> nameToCatalog = Maps.newConcurrentMap();
@@ -777,16 +779,11 @@ public class CatalogMgr implements Writable,
GsonPostProcessable {
@Override
public void gsonPostProcess() throws IOException {
- // After deserializing from Gson, the concurrent map may become a
normal map.
- // So here we reconstruct the concurrent map.
- Map<Long, CatalogIf<? extends DatabaseIf<? extends TableIf>>>
newIdToCatalog = Maps.newConcurrentMap();
Map<String, CatalogIf> newNameToCatalog = Maps.newConcurrentMap();
for (CatalogIf catalog : idToCatalog.values()) {
newNameToCatalog.put(catalog.getName(), catalog);
- newIdToCatalog.put(catalog.getId(), catalog);
// ATTN: can not call catalog.getProperties() here, because
ResourceMgr is not replayed yet.
}
- this.idToCatalog = newIdToCatalog;
this.nameToCatalog = newNameToCatalog;
internalCatalog = (InternalCatalog)
idToCatalog.get(InternalCatalog.INTERNAL_CATALOG_ID);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/dictionary/DictionaryManager.java
b/fe/fe-core/src/main/java/org/apache/doris/dictionary/DictionaryManager.java
index 6bb2234fe8e..040c63803d7 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/dictionary/DictionaryManager.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/dictionary/DictionaryManager.java
@@ -69,6 +69,7 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
@@ -91,12 +92,12 @@ public class DictionaryManager extends MasterDaemon
implements Writable {
// Map of database name -> dictionary name -> dictionary id
@SerializedName(value = "ids")
- private Map<String, Map<String, Long>> dictionaryIds =
Maps.newConcurrentMap();
+ private ConcurrentMap<String, ConcurrentMap<String, Long>> dictionaryIds =
Maps.newConcurrentMap();
// dbname -> tablename -> dict id
@SerializedName(value = "t")
- private Map<String, ListMultimap<String, Long>> dbTableToDicIds =
Maps.newConcurrentMap();
+ private ConcurrentMap<String, ListMultimap<String, Long>> dbTableToDicIds
= Maps.newConcurrentMap();
@SerializedName(value = "idmap")
- private Map<Long, Dictionary> idToDictionary = Maps.newConcurrentMap();
+ private ConcurrentMap<Long, Dictionary> idToDictionary =
Maps.newConcurrentMap();
@SerializedName(value = "i")
private long uniqueId = 0;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/insertoverwrite/InsertOverwriteManager.java
b/fe/fe-core/src/main/java/org/apache/doris/insertoverwrite/InsertOverwriteManager.java
index a5e3dec4296..cc202f93a17 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/insertoverwrite/InsertOverwriteManager.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/insertoverwrite/InsertOverwriteManager.java
@@ -46,6 +46,7 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -55,19 +56,19 @@ public class InsertOverwriteManager extends MasterDaemon
implements Writable {
private static final long CLEAN_INTERVAL_SECOND = 10;
@SerializedName(value = "tasks")
- private Map<Long, InsertOverwriteTask> tasks = Maps.newConcurrentMap();
+ private ConcurrentMap<Long, InsertOverwriteTask> tasks =
Maps.newConcurrentMap();
// <txnId, <dbId, tableId>>
// for iot auto detect tasks. a txn will make many task by different rpc
@SerializedName(value = "taskGroups")
- private Map<Long, List<Long>> taskGroups = Maps.newConcurrentMap();
+ private ConcurrentMap<Long, List<Long>> taskGroups =
Maps.newConcurrentMap();
// for one task group, there may be different requests about changing a
partition to new.
// but we only change one time and save the relations in partitionPairs.
they're protected by taskLocks
@SerializedName(value = "taskLocks")
- private Map<Long, ReentrantLock> taskLocks = Maps.newConcurrentMap();
+ private ConcurrentMap<Long, ReentrantLock> taskLocks =
Maps.newConcurrentMap();
// <groupId, <oldPartId, newPartId>>. no need concern which task it
belongs to.
@SerializedName(value = "partitionPairs")
- private Map<Long, Map<Long, Long>> partitionPairs =
Maps.newConcurrentMap();
+ private ConcurrentMap<Long, Map<Long, Long>> partitionPairs =
Maps.newConcurrentMap();
// TableId running insert overwrite
// dbId ==> Set<tableId>
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/DeleteHandler.java
b/fe/fe-core/src/main/java/org/apache/doris/load/DeleteHandler.java
index ca9e0f6aa14..c9aa476b3f3 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/DeleteHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/DeleteHandler.java
@@ -54,6 +54,7 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.UUID;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class DeleteHandler implements Writable {
@@ -64,7 +65,7 @@ public class DeleteHandler implements Writable {
// Db -> DeleteInfo list
@SerializedName(value = "dbToDeleteInfos")
- private final Map<Long, List<DeleteInfo>> dbToDeleteInfos;
+ private final ConcurrentMap<Long, List<DeleteInfo>> dbToDeleteInfos;
private final ReentrantReadWriteLock lock;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRefreshPartitionSnapshot.java
b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRefreshPartitionSnapshot.java
index c7de673a4a4..82d7ad4190b 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRefreshPartitionSnapshot.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRefreshPartitionSnapshot.java
@@ -34,21 +34,22 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
public class MTMVRefreshPartitionSnapshot {
private static final Logger LOG = LogManager.getLogger(MTMV.class);
// old version only support one pct table
@Deprecated
@SerializedName("p")
- private Map<String, MTMVSnapshotIf> partitions;
+ private ConcurrentMap<String, MTMVSnapshotIf> partitions;
@SerializedName("pcts")
- private Map<BaseTableInfo, Map<String, MTMVSnapshotIf>> pcts;
+ private ConcurrentMap<BaseTableInfo, Map<String, MTMVSnapshotIf>> pcts;
// old version only persist table id, we need `BaseTableInfo`, `tables`
only for compatible old version
@SerializedName("t")
@Deprecated
- private Map<Long, MTMVSnapshotIf> tables;
+ private ConcurrentMap<Long, MTMVSnapshotIf> tables;
@SerializedName("ti")
- private Map<BaseTableInfo, MTMVSnapshotIf> tablesInfo;
+ private ConcurrentMap<BaseTableInfo, MTMVSnapshotIf> tablesInfo;
public MTMVRefreshPartitionSnapshot() {
this.partitions = Maps.newConcurrentMap();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRefreshSnapshot.java
b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRefreshSnapshot.java
index 3c48f9e6280..bc6e1827c9e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRefreshSnapshot.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVRefreshSnapshot.java
@@ -27,10 +27,11 @@ import org.apache.commons.collections4.MapUtils;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
public class MTMVRefreshSnapshot {
@SerializedName("ps")
- private Map<String, MTMVRefreshPartitionSnapshot> partitionSnapshots;
+ private ConcurrentMap<String, MTMVRefreshPartitionSnapshot>
partitionSnapshots;
public MTMVRefreshSnapshot() {
this.partitionSnapshots = Maps.newConcurrentMap();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/PasswordPolicyManager.java
b/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/PasswordPolicyManager.java
index a8eb45dbd6d..0b041de269f 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/PasswordPolicyManager.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/PasswordPolicyManager.java
@@ -33,11 +33,11 @@ import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.List;
-import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
public class PasswordPolicyManager implements Writable {
@SerializedName(value = "policyMap")
- private Map<UserIdentity, PasswordPolicy> policyMap =
Maps.newConcurrentMap();
+ private ConcurrentMap<UserIdentity, PasswordPolicy> policyMap =
Maps.newConcurrentMap();
public PasswordPolicyManager() {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/Role.java
b/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/Role.java
index 9345ecfbf39..c286852165b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/Role.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/Role.java
@@ -48,6 +48,7 @@ import java.util.Map.Entry;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
public class Role implements GsonPostProcessable {
private static final Logger LOG = LogManager.getLogger(Role.class);
@@ -80,17 +81,17 @@ public class Role implements GsonPostProcessable {
private String comment;
// Will be persisted
@SerializedName(value = "tblPatternToPrivs")
- private Map<TablePattern, PrivBitSet> tblPatternToPrivs =
Maps.newConcurrentMap();
+ private ConcurrentMap<TablePattern, PrivBitSet> tblPatternToPrivs =
Maps.newConcurrentMap();
@SerializedName(value = "resourcePatternToPrivs")
- private Map<ResourcePattern, PrivBitSet> resourcePatternToPrivs =
Maps.newConcurrentMap();
+ private ConcurrentMap<ResourcePattern, PrivBitSet> resourcePatternToPrivs
= Maps.newConcurrentMap();
@SerializedName(value = "storageVaultPatternToPrivs")
- private Map<ResourcePattern, PrivBitSet> storageVaultPatternToPrivs =
Maps.newConcurrentMap();
+ private ConcurrentMap<ResourcePattern, PrivBitSet>
storageVaultPatternToPrivs = Maps.newConcurrentMap();
@SerializedName(value = "clusterPatternToPrivs")
- private Map<ResourcePattern, PrivBitSet> clusterPatternToPrivs =
Maps.newConcurrentMap();
+ private ConcurrentMap<ResourcePattern, PrivBitSet> clusterPatternToPrivs =
Maps.newConcurrentMap();
@SerializedName(value = "stagePatternToPrivs")
- private Map<ResourcePattern, PrivBitSet> stagePatternToPrivs =
Maps.newConcurrentMap();
+ private ConcurrentMap<ResourcePattern, PrivBitSet> stagePatternToPrivs =
Maps.newConcurrentMap();
@SerializedName(value = "workloadGroupPatternToPrivs")
- private Map<WorkloadGroupPattern, PrivBitSet> workloadGroupPatternToPrivs
= Maps.newConcurrentMap();
+ private ConcurrentMap<WorkloadGroupPattern, PrivBitSet>
workloadGroupPatternToPrivs = Maps.newConcurrentMap();
@SerializedName(value = "colPrivMap")
private Map<ColPrivilegeKey, Set<String>> colPrivMap = Maps.newHashMap();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/policy/PolicyMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/policy/PolicyMgr.java
index 3aacf3d7f4c..d749b1436bf 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/policy/PolicyMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/policy/PolicyMgr.java
@@ -56,6 +56,7 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
@@ -68,7 +69,7 @@ public class PolicyMgr implements Writable {
private ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
@SerializedName(value = "typeToPolicyMap")
- private Map<PolicyTypeEnum, List<Policy>> typeToPolicyMap =
Maps.newConcurrentMap();
+ private ConcurrentMap<PolicyTypeEnum, List<Policy>> typeToPolicyMap =
Maps.newConcurrentMap();
// ctlName -> dbName -> tableName -> List<RowPolicy>
private Map<String, Map<String, Map<String, List<RowPolicy>>>>
tablePolicies = Maps.newConcurrentMap();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicyMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicyMgr.java
index b212ef42deb..fb1a9aa329e 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicyMgr.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadSchedPolicyMgr.java
@@ -60,6 +60,7 @@ import java.util.Map;
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class WorkloadSchedPolicyMgr extends MasterDaemon implements Writable,
GsonPostProcessable {
@@ -67,7 +68,7 @@ public class WorkloadSchedPolicyMgr extends MasterDaemon
implements Writable, Gs
private static final Logger LOG =
LogManager.getLogger(WorkloadSchedPolicyMgr.class);
@SerializedName(value = "idToPolicy")
- private Map<Long, WorkloadSchedPolicy> idToPolicy =
Maps.newConcurrentMap();
+ private ConcurrentMap<Long, WorkloadSchedPolicy> idToPolicy =
Maps.newConcurrentMap();
private Map<String, WorkloadSchedPolicy> nameToPolicy = Maps.newHashMap();
private PolicyProcNode policyProcNode = new PolicyProcNode();
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/persist/gson/GsonSerializationTest.java
b/fe/fe-core/src/test/java/org/apache/doris/persist/gson/GsonSerializationTest.java
index 09433755fde..0f622afe2ec 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/persist/gson/GsonSerializationTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/persist/gson/GsonSerializationTest.java
@@ -46,6 +46,8 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
/*
* This unit test provides examples about how to make a class serializable.
@@ -207,7 +209,7 @@ public class GsonSerializationTest {
@SerializedName(value = "classA1")
public InnerClassA classA1ChangeName;
public InnerClassA ignoreClassA2ChangeName;
- @SerializedName(value = "flagChangeName", alternate = { "flag" })
+ @SerializedName(value = "flagChangeName", alternate = {"flag"})
public int flagChangeName = 0;
public OriginClassADifferentMemberName(int flag) {
@@ -429,4 +431,75 @@ public class GsonSerializationTest {
Assert.assertEquals(Sets.newHashSet(new Key(MyEnum.TYPE_A, "key1"),
new Key(MyEnum.TYPE_B, "key2")),
readClassA.map.keySet());
}
+
+ public static class ConcurrentMapClassA implements Writable {
+ @SerializedName(value = "map1")
+ public ConcurrentMap<Key, Long> map1 = Maps.newConcurrentMap();
+ @SerializedName(value = "map2")
+ public ConcurrentHashMap<Key, Long> map2 = new ConcurrentHashMap<>();
+ @SerializedName(value = "map3")
+ public ConcurrentMap<String, ConcurrentMap<String, Long>> map3 =
Maps.newConcurrentMap();
+ @SerializedName(value = "map4")
+ public ConcurrentMap<String, Map<String, Long>> map4 =
Maps.newConcurrentMap();
+ @SerializedName(value = "map5")
+ public Map<String, Long> map5 = Maps.newConcurrentMap();
+ @SerializedName(value = "map6")
+ public Map<String, Long> map6 = Maps.newHashMap();
+
+ public ConcurrentMapClassA() {
+ map1.put(new Key(MyEnum.TYPE_A, "key1"), 1L);
+
+ map2.put(new Key(MyEnum.TYPE_B, "key2"), 2L);
+
+ ConcurrentMap map31 = new ConcurrentHashMap<String, Long>();
+ map31.put("a", 1L);
+ map3.put("b", map31);
+
+ Map map41 = new ConcurrentHashMap<String, Long>();
+ map41.put("a", 1L);
+ map4.put("b", map41);
+
+ map5.put("b", 1L);
+
+ map6.put("b", 1L);
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ String json = GsonUtils.GSON.toJson(this);
+ Text.writeString(out, json);
+ }
+
+ public static ConcurrentMapClassA read(DataInput in) throws
IOException {
+ String json = Text.readString(in);
+ ConcurrentMapClassA classA = GsonUtils.GSON.fromJson(json,
ConcurrentMapClassA.class);
+ return classA;
+ }
+ }
+
+ @Test
+ public void testConcurrentMap() throws IOException {
+ // 1. Write objects to file
+ File file = new File(fileName);
+ file.createNewFile();
+ DataOutputStream out = new DataOutputStream(new
FileOutputStream(file));
+
+ ConcurrentMapClassA classA = new ConcurrentMapClassA();
+ classA.write(out);
+ out.flush();
+ out.close();
+
+ // 2. Read objects from file
+ DataInputStream in = new DataInputStream(new FileInputStream(file));
+
+ ConcurrentMapClassA readClassA = ConcurrentMapClassA.read(in);
+ Assert.assertTrue(readClassA.map1 instanceof ConcurrentHashMap);
+ Assert.assertTrue(readClassA.map2 instanceof ConcurrentHashMap);
+ Assert.assertTrue(readClassA.map3 instanceof ConcurrentHashMap);
+ Assert.assertTrue(readClassA.map3.get("b") instanceof
ConcurrentHashMap);
+ Assert.assertTrue(readClassA.map4 instanceof ConcurrentHashMap);
+ Assert.assertFalse(readClassA.map4.get("b") instanceof
ConcurrentHashMap);
+ Assert.assertFalse(readClassA.map5 instanceof ConcurrentHashMap);
+ Assert.assertFalse(readClassA.map6 instanceof ConcurrentHashMap);
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]