This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 6ad9f83cc1 [IOTDB-3881] strength the concurrency control of
regionReplicaSetCache. (#6708)
6ad9f83cc1 is described below
commit 6ad9f83cc13c1d5d46f01bf1bb2d15e16e023f9d
Author: ZhangHongYin <[email protected]>
AuthorDate: Thu Jul 21 14:09:41 2022 +0800
[IOTDB-3881] strength the concurrency control of regionReplicaSetCache.
(#6708)
---
.../db/mpp/plan/analyze/cache/PartitionCache.java | 80 ++++++++++++++--------
1 file changed, 53 insertions(+), 27 deletions(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/cache/PartitionCache.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/cache/PartitionCache.java
index a2b53dfb48..86fbe66bad 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/cache/PartitionCache.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/cache/PartitionCache.java
@@ -66,7 +66,6 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -96,8 +95,7 @@ public class PartitionCache {
/** the latest time when groupIdToReplicaSetMap updated. */
private final AtomicLong latestUpdateTime = new AtomicLong(0);
/** TConsensusGroupId -> TRegionReplicaSet */
- private final Map<TConsensusGroupId, TRegionReplicaSet>
groupIdToReplicaSetMap =
- new ConcurrentHashMap<>();
+ private final Map<TConsensusGroupId, TRegionReplicaSet>
groupIdToReplicaSetMap = new HashMap<>();
/** The lock of cache */
private final ReentrantReadWriteLock storageGroupCacheLock = new
ReentrantReadWriteLock();
@@ -105,6 +103,8 @@ public class PartitionCache {
private final ReentrantReadWriteLock schemaPartitionCacheLock = new
ReentrantReadWriteLock();
private final ReentrantReadWriteLock dataPartitionCacheLock = new
ReentrantReadWriteLock();
+ private final ReentrantReadWriteLock regionReplicaSetLock = new
ReentrantReadWriteLock();
+
private final IClientManager<PartitionRegionId, ConfigNodeClient>
configNodeClientManager =
new IClientManager.Factory<PartitionRegionId, ConfigNodeClient>()
.createClientManager(new
DataNodeClientPoolFactory.ConfigNodeClientPoolFactory());
@@ -396,28 +396,44 @@ public class PartitionCache {
* @throws StatementAnalyzeException if there are exception when try to get
latestRegionRouteMap
*/
public TRegionReplicaSet getRegionReplicaSet(TConsensusGroupId
consensusGroupId) {
- if (!groupIdToReplicaSetMap.containsKey(consensusGroupId)) {
- // try to update latestRegionRegionRouteMap when miss
- synchronized (groupIdToReplicaSetMap) {
- try (ConfigNodeClient client =
-
configNodeClientManager.borrowClient(ConfigNodeInfo.partitionRegionId)) {
- TRegionRouteMapResp resp = client.getLatestRegionRouteMap();
- if (TSStatusCode.SUCCESS_STATUS.getStatusCode() ==
resp.getStatus().getCode()) {
- updateGroupIdToReplicaSetMap(resp.getTimestamp(),
resp.getRegionRouteMap());
- }
- if (!groupIdToReplicaSetMap.containsKey(consensusGroupId)) {
- // failed to get RegionReplicaSet from confignode
- throw new RuntimeException(
- "Failed to get replicaSet of consensus group[id= " +
consensusGroupId + "]");
+ TRegionReplicaSet result;
+ // try to get regionReplicaSet from cache
+ try {
+ regionReplicaSetLock.readLock().lock();
+ result = groupIdToReplicaSetMap.get(consensusGroupId);
+ } finally {
+ regionReplicaSetLock.readLock().unlock();
+ }
+ if (result == null) {
+ // if not hit then try to get regionReplicaSet from confignode
+ try {
+ regionReplicaSetLock.writeLock().lock();
+ // verify that there are not hit in cache
+ if (!groupIdToReplicaSetMap.containsKey(consensusGroupId)) {
+ try (ConfigNodeClient client =
+
configNodeClientManager.borrowClient(ConfigNodeInfo.partitionRegionId)) {
+ TRegionRouteMapResp resp = client.getLatestRegionRouteMap();
+ if (TSStatusCode.SUCCESS_STATUS.getStatusCode() ==
resp.getStatus().getCode()) {
+ updateGroupIdToReplicaSetMap(resp.getTimestamp(),
resp.getRegionRouteMap());
+ }
+ // if confignode don't have then will throw RuntimeException
+ if (!groupIdToReplicaSetMap.containsKey(consensusGroupId)) {
+ // failed to get RegionReplicaSet from confignode
+ throw new RuntimeException(
+ "Failed to get replicaSet of consensus group[id= " +
consensusGroupId + "]");
+ }
+ } catch (IOException | TException e) {
+ throw new StatementAnalyzeException(
+ "An error occurred when executing getRegionReplicaSet():" +
e.getMessage());
}
- } catch (IOException | TException e) {
- throw new StatementAnalyzeException(
- "An error occurred when executing getRegionReplicaSet():" +
e.getMessage());
}
+ result = groupIdToReplicaSetMap.get(consensusGroupId);
+ } finally {
+ regionReplicaSetLock.writeLock().unlock();
}
}
// try to get regionReplicaSet by consensusGroupId
- return groupIdToReplicaSetMap.get(consensusGroupId);
+ return result;
}
/**
@@ -429,18 +445,28 @@ public class PartitionCache {
*/
public boolean updateGroupIdToReplicaSetMap(
long timestamp, Map<TConsensusGroupId, TRegionReplicaSet> map) {
- boolean result = (timestamp ==
latestUpdateTime.accumulateAndGet(timestamp, Math::max));
- // if timestamp is greater than latestUpdateTime, then update
- if (result) {
- groupIdToReplicaSetMap.clear();
- groupIdToReplicaSetMap.putAll(map);
+ try {
+ regionReplicaSetLock.writeLock().lock();
+ boolean result = (timestamp ==
latestUpdateTime.accumulateAndGet(timestamp, Math::max));
+ // if timestamp is greater than latestUpdateTime, then update
+ if (result) {
+ groupIdToReplicaSetMap.clear();
+ groupIdToReplicaSetMap.putAll(map);
+ }
+ return result;
+ } finally {
+ regionReplicaSetLock.writeLock().unlock();
}
- return result;
}
/** invalid replicaSetCache */
public void invalidReplicaSetCache() {
- groupIdToReplicaSetMap.clear();
+ try {
+ regionReplicaSetLock.writeLock().lock();
+ groupIdToReplicaSetMap.clear();
+ } finally {
+ regionReplicaSetLock.writeLock().unlock();
+ }
}
// endregion