This is an automated email from the ASF dual-hosted git repository. yongzao pushed a commit to branch Construct-Cluster-LoadPublisher-Thread-and-IClusterStatusSubscriber in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 16a3e4d5dd78ee6cf06791c690c0725f46c4f092 Author: YongzaoDan <[email protected]> AuthorDate: Wed Apr 19 11:01:30 2023 +0800 Finish --- .../confignode/manager/ClusterSchemaManager.java | 15 --- .../iotdb/confignode/manager/load/LoadManager.java | 8 +- .../manager/load/balancer/RouteBalancer.java | 19 ++-- .../load/balancer/router/RegionRouteMap.java | 5 + .../confignode/manager/load/cache/LoadCache.java | 15 ++- .../load/cache/region/RegionGroupCache.java | 4 + .../manager/load/service/StatisticsService.java | 115 ++++++++++++--------- .../subscriber/IClusterStatusSubscriber.java} | 17 ++- .../manager/load/subscriber/RouteChangeEvent.java | 74 +++++++++++++ .../subscriber/StatisticsChangeEvent.java} | 26 +++-- 10 files changed, 210 insertions(+), 88 deletions(-) diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterSchemaManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterSchemaManager.java index 1ac6823635..5bc25f5d71 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterSchemaManager.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterSchemaManager.java @@ -63,7 +63,6 @@ import org.apache.iotdb.confignode.consensus.response.template.TemplateSetInfoRe import org.apache.iotdb.confignode.exception.DatabaseNotExistsException; import org.apache.iotdb.confignode.manager.consensus.ConsensusManager; import org.apache.iotdb.confignode.manager.node.NodeManager; -import org.apache.iotdb.confignode.manager.observer.NodeStatisticsEvent; import org.apache.iotdb.confignode.manager.partition.PartitionManager; import org.apache.iotdb.confignode.manager.partition.PartitionMetrics; import org.apache.iotdb.confignode.persistence.schema.ClusterSchemaInfo; @@ -81,8 +80,6 @@ import org.apache.iotdb.rpc.RpcUtils; import org.apache.iotdb.rpc.TSStatusCode; import org.apache.iotdb.tsfile.utils.Pair; -import com.google.common.eventbus.AllowConcurrentEvents; -import com.google.common.eventbus.Subscribe; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -806,18 +803,6 @@ public class ClusterSchemaManager { return getConsensusManager().write(new DropSchemaTemplatePlan(templateName)).getStatus(); } - /** - * When some Nodes' states changed during a heartbeat loop, the eventbus in LoadManager will post - * the different NodeStatstics event to SyncManager and ClusterSchemaManager. - * - * @param nodeStatisticsEvent nodeStatistics that changed in a heartbeat loop - */ - @Subscribe - @AllowConcurrentEvents - public void handleNodeStatistics(NodeStatisticsEvent nodeStatisticsEvent) { - // TODO - } - private NodeManager getNodeManager() { return configManager.getNodeManager(); } diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java index 2aaa730a8a..b6f635a11b 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java @@ -72,8 +72,8 @@ public class LoadManager { private final HeartbeatService heartbeatService; private final StatisticsService statisticsService; - private final EventBus eventBus = - new AsyncEventBus("LoadManager-EventBus", Executors.newFixedThreadPool(5)); + private final EventBus loadPublisher = + new AsyncEventBus("Cluster-LoadPublisher-Thread", Executors.newFixedThreadPool(5)); public LoadManager(IManager configManager) { this.configManager = configManager; @@ -85,9 +85,9 @@ public class LoadManager { this.loadCache = new LoadCache(); this.heartbeatService = new HeartbeatService(configManager, loadCache); this.statisticsService = - new StatisticsService(configManager, routeBalancer, loadCache, eventBus); + new StatisticsService(configManager, routeBalancer, loadCache, loadPublisher); - eventBus.register(configManager.getClusterSchemaManager()); + loadPublisher.register(statisticsService); } /** diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java index a87cb051ba..31ea24a50f 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java @@ -41,6 +41,7 @@ import org.apache.iotdb.confignode.manager.load.balancer.router.leader.MinCostFl import org.apache.iotdb.confignode.manager.load.balancer.router.priority.GreedyPriorityBalancer; import org.apache.iotdb.confignode.manager.load.balancer.router.priority.IPriorityBalancer; import org.apache.iotdb.confignode.manager.load.balancer.router.priority.LeaderPriorityBalancer; +import org.apache.iotdb.confignode.manager.load.subscriber.RouteChangeEvent; import org.apache.iotdb.confignode.manager.node.NodeManager; import org.apache.iotdb.confignode.manager.partition.PartitionManager; import org.apache.iotdb.consensus.ConsensusFactory; @@ -165,15 +166,18 @@ public class RouteBalancer { /** * Invoking periodically to update the RegionRouteMap * - * @return True if the RegionRouteMap has changed, false otherwise + * @return RouteChangeEvent */ - public boolean updateRegionRouteMap() { + public RouteChangeEvent updateRegionRouteMap() { synchronized (regionRouteMap) { - return updateRegionLeaderMap() | updateRegionPriorityMap(); + RegionRouteMap preRouteMap = new RegionRouteMap(regionRouteMap); + updateRegionLeaderMap(); + updateRegionPriorityMap(); + return new RouteChangeEvent(preRouteMap, regionRouteMap); } } - private boolean updateRegionLeaderMap() { + private void updateRegionLeaderMap() { AtomicBoolean isLeaderChanged = new AtomicBoolean(false); leaderCache.forEach( (regionGroupId, leadershipSample) -> { @@ -189,10 +193,10 @@ public class RouteBalancer { isLeaderChanged.set(true); } }); - return isLeaderChanged.get(); + isLeaderChanged.get(); } - private boolean updateRegionPriorityMap() { + private void updateRegionPriorityMap() { Map<TConsensusGroupId, Integer> regionLeaderMap = regionRouteMap.getRegionLeaderMap(); Map<Integer, Long> dataNodeLoadScoreMap = getLoadManager().getAllDataNodeLoadScores(); @@ -211,9 +215,6 @@ public class RouteBalancer { if (!latestRegionPriorityMap.equals(regionRouteMap.getRegionPriorityMap())) { regionRouteMap.setRegionPriorityMap(latestRegionPriorityMap); - return true; - } else { - return false; } } diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/RegionRouteMap.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/RegionRouteMap.java index 58f456ab8f..cbe9003355 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/RegionRouteMap.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/RegionRouteMap.java @@ -50,6 +50,11 @@ public class RegionRouteMap { this.regionPriorityMap = new ConcurrentHashMap<>(); } + public RegionRouteMap(RegionRouteMap other) { + this.regionLeaderMap = new ConcurrentHashMap<>(other.regionLeaderMap); + this.regionPriorityMap = new ConcurrentHashMap<>(other.regionPriorityMap); + } + /** * @return DataNodeId where the specified RegionGroup's leader resides. And return -1 if the * leader is not recorded yet diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java index 0b2fd7195c..5a487de11c 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java @@ -165,7 +165,7 @@ public class LoadCache { if (nodeCache.periodicUpdate()) { // Update and record the changed NodeStatistics differentNodeStatisticsMap.put( - nodeId, new Pair<>(nodeCache.getStatistics(), preNodeStatistics)); + nodeId, new Pair<>(preNodeStatistics, nodeCache.getStatistics())); } }); return differentNodeStatisticsMap; @@ -176,14 +176,19 @@ public class LoadCache { * * @return a map of changed RegionGroupStatistics */ - public Map<TConsensusGroupId, RegionGroupStatistics> updateRegionGroupStatistics() { - Map<TConsensusGroupId, RegionGroupStatistics> differentRegionGroupStatisticsMap = - new ConcurrentHashMap<>(); + public Map<TConsensusGroupId, Pair<RegionGroupStatistics, RegionGroupStatistics>> + updateRegionGroupStatistics() { + Map<TConsensusGroupId, Pair<RegionGroupStatistics, RegionGroupStatistics>> + differentRegionGroupStatisticsMap = new ConcurrentHashMap<>(); regionGroupCacheMap.forEach( (regionGroupId, regionGroupCache) -> { + RegionGroupStatistics preRegionGroupStatistics = + regionGroupCache.getPreviousStatistics().deepCopy(); if (regionGroupCache.periodicUpdate()) { // Update and record the changed RegionGroupStatistics - differentRegionGroupStatisticsMap.put(regionGroupId, regionGroupCache.getStatistics()); + differentRegionGroupStatisticsMap.put( + regionGroupId, + new Pair<>(preRegionGroupStatistics, regionGroupCache.getStatistics())); } }); return differentRegionGroupStatisticsMap; diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionGroupCache.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionGroupCache.java index a531e440aa..dd21c24721 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionGroupCache.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionGroupCache.java @@ -163,4 +163,8 @@ public class RegionGroupCache { public RegionGroupStatistics getStatistics() { return currentStatistics.get(); } + + public RegionGroupStatistics getPreviousStatistics() { + return previousStatistics.get(); + } } diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/StatisticsService.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/StatisticsService.java index 9e51b153de..bdbb24cc8e 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/StatisticsService.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/StatisticsService.java @@ -32,12 +32,13 @@ import org.apache.iotdb.confignode.client.async.handlers.AsyncClientHandler; import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor; import org.apache.iotdb.confignode.manager.IManager; import org.apache.iotdb.confignode.manager.load.balancer.RouteBalancer; -import org.apache.iotdb.confignode.manager.load.balancer.router.RegionRouteMap; import org.apache.iotdb.confignode.manager.load.cache.LoadCache; import org.apache.iotdb.confignode.manager.load.cache.node.NodeStatistics; import org.apache.iotdb.confignode.manager.load.cache.region.RegionGroupStatistics; import org.apache.iotdb.confignode.manager.load.cache.region.RegionStatistics; -import org.apache.iotdb.confignode.manager.observer.NodeStatisticsEvent; +import org.apache.iotdb.confignode.manager.load.subscriber.IClusterStatusSubscriber; +import org.apache.iotdb.confignode.manager.load.subscriber.RouteChangeEvent; +import org.apache.iotdb.confignode.manager.load.subscriber.StatisticsChangeEvent; import org.apache.iotdb.mpp.rpc.thrift.TRegionRouteReq; import org.apache.iotdb.tsfile.utils.Pair; @@ -52,7 +53,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; -public class StatisticsService { +public class StatisticsService implements IClusterStatusSubscriber { private static final Logger LOGGER = LoggerFactory.getLogger(StatisticsService.class); @@ -111,28 +112,32 @@ public class StatisticsService { boolean isNeedBroadcast = false; // Update NodeStatistics: - // Pair<NodeStatistics, NodeStatistics>:left one means the current NodeStatistics, right one - // means the previous NodeStatistics + // Map<NodeId, Pair<old NodeStatistics, new NodeStatistics>> Map<Integer, Pair<NodeStatistics, NodeStatistics>> differentNodeStatisticsMap = loadCache.updateNodeStatistics(); if (!differentNodeStatisticsMap.isEmpty()) { isNeedBroadcast = true; - recordNodeStatistics(differentNodeStatisticsMap); - eventBus.post(new NodeStatisticsEvent(differentNodeStatisticsMap)); } // Update RegionGroupStatistics - Map<TConsensusGroupId, RegionGroupStatistics> differentRegionGroupStatisticsMap = - loadCache.updateRegionGroupStatistics(); + // Map<RegionGroupId, Pair<old RegionGroupStatistics, new RegionGroupStatistics>> + Map<TConsensusGroupId, Pair<RegionGroupStatistics, RegionGroupStatistics>> + differentRegionGroupStatisticsMap = loadCache.updateRegionGroupStatistics(); if (!differentRegionGroupStatisticsMap.isEmpty()) { isNeedBroadcast = true; - recordRegionGroupStatistics(differentRegionGroupStatisticsMap); + } + + if (isNeedBroadcast) { + StatisticsChangeEvent statisticsChangeEvent = + new StatisticsChangeEvent(differentNodeStatisticsMap, differentRegionGroupStatisticsMap); + eventBus.post(statisticsChangeEvent); } // Update RegionRouteMap - if (routeBalancer.updateRegionRouteMap()) { + RouteChangeEvent routeChangeEvent = routeBalancer.updateRegionRouteMap(); + if (routeChangeEvent.isNeedBroadcast()) { isNeedBroadcast = true; - recordRegionRouteMap(routeBalancer.getRegionRouteMap()); + eventBus.post(routeChangeEvent); } if (isNeedBroadcast) { @@ -140,6 +145,31 @@ public class StatisticsService { } } + public void broadcastLatestRegionRouteMap() { + Map<TConsensusGroupId, TRegionReplicaSet> latestRegionRouteMap = + routeBalancer.getLatestRegionPriorityMap(); + Map<Integer, TDataNodeLocation> dataNodeLocationMap = new ConcurrentHashMap<>(); + // Broadcast the RegionRouteMap to all DataNodes except the unknown ones + configManager + .getNodeManager() + .filterDataNodeThroughStatus(NodeStatus.Running, NodeStatus.Removing, NodeStatus.ReadOnly) + .forEach( + onlineDataNode -> + dataNodeLocationMap.put( + onlineDataNode.getLocation().getDataNodeId(), onlineDataNode.getLocation())); + + LOGGER.info("[UpdateLoadStatistics] Begin to broadcast RegionRouteMap:"); + long broadcastTime = System.currentTimeMillis(); + + AsyncClientHandler<TRegionRouteReq, TSStatus> clientHandler = + new AsyncClientHandler<>( + DataNodeRequestType.UPDATE_REGION_ROUTE_MAP, + new TRegionRouteReq(broadcastTime, latestRegionRouteMap), + dataNodeLocationMap); + AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler); + LOGGER.info("[UpdateLoadStatistics] Broadcast the latest RegionRouteMap finished."); + } + private void recordNodeStatistics( Map<Integer, Pair<NodeStatistics, NodeStatistics>> differentNodeStatisticsMap) { LOGGER.info("[UpdateLoadStatistics] NodeStatisticsMap: "); @@ -148,19 +178,20 @@ public class StatisticsService { LOGGER.info( "[UpdateLoadStatistics]\t {}={}", "nodeId{" + nodeCacheEntry.getKey() + "}", - nodeCacheEntry.getValue().left); + nodeCacheEntry.getValue().getRight()); } } private void recordRegionGroupStatistics( - Map<TConsensusGroupId, RegionGroupStatistics> differentRegionGroupStatisticsMap) { + Map<TConsensusGroupId, Pair<RegionGroupStatistics, RegionGroupStatistics>> + differentRegionGroupStatisticsMap) { LOGGER.info("[UpdateLoadStatistics] RegionGroupStatisticsMap: "); - for (Map.Entry<TConsensusGroupId, RegionGroupStatistics> regionGroupStatisticsEntry : - differentRegionGroupStatisticsMap.entrySet()) { + for (Map.Entry<TConsensusGroupId, Pair<RegionGroupStatistics, RegionGroupStatistics>> + regionGroupStatisticsEntry : differentRegionGroupStatisticsMap.entrySet()) { LOGGER.info("[UpdateLoadStatistics]\t RegionGroup: {}", regionGroupStatisticsEntry.getKey()); LOGGER.info("[UpdateLoadStatistics]\t {}", regionGroupStatisticsEntry.getValue()); for (Map.Entry<Integer, RegionStatistics> regionStatisticsEntry : - regionGroupStatisticsEntry.getValue().getRegionStatisticsMap().entrySet()) { + regionGroupStatisticsEntry.getValue().getRight().getRegionStatisticsMap().entrySet()) { LOGGER.info( "[UpdateLoadStatistics]\t dataNodeId{}={}", regionStatisticsEntry.getKey(), @@ -169,50 +200,40 @@ public class StatisticsService { } } - private void recordRegionRouteMap(RegionRouteMap regionRouteMap) { + @Override + public void onClusterStatisticsChanged(StatisticsChangeEvent event) { + recordNodeStatistics(event.getNodeStatisticsMap()); + recordRegionGroupStatistics(event.getRegionGroupStatisticsMap()); + } + + private void recordRegionLeaderMap(Map<TConsensusGroupId, Pair<Integer, Integer>> leaderMap) { LOGGER.info("[UpdateLoadStatistics] RegionLeaderMap: "); - for (Map.Entry<TConsensusGroupId, Integer> regionLeaderEntry : - regionRouteMap.getRegionLeaderMap().entrySet()) { + for (Map.Entry<TConsensusGroupId, Pair<Integer, Integer>> regionLeaderEntry : + leaderMap.entrySet()) { LOGGER.info( "[UpdateLoadStatistics]\t {}={}", regionLeaderEntry.getKey(), - regionLeaderEntry.getValue()); + regionLeaderEntry.getValue().getRight()); } + } + private void recordRegionPriorityMap( + Map<TConsensusGroupId, Pair<TRegionReplicaSet, TRegionReplicaSet>> priorityMap) { LOGGER.info("[UpdateLoadStatistics] RegionPriorityMap: "); - for (Map.Entry<TConsensusGroupId, TRegionReplicaSet> regionPriorityEntry : - regionRouteMap.getRegionPriorityMap().entrySet()) { + for (Map.Entry<TConsensusGroupId, Pair<TRegionReplicaSet, TRegionReplicaSet>> + regionPriorityEntry : priorityMap.entrySet()) { LOGGER.info( "[UpdateLoadStatistics]\t {}={}", regionPriorityEntry.getKey(), - regionPriorityEntry.getValue().getDataNodeLocations().stream() + regionPriorityEntry.getValue().getRight().getDataNodeLocations().stream() .map(TDataNodeLocation::getDataNodeId) .collect(Collectors.toList())); } } - public void broadcastLatestRegionRouteMap() { - Map<TConsensusGroupId, TRegionReplicaSet> latestRegionRouteMap = - routeBalancer.getLatestRegionPriorityMap(); - Map<Integer, TDataNodeLocation> dataNodeLocationMap = new ConcurrentHashMap<>(); - // Broadcast the RegionRouteMap to all DataNodes except the unknown ones - configManager - .getNodeManager() - .filterDataNodeThroughStatus(NodeStatus.Running, NodeStatus.Removing, NodeStatus.ReadOnly) - .forEach( - onlineDataNode -> - dataNodeLocationMap.put( - onlineDataNode.getLocation().getDataNodeId(), onlineDataNode.getLocation())); - - LOGGER.info("[UpdateLoadStatistics] Begin to broadcast RegionRouteMap:"); - long broadcastTime = System.currentTimeMillis(); - - AsyncClientHandler<TRegionRouteReq, TSStatus> clientHandler = - new AsyncClientHandler<>( - DataNodeRequestType.UPDATE_REGION_ROUTE_MAP, - new TRegionRouteReq(broadcastTime, latestRegionRouteMap), - dataNodeLocationMap); - AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler); - LOGGER.info("[UpdateLoadStatistics] Broadcast the latest RegionRouteMap finished."); + @Override + public void onRegionGroupLeaderChanged(RouteChangeEvent event) { + recordRegionLeaderMap(event.getLeaderMap()); + recordRegionPriorityMap(event.getPriorityMap()); } } diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/observer/IEvent.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/subscriber/IClusterStatusSubscriber.java similarity index 66% rename from confignode/src/main/java/org/apache/iotdb/confignode/manager/observer/IEvent.java rename to confignode/src/main/java/org/apache/iotdb/confignode/manager/load/subscriber/IClusterStatusSubscriber.java index d9e8445e74..faa79fb5a5 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/observer/IEvent.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/subscriber/IClusterStatusSubscriber.java @@ -16,6 +16,19 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.iotdb.confignode.manager.observer; -public interface IEvent {} +package org.apache.iotdb.confignode.manager.load.subscriber; + +import com.google.common.eventbus.AllowConcurrentEvents; +import com.google.common.eventbus.Subscribe; + +public interface IClusterStatusSubscriber { + + @Subscribe + @AllowConcurrentEvents + void onClusterStatisticsChanged(StatisticsChangeEvent event); + + @Subscribe + @AllowConcurrentEvents + void onRegionGroupLeaderChanged(RouteChangeEvent event); +} diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/subscriber/RouteChangeEvent.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/subscriber/RouteChangeEvent.java new file mode 100644 index 0000000000..55153f3bf5 --- /dev/null +++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/subscriber/RouteChangeEvent.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.confignode.manager.load.subscriber; + +import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; +import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; +import org.apache.iotdb.confignode.manager.load.balancer.router.RegionRouteMap; +import org.apache.iotdb.tsfile.utils.Pair; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +public class RouteChangeEvent { + + // Map<RegionGroupId, Pair<old Leader, new Leader>> + private final Map<TConsensusGroupId, Pair<Integer, Integer>> leaderMap; + // Map<RegionGroupId, Pair<old Priority, new Priority>> + private final Map<TConsensusGroupId, Pair<TRegionReplicaSet, TRegionReplicaSet>> priorityMap; + + public RouteChangeEvent(RegionRouteMap preRouteMap, RegionRouteMap currentRouteMap) { + this.leaderMap = new ConcurrentHashMap<>(); + this.priorityMap = new ConcurrentHashMap<>(); + + preRouteMap + .getRegionLeaderMap() + .forEach( + (regionGroupId, oldLeader) -> { + Integer newLeader = currentRouteMap.getRegionLeaderMap().get(regionGroupId); + if (newLeader != null && !newLeader.equals(oldLeader)) { + leaderMap.put(regionGroupId, new Pair<>(oldLeader, newLeader)); + } + }); + + preRouteMap + .getRegionPriorityMap() + .forEach( + (regionGroupId, oldPriority) -> { + TRegionReplicaSet newPriority = + currentRouteMap.getRegionPriorityMap().get(regionGroupId); + if (newPriority != null && !newPriority.equals(oldPriority)) { + priorityMap.put(regionGroupId, new Pair<>(oldPriority, newPriority)); + } + }); + } + + public boolean isNeedBroadcast() { + return !leaderMap.isEmpty() || !priorityMap.isEmpty(); + } + + public Map<TConsensusGroupId, Pair<Integer, Integer>> getLeaderMap() { + return leaderMap; + } + + public Map<TConsensusGroupId, Pair<TRegionReplicaSet, TRegionReplicaSet>> getPriorityMap() { + return priorityMap; + } +} diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/observer/NodeStatisticsEvent.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/subscriber/StatisticsChangeEvent.java similarity index 51% rename from confignode/src/main/java/org/apache/iotdb/confignode/manager/observer/NodeStatisticsEvent.java rename to confignode/src/main/java/org/apache/iotdb/confignode/manager/load/subscriber/StatisticsChangeEvent.java index 50e9023040..c8ba7b6248 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/observer/NodeStatisticsEvent.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/subscriber/StatisticsChangeEvent.java @@ -16,24 +16,38 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.iotdb.confignode.manager.observer; +package org.apache.iotdb.confignode.manager.load.subscriber; + +import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; import org.apache.iotdb.confignode.manager.load.cache.node.NodeStatistics; +import org.apache.iotdb.confignode.manager.load.cache.region.RegionGroupStatistics; import org.apache.iotdb.tsfile.utils.Pair; import java.util.Map; -public class NodeStatisticsEvent implements IEvent { +public class StatisticsChangeEvent { - // Pair<NodeStatistics, NodeStatistics>:left one means the current NodeStatistics, right one means - // the previous NodeStatistics - private Map<Integer, Pair<NodeStatistics, NodeStatistics>> nodeStatisticsMap; + // Map<NodeId, Pair<old NodeStatistics, new NodeStatistics>> + private final Map<Integer, Pair<NodeStatistics, NodeStatistics>> nodeStatisticsMap; + // Map<RegionGroupId, Pair<old RegionGroupStatistics, new RegionGroupStatistics>> + private final Map<TConsensusGroupId, Pair<RegionGroupStatistics, RegionGroupStatistics>> + regionGroupStatisticsMap; - public NodeStatisticsEvent(Map<Integer, Pair<NodeStatistics, NodeStatistics>> nodeStatisticsMap) { + public StatisticsChangeEvent( + Map<Integer, Pair<NodeStatistics, NodeStatistics>> nodeStatisticsMap, + Map<TConsensusGroupId, Pair<RegionGroupStatistics, RegionGroupStatistics>> + regionGroupStatisticsMap) { this.nodeStatisticsMap = nodeStatisticsMap; + this.regionGroupStatisticsMap = regionGroupStatisticsMap; } public Map<Integer, Pair<NodeStatistics, NodeStatistics>> getNodeStatisticsMap() { return nodeStatisticsMap; } + + public Map<TConsensusGroupId, Pair<RegionGroupStatistics, RegionGroupStatistics>> + getRegionGroupStatisticsMap() { + return regionGroupStatisticsMap; + } }
