This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 5c3b7f53d52 Optimize priority map lock structure (#12351)
5c3b7f53d52 is described below
commit 5c3b7f53d52a70cdb875396018d74c3d79ebfc46
Author: Yongzao <[email protected]>
AuthorDate: Wed Apr 17 17:10:50 2024 +0800
Optimize priority map lock structure (#12351)
* Finish
* bug fix
* Fix nano time
---
.../client/async/handlers/AsyncClientHandler.java | 11 ++++++++++-
.../async/handlers/rpc/TransferLeaderRPCHandler.java | 2 +-
.../confignode/manager/load/balancer/RouteBalancer.java | 16 ++++++----------
3 files changed, 17 insertions(+), 12 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/AsyncClientHandler.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/AsyncClientHandler.java
index a8c445665d0..7cd86af6ef7 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/AsyncClientHandler.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/AsyncClientHandler.java
@@ -30,6 +30,7 @@ import
org.apache.iotdb.confignode.client.async.handlers.rpc.FetchSchemaBlackLis
import
org.apache.iotdb.confignode.client.async.handlers.rpc.PipeHeartbeatRPCHandler;
import
org.apache.iotdb.confignode.client.async.handlers.rpc.PipePushMetaRPCHandler;
import
org.apache.iotdb.confignode.client.async.handlers.rpc.SchemaUpdateRPCHandler;
+import
org.apache.iotdb.confignode.client.async.handlers.rpc.TransferLeaderRPCHandler;
import
org.apache.iotdb.confignode.client.async.handlers.rpc.subscription.ConsumerGroupPushMetaRPCHandler;
import
org.apache.iotdb.confignode.client.async.handlers.rpc.subscription.TopicPushMetaRPCHandler;
import org.apache.iotdb.mpp.rpc.thrift.TCheckTimeSeriesExistenceResp;
@@ -39,6 +40,7 @@ import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatResp;
import org.apache.iotdb.mpp.rpc.thrift.TPushConsumerGroupMetaResp;
import org.apache.iotdb.mpp.rpc.thrift.TPushPipeMetaResp;
import org.apache.iotdb.mpp.rpc.thrift.TPushTopicMetaResp;
+import org.apache.iotdb.mpp.rpc.thrift.TRegionLeaderChangeResp;
import java.util.ArrayList;
import java.util.List;
@@ -243,6 +245,14 @@ public class AsyncClientHandler<Q, R> {
dataNodeLocationMap,
(Map<Integer, TPushConsumerGroupMetaResp>) responseMap,
countDownLatch);
+ case CHANGE_REGION_LEADER:
+ return new TransferLeaderRPCHandler(
+ requestType,
+ requestId,
+ targetDataNode,
+ dataNodeLocationMap,
+ (Map<Integer, TRegionLeaderChangeResp>) responseMap,
+ countDownLatch);
case SET_TTL:
case CREATE_DATA_REGION:
case CREATE_SCHEMA_REGION:
@@ -264,7 +274,6 @@ public class AsyncClientHandler<Q, R> {
case UPDATE_REGION_ROUTE_MAP:
case INVALIDATE_MATCHED_SCHEMA_CACHE:
case UPDATE_TEMPLATE:
- case CHANGE_REGION_LEADER:
case KILL_QUERY_INSTANCE:
case RESET_PEER_LIST:
default:
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/TransferLeaderRPCHandler.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/TransferLeaderRPCHandler.java
index 9f0e2312717..d5107e315cf 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/TransferLeaderRPCHandler.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/TransferLeaderRPCHandler.java
@@ -35,7 +35,7 @@ public class TransferLeaderRPCHandler extends
AbstractAsyncRPCHandler<TRegionLea
private static final Logger LOGGER =
LoggerFactory.getLogger(TransferLeaderRPCHandler.class);
- protected TransferLeaderRPCHandler(
+ public TransferLeaderRPCHandler(
DataNodeRequestType requestType,
int requestId,
TDataNodeLocation targetDataNode,
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
index 25b3fc31478..7269defdbe6 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
@@ -108,7 +108,7 @@ public class RouteBalancer implements
IClusterStatusSubscriber {
private final Map<TConsensusGroupId, TRegionReplicaSet> regionPriorityMap;
// The interval of retrying to balance ratis leader after the last failed
time
- private static final long BALANCE_RATIS_LEADER_FAILED_INTERVAL = 60 * 1000L;
+ private static final long BALANCE_RATIS_LEADER_FAILED_INTERVAL_IN_NS = 60 *
1000L * 1000L * 1000L;
private final Map<TConsensusGroupId, Long> lastFailedTimeForLeaderBalance;
public RouteBalancer(IManager configManager) {
@@ -175,7 +175,7 @@ public class RouteBalancer implements
IClusterStatusSubscriber {
(regionGroupId, newLeaderId) -> {
if (ConsensusFactory.RATIS_CONSENSUS.equals(consensusProtocolClass)
&& currentTime -
lastFailedTimeForLeaderBalance.getOrDefault(regionGroupId, 0L)
- > BALANCE_RATIS_LEADER_FAILED_INTERVAL) {
+ <= BALANCE_RATIS_LEADER_FAILED_INTERVAL_IN_NS) {
return;
}
@@ -283,13 +283,8 @@ public class RouteBalancer implements
IClusterStatusSubscriber {
}
if (needBroadcast.get()) {
- priorityMapLock.readLock().lock();
- try {
- broadcastLatestRegionPriorityMap();
- recordRegionPriorityMap(differentPriorityMap);
- } finally {
- priorityMapLock.readLock().unlock();
- }
+ recordRegionPriorityMap(differentPriorityMap);
+ broadcastLatestRegionPriorityMap();
}
}
@@ -304,10 +299,11 @@ public class RouteBalancer implements
IClusterStatusSubscriber {
.collect(Collectors.toMap(TDataNodeLocation::getDataNodeId,
location -> location));
long broadcastTime = System.currentTimeMillis();
+ Map<TConsensusGroupId, TRegionReplicaSet> tmpPriorityMap =
getRegionPriorityMap();
AsyncClientHandler<TRegionRouteReq, TSStatus> clientHandler =
new AsyncClientHandler<>(
DataNodeRequestType.UPDATE_REGION_ROUTE_MAP,
- new TRegionRouteReq(broadcastTime, regionPriorityMap),
+ new TRegionRouteReq(broadcastTime, tmpPriorityMap),
dataNodeLocationMap);
AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
}