This is an automated email from the ASF dual-hosted git repository.

caogaofei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new ade087b330 [IOTDB-4029] Latent routing policy for MultiLeader protocol 
(#6880)
ade087b330 is described below

commit ade087b330f0b19930259320db1fe4770686ad67
Author: YongzaoDan <[email protected]>
AuthorDate: Sat Aug 6 08:47:20 2022 +0800

    [IOTDB-4029] Latent routing policy for MultiLeader protocol (#6880)
---
 .../iotdb/confignode/manager/PartitionManager.java |  11 +-
 .../iotdb/confignode/manager/load/LoadManager.java |  95 ++++++++----
 .../manager/load/balancer/RouteBalancer.java       |  60 +++++++-
 .../load/balancer/router/LazyGreedyRouter.java     | 154 +++++++++++++++++++
 .../manager/load/heartbeat/IRegionGroupCache.java  |   9 ++
 .../manager/load/heartbeat/RegionGroupCache.java   |   7 +-
 .../procedure/env/DataNodeRemoveHandler.java       |   2 +
 .../load/balancer/router/LazyGreedyRouterTest.java | 166 +++++++++++++++++++++
 .../load/balancer/router/LeaderRouterTest.java     |   2 +-
 9 files changed, 460 insertions(+), 46 deletions(-)

diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java
index 0ba379af5f..0e4adebb84 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/PartitionManager.java
@@ -286,9 +286,8 @@ public class PartitionManager {
           continue;
         }
 
-        // 2. The average number of partitions held by each Region is greater 
than the expected
-        // average
-        //    when the partition allocation is complete
+        // 2. The average number of partitions held by each Region will be 
greater than the
+        // expected average number after the partition allocation is completed
         if (allocatedRegionCount < maxRegionCount
             && slotCount / allocatedRegionCount > maxSlotCount / 
maxRegionCount) {
           // The delta is equal to the smallest integer solution that 
satisfies the inequality:
@@ -306,8 +305,10 @@ public class PartitionManager {
       }
 
       // TODO: Use procedure to protect the following process
-      // Do Region allocation and creation for StorageGroups based on the 
allotment
-      getLoadManager().doRegionCreation(allotmentMap, consensusGroupType);
+      if (!allotmentMap.isEmpty()) {
+        // Do Region allocation and creation for StorageGroups based on the 
allotment
+        getLoadManager().doRegionCreation(allotmentMap, consensusGroupType);
+      }
 
       result.setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode());
     } catch (NotEnoughDataNodeException e) {
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
index 6d5cc98cc1..ff9835cebf 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
@@ -55,6 +55,7 @@ import 
org.apache.iotdb.confignode.manager.load.heartbeat.ConfigNodeHeartbeatCac
 import 
org.apache.iotdb.confignode.manager.load.heartbeat.DataNodeHeartbeatCache;
 import org.apache.iotdb.confignode.manager.load.heartbeat.INodeCache;
 import org.apache.iotdb.confignode.manager.load.heartbeat.IRegionGroupCache;
+import org.apache.iotdb.consensus.ConsensusFactory;
 import org.apache.iotdb.db.service.metrics.MetricsService;
 import org.apache.iotdb.db.service.metrics.enums.Metric;
 import org.apache.iotdb.db.service.metrics.enums.Tag;
@@ -153,8 +154,11 @@ public class LoadManager {
           
getClusterSchemaManager().getStorageGroupSchemaByName(storageGroup).getTTL());
     }
     
AsyncDataNodeClientPool.getInstance().createRegions(createRegionGroupsPlan, 
ttlMap);
+
     // Persist the allocation result
     getConsensusManager().write(createRegionGroupsPlan);
+    // Broadcast the latest RegionRouteMap
+    broadcastLatestRegionRouteMap();
   }
 
   /**
@@ -188,6 +192,7 @@ public class LoadManager {
    *     sorting result have higher priority.
    */
   public Map<TConsensusGroupId, TRegionReplicaSet> genLatestRegionRouteMap() {
+    // Always take the latest locations of RegionGroups as the input parameter
     return 
routeBalancer.genLatestRegionRouteMap(getPartitionManager().getAllReplicaSets());
   }
 
@@ -267,17 +272,19 @@ public class LoadManager {
   }
 
   private void updateNodeLoadStatistic() {
-    AtomicBoolean isNeedBroadcast = new AtomicBoolean(false);
+    AtomicBoolean existFailDownDataNode = new AtomicBoolean(false);
+    AtomicBoolean existChangeLeaderSchemaRegionGroup = new 
AtomicBoolean(false);
+    AtomicBoolean existChangeLeaderDataRegionGroup = new AtomicBoolean(false);
+    boolean isNeedBroadcast = false;
 
     nodeCacheMap
         .values()
         .forEach(
             nodeCache -> {
               boolean updateResult = nodeCache.updateLoadStatistic();
-              if (CONF.getRoutingPolicy().equals(RouteBalancer.greedyPolicy)
-                  && nodeCache instanceof DataNodeHeartbeatCache) {
-                // We need a broadcast when some DataNode fail down
-                isNeedBroadcast.compareAndSet(false, updateResult);
+              if (nodeCache instanceof DataNodeHeartbeatCache) {
+                // Check if some DataNodes fail down
+                existFailDownDataNode.compareAndSet(false, updateResult);
               }
             });
 
@@ -286,13 +293,37 @@ public class LoadManager {
         .forEach(
             regionGroupCache -> {
               boolean updateResult = regionGroupCache.updateLoadStatistic();
-              if (CONF.getRoutingPolicy().equals(RouteBalancer.leaderPolicy)) {
-                // We need a broadcast when the leadership changed
-                isNeedBroadcast.compareAndSet(false, updateResult);
+              switch (regionGroupCache.getConsensusGroupId().getType()) {
+                  // Check if some RegionGroups change their leader
+                case SchemaRegion:
+                  existChangeLeaderSchemaRegionGroup.compareAndSet(false, 
updateResult);
+                  break;
+                case DataRegion:
+                  existChangeLeaderDataRegionGroup.compareAndSet(false, 
updateResult);
+                  break;
               }
             });
 
-    if (isNeedBroadcast.get()) {
+    if (existFailDownDataNode.get()) {
+      // The RegionRouteMap must be broadcast if some DataNodes fail down
+      isNeedBroadcast = true;
+    }
+
+    if (RouteBalancer.leaderPolicy.equals(CONF.getRoutingPolicy())) {
+      // Check the condition of leader routing policy
+      if (existChangeLeaderSchemaRegionGroup.get()) {
+        // Broadcast the RegionRouteMap if some SchemaRegionGroups change 
their leader
+        isNeedBroadcast = true;
+      }
+      if 
(!ConsensusFactory.MultiLeaderConsensus.equals(CONF.getDataRegionConsensusProtocolClass())
+          && existChangeLeaderDataRegionGroup.get()) {
+        // Broadcast the RegionRouteMap if some DataRegionGroups change their 
leader
+        // and the consensus protocol isn't MultiLeader
+        isNeedBroadcast = true;
+      }
+    }
+
+    if (isNeedBroadcast) {
       broadcastLatestRegionRouteMap();
     }
     if (nodeCacheMap.size() == getNodeManager().getRegisteredNodeCount()) {
@@ -300,7 +331,7 @@ public class LoadManager {
     }
   }
 
-  private void broadcastLatestRegionRouteMap() {
+  public void broadcastLatestRegionRouteMap() {
     Map<TConsensusGroupId, TRegionReplicaSet> latestRegionRouteMap = 
genLatestRegionRouteMap();
     Map<Integer, TDataNodeLocation> dataNodeLocationMap = new 
ConcurrentHashMap<>();
     getOnlineDataNodes(-1)
@@ -309,7 +340,7 @@ public class LoadManager {
                 dataNodeLocationMap.put(
                     onlineDataNode.getLocation().getDataNodeId(), 
onlineDataNode.getLocation()));
 
-    LOGGER.info("Begin to broadcast RegionRouteMap:");
+    LOGGER.info("[latestRegionRouteMap] Begin to broadcast RegionRouteMap:");
     long broadcastTime = System.currentTimeMillis();
     printRegionRouteMap(broadcastTime, latestRegionRouteMap);
     AsyncDataNodeClientPool.getInstance()
@@ -318,7 +349,7 @@ public class LoadManager {
             dataNodeLocationMap,
             DataNodeRequestType.UPDATE_REGION_ROUTE_MAP,
             null);
-    LOGGER.info("Broadcast the latest RegionRouteMap finished.");
+    LOGGER.info("[latestRegionRouteMap] Broadcast the latest RegionRouteMap 
finished.");
   }
 
   /** loop body of the heartbeat thread */
@@ -412,44 +443,44 @@ public class LoadManager {
   public List<TConfigNodeLocation> getOnlineConfigNodes() {
     return getNodeManager().getRegisteredConfigNodes().stream()
         .filter(
-            registeredConfigNode ->
-                nodeCacheMap
-                    .get(registeredConfigNode.getConfigNodeId())
-                    .getNodeStatus()
-                    .equals(NodeStatus.Running))
+            registeredConfigNode -> {
+              int configNodeId = registeredConfigNode.getConfigNodeId();
+              return nodeCacheMap.containsKey(configNodeId)
+                  && 
nodeCacheMap.get(configNodeId).getNodeStatus().equals(NodeStatus.Running);
+            })
         .collect(Collectors.toList());
   }
 
   public List<TDataNodeConfiguration> getOnlineDataNodes(int dataNodeId) {
     return getNodeManager().getRegisteredDataNodes(dataNodeId).stream()
         .filter(
-            registeredDataNode ->
-                nodeCacheMap
-                    .get(registeredDataNode.getLocation().getDataNodeId())
-                    .getNodeStatus()
-                    .equals(NodeStatus.Running))
+            registeredDataNode -> {
+              int id = registeredDataNode.getLocation().getDataNodeId();
+              return nodeCacheMap.containsKey(id)
+                  && 
nodeCacheMap.get(id).getNodeStatus().equals(NodeStatus.Running);
+            })
         .collect(Collectors.toList());
   }
 
   public List<TConfigNodeLocation> getUnknownConfigNodes() {
     return getNodeManager().getRegisteredConfigNodes().stream()
         .filter(
-            registeredConfigNode ->
-                nodeCacheMap
-                    .get(registeredConfigNode.getConfigNodeId())
-                    .getNodeStatus()
-                    .equals(NodeStatus.Unknown))
+            registeredConfigNode -> {
+              int configNodeId = registeredConfigNode.getConfigNodeId();
+              return nodeCacheMap.containsKey(configNodeId)
+                  && 
nodeCacheMap.get(configNodeId).getNodeStatus().equals(NodeStatus.Unknown);
+            })
         .collect(Collectors.toList());
   }
 
   public List<TDataNodeConfiguration> getUnknownDataNodes(int dataNodeId) {
     return getNodeManager().getRegisteredDataNodes(dataNodeId).stream()
         .filter(
-            registeredDataNode ->
-                nodeCacheMap
-                    .get(registeredDataNode.getLocation().getDataNodeId())
-                    .getNodeStatus()
-                    .equals(NodeStatus.Unknown))
+            registeredDataNode -> {
+              int id = registeredDataNode.getLocation().getDataNodeId();
+              return nodeCacheMap.containsKey(id)
+                  && 
nodeCacheMap.get(id).getNodeStatus().equals(NodeStatus.Unknown);
+            })
         .collect(Collectors.toList());
   }
 
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
index 97c6d91952..a77fa10be2 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
@@ -19,14 +19,18 @@
 package org.apache.iotdb.confignode.manager.load.balancer;
 
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
 import org.apache.iotdb.confignode.manager.IManager;
 import org.apache.iotdb.confignode.manager.load.LoadManager;
 import org.apache.iotdb.confignode.manager.load.balancer.router.IRouter;
+import 
org.apache.iotdb.confignode.manager.load.balancer.router.LazyGreedyRouter;
 import org.apache.iotdb.confignode.manager.load.balancer.router.LeaderRouter;
 import 
org.apache.iotdb.confignode.manager.load.balancer.router.LoadScoreGreedyRouter;
+import org.apache.iotdb.consensus.ConsensusFactory;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
@@ -41,22 +45,64 @@ public class RouteBalancer {
 
   private final IManager configManager;
 
+  private final LazyGreedyRouter lazyGreedyRouter;
+
   public RouteBalancer(IManager configManager) {
     this.configManager = configManager;
+    this.lazyGreedyRouter = new LazyGreedyRouter();
   }
 
   public Map<TConsensusGroupId, TRegionReplicaSet> genLatestRegionRouteMap(
       List<TRegionReplicaSet> regionReplicaSets) {
-    return genRouter().genLatestRegionRouteMap(regionReplicaSets);
+    List<TRegionReplicaSet> schemaRegionGroups = new ArrayList<>();
+    List<TRegionReplicaSet> dataRegionGroups = new ArrayList<>();
+
+    regionReplicaSets.forEach(
+        regionReplicaSet -> {
+          switch (regionReplicaSet.getRegionId().getType()) {
+            case SchemaRegion:
+              schemaRegionGroups.add(regionReplicaSet);
+              break;
+            case DataRegion:
+              dataRegionGroups.add(regionReplicaSet);
+              break;
+          }
+        });
+
+    // Generate SchemaRegionRouteMap
+    Map<TConsensusGroupId, TRegionReplicaSet> result =
+        
genRouter(TConsensusGroupType.SchemaRegion).genLatestRegionRouteMap(schemaRegionGroups);
+    // Generate DataRegionRouteMap
+    result.putAll(
+        
genRouter(TConsensusGroupType.DataRegion).genLatestRegionRouteMap(dataRegionGroups));
+    return result;
   }
 
-  private IRouter genRouter() {
+  private IRouter genRouter(TConsensusGroupType groupType) {
     String policy = 
ConfigNodeDescriptor.getInstance().getConf().getRoutingPolicy();
-    if (policy.equals(leaderPolicy)) {
-      return new LeaderRouter(
-          getLoadManager().getAllLeadership(), 
getLoadManager().getAllLoadScores());
-    } else {
-      return new LoadScoreGreedyRouter(getLoadManager().getAllLoadScores());
+    switch (groupType) {
+      case SchemaRegion:
+        if (policy.equals(leaderPolicy)) {
+          return new LeaderRouter(
+              getLoadManager().getAllLeadership(), 
getLoadManager().getAllLoadScores());
+        } else {
+          return new 
LoadScoreGreedyRouter(getLoadManager().getAllLoadScores());
+        }
+      case DataRegion:
+      default:
+        if (ConfigNodeDescriptor.getInstance()
+            .getConf()
+            .getDataRegionConsensusProtocolClass()
+            .equals(ConsensusFactory.MultiLeaderConsensus)) {
+          // Latent router for MultiLeader consensus protocol
+          
lazyGreedyRouter.updateUnknownDataNodes(getLoadManager().getUnknownDataNodes(-1));
+          return lazyGreedyRouter;
+        } else if (policy.equals(leaderPolicy)) {
+          return new LeaderRouter(
+              getLoadManager().getAllLeadership(), 
getLoadManager().getAllLoadScores());
+        } else {
+          return new 
LoadScoreGreedyRouter(getLoadManager().getAllLoadScores());
+        }
     }
   }
 
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/LazyGreedyRouter.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/LazyGreedyRouter.java
new file mode 100644
index 0000000000..c76c79d494
--- /dev/null
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/LazyGreedyRouter.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.manager.load.balancer.router;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+/**
+ * The LazyGreedyRouter mainly applies to the MultiLeader consensus protocol, 
it will make the
+ * number of leaders in each online DataNode as equal as possible
+ */
+public class LazyGreedyRouter implements IRouter {
+
+  // Set<DataNodeId>
+  private final Set<Integer> unknownDataNodes;
+  private final Map<TConsensusGroupId, TRegionReplicaSet> routeMap;
+
+  public LazyGreedyRouter() {
+    this.unknownDataNodes = Collections.synchronizedSet(new HashSet<>());
+    this.routeMap = new ConcurrentHashMap<>();
+  }
+
+  /**
+   * Update unknownDataNodes cache in LazyRandomRouter
+   *
+   * @param unknownDataNodes DataNodes that have unknown status
+   */
+  public void updateUnknownDataNodes(List<TDataNodeConfiguration> 
unknownDataNodes) {
+    synchronized (this.unknownDataNodes) {
+      this.unknownDataNodes.clear();
+      this.unknownDataNodes.addAll(
+          unknownDataNodes.stream()
+              .map(dataNodeConfiguration -> 
dataNodeConfiguration.getLocation().getDataNodeId())
+              .collect(Collectors.toList()));
+    }
+  }
+
+  @Override
+  public Map<TConsensusGroupId, TRegionReplicaSet> genLatestRegionRouteMap(
+      List<TRegionReplicaSet> replicaSets) {
+    synchronized (unknownDataNodes) {
+      // Map<DataNodeId, leaderCount> Count the number of leaders in each 
DataNodes
+      Map<Integer, Integer> leaderCounter = new HashMap<>();
+      Map<TConsensusGroupId, TRegionReplicaSet> result = new 
ConcurrentHashMap<>();
+      List<TRegionReplicaSet> updateReplicas = new ArrayList<>();
+
+      for (TRegionReplicaSet replicaSet : replicaSets) {
+        if (routeEntryNeedsUpdate(replicaSet)) {
+          // The greedy algorithm should be performed lastly
+          updateReplicas.add(replicaSet);
+        } else {
+          // Update counter
+          leaderCounter.compute(
+              
routeMap.get(replicaSet.getRegionId()).getDataNodeLocations().get(0).getDataNodeId(),
+              (dataNodeId, counter) -> (counter == null ? 1 : counter + 1));
+          // Record the unaltered results
+          result.put(replicaSet.getRegionId(), 
routeMap.get(replicaSet.getRegionId()));
+        }
+      }
+
+      for (TRegionReplicaSet replicaSet : updateReplicas) {
+        updateRouteEntry(replicaSet, leaderCounter);
+        result.put(replicaSet.getRegionId(), 
routeMap.get(replicaSet.getRegionId()));
+      }
+
+      return result;
+    }
+  }
+
+  /** Check whether the specific RegionReplicaSet's routing policy needs 
update */
+  private boolean routeEntryNeedsUpdate(TRegionReplicaSet replicaSet) {
+    TConsensusGroupId groupId = replicaSet.getRegionId();
+    if (!routeMap.containsKey(groupId)) {
+      // The RouteEntry needs update when it is not recorded yet
+      return true;
+    }
+
+    Set<Integer> cacheReplicaSet =
+        routeMap.get(groupId).getDataNodeLocations().stream()
+            .map(TDataNodeLocation::getDataNodeId)
+            .collect(Collectors.toSet());
+    Set<Integer> inputReplicaSet =
+        replicaSet.getDataNodeLocations().stream()
+            .map(TDataNodeLocation::getDataNodeId)
+            .collect(Collectors.toSet());
+    if (!cacheReplicaSet.equals(inputReplicaSet)) {
+      // The RouteEntry needs update when the cached record is outdated
+      return true;
+    }
+
+    // The RouteEntry needs update when the status of DataNode corresponding 
to the first priority
+    // is unknown
+    return unknownDataNodes.contains(
+        routeMap.get(groupId).getDataNodeLocations().get(0).getDataNodeId());
+  }
+
+  private void updateRouteEntry(TRegionReplicaSet replicaSet, Map<Integer, 
Integer> leaderCounter) {
+    TRegionReplicaSet newRouteEntry = new TRegionReplicaSet(replicaSet);
+    Collections.shuffle(newRouteEntry.getDataNodeLocations(), new Random());
+
+    // Greedily select the leader replica
+    int leaderIndex = -1;
+    int locateLeaderCount = Integer.MAX_VALUE;
+    for (int i = 0; i < newRouteEntry.getDataNodeLocationsSize(); i++) {
+      int currentDataNodeId = 
newRouteEntry.getDataNodeLocations().get(i).getDataNodeId();
+      if (!unknownDataNodes.contains(currentDataNodeId)
+          && leaderCounter.getOrDefault(currentDataNodeId, 0) < 
locateLeaderCount) {
+        leaderIndex = i;
+        locateLeaderCount = leaderCounter.getOrDefault(currentDataNodeId, 0);
+      }
+    }
+
+    if (leaderIndex == -1) {
+      // Prevent corner case that all DataNodes fail down
+      leaderIndex = 0;
+    }
+
+    // Swap leader replica and update statistic
+    Collections.swap(newRouteEntry.getDataNodeLocations(), 0, leaderIndex);
+    leaderCounter.compute(
+        newRouteEntry.getDataNodeLocations().get(0).getDataNodeId(),
+        (dataNodeId, counter) -> (counter == null ? 1 : counter + 1));
+    routeMap.put(newRouteEntry.getRegionId(), newRouteEntry);
+  }
+}
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/IRegionGroupCache.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/IRegionGroupCache.java
index 3c13b7e855..19288b84a9 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/IRegionGroupCache.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/IRegionGroupCache.java
@@ -18,6 +18,8 @@
  */
 package org.apache.iotdb.confignode.manager.load.heartbeat;
 
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+
 public interface IRegionGroupCache {
 
   /**
@@ -40,4 +42,11 @@ public interface IRegionGroupCache {
    * @return The DataNodeId of the latest leader
    */
   int getLeaderDataNodeId();
+
+  /**
+   * Get RegionGroup's ConsensusGroupId
+   *
+   * @return TConsensusGroupId
+   */
+  TConsensusGroupId getConsensusGroupId();
 }
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/RegionGroupCache.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/RegionGroupCache.java
index e325f28929..0d6d937ad8 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/RegionGroupCache.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/RegionGroupCache.java
@@ -36,10 +36,10 @@ public class RegionGroupCache implements IRegionGroupCache {
   // TODO: This class might be split into SchemaRegionGroupCache and 
DataRegionGroupCache
 
   private static final int maximumWindowSize = 100;
-  // Map<DataNodeId(where a RegionReplica resides), 
LinkedList<RegionHeartbeatSample>>
 
   private final TConsensusGroupId consensusGroupId;
 
+  // Map<DataNodeId(where a RegionReplica resides), 
LinkedList<RegionHeartbeatSample>>
   private final Map<Integer, LinkedList<RegionHeartbeatSample>> slidingWindow;
 
   // Indicates the version of the statistics
@@ -107,4 +107,9 @@ public class RegionGroupCache implements IRegionGroupCache {
   public int getLeaderDataNodeId() {
     return leaderDataNodeId.get();
   }
+
+  @Override
+  public TConsensusGroupId getConsensusGroupId() {
+    return consensusGroupId;
+  }
 }
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
index aa5a0b39e7..67cee64c4f 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
@@ -207,6 +207,8 @@ public class DataNodeRemoveHandler {
         status,
         originalDataNode.getInternalEndPoint().getIp(),
         destDataNode.getInternalEndPoint().getIp());
+    // Broadcast the latest RegionRouteMap when Region migration finished
+    configManager.getLoadManager().broadcastLatestRegionRouteMap();
   }
 
   /**
diff --git 
a/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/LazyGreedyRouterTest.java
 
b/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/LazyGreedyRouterTest.java
new file mode 100644
index 0000000000..b3adbec0dc
--- /dev/null
+++ 
b/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/LazyGreedyRouterTest.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.confignode.manager.load.balancer.router;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class LazyGreedyRouterTest {
+
+  @Test
+  public void testGenLatestRegionRouteMap() {
+    LazyGreedyRouter lazyGreedyRouter = new LazyGreedyRouter();
+
+    /* Prepare TRegionReplicaSets */
+    List<TRegionReplicaSet> regionReplicaSetList = new ArrayList<>();
+    for (int i = 0; i < 12; i++) {
+      TRegionReplicaSet regionReplicaSet = new TRegionReplicaSet();
+      regionReplicaSet.setRegionId(new 
TConsensusGroupId(TConsensusGroupType.DataRegion, i));
+      for (int j = 1; j <= 3; j++) {
+        regionReplicaSet.addToDataNodeLocations(new 
TDataNodeLocation().setDataNodeId(j));
+      }
+      regionReplicaSetList.add(regionReplicaSet);
+    }
+
+    /* Test1: The number of leaders in each DataNode should be approximately 4 
*/
+    Map<TConsensusGroupId, TRegionReplicaSet> routeMap =
+        lazyGreedyRouter.genLatestRegionRouteMap(regionReplicaSetList);
+    Map<Integer, AtomicInteger> leaderCounter = new HashMap<>();
+    routeMap
+        .values()
+        .forEach(
+            regionReplicaSet ->
+                leaderCounter
+                    .computeIfAbsent(
+                        
regionReplicaSet.getDataNodeLocations().get(0).getDataNodeId(),
+                        empty -> new AtomicInteger(0))
+                    .getAndIncrement());
+    Assert.assertEquals(3, leaderCounter.size());
+    for (int i = 1; i <= 3; i++) {
+      Assert.assertTrue(3 <= leaderCounter.get(i).get());
+      Assert.assertTrue(leaderCounter.get(i).get() <= 5);
+    }
+
+    /* Unknown DataNodes */
+    List<TDataNodeConfiguration> dataNodeConfigurations = new ArrayList<>();
+    dataNodeConfigurations.add(
+        new TDataNodeConfiguration().setLocation(new 
TDataNodeLocation().setDataNodeId(2)));
+
+    /* Test2: The number of leaders in DataNode-1 and DataNode-3 should be 
approximately 6 */
+    lazyGreedyRouter.updateUnknownDataNodes(dataNodeConfigurations);
+    leaderCounter.clear();
+    routeMap = lazyGreedyRouter.genLatestRegionRouteMap(regionReplicaSetList);
+    routeMap
+        .values()
+        .forEach(
+            regionReplicaSet ->
+                leaderCounter
+                    .computeIfAbsent(
+                        
regionReplicaSet.getDataNodeLocations().get(0).getDataNodeId(),
+                        empty -> new AtomicInteger(0))
+                    .getAndIncrement());
+    Assert.assertEquals(2, leaderCounter.size());
+    Assert.assertTrue(4 <= leaderCounter.get(1).get());
+    Assert.assertTrue(leaderCounter.get(1).get() <= 8);
+    Assert.assertTrue(4 <= leaderCounter.get(3).get());
+    Assert.assertTrue(leaderCounter.get(3).get() <= 8);
+  }
+
+  @Test
+  public void testGenLatestRegionRouteMapWithDifferentReplicaSize() {
+    LazyGreedyRouter lazyGreedyRouter = new LazyGreedyRouter();
+
+    /* Prepare TRegionReplicaSets */
+    List<TRegionReplicaSet> regionReplicaSetList = new ArrayList<>();
+    for (int i = 0; i < 12; i++) {
+      TRegionReplicaSet regionReplicaSet = new TRegionReplicaSet();
+      regionReplicaSet.setRegionId(new 
TConsensusGroupId(TConsensusGroupType.DataRegion, i));
+      for (int j = 1; j <= 3; j++) {
+        regionReplicaSet.addToDataNodeLocations(new 
TDataNodeLocation().setDataNodeId(j));
+      }
+      regionReplicaSetList.add(regionReplicaSet);
+    }
+    int dataNodeId = 0;
+    for (int i = 12; i < 18; i++) {
+      TRegionReplicaSet regionReplicaSet = new TRegionReplicaSet();
+      regionReplicaSet.setRegionId(new 
TConsensusGroupId(TConsensusGroupType.DataRegion, i));
+      for (int j = 0; j < 2; j++) {
+        regionReplicaSet.addToDataNodeLocations(
+            new TDataNodeLocation().setDataNodeId(dataNodeId + 1));
+        dataNodeId = (dataNodeId + 1) % 3;
+      }
+      regionReplicaSetList.add(regionReplicaSet);
+    }
+
+    /* Test1: The number of leaders in each DataNode should be approximately 6 
*/
+    Map<TConsensusGroupId, TRegionReplicaSet> routeMap =
+        lazyGreedyRouter.genLatestRegionRouteMap(regionReplicaSetList);
+    Map<Integer, AtomicInteger> leaderCounter = new HashMap<>();
+    routeMap
+        .values()
+        .forEach(
+            regionReplicaSet ->
+                leaderCounter
+                    .computeIfAbsent(
+                        
regionReplicaSet.getDataNodeLocations().get(0).getDataNodeId(),
+                        empty -> new AtomicInteger(0))
+                    .getAndIncrement());
+    Assert.assertEquals(3, leaderCounter.size());
+    for (int i = 1; i <= 3; i++) {
+      Assert.assertTrue(4 <= leaderCounter.get(i).get());
+      Assert.assertTrue(leaderCounter.get(i).get() <= 8);
+    }
+
+    /* Unknown DataNodes */
+    List<TDataNodeConfiguration> dataNodeConfigurations = new ArrayList<>();
+    dataNodeConfigurations.add(
+        new TDataNodeConfiguration().setLocation(new 
TDataNodeLocation().setDataNodeId(2)));
+
+    /* Test2: The number of leaders in DataNode-1 and DataNode-3 should be 
exactly 9 */
+    lazyGreedyRouter.updateUnknownDataNodes(dataNodeConfigurations);
+    leaderCounter.clear();
+    routeMap = lazyGreedyRouter.genLatestRegionRouteMap(regionReplicaSetList);
+    routeMap
+        .values()
+        .forEach(
+            regionReplicaSet ->
+                leaderCounter
+                    .computeIfAbsent(
+                        
regionReplicaSet.getDataNodeLocations().get(0).getDataNodeId(),
+                        empty -> new AtomicInteger(0))
+                    .getAndIncrement());
+    Assert.assertEquals(2, leaderCounter.size());
+    Assert.assertTrue(7 <= leaderCounter.get(1).get());
+    Assert.assertTrue(leaderCounter.get(1).get() <= 11);
+    Assert.assertTrue(7 <= leaderCounter.get(3).get());
+    Assert.assertTrue(leaderCounter.get(3).get() <= 11);
+  }
+}
diff --git 
a/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/LeaderRouterTest.java
 
b/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/LeaderRouterTest.java
index be8dc302f2..db134efd25 100644
--- 
a/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/LeaderRouterTest.java
+++ 
b/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/LeaderRouterTest.java
@@ -43,7 +43,7 @@ import java.util.concurrent.ConcurrentHashMap;
 public class LeaderRouterTest {
 
   @Test
-  public void genRealTimeRoutingPolicy() {
+  public void testGenRealTimeRoutingPolicy() {
     // Build TDataNodeLocations
     List<TDataNodeLocation> dataNodeLocations = new ArrayList<>();
     for (int i = 0; i < 6; i++) {

Reply via email to