This is an automated email from the ASF dual-hosted git repository.

yongzao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 8b52e79bb3 [IOTDB-5791] Construct Cluster-LoadPublisher-Thread and 
IClusterStatusSubscriber (#9645)
8b52e79bb3 is described below

commit 8b52e79bb369dcafa68d0e9f93d96a822534bf80
Author: YongzaoDan <[email protected]>
AuthorDate: Mon Apr 24 13:56:48 2023 +0800

    [IOTDB-5791] Construct Cluster-LoadPublisher-Thread and 
IClusterStatusSubscriber (#9645)
---
 .../confignode/manager/ClusterSchemaManager.java   |  15 ---
 .../iotdb/confignode/manager/load/LoadManager.java |   8 +-
 .../manager/load/balancer/RouteBalancer.java       |  21 ++--
 .../load/balancer/router/RegionRouteMap.java       |   5 +
 .../confignode/manager/load/cache/LoadCache.java   |  15 ++-
 .../load/cache/region/RegionGroupCache.java        |   4 +
 .../manager/load/service/StatisticsService.java    | 115 ++++++++++++---------
 .../subscriber/IClusterStatusSubscriber.java}      |  17 ++-
 .../manager/load/subscriber/RouteChangeEvent.java  |  74 +++++++++++++
 .../subscriber/StatisticsChangeEvent.java}         |  26 +++--
 10 files changed, 209 insertions(+), 91 deletions(-)

diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterSchemaManager.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterSchemaManager.java
index bd5d3209f0..7f21222f62 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterSchemaManager.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterSchemaManager.java
@@ -60,7 +60,6 @@ import 
org.apache.iotdb.confignode.consensus.response.template.TemplateSetInfoRe
 import org.apache.iotdb.confignode.exception.DatabaseNotExistsException;
 import org.apache.iotdb.confignode.manager.consensus.ConsensusManager;
 import org.apache.iotdb.confignode.manager.node.NodeManager;
-import org.apache.iotdb.confignode.manager.observer.NodeStatisticsEvent;
 import org.apache.iotdb.confignode.manager.partition.PartitionManager;
 import org.apache.iotdb.confignode.manager.partition.PartitionMetrics;
 import org.apache.iotdb.confignode.persistence.schema.ClusterSchemaInfo;
@@ -80,8 +79,6 @@ import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.tsfile.utils.Pair;
 
-import com.google.common.eventbus.AllowConcurrentEvents;
-import com.google.common.eventbus.Subscribe;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -803,18 +800,6 @@ public class ClusterSchemaManager {
     return RpcUtils.SUCCESS_STATUS;
   }
 
-  /**
-   * When some Nodes' states changed during a heartbeat loop, the eventbus in 
LoadManager will post
-   * the different NodeStatstics event to SyncManager and ClusterSchemaManager.
-   *
-   * @param nodeStatisticsEvent nodeStatistics that changed in a heartbeat loop
-   */
-  @Subscribe
-  @AllowConcurrentEvents
-  public void handleNodeStatistics(NodeStatisticsEvent nodeStatisticsEvent) {
-    // TODO
-  }
-
   private NodeManager getNodeManager() {
     return configManager.getNodeManager();
   }
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
index 2aaa730a8a..b6f635a11b 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
@@ -72,8 +72,8 @@ public class LoadManager {
   private final HeartbeatService heartbeatService;
   private final StatisticsService statisticsService;
 
-  private final EventBus eventBus =
-      new AsyncEventBus("LoadManager-EventBus", 
Executors.newFixedThreadPool(5));
+  private final EventBus loadPublisher =
+      new AsyncEventBus("Cluster-LoadPublisher-Thread", 
Executors.newFixedThreadPool(5));
 
   public LoadManager(IManager configManager) {
     this.configManager = configManager;
@@ -85,9 +85,9 @@ public class LoadManager {
     this.loadCache = new LoadCache();
     this.heartbeatService = new HeartbeatService(configManager, loadCache);
     this.statisticsService =
-        new StatisticsService(configManager, routeBalancer, loadCache, 
eventBus);
+        new StatisticsService(configManager, routeBalancer, loadCache, 
loadPublisher);
 
-    eventBus.register(configManager.getClusterSchemaManager());
+    loadPublisher.register(statisticsService);
   }
 
   /**
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
index a87cb051ba..3599a442fa 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
@@ -41,6 +41,7 @@ import 
org.apache.iotdb.confignode.manager.load.balancer.router.leader.MinCostFl
 import 
org.apache.iotdb.confignode.manager.load.balancer.router.priority.GreedyPriorityBalancer;
 import 
org.apache.iotdb.confignode.manager.load.balancer.router.priority.IPriorityBalancer;
 import 
org.apache.iotdb.confignode.manager.load.balancer.router.priority.LeaderPriorityBalancer;
+import org.apache.iotdb.confignode.manager.load.subscriber.RouteChangeEvent;
 import org.apache.iotdb.confignode.manager.node.NodeManager;
 import org.apache.iotdb.confignode.manager.partition.PartitionManager;
 import org.apache.iotdb.consensus.ConsensusFactory;
@@ -57,7 +58,6 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 
@@ -165,16 +165,18 @@ public class RouteBalancer {
   /**
    * Invoking periodically to update the RegionRouteMap
    *
-   * @return True if the RegionRouteMap has changed, false otherwise
+   * @return RouteChangeEvent
    */
-  public boolean updateRegionRouteMap() {
+  public RouteChangeEvent updateRegionRouteMap() {
     synchronized (regionRouteMap) {
-      return updateRegionLeaderMap() | updateRegionPriorityMap();
+      RegionRouteMap preRouteMap = new RegionRouteMap(regionRouteMap);
+      updateRegionLeaderMap();
+      updateRegionPriorityMap();
+      return new RouteChangeEvent(preRouteMap, regionRouteMap);
     }
   }
 
-  private boolean updateRegionLeaderMap() {
-    AtomicBoolean isLeaderChanged = new AtomicBoolean(false);
+  private void updateRegionLeaderMap() {
     leaderCache.forEach(
         (regionGroupId, leadershipSample) -> {
           if (TConsensusGroupType.DataRegion.equals(regionGroupId.getType())
@@ -186,13 +188,11 @@ public class RouteBalancer {
           if (leadershipSample.getRight() != 
regionRouteMap.getLeader(regionGroupId)) {
             // Update leader
             regionRouteMap.setLeader(regionGroupId, 
leadershipSample.getRight());
-            isLeaderChanged.set(true);
           }
         });
-    return isLeaderChanged.get();
   }
 
-  private boolean updateRegionPriorityMap() {
+  private void updateRegionPriorityMap() {
     Map<TConsensusGroupId, Integer> regionLeaderMap = 
regionRouteMap.getRegionLeaderMap();
     Map<Integer, Long> dataNodeLoadScoreMap = 
getLoadManager().getAllDataNodeLoadScores();
 
@@ -211,9 +211,6 @@ public class RouteBalancer {
 
     if 
(!latestRegionPriorityMap.equals(regionRouteMap.getRegionPriorityMap())) {
       regionRouteMap.setRegionPriorityMap(latestRegionPriorityMap);
-      return true;
-    } else {
-      return false;
     }
   }
 
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/RegionRouteMap.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/RegionRouteMap.java
index 58f456ab8f..cbe9003355 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/RegionRouteMap.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/RegionRouteMap.java
@@ -50,6 +50,11 @@ public class RegionRouteMap {
     this.regionPriorityMap = new ConcurrentHashMap<>();
   }
 
+  public RegionRouteMap(RegionRouteMap other) {
+    this.regionLeaderMap = new ConcurrentHashMap<>(other.regionLeaderMap);
+    this.regionPriorityMap = new ConcurrentHashMap<>(other.regionPriorityMap);
+  }
+
   /**
    * @return DataNodeId where the specified RegionGroup's leader resides. And 
return -1 if the
    *     leader is not recorded yet
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java
index 0b2fd7195c..5a487de11c 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java
@@ -165,7 +165,7 @@ public class LoadCache {
           if (nodeCache.periodicUpdate()) {
             // Update and record the changed NodeStatistics
             differentNodeStatisticsMap.put(
-                nodeId, new Pair<>(nodeCache.getStatistics(), 
preNodeStatistics));
+                nodeId, new Pair<>(preNodeStatistics, 
nodeCache.getStatistics()));
           }
         });
     return differentNodeStatisticsMap;
@@ -176,14 +176,19 @@ public class LoadCache {
    *
    * @return a map of changed RegionGroupStatistics
    */
-  public Map<TConsensusGroupId, RegionGroupStatistics> 
updateRegionGroupStatistics() {
-    Map<TConsensusGroupId, RegionGroupStatistics> 
differentRegionGroupStatisticsMap =
-        new ConcurrentHashMap<>();
+  public Map<TConsensusGroupId, Pair<RegionGroupStatistics, 
RegionGroupStatistics>>
+      updateRegionGroupStatistics() {
+    Map<TConsensusGroupId, Pair<RegionGroupStatistics, RegionGroupStatistics>>
+        differentRegionGroupStatisticsMap = new ConcurrentHashMap<>();
     regionGroupCacheMap.forEach(
         (regionGroupId, regionGroupCache) -> {
+          RegionGroupStatistics preRegionGroupStatistics =
+              regionGroupCache.getPreviousStatistics().deepCopy();
           if (regionGroupCache.periodicUpdate()) {
             // Update and record the changed RegionGroupStatistics
-            differentRegionGroupStatisticsMap.put(regionGroupId, 
regionGroupCache.getStatistics());
+            differentRegionGroupStatisticsMap.put(
+                regionGroupId,
+                new Pair<>(preRegionGroupStatistics, 
regionGroupCache.getStatistics()));
           }
         });
     return differentRegionGroupStatisticsMap;
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionGroupCache.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionGroupCache.java
index a531e440aa..dd21c24721 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionGroupCache.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionGroupCache.java
@@ -163,4 +163,8 @@ public class RegionGroupCache {
   public RegionGroupStatistics getStatistics() {
     return currentStatistics.get();
   }
+
+  public RegionGroupStatistics getPreviousStatistics() {
+    return previousStatistics.get();
+  }
 }
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/StatisticsService.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/StatisticsService.java
index 9e51b153de..bdbb24cc8e 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/StatisticsService.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/StatisticsService.java
@@ -32,12 +32,13 @@ import 
org.apache.iotdb.confignode.client.async.handlers.AsyncClientHandler;
 import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
 import org.apache.iotdb.confignode.manager.IManager;
 import org.apache.iotdb.confignode.manager.load.balancer.RouteBalancer;
-import org.apache.iotdb.confignode.manager.load.balancer.router.RegionRouteMap;
 import org.apache.iotdb.confignode.manager.load.cache.LoadCache;
 import org.apache.iotdb.confignode.manager.load.cache.node.NodeStatistics;
 import 
org.apache.iotdb.confignode.manager.load.cache.region.RegionGroupStatistics;
 import org.apache.iotdb.confignode.manager.load.cache.region.RegionStatistics;
-import org.apache.iotdb.confignode.manager.observer.NodeStatisticsEvent;
+import 
org.apache.iotdb.confignode.manager.load.subscriber.IClusterStatusSubscriber;
+import org.apache.iotdb.confignode.manager.load.subscriber.RouteChangeEvent;
+import 
org.apache.iotdb.confignode.manager.load.subscriber.StatisticsChangeEvent;
 import org.apache.iotdb.mpp.rpc.thrift.TRegionRouteReq;
 import org.apache.iotdb.tsfile.utils.Pair;
 
@@ -52,7 +53,7 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
-public class StatisticsService {
+public class StatisticsService implements IClusterStatusSubscriber {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(StatisticsService.class);
 
@@ -111,28 +112,32 @@ public class StatisticsService {
     boolean isNeedBroadcast = false;
 
     // Update NodeStatistics:
-    // Pair<NodeStatistics, NodeStatistics>:left one means the current 
NodeStatistics, right one
-    // means the previous NodeStatistics
+    // Map<NodeId, Pair<old NodeStatistics, new NodeStatistics>>
     Map<Integer, Pair<NodeStatistics, NodeStatistics>> 
differentNodeStatisticsMap =
         loadCache.updateNodeStatistics();
     if (!differentNodeStatisticsMap.isEmpty()) {
       isNeedBroadcast = true;
-      recordNodeStatistics(differentNodeStatisticsMap);
-      eventBus.post(new NodeStatisticsEvent(differentNodeStatisticsMap));
     }
 
     // Update RegionGroupStatistics
-    Map<TConsensusGroupId, RegionGroupStatistics> 
differentRegionGroupStatisticsMap =
-        loadCache.updateRegionGroupStatistics();
+    // Map<RegionGroupId, Pair<old RegionGroupStatistics, new 
RegionGroupStatistics>>
+    Map<TConsensusGroupId, Pair<RegionGroupStatistics, RegionGroupStatistics>>
+        differentRegionGroupStatisticsMap = 
loadCache.updateRegionGroupStatistics();
     if (!differentRegionGroupStatisticsMap.isEmpty()) {
       isNeedBroadcast = true;
-      recordRegionGroupStatistics(differentRegionGroupStatisticsMap);
+    }
+
+    if (isNeedBroadcast) {
+      StatisticsChangeEvent statisticsChangeEvent =
+          new StatisticsChangeEvent(differentNodeStatisticsMap, 
differentRegionGroupStatisticsMap);
+      eventBus.post(statisticsChangeEvent);
     }
 
     // Update RegionRouteMap
-    if (routeBalancer.updateRegionRouteMap()) {
+    RouteChangeEvent routeChangeEvent = routeBalancer.updateRegionRouteMap();
+    if (routeChangeEvent.isNeedBroadcast()) {
       isNeedBroadcast = true;
-      recordRegionRouteMap(routeBalancer.getRegionRouteMap());
+      eventBus.post(routeChangeEvent);
     }
 
     if (isNeedBroadcast) {
@@ -140,6 +145,31 @@ public class StatisticsService {
     }
   }
 
+  public void broadcastLatestRegionRouteMap() {
+    Map<TConsensusGroupId, TRegionReplicaSet> latestRegionRouteMap =
+        routeBalancer.getLatestRegionPriorityMap();
+    Map<Integer, TDataNodeLocation> dataNodeLocationMap = new 
ConcurrentHashMap<>();
+    // Broadcast the RegionRouteMap to all DataNodes except the unknown ones
+    configManager
+        .getNodeManager()
+        .filterDataNodeThroughStatus(NodeStatus.Running, NodeStatus.Removing, 
NodeStatus.ReadOnly)
+        .forEach(
+            onlineDataNode ->
+                dataNodeLocationMap.put(
+                    onlineDataNode.getLocation().getDataNodeId(), 
onlineDataNode.getLocation()));
+
+    LOGGER.info("[UpdateLoadStatistics] Begin to broadcast RegionRouteMap:");
+    long broadcastTime = System.currentTimeMillis();
+
+    AsyncClientHandler<TRegionRouteReq, TSStatus> clientHandler =
+        new AsyncClientHandler<>(
+            DataNodeRequestType.UPDATE_REGION_ROUTE_MAP,
+            new TRegionRouteReq(broadcastTime, latestRegionRouteMap),
+            dataNodeLocationMap);
+    
AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
+    LOGGER.info("[UpdateLoadStatistics] Broadcast the latest RegionRouteMap 
finished.");
+  }
+
   private void recordNodeStatistics(
       Map<Integer, Pair<NodeStatistics, NodeStatistics>> 
differentNodeStatisticsMap) {
     LOGGER.info("[UpdateLoadStatistics] NodeStatisticsMap: ");
@@ -148,19 +178,20 @@ public class StatisticsService {
       LOGGER.info(
           "[UpdateLoadStatistics]\t {}={}",
           "nodeId{" + nodeCacheEntry.getKey() + "}",
-          nodeCacheEntry.getValue().left);
+          nodeCacheEntry.getValue().getRight());
     }
   }
 
   private void recordRegionGroupStatistics(
-      Map<TConsensusGroupId, RegionGroupStatistics> 
differentRegionGroupStatisticsMap) {
+      Map<TConsensusGroupId, Pair<RegionGroupStatistics, 
RegionGroupStatistics>>
+          differentRegionGroupStatisticsMap) {
     LOGGER.info("[UpdateLoadStatistics] RegionGroupStatisticsMap: ");
-    for (Map.Entry<TConsensusGroupId, RegionGroupStatistics> 
regionGroupStatisticsEntry :
-        differentRegionGroupStatisticsMap.entrySet()) {
+    for (Map.Entry<TConsensusGroupId, Pair<RegionGroupStatistics, 
RegionGroupStatistics>>
+        regionGroupStatisticsEntry : 
differentRegionGroupStatisticsMap.entrySet()) {
       LOGGER.info("[UpdateLoadStatistics]\t RegionGroup: {}", 
regionGroupStatisticsEntry.getKey());
       LOGGER.info("[UpdateLoadStatistics]\t {}", 
regionGroupStatisticsEntry.getValue());
       for (Map.Entry<Integer, RegionStatistics> regionStatisticsEntry :
-          
regionGroupStatisticsEntry.getValue().getRegionStatisticsMap().entrySet()) {
+          
regionGroupStatisticsEntry.getValue().getRight().getRegionStatisticsMap().entrySet())
 {
         LOGGER.info(
             "[UpdateLoadStatistics]\t dataNodeId{}={}",
             regionStatisticsEntry.getKey(),
@@ -169,50 +200,40 @@ public class StatisticsService {
     }
   }
 
-  private void recordRegionRouteMap(RegionRouteMap regionRouteMap) {
+  @Override
+  public void onClusterStatisticsChanged(StatisticsChangeEvent event) {
+    recordNodeStatistics(event.getNodeStatisticsMap());
+    recordRegionGroupStatistics(event.getRegionGroupStatisticsMap());
+  }
+
+  private void recordRegionLeaderMap(Map<TConsensusGroupId, Pair<Integer, 
Integer>> leaderMap) {
     LOGGER.info("[UpdateLoadStatistics] RegionLeaderMap: ");
-    for (Map.Entry<TConsensusGroupId, Integer> regionLeaderEntry :
-        regionRouteMap.getRegionLeaderMap().entrySet()) {
+    for (Map.Entry<TConsensusGroupId, Pair<Integer, Integer>> 
regionLeaderEntry :
+        leaderMap.entrySet()) {
       LOGGER.info(
           "[UpdateLoadStatistics]\t {}={}",
           regionLeaderEntry.getKey(),
-          regionLeaderEntry.getValue());
+          regionLeaderEntry.getValue().getRight());
     }
+  }
 
+  private void recordRegionPriorityMap(
+      Map<TConsensusGroupId, Pair<TRegionReplicaSet, TRegionReplicaSet>> 
priorityMap) {
     LOGGER.info("[UpdateLoadStatistics] RegionPriorityMap: ");
-    for (Map.Entry<TConsensusGroupId, TRegionReplicaSet> regionPriorityEntry :
-        regionRouteMap.getRegionPriorityMap().entrySet()) {
+    for (Map.Entry<TConsensusGroupId, Pair<TRegionReplicaSet, 
TRegionReplicaSet>>
+        regionPriorityEntry : priorityMap.entrySet()) {
       LOGGER.info(
           "[UpdateLoadStatistics]\t {}={}",
           regionPriorityEntry.getKey(),
-          regionPriorityEntry.getValue().getDataNodeLocations().stream()
+          
regionPriorityEntry.getValue().getRight().getDataNodeLocations().stream()
               .map(TDataNodeLocation::getDataNodeId)
               .collect(Collectors.toList()));
     }
   }
 
-  public void broadcastLatestRegionRouteMap() {
-    Map<TConsensusGroupId, TRegionReplicaSet> latestRegionRouteMap =
-        routeBalancer.getLatestRegionPriorityMap();
-    Map<Integer, TDataNodeLocation> dataNodeLocationMap = new 
ConcurrentHashMap<>();
-    // Broadcast the RegionRouteMap to all DataNodes except the unknown ones
-    configManager
-        .getNodeManager()
-        .filterDataNodeThroughStatus(NodeStatus.Running, NodeStatus.Removing, 
NodeStatus.ReadOnly)
-        .forEach(
-            onlineDataNode ->
-                dataNodeLocationMap.put(
-                    onlineDataNode.getLocation().getDataNodeId(), 
onlineDataNode.getLocation()));
-
-    LOGGER.info("[UpdateLoadStatistics] Begin to broadcast RegionRouteMap:");
-    long broadcastTime = System.currentTimeMillis();
-
-    AsyncClientHandler<TRegionRouteReq, TSStatus> clientHandler =
-        new AsyncClientHandler<>(
-            DataNodeRequestType.UPDATE_REGION_ROUTE_MAP,
-            new TRegionRouteReq(broadcastTime, latestRegionRouteMap),
-            dataNodeLocationMap);
-    
AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
-    LOGGER.info("[UpdateLoadStatistics] Broadcast the latest RegionRouteMap 
finished.");
+  @Override
+  public void onRegionGroupLeaderChanged(RouteChangeEvent event) {
+    recordRegionLeaderMap(event.getLeaderMap());
+    recordRegionPriorityMap(event.getPriorityMap());
   }
 }
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/observer/IEvent.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/subscriber/IClusterStatusSubscriber.java
similarity index 66%
rename from 
confignode/src/main/java/org/apache/iotdb/confignode/manager/observer/IEvent.java
rename to 
confignode/src/main/java/org/apache/iotdb/confignode/manager/load/subscriber/IClusterStatusSubscriber.java
index d9e8445e74..faa79fb5a5 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/observer/IEvent.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/subscriber/IClusterStatusSubscriber.java
@@ -16,6 +16,19 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.confignode.manager.observer;
 
-public interface IEvent {}
+package org.apache.iotdb.confignode.manager.load.subscriber;
+
+import com.google.common.eventbus.AllowConcurrentEvents;
+import com.google.common.eventbus.Subscribe;
+
+public interface IClusterStatusSubscriber {
+
+  @Subscribe
+  @AllowConcurrentEvents
+  void onClusterStatisticsChanged(StatisticsChangeEvent event);
+
+  @Subscribe
+  @AllowConcurrentEvents
+  void onRegionGroupLeaderChanged(RouteChangeEvent event);
+}
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/subscriber/RouteChangeEvent.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/subscriber/RouteChangeEvent.java
new file mode 100644
index 0000000000..55153f3bf5
--- /dev/null
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/subscriber/RouteChangeEvent.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.manager.load.subscriber;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.confignode.manager.load.balancer.router.RegionRouteMap;
+import org.apache.iotdb.tsfile.utils.Pair;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class RouteChangeEvent {
+
+  // Map<RegionGroupId, Pair<old Leader, new Leader>>
+  private final Map<TConsensusGroupId, Pair<Integer, Integer>> leaderMap;
+  // Map<RegionGroupId, Pair<old Priority, new Priority>>
+  private final Map<TConsensusGroupId, Pair<TRegionReplicaSet, 
TRegionReplicaSet>> priorityMap;
+
+  public RouteChangeEvent(RegionRouteMap preRouteMap, RegionRouteMap 
currentRouteMap) {
+    this.leaderMap = new ConcurrentHashMap<>();
+    this.priorityMap = new ConcurrentHashMap<>();
+
+    preRouteMap
+        .getRegionLeaderMap()
+        .forEach(
+            (regionGroupId, oldLeader) -> {
+              Integer newLeader = 
currentRouteMap.getRegionLeaderMap().get(regionGroupId);
+              if (newLeader != null && !newLeader.equals(oldLeader)) {
+                leaderMap.put(regionGroupId, new Pair<>(oldLeader, newLeader));
+              }
+            });
+
+    preRouteMap
+        .getRegionPriorityMap()
+        .forEach(
+            (regionGroupId, oldPriority) -> {
+              TRegionReplicaSet newPriority =
+                  currentRouteMap.getRegionPriorityMap().get(regionGroupId);
+              if (newPriority != null && !newPriority.equals(oldPriority)) {
+                priorityMap.put(regionGroupId, new Pair<>(oldPriority, 
newPriority));
+              }
+            });
+  }
+
+  public boolean isNeedBroadcast() {
+    return !leaderMap.isEmpty() || !priorityMap.isEmpty();
+  }
+
+  public Map<TConsensusGroupId, Pair<Integer, Integer>> getLeaderMap() {
+    return leaderMap;
+  }
+
+  public Map<TConsensusGroupId, Pair<TRegionReplicaSet, TRegionReplicaSet>> 
getPriorityMap() {
+    return priorityMap;
+  }
+}
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/observer/NodeStatisticsEvent.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/subscriber/StatisticsChangeEvent.java
similarity index 51%
rename from 
confignode/src/main/java/org/apache/iotdb/confignode/manager/observer/NodeStatisticsEvent.java
rename to 
confignode/src/main/java/org/apache/iotdb/confignode/manager/load/subscriber/StatisticsChangeEvent.java
index 50e9023040..c8ba7b6248 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/observer/NodeStatisticsEvent.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/subscriber/StatisticsChangeEvent.java
@@ -16,24 +16,38 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.confignode.manager.observer;
 
+package org.apache.iotdb.confignode.manager.load.subscriber;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.confignode.manager.load.cache.node.NodeStatistics;
+import 
org.apache.iotdb.confignode.manager.load.cache.region.RegionGroupStatistics;
 import org.apache.iotdb.tsfile.utils.Pair;
 
 import java.util.Map;
 
-public class NodeStatisticsEvent implements IEvent {
+public class StatisticsChangeEvent {
 
-  // Pair<NodeStatistics, NodeStatistics>:left one means the current 
NodeStatistics, right one means
-  // the previous NodeStatistics
-  private Map<Integer, Pair<NodeStatistics, NodeStatistics>> nodeStatisticsMap;
+  // Map<NodeId, Pair<old NodeStatistics, new NodeStatistics>>
+  private final Map<Integer, Pair<NodeStatistics, NodeStatistics>> 
nodeStatisticsMap;
+  // Map<RegionGroupId, Pair<old RegionGroupStatistics, new 
RegionGroupStatistics>>
+  private final Map<TConsensusGroupId, Pair<RegionGroupStatistics, 
RegionGroupStatistics>>
+      regionGroupStatisticsMap;
 
-  public NodeStatisticsEvent(Map<Integer, Pair<NodeStatistics, 
NodeStatistics>> nodeStatisticsMap) {
+  public StatisticsChangeEvent(
+      Map<Integer, Pair<NodeStatistics, NodeStatistics>> nodeStatisticsMap,
+      Map<TConsensusGroupId, Pair<RegionGroupStatistics, 
RegionGroupStatistics>>
+          regionGroupStatisticsMap) {
     this.nodeStatisticsMap = nodeStatisticsMap;
+    this.regionGroupStatisticsMap = regionGroupStatisticsMap;
   }
 
   public Map<Integer, Pair<NodeStatistics, NodeStatistics>> 
getNodeStatisticsMap() {
     return nodeStatisticsMap;
   }
+
+  public Map<TConsensusGroupId, Pair<RegionGroupStatistics, 
RegionGroupStatistics>>
+      getRegionGroupStatisticsMap() {
+    return regionGroupStatisticsMap;
+  }
 }

Reply via email to