This is an automated email from the ASF dual-hosted git repository.

caogaofei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 82e984baf2 [IOTDB-3947] LeaderPolicy can't broadcast when a DataNode 
down (#6777)
82e984baf2 is described below

commit 82e984baf25b4917221cc5fa19c21eb9484af966
Author: YongzaoDan <[email protected]>
AuthorDate: Thu Jul 28 08:53:36 2022 +0800

    [IOTDB-3947] LeaderPolicy can't broadcast when a DataNode down (#6777)
---
 .../async/datanode/AsyncDataNodeClientPool.java    |   1 -
 .../async/handlers/DataNodeHeartbeatHandler.java   |   3 +-
 .../handlers/UpdateRegionRouteMapHandler.java      |   7 +-
 .../iotdb/confignode/manager/load/LoadManager.java |  25 +++-
 .../manager/load/heartbeat/RegionGroupCache.java   |  14 ++-
 .../thrift/ConfigNodeRPCServiceProcessor.java      |   4 +-
 .../load/balancer/router/LeaderRouterTest.java     | 137 +++++++++++++++++++--
 7 files changed, 168 insertions(+), 23 deletions(-)

diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/datanode/AsyncDataNodeClientPool.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/datanode/AsyncDataNodeClientPool.java
index 1547e617d8..d12c0f2705 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/datanode/AsyncDataNodeClientPool.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/datanode/AsyncDataNodeClientPool.java
@@ -326,7 +326,6 @@ public class AsyncDataNodeClientPool {
    */
   public void getDataNodeHeartBeat(
       TEndPoint endPoint, THeartbeatReq req, DataNodeHeartbeatHandler handler) 
{
-    // TODO: Add a retry logic
     AsyncDataNodeInternalServiceClient client;
     try {
       client = clientManager.borrowClient(endPoint);
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/DataNodeHeartbeatHandler.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/DataNodeHeartbeatHandler.java
index a148c1cc14..6891c032e7 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/DataNodeHeartbeatHandler.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/DataNodeHeartbeatHandler.java
@@ -66,7 +66,8 @@ public class DataNodeHeartbeatHandler implements 
AsyncMethodCallback<THeartbeatR
           .forEach(
               (consensusGroupId, isLeader) ->
                   regionGroupCacheMap
-                      .computeIfAbsent(consensusGroupId, empty -> new 
RegionGroupCache())
+                      .computeIfAbsent(
+                          consensusGroupId, empty -> new 
RegionGroupCache(consensusGroupId))
                       .cacheHeartbeatSample(
                           new RegionHeartbeatSample(
                               heartbeatResp.getHeartbeatTimestamp(),
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/UpdateRegionRouteMapHandler.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/UpdateRegionRouteMapHandler.java
index 4877fb70a6..30a88953c1 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/UpdateRegionRouteMapHandler.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/UpdateRegionRouteMapHandler.java
@@ -47,16 +47,17 @@ public class UpdateRegionRouteMapHandler extends 
AbstractRetryHandler
   public void onComplete(TSStatus status) {
     if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
       dataNodeLocationMap.remove(targetDataNode.getDataNodeId());
-      LOGGER.info("Successfully update the RegionRouteMap on DataNode: {}", 
targetDataNode);
+      LOGGER.info(
+          "Successfully update the RegionRouteMap on DataNode: {}", 
targetDataNode.getDataNodeId());
     } else {
-      LOGGER.error("Update RegionRouteMap on DataNode: {} failed", 
targetDataNode);
+      LOGGER.error("Update RegionRouteMap on DataNode: {} failed", 
targetDataNode.getDataNodeId());
     }
     countDownLatch.countDown();
   }
 
   @Override
   public void onError(Exception e) {
-    LOGGER.error("Update RegionRouteMap on DataNode: {} failed", 
targetDataNode);
+    LOGGER.error("Update RegionRouteMap on DataNode: {} failed", 
targetDataNode.getDataNodeId());
     countDownLatch.countDown();
   }
 }
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
index 9886593212..61babc277a 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
@@ -303,10 +303,12 @@ public class LoadManager {
                 dataNodeLocationMap.put(
                     onlineDataNode.getLocation().getDataNodeId(), 
onlineDataNode.getLocation()));
 
-    LOGGER.info("Begin to broadcast RegionRouteMap: {}", latestRegionRouteMap);
+    LOGGER.info("Begin to broadcast RegionRouteMap:");
+    long broadcastTime = System.currentTimeMillis();
+    printRegionRouteMap(broadcastTime, latestRegionRouteMap);
     AsyncDataNodeClientPool.getInstance()
         .sendAsyncRequestToDataNodeWithRetry(
-            new TRegionRouteReq(System.currentTimeMillis(), 
latestRegionRouteMap),
+            new TRegionRouteReq(broadcastTime, latestRegionRouteMap),
             dataNodeLocationMap,
             DataNodeRequestType.UPDATE_REGION_ROUTE_MAP,
             null);
@@ -333,6 +335,9 @@ public class LoadManager {
    * @param registeredDataNodes DataNodes that registered in cluster
    */
   private void pingRegisteredDataNodes(List<TDataNodeConfiguration> 
registeredDataNodes) {
+    // Generate heartbeat request
+    THeartbeatReq heartbeatReq = genHeartbeatReq();
+
     // Send heartbeat requests
     for (TDataNodeConfiguration dataNodeInfo : registeredDataNodes) {
       DataNodeHeartbeatHandler handler =
@@ -345,7 +350,7 @@ public class LoadManager {
               regionGroupCacheMap);
       AsyncDataNodeClientPool.getInstance()
           .getDataNodeHeartBeat(
-              dataNodeInfo.getLocation().getInternalEndPoint(), 
genHeartbeatReq(), handler);
+              dataNodeInfo.getLocation().getInternalEndPoint(), heartbeatReq, 
handler);
     }
   }
 
@@ -411,6 +416,20 @@ public class LoadManager {
         .collect(Collectors.toList());
   }
 
+  public static void printRegionRouteMap(
+      long timestamp, Map<TConsensusGroupId, TRegionReplicaSet> 
regionRouteMap) {
+    LOGGER.info("[latestRegionRouteMap] timestamp:{}", timestamp);
+    LOGGER.info("[latestRegionRouteMap] RegionRouteMap:");
+    for (Map.Entry<TConsensusGroupId, TRegionReplicaSet> entry : 
regionRouteMap.entrySet()) {
+      LOGGER.info(
+          "[latestRegionRouteMap]\t {}={}",
+          entry.getKey(),
+          entry.getValue().getDataNodeLocations().stream()
+              .map(TDataNodeLocation::getDataNodeId)
+              .collect(Collectors.toList()));
+    }
+  }
+
   private ConsensusManager getConsensusManager() {
     return configManager.getConsensusManager();
   }
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/RegionGroupCache.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/RegionGroupCache.java
index fbca007f5c..e325f28929 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/RegionGroupCache.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/heartbeat/RegionGroupCache.java
@@ -18,6 +18,11 @@
  */
 package org.apache.iotdb.confignode.manager.load.heartbeat;
 
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.util.LinkedList;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -26,10 +31,15 @@ import java.util.concurrent.atomic.AtomicLong;
 
 public class RegionGroupCache implements IRegionGroupCache {
 
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(RegionGroupCache.class);
+
   // TODO: This class might be split into SchemaRegionGroupCache and 
DataRegionGroupCache
 
   private static final int maximumWindowSize = 100;
   // Map<DataNodeId(where a RegionReplica resides), 
LinkedList<RegionHeartbeatSample>>
+
+  private final TConsensusGroupId consensusGroupId;
+
   private final Map<Integer, LinkedList<RegionHeartbeatSample>> slidingWindow;
 
   // Indicates the version of the statistics
@@ -37,7 +47,9 @@ public class RegionGroupCache implements IRegionGroupCache {
   // The DataNode where the leader resides
   private final AtomicInteger leaderDataNodeId;
 
-  public RegionGroupCache() {
+  public RegionGroupCache(TConsensusGroupId consensusGroupId) {
+    this.consensusGroupId = consensusGroupId;
+
     this.slidingWindow = new ConcurrentHashMap<>();
 
     this.versionTimestamp = new AtomicLong(0);
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
index 03a5a02c21..0ecc43bea0 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
@@ -538,7 +538,9 @@ public class ConfigNodeRPCServiceProcessor implements 
IConfigNodeRPCService.Ifac
   @Override
   public TRegionRouteMapResp getLatestRegionRouteMap() throws TException {
     TRegionRouteMapResp resp = configManager.getLatestRegionRouteMap();
-    LOGGER.info("Generate a latest RegionRouteMap: {}", resp);
+    configManager
+        .getLoadManager()
+        .printRegionRouteMap(resp.getTimestamp(), resp.getRegionRouteMap());
     return resp;
   }
 
diff --git 
a/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/LeaderRouterTest.java
 
b/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/LeaderRouterTest.java
index 2d0e7dc87a..be8dc302f2 100644
--- 
a/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/LeaderRouterTest.java
+++ 
b/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/LeaderRouterTest.java
@@ -25,7 +25,10 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import 
org.apache.iotdb.confignode.manager.load.heartbeat.DataNodeHeartbeatCache;
 import org.apache.iotdb.confignode.manager.load.heartbeat.INodeCache;
+import org.apache.iotdb.confignode.manager.load.heartbeat.IRegionGroupCache;
 import org.apache.iotdb.confignode.manager.load.heartbeat.NodeHeartbeatSample;
+import org.apache.iotdb.confignode.manager.load.heartbeat.RegionGroupCache;
+import 
org.apache.iotdb.confignode.manager.load.heartbeat.RegionHeartbeatSample;
 
 import org.junit.Assert;
 import org.junit.Test;
@@ -41,7 +44,7 @@ public class LeaderRouterTest {
 
   @Test
   public void genRealTimeRoutingPolicy() {
-    /* Build TDataNodeLocations */
+    // Build TDataNodeLocations
     List<TDataNodeLocation> dataNodeLocations = new ArrayList<>();
     for (int i = 0; i < 6; i++) {
       dataNodeLocations.add(
@@ -54,7 +57,7 @@ public class LeaderRouterTest {
               new TEndPoint("0.0.0.0", 50010 + i)));
     }
 
-    /* Build nodeCacheMap */
+    // Build nodeCacheMap
     long currentTimeMillis = System.currentTimeMillis();
     Map<Integer, INodeCache> nodeCacheMap = new HashMap<>();
     for (int i = 0; i < 6; i++) {
@@ -67,13 +70,13 @@ public class LeaderRouterTest {
     }
     nodeCacheMap.values().forEach(INodeCache::updateLoadStatistic);
 
-    /* Get the loadScoreMap */
+    // Get the loadScoreMap
     Map<Integer, Long> loadScoreMap = new ConcurrentHashMap<>();
     nodeCacheMap.forEach(
         (dataNodeId, heartbeatCache) ->
             loadScoreMap.put(dataNodeId, heartbeatCache.getLoadScore()));
 
-    /* Build TRegionReplicaSet */
+    // Build TRegionReplicaSet
     TConsensusGroupId groupId1 = new 
TConsensusGroupId(TConsensusGroupType.SchemaRegion, 1);
     TRegionReplicaSet regionReplicaSet1 =
         new TRegionReplicaSet(
@@ -86,30 +89,138 @@ public class LeaderRouterTest {
             groupId2,
             Arrays.asList(
                 dataNodeLocations.get(5), dataNodeLocations.get(4), 
dataNodeLocations.get(3)));
+    List<TRegionReplicaSet> regionReplicaSets = 
Arrays.asList(regionReplicaSet1, regionReplicaSet2);
 
-    /* Build leaderMap */
+    // Build regionGroupCacheMap
+    Map<TConsensusGroupId, IRegionGroupCache> regionGroupCacheMap = new 
HashMap<>();
+    regionGroupCacheMap.put(groupId1, new RegionGroupCache(groupId1));
+    regionGroupCacheMap.put(groupId2, new RegionGroupCache(groupId2));
+
+    /* Simulate ratis consensus protocol(only one leader) */
+    regionGroupCacheMap
+        .get(groupId1)
+        .cacheHeartbeatSample(new RegionHeartbeatSample(10, 10, 0, false));
+    regionGroupCacheMap
+        .get(groupId1)
+        .cacheHeartbeatSample(new RegionHeartbeatSample(11, 11, 1, true));
+    regionGroupCacheMap
+        .get(groupId1)
+        .cacheHeartbeatSample(new RegionHeartbeatSample(12, 12, 2, false));
+    regionGroupCacheMap
+        .get(groupId2)
+        .cacheHeartbeatSample(new RegionHeartbeatSample(13, 13, 3, false));
+    regionGroupCacheMap
+        .get(groupId2)
+        .cacheHeartbeatSample(new RegionHeartbeatSample(14, 14, 4, true));
+    regionGroupCacheMap
+        .get(groupId2)
+        .cacheHeartbeatSample(new RegionHeartbeatSample(15, 15, 5, false));
+
+    // Get leaderMap
     Map<TConsensusGroupId, Integer> leaderMap = new HashMap<>();
-    leaderMap.put(groupId1, 1);
-    leaderMap.put(groupId2, 4);
+    regionGroupCacheMap
+        .values()
+        .forEach(regionGroupCache -> 
Assert.assertTrue(regionGroupCache.updateLoadStatistic()));
+    regionGroupCacheMap.forEach(
+        (groupId, regionGroupCache) ->
+            leaderMap.put(groupId, regionGroupCache.getLeaderDataNodeId()));
 
-    /* Check result */
+    // Check result
     Map<TConsensusGroupId, TRegionReplicaSet> result =
-        new LeaderRouter(leaderMap, loadScoreMap)
-            .genLatestRegionRouteMap(Arrays.asList(regionReplicaSet1, 
regionReplicaSet2));
-    Assert.assertEquals(2, result.size());
-
+        new LeaderRouter(leaderMap, 
loadScoreMap).genLatestRegionRouteMap(regionReplicaSets);
     TRegionReplicaSet result1 = result.get(groupId1);
     // Leader first
     Assert.assertEquals(dataNodeLocations.get(1), 
result1.getDataNodeLocations().get(0));
     // The others will be sorted by loadScore
     Assert.assertEquals(dataNodeLocations.get(0), 
result1.getDataNodeLocations().get(1));
     Assert.assertEquals(dataNodeLocations.get(2), 
result1.getDataNodeLocations().get(2));
-
     TRegionReplicaSet result2 = result.get(groupId2);
     // Leader first
     Assert.assertEquals(dataNodeLocations.get(4), 
result2.getDataNodeLocations().get(0));
     // The others will be sorted by loadScore
     Assert.assertEquals(dataNodeLocations.get(3), 
result2.getDataNodeLocations().get(1));
     Assert.assertEquals(dataNodeLocations.get(5), 
result2.getDataNodeLocations().get(2));
+
+    /* Simulate multiLeader consensus protocol(Each Region believes it is the 
leader) */
+    for (int i = 2; i <= 1000; i++) {
+      regionGroupCacheMap
+          .get(groupId1)
+          .cacheHeartbeatSample(new RegionHeartbeatSample(i * 10, i * 10, 0, 
true));
+      regionGroupCacheMap
+          .get(groupId1)
+          .cacheHeartbeatSample(new RegionHeartbeatSample(i * 10 + 1, i * 10 + 
1, 1, true));
+      regionGroupCacheMap
+          .get(groupId1)
+          .cacheHeartbeatSample(new RegionHeartbeatSample(i * 10 + 2, i * 10 + 
2, 2, true));
+      regionGroupCacheMap
+          .get(groupId2)
+          .cacheHeartbeatSample(new RegionHeartbeatSample(i * 10 + 3, i * 10 + 
3, 3, true));
+      regionGroupCacheMap
+          .get(groupId2)
+          .cacheHeartbeatSample(new RegionHeartbeatSample(i * 10 + 4, i * 10 + 
4, 4, true));
+      regionGroupCacheMap
+          .get(groupId2)
+          .cacheHeartbeatSample(new RegionHeartbeatSample(i * 10 + 5, i * 10 + 
5, 5, true));
+
+      // Get leaderMap
+      leaderMap.clear();
+      
regionGroupCacheMap.values().forEach(IRegionGroupCache::updateLoadStatistic);
+      regionGroupCacheMap.forEach(
+          (groupId, regionGroupCache) ->
+              leaderMap.put(groupId, regionGroupCache.getLeaderDataNodeId()));
+
+      // Check result
+      result = new LeaderRouter(leaderMap, 
loadScoreMap).genLatestRegionRouteMap(regionReplicaSets);
+      result1 = result.get(groupId1);
+      // Leader first
+      Assert.assertEquals(dataNodeLocations.get(2), 
result1.getDataNodeLocations().get(0));
+      // The others will be sorted by loadScore
+      Assert.assertEquals(dataNodeLocations.get(0), 
result1.getDataNodeLocations().get(1));
+      Assert.assertEquals(dataNodeLocations.get(1), 
result1.getDataNodeLocations().get(2));
+      result2 = result.get(groupId2);
+      // Leader first
+      Assert.assertEquals(dataNodeLocations.get(5), 
result2.getDataNodeLocations().get(0));
+      // The others will be sorted by loadScore
+      Assert.assertEquals(dataNodeLocations.get(3), 
result2.getDataNodeLocations().get(1));
+      Assert.assertEquals(dataNodeLocations.get(4), 
result2.getDataNodeLocations().get(2));
+    }
+
+    /* Simulate multiLeader consensus protocol with a DataNode fails down */
+    regionGroupCacheMap
+        .get(groupId1)
+        .cacheHeartbeatSample(new RegionHeartbeatSample(10030, 10030, 0, 
true));
+    regionGroupCacheMap
+        .get(groupId1)
+        .cacheHeartbeatSample(new RegionHeartbeatSample(10031, 10031, 1, 
true));
+    regionGroupCacheMap
+        .get(groupId2)
+        .cacheHeartbeatSample(new RegionHeartbeatSample(10033, 10033, 3, 
true));
+    regionGroupCacheMap
+        .get(groupId2)
+        .cacheHeartbeatSample(new RegionHeartbeatSample(10034, 10034, 4, 
true));
+
+    // Get leaderMap
+    leaderMap.clear();
+    regionGroupCacheMap
+        .values()
+        .forEach(regionGroupCache -> 
Assert.assertTrue(regionGroupCache.updateLoadStatistic()));
+    regionGroupCacheMap.forEach(
+        (groupId, regionGroupCache) ->
+            leaderMap.put(groupId, regionGroupCache.getLeaderDataNodeId()));
+
+    // Check result
+    result = new LeaderRouter(leaderMap, 
loadScoreMap).genLatestRegionRouteMap(regionReplicaSets);
+    result1 = result.get(groupId1);
+    // Leader first
+    Assert.assertEquals(dataNodeLocations.get(1), 
result1.getDataNodeLocations().get(0));
+    // The others will be sorted by loadScore
+    Assert.assertEquals(dataNodeLocations.get(0), 
result1.getDataNodeLocations().get(1));
+    Assert.assertEquals(dataNodeLocations.get(2), 
result1.getDataNodeLocations().get(2));
+    result2 = result.get(groupId2);
+    // Leader first
+    Assert.assertEquals(dataNodeLocations.get(4), 
result2.getDataNodeLocations().get(0));
+    // The others will be sorted by loadScore
+    Assert.assertEquals(dataNodeLocations.get(3), 
result2.getDataNodeLocations().get(1));
+    Assert.assertEquals(dataNodeLocations.get(5), 
result2.getDataNodeLocations().get(2));
   }
 }

Reply via email to