This is an automated email from the ASF dual-hosted git repository. yongzao pushed a commit to branch Refactor-RouteBalancer-to-match-LoadManager-framework in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 7d99fac1c087e1ad61ca90718da5263092216994 Author: YongzaoDan <[email protected]> AuthorDate: Tue Apr 25 10:25:48 2023 +0800 Finish --- .../heartbeat/DataNodeHeartbeatHandler.java | 6 +- .../statemachine/ConfigRegionStateMachine.java | 2 - .../iotdb/confignode/manager/ConfigManager.java | 4 +- .../iotdb/confignode/manager/load/LoadManager.java | 104 +++++--- .../manager/load/balancer/RouteBalancer.java | 265 +++++---------------- .../load/balancer/router/RegionRouteMap.java | 176 -------------- .../confignode/manager/load/cache/LoadCache.java | 100 +++++++- .../manager/load/cache/route/RegionRouteCache.java | 131 ++++++++++ .../manager/load/service/HeartbeatService.java | 1 - .../manager/load/service/StatisticsService.java | 43 ++-- .../manager/load/subscriber/RouteChangeEvent.java | 35 +-- .../manager/partition/PartitionManager.java | 2 +- .../procedure/env/ConfigNodeProcedureEnv.java | 31 +-- .../procedure/env/DataNodeRemoveHandler.java | 4 +- .../impl/pipe/task/CreatePipeProcedureV2.java | 2 +- .../impl/schema/DataNodeRegionTaskExecutor.java | 4 +- .../impl/schema/DeleteDatabaseProcedure.java | 2 - .../statemachine/CreateRegionGroupsProcedure.java | 1 - .../load/balancer/router/RegionRouteMapTest.java | 82 ------- 19 files changed, 403 insertions(+), 592 deletions(-) diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java index 60ea48b918..3d1b2214d4 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java @@ -19,7 +19,6 @@ package org.apache.iotdb.confignode.client.async.handlers.heartbeat; import org.apache.iotdb.commons.cluster.RegionStatus; -import org.apache.iotdb.confignode.manager.load.balancer.RouteBalancer; import org.apache.iotdb.confignode.manager.load.cache.LoadCache; import org.apache.iotdb.confignode.manager.load.cache.node.NodeHeartbeatSample; import org.apache.iotdb.confignode.manager.load.cache.region.RegionHeartbeatSample; @@ -35,7 +34,6 @@ public class DataNodeHeartbeatHandler implements AsyncMethodCallback<THeartbeatR private final int nodeId; private final LoadCache loadCache; - private final RouteBalancer routeBalancer; private final Map<Integer, Long> deviceNum; private final Map<Integer, Long> timeSeriesNum; @@ -44,14 +42,12 @@ public class DataNodeHeartbeatHandler implements AsyncMethodCallback<THeartbeatR public DataNodeHeartbeatHandler( int nodeId, LoadCache loadCache, - RouteBalancer routeBalancer, Map<Integer, Long> deviceNum, Map<Integer, Long> timeSeriesNum, Map<Integer, Long> regionDisk) { this.nodeId = nodeId; this.loadCache = loadCache; - this.routeBalancer = routeBalancer; this.deviceNum = deviceNum; this.timeSeriesNum = timeSeriesNum; this.regionDisk = regionDisk; @@ -81,7 +77,7 @@ public class DataNodeHeartbeatHandler implements AsyncMethodCallback<THeartbeatR if (isLeader) { // Update leaderCache - routeBalancer.cacheLeaderSample( + loadCache.cacheLeaderSample( regionGroupId, new Pair<>(heartbeatResp.getHeartbeatTimestamp(), nodeId)); } }); diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java index f6b3f25e14..46e1795f28 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java @@ -209,7 +209,6 @@ public class ConfigRegionStateMachine // Start leader scheduling services configManager.getProcedureManager().shiftExecutor(true); - configManager.getLoadManager().getRouteBalancer().startRouteBalancingService(); configManager.getRetryFailedTasksThread().startRetryFailedTasksService(); configManager.getPartitionManager().startRegionCleaner(); @@ -229,7 +228,6 @@ public class ConfigRegionStateMachine // Stop leader scheduling services configManager.getLoadManager().stopLoadServices(); configManager.getProcedureManager().shiftExecutor(false); - configManager.getLoadManager().getRouteBalancer().stopRouteBalancingService(); configManager.getRetryFailedTasksThread().stopRetryFailedTasksService(); configManager.getPartitionManager().stopRegionCleaner(); configManager.getCQManager().stopCQScheduler(); diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java index 7fd2fb85c5..e83067bb97 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java @@ -755,7 +755,7 @@ public class ConfigManager implements IManager { partitionManager.getNodePathsPartition(getNodePathsPartitionPlan); TSchemaNodeManagementResp result = resp.convertToRpcSchemaNodeManagementPartitionResp( - getLoadManager().getLatestRegionRouteMap()); + getLoadManager().getRegionPriorityMap()); LOGGER.info( "getNodePathsPartition receive devicePaths: {}, level: {}, return TSchemaNodeManagementResp: {}", @@ -1329,7 +1329,7 @@ public class ConfigManager implements IManager { if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { resp.setTimestamp(System.currentTimeMillis()); - resp.setRegionRouteMap(getLoadManager().getLatestRegionRouteMap()); + resp.setRegionRouteMap(getLoadManager().getRegionPriorityMap()); } return resp; diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java index b6f635a11b..1efe554d84 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java @@ -129,49 +129,12 @@ public class LoadManager { return partitionBalancer.allocateDataPartition(unassignedDataPartitionSlotsMap); } - /** @return Map<RegionGroupId, DataNodeId where the leader resides> */ - public Map<TConsensusGroupId, Integer> getLatestRegionLeaderMap() { - return routeBalancer.getLatestRegionLeaderMap(); - } - - /** - * Get the number of RegionGroup-leaders in the specified DataNode. - * - * @param dataNodeId The specified DataNode - * @param type SchemaRegion or DataRegion - * @return The number of RegionGroup-leaders - */ - public int getRegionGroupLeaderCount(int dataNodeId, TConsensusGroupType type) { - AtomicInteger result = new AtomicInteger(0); - routeBalancer - .getLatestRegionLeaderMap() - .forEach( - ((consensusGroupId, leaderId) -> { - if (dataNodeId == leaderId && type.equals(consensusGroupId.getType())) { - result.getAndIncrement(); - } - })); - return result.get(); - } - - /** - * Generate an optimal real-time read/write requests routing policy. - * - * @return Map<TConsensusGroupId, TRegionReplicaSet>, The routing policy of read/write requests - * for each Region is based on the order in the TRegionReplicaSet. The replica with higher - * sorting result have higher priority. - */ - public Map<TConsensusGroupId, TRegionReplicaSet> getLatestRegionRouteMap() { - return routeBalancer.getLatestRegionPriorityMap(); - } - public void broadcastLatestRegionRouteMap() { statisticsService.broadcastLatestRegionRouteMap(); } public void startLoadServices() { loadCache.initHeartbeatCache(configManager); - routeBalancer.initRegionRouteMap(); heartbeatService.startHeartbeatService(); statisticsService.startLoadStatisticsService(); } @@ -358,4 +321,71 @@ public class LoadManager { public void removeRegionGroupCache(TConsensusGroupId consensusGroupId) { loadCache.removeRegionGroupCache(consensusGroupId); } + + /** + * Get the latest RegionLeaderMap. + * + * @return Map<RegionGroupId, leaderId> + */ + public Map<TConsensusGroupId, Integer> getRegionLeaderMap() { + return loadCache.getRegionLeaderMap(); + } + + /** + * Get the latest RegionPriorityMap. + * + * @return Map<RegionGroupId, RegionPriority> + */ + public Map<TConsensusGroupId, TRegionReplicaSet> getRegionPriorityMap() { + return loadCache.getRegionPriorityMap(); + } + + /** + * Get the number of RegionGroup-leaders in the specified DataNode. + * + * @param dataNodeId The specified DataNode + * @param type SchemaRegion or DataRegion + * @return The number of RegionGroup-leaders + */ + public int getRegionGroupLeaderCount(int dataNodeId, TConsensusGroupType type) { + AtomicInteger result = new AtomicInteger(0); + getRegionLeaderMap() + .forEach( + ((consensusGroupId, leaderId) -> { + if (dataNodeId == leaderId && type.equals(consensusGroupId.getType())) { + result.getAndIncrement(); + } + })); + return result.get(); + } + + /** + * Force update the specified RegionGroup's leader. + * + * @param regionGroupId Specified RegionGroupId + * @param leaderId Leader DataNodeId + */ + public void forceUpdateRegionLeader(TConsensusGroupId regionGroupId, int leaderId) { + loadCache.forceUpdateRegionLeader(regionGroupId, leaderId); + } + + /** + * Force update the specified RegionGroup's priority. + * + * @param regionGroupId Specified RegionGroupId + * @param regionPriority Region route priority + */ + public void forceUpdateRegionPriority( + TConsensusGroupId regionGroupId, TRegionReplicaSet regionPriority) { + loadCache.forceUpdateRegionPriority(regionGroupId, regionPriority); + } + + /** + * Remove the specified RegionGroup's route cache. + * + * @param regionGroupId Specified RegionGroupId + */ + public void removeRegionRouteCache(TConsensusGroupId regionGroupId) { + loadCache.removeRegionRouteCache(regionGroupId); + } } diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java index 3599a442fa..6573a970d4 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java @@ -25,8 +25,6 @@ import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.cluster.NodeStatus; -import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory; -import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil; import org.apache.iotdb.confignode.client.DataNodeRequestType; import org.apache.iotdb.confignode.client.async.AsyncDataNodeClientPool; import org.apache.iotdb.confignode.client.async.handlers.AsyncClientHandler; @@ -34,14 +32,12 @@ import org.apache.iotdb.confignode.conf.ConfigNodeConfig; import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor; import org.apache.iotdb.confignode.manager.IManager; import org.apache.iotdb.confignode.manager.load.LoadManager; -import org.apache.iotdb.confignode.manager.load.balancer.router.RegionRouteMap; import org.apache.iotdb.confignode.manager.load.balancer.router.leader.GreedyLeaderBalancer; import org.apache.iotdb.confignode.manager.load.balancer.router.leader.ILeaderBalancer; import org.apache.iotdb.confignode.manager.load.balancer.router.leader.MinCostFlowLeaderBalancer; import org.apache.iotdb.confignode.manager.load.balancer.router.priority.GreedyPriorityBalancer; import org.apache.iotdb.confignode.manager.load.balancer.router.priority.IPriorityBalancer; import org.apache.iotdb.confignode.manager.load.balancer.router.priority.LeaderPriorityBalancer; -import org.apache.iotdb.confignode.manager.load.subscriber.RouteChangeEvent; import org.apache.iotdb.confignode.manager.node.NodeManager; import org.apache.iotdb.confignode.manager.partition.PartitionManager; import org.apache.iotdb.consensus.ConsensusFactory; @@ -51,13 +47,8 @@ import org.apache.iotdb.tsfile.utils.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.HashMap; -import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Future; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; @@ -92,33 +83,16 @@ public class RouteBalancer { private static final boolean IS_DATA_REGION_IOT_CONSENSUS = ConsensusFactory.IOT_CONSENSUS.equals(DATA_REGION_CONSENSUS_PROTOCOL_CLASS); - // Key: RegionGroupId - // Value: Pair<Timestamp, LeaderDataNodeId>, where - // the left value stands for sampling timestamp - // and the right value stands for the index of DataNode that leader resides. - private final Map<TConsensusGroupId, Pair<Long, Integer>> leaderCache; - private final IManager configManager; /** RegionRouteMap */ - private final RegionRouteMap regionRouteMap; // For generating optimal RegionLeaderMap private final ILeaderBalancer leaderBalancer; // For generating optimal RegionPriorityMap private final IPriorityBalancer priorityRouter; - /** Leader Balancing service */ - // TODO: leader balancing should be triggered by cluster events - private Future<?> currentLeaderBalancingFuture; - - private final ScheduledExecutorService leaderBalancingExecutor = - IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("Cluster-LeaderBalancing-Service"); - private final Object scheduleMonitor = new Object(); - public RouteBalancer(IManager configManager) { this.configManager = configManager; - this.regionRouteMap = new RegionRouteMap(); - this.leaderCache = new ConcurrentHashMap<>(); switch (CONF.getLeaderDistributionPolicy()) { case ILeaderBalancer.GREEDY_POLICY: @@ -142,160 +116,33 @@ public class RouteBalancer { } /** - * Cache the latest leader of a RegionGroup. - * - * @param regionGroupId the id of the RegionGroup - * @param leaderSample the latest leader of a RegionGroup - */ - public void cacheLeaderSample(TConsensusGroupId regionGroupId, Pair<Long, Integer> leaderSample) { - if (TConsensusGroupType.DataRegion.equals(regionGroupId.getType()) - && IS_DATA_REGION_IOT_CONSENSUS) { - // The leadership of IoTConsensus protocol is decided by ConfigNode-leader - return; - } - - leaderCache.putIfAbsent(regionGroupId, leaderSample); - synchronized (leaderCache.get(regionGroupId)) { - if (leaderCache.get(regionGroupId).getLeft() < leaderSample.getLeft()) { - leaderCache.replace(regionGroupId, leaderSample); - } - } - } - - /** - * Invoking periodically to update the RegionRouteMap - * - * @return RouteChangeEvent - */ - public RouteChangeEvent updateRegionRouteMap() { - synchronized (regionRouteMap) { - RegionRouteMap preRouteMap = new RegionRouteMap(regionRouteMap); - updateRegionLeaderMap(); - updateRegionPriorityMap(); - return new RouteChangeEvent(preRouteMap, regionRouteMap); - } - } - - private void updateRegionLeaderMap() { - leaderCache.forEach( - (regionGroupId, leadershipSample) -> { - if (TConsensusGroupType.DataRegion.equals(regionGroupId.getType()) - && IS_DATA_REGION_IOT_CONSENSUS) { - // Ignore IoTConsensus consensus protocol - return; - } - - if (leadershipSample.getRight() != regionRouteMap.getLeader(regionGroupId)) { - // Update leader - regionRouteMap.setLeader(regionGroupId, leadershipSample.getRight()); - } - }); - } - - private void updateRegionPriorityMap() { - Map<TConsensusGroupId, Integer> regionLeaderMap = regionRouteMap.getRegionLeaderMap(); - Map<Integer, Long> dataNodeLoadScoreMap = getLoadManager().getAllDataNodeLoadScores(); - - // Balancing region priority in each SchemaRegionGroup - Map<TConsensusGroupId, TRegionReplicaSet> latestRegionPriorityMap = - priorityRouter.generateOptimalRoutePriority( - getPartitionManager().getAllReplicaSets(TConsensusGroupType.SchemaRegion), - regionLeaderMap, - dataNodeLoadScoreMap); - // Balancing region priority in each DataRegionGroup - latestRegionPriorityMap.putAll( - priorityRouter.generateOptimalRoutePriority( - getPartitionManager().getAllReplicaSets(TConsensusGroupType.DataRegion), - regionLeaderMap, - dataNodeLoadScoreMap)); - - if (!latestRegionPriorityMap.equals(regionRouteMap.getRegionPriorityMap())) { - regionRouteMap.setRegionPriorityMap(latestRegionPriorityMap); - } - } - - /** - * Select leader for the specified RegionGroup greedily. The selected leader will be the DataNode - * that currently has the fewest leaders + * Balance cluster RegionGroup leader distribution through configured algorithm * - * @param regionGroupId The specified RegionGroup - * @param dataNodeIds The indices of DataNodes where the RegionReplicas reside + * @return Map<RegionGroupId, Pair<old leader index, new leader index>> */ - public void greedySelectLeader(TConsensusGroupId regionGroupId, List<Integer> dataNodeIds) { - synchronized (regionRouteMap) { - // Map<DataNodeId, The number of leaders> - Map<Integer, AtomicInteger> leaderCounter = new HashMap<>(); - regionRouteMap - .getRegionLeaderMap() - .forEach( - (consensusGroupId, leaderId) -> { - if (TConsensusGroupType.DataRegion.equals(consensusGroupId.getType())) { - leaderCounter - .computeIfAbsent(leaderId, empty -> new AtomicInteger(0)) - .getAndIncrement(); - } - }); - - int newLeaderId = -1; - int minCount = Integer.MAX_VALUE; - AtomicInteger zero = new AtomicInteger(0); - for (int dataNodeId : dataNodeIds) { - int leaderCount = leaderCounter.getOrDefault(dataNodeId, zero).get(); - if (leaderCount < minCount) { - newLeaderId = dataNodeId; - minCount = leaderCount; - } - } - regionRouteMap.setLeader(regionGroupId, newLeaderId); - } - } - - /** Start the route balancing service */ - public void startRouteBalancingService() { - synchronized (scheduleMonitor) { - if (currentLeaderBalancingFuture == null) { - currentLeaderBalancingFuture = - ScheduledExecutorUtil.safelyScheduleWithFixedDelay( - leaderBalancingExecutor, - this::balancingRegionLeader, - 0, - // Execute route balancing service in every 20 loops of heartbeat service - NodeManager.HEARTBEAT_INTERVAL * 20, - TimeUnit.MILLISECONDS); - LOGGER.info("Route-Balancing service is started successfully."); - } - } - } - - /** Stop the route balancing service */ - public void stopRouteBalancingService() { - synchronized (scheduleMonitor) { - if (currentLeaderBalancingFuture != null) { - currentLeaderBalancingFuture.cancel(false); - currentLeaderBalancingFuture = null; - leaderCache.clear(); - regionRouteMap.clear(); - LOGGER.info("Route-Balancing service is stopped successfully."); - } - } - } - - private void balancingRegionLeader() { + public Map<TConsensusGroupId, Pair<Integer, Integer>> balanceRegionLeader() { + Map<TConsensusGroupId, Pair<Integer, Integer>> differentRegionLeaderMap = + new ConcurrentHashMap<>(); if (IS_ENABLE_AUTO_LEADER_BALANCE_FOR_SCHEMA_REGION) { - balancingRegionLeader(TConsensusGroupType.SchemaRegion); + differentRegionLeaderMap.putAll(balanceRegionLeader(TConsensusGroupType.SchemaRegion)); } - if (IS_ENABLE_AUTO_LEADER_BALANCE_FOR_DATA_REGION) { - balancingRegionLeader(TConsensusGroupType.DataRegion); + differentRegionLeaderMap.putAll(balanceRegionLeader(TConsensusGroupType.DataRegion)); } + return differentRegionLeaderMap; } - private void balancingRegionLeader(TConsensusGroupType regionGroupType) { + private Map<TConsensusGroupId, Pair<Integer, Integer>> balanceRegionLeader( + TConsensusGroupType regionGroupType) { + Map<TConsensusGroupId, Pair<Integer, Integer>> differentRegionLeaderMap = + new ConcurrentHashMap<>(); + // Collect the latest data and generate the optimal leader distribution - Map<TConsensusGroupId, Integer> leaderDistribution = + Map<TConsensusGroupId, Integer> currentLeaderMap = getLoadManager().getRegionLeaderMap(); + Map<TConsensusGroupId, Integer> optimalLeadderMap = leaderBalancer.generateOptimalLeaderDistribution( getPartitionManager().getAllReplicaSetsMap(regionGroupType), - regionRouteMap.getRegionLeaderMap(), + currentLeaderMap, getNodeManager() .filterDataNodeThroughStatus( NodeStatus.Unknown, NodeStatus.ReadOnly, NodeStatus.Removing) @@ -308,9 +155,9 @@ public class RouteBalancer { AtomicInteger requestId = new AtomicInteger(0); AsyncClientHandler<TRegionLeaderChangeReq, TSStatus> clientHandler = new AsyncClientHandler<>(DataNodeRequestType.CHANGE_REGION_LEADER); - leaderDistribution.forEach( + optimalLeadderMap.forEach( (regionGroupId, newLeaderId) -> { - if (newLeaderId != -1 && newLeaderId != regionRouteMap.getLeader(regionGroupId)) { + if (newLeaderId != -1 && !newLeaderId.equals(currentLeaderMap.get(regionGroupId))) { String consensusProtocolClass; switch (regionGroupId.getType()) { case SchemaRegion: @@ -331,17 +178,15 @@ public class RouteBalancer { clientHandler, regionGroupId, getNodeManager().getRegisteredDataNode(newLeaderId).getLocation()); + differentRegionLeaderMap.put( + regionGroupId, new Pair<>(currentLeaderMap.get(regionGroupId), newLeaderId)); } }); - if (requestId.get() > 0) { // Don't retry ChangeLeader request AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler, 1); } - } - - public void changeLeaderForIoTConsensus(TConsensusGroupId regionGroupId, int newLeaderId) { - regionRouteMap.setLeader(regionGroupId, newLeaderId); + return differentRegionLeaderMap; } private void changeRegionLeader( @@ -354,7 +199,7 @@ public class RouteBalancer { case ConsensusFactory.IOT_CONSENSUS: // For IoTConsensus protocol, change RegionRouteMap is enough. // And the result will be broadcast by Cluster-LoadStatistics-Service soon. - regionRouteMap.setLeader(regionGroupId, newLeader.getDataNodeId()); + getLoadManager().forceUpdateRegionLeader(regionGroupId, newLeader.getDataNodeId()); break; case ConsensusFactory.RATIS_CONSENSUS: default: @@ -371,36 +216,48 @@ public class RouteBalancer { } } - /** Initialize the regionRouteMap when the ConfigNode-Leader is switched */ - public void initRegionRouteMap() { - synchronized (regionRouteMap) { - regionRouteMap.clear(); - if (IS_DATA_REGION_IOT_CONSENSUS) { - // Greedily pick leader for all existed DataRegionGroups - List<TRegionReplicaSet> dataRegionGroups = - getPartitionManager().getAllReplicaSets(TConsensusGroupType.DataRegion); - for (TRegionReplicaSet dataRegionGroup : dataRegionGroups) { - greedySelectLeader( - dataRegionGroup.getRegionId(), - dataRegionGroup.getDataNodeLocations().stream() - .map(TDataNodeLocation::getDataNodeId) - .collect(Collectors.toList())); - } - } - updateRegionRouteMap(); - } - } + /** + * Balance cluster RegionGroup route priority through configured algorithm + * + * @return Map<RegionGroupId, Pair<old route priority, new route priority>> + */ + public Map<TConsensusGroupId, Pair<TRegionReplicaSet, TRegionReplicaSet>> + balanceRegionPriority() { - public Map<TConsensusGroupId, Integer> getLatestRegionLeaderMap() { - return regionRouteMap.getRegionLeaderMap(); - } + Map<TConsensusGroupId, TRegionReplicaSet> currentPriorityMap = + getLoadManager().getRegionPriorityMap(); + Map<TConsensusGroupId, Integer> regionLeaderMap = getLoadManager().getRegionLeaderMap(); + Map<Integer, Long> dataNodeLoadScoreMap = getLoadManager().getAllDataNodeLoadScores(); - public Map<TConsensusGroupId, TRegionReplicaSet> getLatestRegionPriorityMap() { - return regionRouteMap.getRegionPriorityMap(); - } + // Balancing region priority in each SchemaRegionGroup + Map<TConsensusGroupId, TRegionReplicaSet> optimalRegionPriorityMap = + priorityRouter.generateOptimalRoutePriority( + getPartitionManager().getAllReplicaSets(TConsensusGroupType.SchemaRegion), + regionLeaderMap, + dataNodeLoadScoreMap); + // Balancing region priority in each DataRegionGroup + optimalRegionPriorityMap.putAll( + priorityRouter.generateOptimalRoutePriority( + getPartitionManager().getAllReplicaSets(TConsensusGroupType.DataRegion), + regionLeaderMap, + dataNodeLoadScoreMap)); - public RegionRouteMap getRegionRouteMap() { - return regionRouteMap; + Map<TConsensusGroupId, Pair<TRegionReplicaSet, TRegionReplicaSet>> differentRegionPriorityMap = + new ConcurrentHashMap<>(); + for (TConsensusGroupId regionGroupId : currentPriorityMap.keySet()) { + if (!optimalRegionPriorityMap + .get(regionGroupId) + .equals(currentPriorityMap.get(regionGroupId))) { + differentRegionPriorityMap.put( + regionGroupId, + new Pair<>( + currentPriorityMap.get(regionGroupId), + optimalRegionPriorityMap.get(regionGroupId))); + getLoadManager() + .forceUpdateRegionPriority(regionGroupId, optimalRegionPriorityMap.get(regionGroupId)); + } + } + return differentRegionPriorityMap; } private NodeManager getNodeManager() { diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/RegionRouteMap.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/RegionRouteMap.java deleted file mode 100644 index cbe9003355..0000000000 --- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/RegionRouteMap.java +++ /dev/null @@ -1,176 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iotdb.confignode.manager.load.balancer.router; - -import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; -import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; -import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils; -import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; - -import org.apache.thrift.TException; -import org.apache.thrift.protocol.TProtocol; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.nio.ByteBuffer; -import java.util.Map; -import java.util.Objects; -import java.util.concurrent.ConcurrentHashMap; - -public class RegionRouteMap { - - // Map<RegionGroupId, LeaderDataNodeId> - private Map<TConsensusGroupId, Integer> regionLeaderMap; - - // Map<RegionGroupId, TRegionReplicaSet> - // Indicate the routing priority of read/write requests for each RegionGroup. - // The replica with higher sorting result have higher priority. - // TODO: Might be split into readRouteMap and writeRouteMap in the future - private Map<TConsensusGroupId, TRegionReplicaSet> regionPriorityMap; - - public RegionRouteMap() { - this.regionLeaderMap = new ConcurrentHashMap<>(); - this.regionPriorityMap = new ConcurrentHashMap<>(); - } - - public RegionRouteMap(RegionRouteMap other) { - this.regionLeaderMap = new ConcurrentHashMap<>(other.regionLeaderMap); - this.regionPriorityMap = new ConcurrentHashMap<>(other.regionPriorityMap); - } - - /** - * @return DataNodeId where the specified RegionGroup's leader resides. And return -1 if the - * leader is not recorded yet - */ - public int getLeader(TConsensusGroupId regionGroupId) { - return regionLeaderMap.getOrDefault(regionGroupId, -1); - } - - public void setLeader(TConsensusGroupId regionGroupId, int leaderId) { - regionLeaderMap.put(regionGroupId, leaderId); - } - - public Map<TConsensusGroupId, Integer> getRegionLeaderMap() { - return regionLeaderMap; - } - - public void setRegionLeaderMap(Map<TConsensusGroupId, Integer> regionLeaderMap) { - this.regionLeaderMap = regionLeaderMap; - } - - public Map<TConsensusGroupId, TRegionReplicaSet> getRegionPriorityMap() { - return regionPriorityMap; - } - - public boolean isEmpty() { - return regionLeaderMap.isEmpty() && regionPriorityMap.isEmpty(); - } - - public void setRegionPriorityMap(Map<TConsensusGroupId, TRegionReplicaSet> regionPriorityMap) { - this.regionPriorityMap = regionPriorityMap; - } - - public void removeRegionRouteCache(TConsensusGroupId regionGroupId) { - this.regionLeaderMap.remove(regionGroupId); - this.regionPriorityMap.remove(regionGroupId); - } - - public void serialize(OutputStream stream, TProtocol protocol) throws IOException { - try { - ReadWriteIOUtils.write(regionLeaderMap.size(), stream); - for (Map.Entry<TConsensusGroupId, Integer> leaderEntry : regionLeaderMap.entrySet()) { - leaderEntry.getKey().write(protocol); - ReadWriteIOUtils.write(leaderEntry.getValue(), stream); - } - - ReadWriteIOUtils.write(regionPriorityMap.size(), stream); - for (Map.Entry<TConsensusGroupId, TRegionReplicaSet> priorityEntry : - regionPriorityMap.entrySet()) { - priorityEntry.getKey().write(protocol); - priorityEntry.getValue().write(protocol); - } - } catch (TException e) { - throw new IOException(e); - } - } - - // Deserializer for consensus-write - public void deserialize(ByteBuffer buffer) { - this.regionLeaderMap = new ConcurrentHashMap<>(); - int leaderEntryNum = buffer.getInt(); - for (int i = 0; i < leaderEntryNum; i++) { - TConsensusGroupId regionGroupId = - ThriftCommonsSerDeUtils.deserializeTConsensusGroupId(buffer); - int leaderId = buffer.getInt(); - regionLeaderMap.put(regionGroupId, leaderId); - } - - this.regionPriorityMap = new ConcurrentHashMap<>(); - int priorityEntryNum = buffer.getInt(); - for (int i = 0; i < priorityEntryNum; i++) { - TConsensusGroupId regionGroupId = - ThriftCommonsSerDeUtils.deserializeTConsensusGroupId(buffer); - TRegionReplicaSet regionReplicaSet = - ThriftCommonsSerDeUtils.deserializeTRegionReplicaSet(buffer); - regionPriorityMap.put(regionGroupId, regionReplicaSet); - } - } - - // Deserializer for snapshot - public void deserialize(InputStream stream, TProtocol protocol) throws IOException, TException { - this.regionLeaderMap = new ConcurrentHashMap<>(); - int leaderEntryNum = ReadWriteIOUtils.readInt(stream); - for (int i = 0; i < leaderEntryNum; i++) { - TConsensusGroupId regionGroupId = new TConsensusGroupId(); - regionGroupId.read(protocol); - int leaderId = ReadWriteIOUtils.readInt(stream); - regionLeaderMap.put(regionGroupId, leaderId); - } - - this.regionPriorityMap = new ConcurrentHashMap<>(); - int priorityEntryNum = ReadWriteIOUtils.readInt(stream); - for (int i = 0; i < priorityEntryNum; i++) { - TConsensusGroupId regionGroupId = new TConsensusGroupId(); - regionGroupId.read(protocol); - TRegionReplicaSet regionReplicaSet = new TRegionReplicaSet(); - regionReplicaSet.read(protocol); - regionPriorityMap.put(regionGroupId, regionReplicaSet); - } - } - - public void clear() { - regionLeaderMap.clear(); - regionPriorityMap.clear(); - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - RegionRouteMap that = (RegionRouteMap) o; - return regionLeaderMap.equals(that.regionLeaderMap) - && regionPriorityMap.equals(that.regionPriorityMap); - } - - @Override - public int hashCode() { - return Objects.hash(regionLeaderMap, regionPriorityMap); - } -} diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java index 5a487de11c..a81efde33b 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java @@ -36,6 +36,7 @@ import org.apache.iotdb.confignode.manager.load.cache.node.NodeStatistics; import org.apache.iotdb.confignode.manager.load.cache.region.RegionGroupCache; import org.apache.iotdb.confignode.manager.load.cache.region.RegionGroupStatistics; import org.apache.iotdb.confignode.manager.load.cache.region.RegionHeartbeatSample; +import org.apache.iotdb.confignode.manager.load.cache.route.RegionRouteCache; import org.apache.iotdb.confignode.manager.partition.RegionGroupStatus; import org.apache.iotdb.tsfile.utils.Pair; @@ -55,10 +56,13 @@ public class LoadCache { private final Map<Integer, BaseNodeCache> nodeCacheMap; // Map<RegionGroupId, RegionGroupCache> private final Map<TConsensusGroupId, RegionGroupCache> regionGroupCacheMap; + // Map<RegionGroupId, RegionRouteCache> + private final Map<TConsensusGroupId, RegionRouteCache> regionRouteCacheMap; public LoadCache() { this.nodeCacheMap = new ConcurrentHashMap<>(); this.regionGroupCacheMap = new ConcurrentHashMap<>(); + this.regionRouteCacheMap = new ConcurrentHashMap<>(); } public void initHeartbeatCache(IManager configManager) { @@ -102,10 +106,11 @@ public class LoadCache { private void initRegionGroupHeartbeatCache(List<TRegionReplicaSet> regionReplicaSets) { regionGroupCacheMap.clear(); regionReplicaSets.forEach( - regionReplicaSet -> - regionGroupCacheMap.put( - regionReplicaSet.getRegionId(), - new RegionGroupCache(regionReplicaSet.getRegionId()))); + regionReplicaSet -> { + TConsensusGroupId consensusGroupId = regionReplicaSet.getRegionId(); + regionGroupCacheMap.put(consensusGroupId, new RegionGroupCache(consensusGroupId)); + regionRouteCacheMap.put(consensusGroupId, new RegionRouteCache(consensusGroupId)); + }); } public void clearHeartbeatCache() { @@ -151,6 +156,18 @@ public class LoadCache { .cacheHeartbeatSample(nodeId, sample); } + /** + * Cache the latest leader of a RegionGroup. + * + * @param regionGroupId the id of the RegionGroup + * @param leaderSample the latest leader of a RegionGroup + */ + public void cacheLeaderSample(TConsensusGroupId regionGroupId, Pair<Long, Integer> leaderSample) { + regionRouteCacheMap + .computeIfAbsent(regionGroupId, empty -> new RegionRouteCache(regionGroupId)) + .cacheLeaderSample(leaderSample); + } + /** * Periodic invoke to update the NodeStatistics of all Nodes. * @@ -194,6 +211,21 @@ public class LoadCache { return differentRegionGroupStatisticsMap; } + public Map<TConsensusGroupId, Pair<Integer, Integer>> updateRegionGroupLeader() { + Map<TConsensusGroupId, Pair<Integer, Integer>> differentRegionGroupLeaderMap = + new ConcurrentHashMap<>(); + regionRouteCacheMap.forEach( + (regionGroupId, regionRouteCache) -> { + int prevLeader = regionRouteCache.getLeaderId(); + if (regionRouteCache.periodicUpdate()) { + // Update and record the changed RegionGroupStatistics + differentRegionGroupLeaderMap.put( + regionGroupId, new Pair<>(prevLeader, regionRouteCache.getLeaderId())); + } + }); + return differentRegionGroupLeaderMap; + } + /** * Safely get NodeStatus by NodeId. * @@ -454,4 +486,64 @@ public class LoadCache { public void removeRegionGroupCache(TConsensusGroupId consensusGroupId) { regionGroupCacheMap.remove(consensusGroupId); } + + /** + * Get the latest RegionLeaderMap. + * + * @return Map<RegionGroupId, leaderId> + */ + public Map<TConsensusGroupId, Integer> getRegionLeaderMap() { + Map<TConsensusGroupId, Integer> regionLeaderMap = new ConcurrentHashMap<>(); + regionRouteCacheMap.forEach( + (regionGroupId, regionRouteCache) -> + regionLeaderMap.put(regionGroupId, regionRouteCache.getLeaderId())); + return regionLeaderMap; + } + + /** + * Get the latest RegionPriorityMap. + * + * @return Map<RegionGroupId, RegionPriority> + */ + public Map<TConsensusGroupId, TRegionReplicaSet> getRegionPriorityMap() { + Map<TConsensusGroupId, TRegionReplicaSet> regionPriorityMap = new ConcurrentHashMap<>(); + regionRouteCacheMap.forEach( + (regionGroupId, regionRouteCache) -> + regionPriorityMap.put(regionGroupId, regionRouteCache.getRegionPriority())); + return regionPriorityMap; + } + + /** + * Force update the specified RegionGroup's leader. + * + * @param regionGroupId Specified RegionGroupId + * @param leaderId Leader DataNodeId + */ + public void forceUpdateRegionLeader(TConsensusGroupId regionGroupId, int leaderId) { + regionRouteCacheMap + .computeIfAbsent(regionGroupId, empty -> new RegionRouteCache(regionGroupId)) + .forceUpdateRegionLeader(leaderId); + } + + /** + * Force update the specified RegionGroup's priority. + * + * @param regionGroupId Specified RegionGroupId + * @param regionPriority Region route priority + */ + public void forceUpdateRegionPriority( + TConsensusGroupId regionGroupId, TRegionReplicaSet regionPriority) { + regionRouteCacheMap + .computeIfAbsent(regionGroupId, empty -> new RegionRouteCache(regionGroupId)) + .forceUpdateRegionPriority(regionPriority); + } + + /** + * Remove the specified RegionGroup's route cache. + * + * @param regionGroupId Specified RegionGroupId + */ + public void removeRegionRouteCache(TConsensusGroupId regionGroupId) { + regionRouteCacheMap.remove(regionGroupId); + } } diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/route/RegionRouteCache.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/route/RegionRouteCache.java new file mode 100644 index 0000000000..105ac04946 --- /dev/null +++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/route/RegionRouteCache.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.confignode.manager.load.cache.route; + +import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; +import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; +import org.apache.iotdb.confignode.conf.ConfigNodeConfig; +import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor; +import org.apache.iotdb.consensus.ConsensusFactory; +import org.apache.iotdb.tsfile.utils.Pair; + +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +public class RegionRouteCache { + + private static final ConfigNodeConfig CONF = ConfigNodeDescriptor.getInstance().getConf(); + private static final String SCHEMA_REGION_CONSENSUS_PROTOCOL_CLASS = + CONF.getSchemaRegionConsensusProtocolClass(); + private static final String DATA_REGION_CONSENSUS_PROTOCOL_CLASS = + CONF.getDataRegionConsensusProtocolClass(); + + private final TConsensusGroupId consensusGroupId; + private final String consensusProtocolClass; + + // Pair<Timestamp, LeaderDataNodeId>, where + // the left value stands for sampling timestamp + // and the right value stands for the index of DataNode that leader resides. + private final AtomicReference<Pair<Long, Integer>> leaderSample; + private final AtomicInteger leaderId; + private final AtomicReference<TRegionReplicaSet> regionPriority; + + public RegionRouteCache(TConsensusGroupId consensusGroupId) { + this.consensusGroupId = consensusGroupId; + switch (consensusGroupId.getType()) { + case SchemaRegion: + this.consensusProtocolClass = SCHEMA_REGION_CONSENSUS_PROTOCOL_CLASS; + break; + case DataRegion: + default: + this.consensusProtocolClass = DATA_REGION_CONSENSUS_PROTOCOL_CLASS; + break; + } + + this.leaderSample = new AtomicReference<>(new Pair<>(0L, -1)); + this.leaderId = new AtomicInteger(-1); + this.regionPriority = new AtomicReference<>(new TRegionReplicaSet()); + } + + /** + * Cache the latest leader of a RegionGroup. + * + * @param leaderSample the latest leader of a RegionGroup + */ + public synchronized void cacheLeaderSample(Pair<Long, Integer> leaderSample) { + // The leader of other consensus protocol is selected by ConfigNode-leader. + if (consensusProtocolClass.equals(ConsensusFactory.RATIS_CONSENSUS)) { + // Only the leader of ratis consensus is self-elected + if (leaderSample.getLeft() > this.leaderSample.get().getLeft()) { + this.leaderSample.set(leaderSample); + } + } + } + + /** + * Invoking periodically in the Cluster-LoadStatistics-Service to update leaderId and compare with + * the previous leader, in order to detect whether the RegionGroup's leader has changed. + * + * @return True if the leader has changed recently(compare with the leaderId), false otherwise + */ + public boolean periodicUpdate() { + switch (consensusProtocolClass) { + case ConsensusFactory.RATIS_CONSENSUS: + // Only the leader of ratis consensus is self-elected + if (leaderSample.get().getRight() != leaderId.get()) { + leaderId.set(leaderSample.get().getRight()); + return true; + } + return false; + case ConsensusFactory.IOT_CONSENSUS: + default: + // The leader of other consensus protocol is selected by ConfigNode-leader. + // The leaderId is initialized to -1, in this case return ture will trigger the leader + // selection. + return leaderId.get() == -1; + } + } + + /** + * Force update the specified RegionGroup's leader. + * + * @param leaderId Leader DataNodeId + */ + public void forceUpdateRegionLeader(int leaderId) { + this.leaderId.set(leaderId); + } + + /** + * Force update the specified RegionGroup's priority. + * + * @param regionPriority Region route priority + */ + public void forceUpdateRegionPriority(TRegionReplicaSet regionPriority) { + this.regionPriority.set(regionPriority); + } + + public int getLeaderId() { + return leaderId.get(); + } + + public TRegionReplicaSet getRegionPriority() { + return regionPriority.get(); + } +} diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java index 18c3915319..2abd75acef 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java @@ -171,7 +171,6 @@ public class HeartbeatService { new DataNodeHeartbeatHandler( dataNodeInfo.getLocation().getDataNodeId(), loadCache, - configManager.getLoadManager().getRouteBalancer(), configManager.getClusterQuotaManager().getDeviceNum(), configManager.getClusterQuotaManager().getTimeSeriesNum(), configManager.getClusterQuotaManager().getRegionDisk()); diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/StatisticsService.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/StatisticsService.java index bdbb24cc8e..627802539b 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/StatisticsService.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/StatisticsService.java @@ -31,6 +31,7 @@ import org.apache.iotdb.confignode.client.async.AsyncDataNodeClientPool; import org.apache.iotdb.confignode.client.async.handlers.AsyncClientHandler; import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor; import org.apache.iotdb.confignode.manager.IManager; +import org.apache.iotdb.confignode.manager.load.LoadManager; import org.apache.iotdb.confignode.manager.load.balancer.RouteBalancer; import org.apache.iotdb.confignode.manager.load.cache.LoadCache; import org.apache.iotdb.confignode.manager.load.cache.node.NodeStatistics; @@ -39,6 +40,7 @@ import org.apache.iotdb.confignode.manager.load.cache.region.RegionStatistics; import org.apache.iotdb.confignode.manager.load.subscriber.IClusterStatusSubscriber; import org.apache.iotdb.confignode.manager.load.subscriber.RouteChangeEvent; import org.apache.iotdb.confignode.manager.load.subscriber.StatisticsChangeEvent; +import org.apache.iotdb.confignode.manager.node.NodeManager; import org.apache.iotdb.mpp.rpc.thrift.TRegionRouteReq; import org.apache.iotdb.tsfile.utils.Pair; @@ -111,7 +113,7 @@ public class StatisticsService implements IClusterStatusSubscriber { // Broadcast the RegionRouteMap if some LoadStatistics has changed boolean isNeedBroadcast = false; - // Update NodeStatistics: + // Update NodeStatistics // Map<NodeId, Pair<old NodeStatistics, new NodeStatistics>> Map<Integer, Pair<NodeStatistics, NodeStatistics>> differentNodeStatisticsMap = loadCache.updateNodeStatistics(); @@ -127,31 +129,32 @@ public class StatisticsService implements IClusterStatusSubscriber { isNeedBroadcast = true; } - if (isNeedBroadcast) { - StatisticsChangeEvent statisticsChangeEvent = - new StatisticsChangeEvent(differentNodeStatisticsMap, differentRegionGroupStatisticsMap); - eventBus.post(statisticsChangeEvent); - } - - // Update RegionRouteMap - RouteChangeEvent routeChangeEvent = routeBalancer.updateRegionRouteMap(); - if (routeChangeEvent.isNeedBroadcast()) { + // Update RegionGroupLeaders + // Map<RegionGroupId, Pair<old leader index, new leader index>> + Map<TConsensusGroupId, Pair<Integer, Integer>> differentRegionLeaderMap = + loadCache.updateRegionGroupLeader(); + if (!differentRegionLeaderMap.isEmpty()) { isNeedBroadcast = true; - eventBus.post(routeChangeEvent); } if (isNeedBroadcast) { + differentRegionLeaderMap.putAll(routeBalancer.balanceRegionLeader()); + Map<TConsensusGroupId, Pair<TRegionReplicaSet, TRegionReplicaSet>> + differentRegionPriorityMap = routeBalancer.balanceRegionPriority(); + + eventBus.post( + new StatisticsChangeEvent(differentNodeStatisticsMap, differentRegionGroupStatisticsMap)); + eventBus.post(new RouteChangeEvent(differentRegionLeaderMap, differentRegionPriorityMap)); broadcastLatestRegionRouteMap(); } } public void broadcastLatestRegionRouteMap() { - Map<TConsensusGroupId, TRegionReplicaSet> latestRegionRouteMap = - routeBalancer.getLatestRegionPriorityMap(); + Map<TConsensusGroupId, TRegionReplicaSet> regionPriorityMap = + getLoadManager().getRegionPriorityMap(); Map<Integer, TDataNodeLocation> dataNodeLocationMap = new ConcurrentHashMap<>(); // Broadcast the RegionRouteMap to all DataNodes except the unknown ones - configManager - .getNodeManager() + getNodeManager() .filterDataNodeThroughStatus(NodeStatus.Running, NodeStatus.Removing, NodeStatus.ReadOnly) .forEach( onlineDataNode -> @@ -164,7 +167,7 @@ public class StatisticsService implements IClusterStatusSubscriber { AsyncClientHandler<TRegionRouteReq, TSStatus> clientHandler = new AsyncClientHandler<>( DataNodeRequestType.UPDATE_REGION_ROUTE_MAP, - new TRegionRouteReq(broadcastTime, latestRegionRouteMap), + new TRegionRouteReq(broadcastTime, regionPriorityMap), dataNodeLocationMap); AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler); LOGGER.info("[UpdateLoadStatistics] Broadcast the latest RegionRouteMap finished."); @@ -236,4 +239,12 @@ public class StatisticsService implements IClusterStatusSubscriber { recordRegionLeaderMap(event.getLeaderMap()); recordRegionPriorityMap(event.getPriorityMap()); } + + private NodeManager getNodeManager() { + return configManager.getNodeManager(); + } + + private LoadManager getLoadManager() { + return configManager.getLoadManager(); + } } diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/subscriber/RouteChangeEvent.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/subscriber/RouteChangeEvent.java index 55153f3bf5..b927d8c0ab 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/subscriber/RouteChangeEvent.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/subscriber/RouteChangeEvent.java @@ -21,11 +21,9 @@ package org.apache.iotdb.confignode.manager.load.subscriber; import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; -import org.apache.iotdb.confignode.manager.load.balancer.router.RegionRouteMap; import org.apache.iotdb.tsfile.utils.Pair; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; public class RouteChangeEvent { @@ -34,34 +32,11 @@ public class RouteChangeEvent { // Map<RegionGroupId, Pair<old Priority, new Priority>> private final Map<TConsensusGroupId, Pair<TRegionReplicaSet, TRegionReplicaSet>> priorityMap; - public RouteChangeEvent(RegionRouteMap preRouteMap, RegionRouteMap currentRouteMap) { - this.leaderMap = new ConcurrentHashMap<>(); - this.priorityMap = new ConcurrentHashMap<>(); - - preRouteMap - .getRegionLeaderMap() - .forEach( - (regionGroupId, oldLeader) -> { - Integer newLeader = currentRouteMap.getRegionLeaderMap().get(regionGroupId); - if (newLeader != null && !newLeader.equals(oldLeader)) { - leaderMap.put(regionGroupId, new Pair<>(oldLeader, newLeader)); - } - }); - - preRouteMap - .getRegionPriorityMap() - .forEach( - (regionGroupId, oldPriority) -> { - TRegionReplicaSet newPriority = - currentRouteMap.getRegionPriorityMap().get(regionGroupId); - if (newPriority != null && !newPriority.equals(oldPriority)) { - priorityMap.put(regionGroupId, new Pair<>(oldPriority, newPriority)); - } - }); - } - - public boolean isNeedBroadcast() { - return !leaderMap.isEmpty() || !priorityMap.isEmpty(); + public RouteChangeEvent( + Map<TConsensusGroupId, Pair<Integer, Integer>> leaderMap, + Map<TConsensusGroupId, Pair<TRegionReplicaSet, TRegionReplicaSet>> priorityMap) { + this.leaderMap = leaderMap; + this.priorityMap = priorityMap; } public Map<TConsensusGroupId, Pair<Integer, Integer>> getLeaderMap() { diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java index f15978d6bd..7e4d8bcaf3 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java @@ -776,7 +776,7 @@ public class PartitionManager { (RegionInfoListResp) getConsensusManager().read(req).getDataset(); // Get cached result - Map<TConsensusGroupId, Integer> allLeadership = getLoadManager().getLatestRegionLeaderMap(); + Map<TConsensusGroupId, Integer> allLeadership = getLoadManager().getRegionLeaderMap(); regionInfoListResp .getRegionInfoList() .forEach( diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java index 46a4c7ab01..ab710f29aa 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java @@ -38,7 +38,6 @@ import org.apache.iotdb.confignode.client.async.AsyncDataNodeClientPool; import org.apache.iotdb.confignode.client.async.handlers.AsyncClientHandler; import org.apache.iotdb.confignode.client.sync.SyncConfigNodeClientPool; import org.apache.iotdb.confignode.client.sync.SyncDataNodeClientPool; -import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor; import org.apache.iotdb.confignode.consensus.request.write.confignode.RemoveConfigNodePlan; import org.apache.iotdb.confignode.consensus.request.write.database.DeleteDatabasePlan; import org.apache.iotdb.confignode.consensus.request.write.database.PreDeleteDatabasePlan; @@ -59,7 +58,6 @@ import org.apache.iotdb.confignode.procedure.exception.ProcedureException; import org.apache.iotdb.confignode.procedure.scheduler.LockQueue; import org.apache.iotdb.confignode.procedure.scheduler.ProcedureScheduler; import org.apache.iotdb.confignode.rpc.thrift.TAddConsensusGroupReq; -import org.apache.iotdb.consensus.ConsensusFactory; import org.apache.iotdb.mpp.rpc.thrift.TActiveTriggerInstanceReq; import org.apache.iotdb.mpp.rpc.thrift.TCreateDataRegionReq; import org.apache.iotdb.mpp.rpc.thrift.TCreatePipeOnDataNodeReq; @@ -529,9 +527,15 @@ public class ConfigNodeProcedureEnv { getConsensusManager().write(createRegionGroupsPlan); } + /** + * Force activating RegionGroup by setting status to Running, therefore the ConfigNode-leader can + * use this RegionGroup to allocate new Partitions + * + * @param regionGroupId Specified RegionGroup + * @param regionStatusMap Map<DataNodeId, RegionStatus> + */ public void activateRegionGroup( TConsensusGroupId regionGroupId, Map<Integer, RegionStatus> regionStatusMap) { - // Force activating RegionGroup long currentTime = System.currentTimeMillis(); Map<Integer, RegionHeartbeatSample> heartbeatSampleMap = new HashMap<>(); regionStatusMap.forEach( @@ -539,27 +543,6 @@ public class ConfigNodeProcedureEnv { heartbeatSampleMap.put( dataNodeId, new RegionHeartbeatSample(currentTime, currentTime, regionStatus))); getLoadManager().forceUpdateRegionGroupCache(regionGroupId, heartbeatSampleMap); - - // Select leader greedily for iot consensus protocol - if (TConsensusGroupType.DataRegion.equals(regionGroupId.getType()) - && ConsensusFactory.IOT_CONSENSUS.equals( - ConfigNodeDescriptor.getInstance().getConf().getDataRegionConsensusProtocolClass())) { - List<Integer> availableDataNodes = new ArrayList<>(); - for (Map.Entry<Integer, RegionStatus> statusEntry : regionStatusMap.entrySet()) { - if (RegionStatus.isNormalStatus(statusEntry.getValue())) { - availableDataNodes.add(statusEntry.getKey()); - } - } - getLoadManager().getRouteBalancer().greedySelectLeader(regionGroupId, availableDataNodes); - } - - // Force update RegionRouteMap - getLoadManager().getRouteBalancer().updateRegionRouteMap(); - } - - public void broadcastRegionGroup() { - // Broadcast the latest RegionRouteMap - getLoadManager().broadcastLatestRegionRouteMap(); } public List<TRegionReplicaSet> getAllReplicaSets(String storageGroup) { diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java index c75fd419b8..b28ceccd24 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java @@ -596,8 +596,7 @@ public class DataNodeRemoveHandler { if (newLeaderNode.isPresent()) { configManager .getLoadManager() - .getRouteBalancer() - .changeLeaderForIoTConsensus(regionId, newLeaderNode.get().getDataNodeId()); + .forceUpdateRegionLeader(regionId, newLeaderNode.get().getDataNodeId()); LOGGER.info( "{}, Change region leader finished for IOT_CONSENSUS, regionId: {}, newLeaderNode: {}", @@ -610,6 +609,7 @@ public class DataNodeRemoveHandler { } if (newLeaderNode.isPresent()) { + // TODO: Trigger event post after enhance RegionMigrate procedure SyncDataNodeClientPool.getInstance() .changeRegionLeader( regionId, originalDataNode.getInternalEndPoint(), newLeaderNode.get()); diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java index eb110e0593..13f16f9ae8 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java @@ -105,7 +105,7 @@ public class CreatePipeProcedureV2 extends AbstractOperatePipeProcedureV2 { final Map<TConsensusGroupId, PipeTaskMeta> consensusGroupIdToTaskMetaMap = new HashMap<>(); env.getConfigManager() .getLoadManager() - .getLatestRegionLeaderMap() + .getRegionLeaderMap() .forEach( (region, leader) -> { consensusGroupIdToTaskMetaMap.put(region, new PipeTaskMeta(0, leader)); diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DataNodeRegionTaskExecutor.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DataNodeRegionTaskExecutor.java index 84dc31f45d..b360d4454a 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DataNodeRegionTaskExecutor.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DataNodeRegionTaskExecutor.java @@ -70,7 +70,7 @@ public abstract class DataNodeRegionTaskExecutor<Q, R> { executeOnAllReplicaset ? getAllReplicaDataNodeRegionGroupMap(targetSchemaRegionGroup) : getLeaderDataNodeRegionGroupMap( - env.getConfigManager().getLoadManager().getLatestRegionLeaderMap(), + env.getConfigManager().getLoadManager().getRegionLeaderMap(), targetSchemaRegionGroup); while (!dataNodeConsensusGroupIdMap.isEmpty()) { AsyncClientHandler<Q, R> clientHandler = prepareRequestHandler(dataNodeConsensusGroupIdMap); @@ -143,7 +143,7 @@ public abstract class DataNodeRegionTaskExecutor<Q, R> { Map<TDataNodeLocation, List<TConsensusGroupId>> availableDataNodeLocation = new HashMap<>(); Map<TConsensusGroupId, Integer> leaderMap = - env.getConfigManager().getLoadManager().getLatestRegionLeaderMap(); + env.getConfigManager().getLoadManager().getRegionLeaderMap(); for (List<TConsensusGroupId> consensusGroupIdList : failedDataNodeConsensusGroupIdMap.values()) { for (TConsensusGroupId consensusGroupId : consensusGroupIdList) { diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteDatabaseProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteDatabaseProcedure.java index 43c3404b58..4eb2b42bfb 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteDatabaseProcedure.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteDatabaseProcedure.java @@ -124,8 +124,6 @@ public class DeleteDatabaseProcedure .removeRegionGroupCache(regionReplicaSet.getRegionId()); env.getConfigManager() .getLoadManager() - .getRouteBalancer() - .getRegionRouteMap() .removeRegionRouteCache(regionReplicaSet.getRegionId()); if (regionReplicaSet diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/CreateRegionGroupsProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/CreateRegionGroupsProcedure.java index 80f0ab1e8b..78abfbca2d 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/CreateRegionGroupsProcedure.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/CreateRegionGroupsProcedure.java @@ -201,7 +201,6 @@ public class CreateRegionGroupsProcedure setNextState(CreateRegionGroupsState.CREATE_REGION_GROUPS_FINISH); break; case CREATE_REGION_GROUPS_FINISH: - env.broadcastRegionGroup(); return Flow.NO_MORE_STATE; } diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/RegionRouteMapTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/RegionRouteMapTest.java deleted file mode 100644 index 986fc68637..0000000000 --- a/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/RegionRouteMapTest.java +++ /dev/null @@ -1,82 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iotdb.confignode.manager.load.balancer.router; - -import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; -import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; -import org.apache.iotdb.common.rpc.thrift.TEndPoint; -import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; -import org.apache.iotdb.tsfile.utils.PublicBAOS; - -import org.apache.thrift.protocol.TBinaryProtocol; -import org.apache.thrift.transport.TIOStreamTransport; -import org.apache.thrift.transport.TTransport; -import org.apache.thrift.transport.TTransportException; -import org.junit.Assert; -import org.junit.Test; - -import java.io.DataOutputStream; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.HashMap; -import java.util.Map; - -import static org.apache.iotdb.common.rpc.thrift.TConsensusGroupType.SchemaRegion; - -public class RegionRouteMapTest { - - @Test - public void RegionRouteMapSerDeTest() throws IOException, TTransportException { - RegionRouteMap regionRouteMap0 = new RegionRouteMap(); - Map<TConsensusGroupId, Integer> regionLeaderMap = new HashMap<>(); - Map<TConsensusGroupId, TRegionReplicaSet> regionPriorityMap = new HashMap<>(); - for (int i = 0; i < 10; i++) { - TConsensusGroupId regionGroupId = new TConsensusGroupId(SchemaRegion, i); - TRegionReplicaSet regionReplicaSet = new TRegionReplicaSet(); - regionReplicaSet.setRegionId(regionGroupId); - for (int j = 0; j < 3; j++) { - regionReplicaSet.addToDataNodeLocations( - new TDataNodeLocation( - j, - new TEndPoint("0.0.0.0", 6667 + j), - new TEndPoint("0.0.0.0", 10730 + j), - new TEndPoint("0.0.0.0", 10740 + j), - new TEndPoint("0.0.0.0", 10760 + j), - new TEndPoint("0.0.0.0", 10750 + j))); - } - regionLeaderMap.put(regionGroupId, i % 3); - regionPriorityMap.put(regionGroupId, regionReplicaSet); - } - regionRouteMap0.setRegionLeaderMap(regionLeaderMap); - regionRouteMap0.setRegionPriorityMap(regionPriorityMap); - - try (PublicBAOS byteArrayOutputStream = new PublicBAOS(); - DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream)) { - TTransport transport = new TIOStreamTransport(outputStream); - TBinaryProtocol protocol = new TBinaryProtocol(transport); - regionRouteMap0.serialize(outputStream, protocol); - - ByteBuffer buffer = - ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size()); - RegionRouteMap regionRouteMap1 = new RegionRouteMap(); - regionRouteMap1.deserialize(buffer); - Assert.assertEquals(regionRouteMap0, regionRouteMap1); - } - } -}
