This is an automated email from the ASF dual-hosted git repository.

caogaofei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 03c826930a [IOTDB-5027] The RegionRouteMap doesn't update after the 
leader Node is Unknown (#8106)
03c826930a is described below

commit 03c826930a4c51a31f1ab691c9fa1cef1ff47b2f
Author: YongzaoDan <[email protected]>
AuthorDate: Wed Nov 23 19:24:03 2022 +0800

    [IOTDB-5027] The RegionRouteMap doesn't update after the leader Node is 
Unknown (#8106)
---
 .../router/leader/GreedyLeaderBalancer.java        |  1 +
 .../router/priority/GreedyPriorityBalancer.java    | 57 ++++++++++++----------
 .../router/priority/LeaderPriorityBalancer.java    | 51 ++++++-------------
 .../priority/LeaderPriorityBalancerTest.java       | 49 +++++++++++++++++++
 4 files changed, 96 insertions(+), 62 deletions(-)

diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/GreedyLeaderBalancer.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/GreedyLeaderBalancer.java
index 21c4356bac..0d3f813a68 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/GreedyLeaderBalancer.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/GreedyLeaderBalancer.java
@@ -32,6 +32,7 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
 
+/** Leader distribution balancer that uses greedy algorithm */
 public class GreedyLeaderBalancer implements ILeaderBalancer {
 
   private final Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap;
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/priority/GreedyPriorityBalancer.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/priority/GreedyPriorityBalancer.java
index c2dd329b64..0ae1315fad 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/priority/GreedyPriorityBalancer.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/priority/GreedyPriorityBalancer.java
@@ -29,7 +29,7 @@ import java.util.Map;
 import java.util.Vector;
 import java.util.concurrent.ConcurrentHashMap;
 
-/** The GreedyRouter always pick the Replica with the lowest loadScore */
+/** The GreedyPriorityBalancer always pick the Replica with the lowest 
loadScore */
 public class GreedyPriorityBalancer implements IPriorityBalancer {
 
   public GreedyPriorityBalancer() {
@@ -46,33 +46,40 @@ public class GreedyPriorityBalancer implements 
IPriorityBalancer {
 
     replicaSets.forEach(
         replicaSet -> {
-          TRegionReplicaSet sortedReplicaSet = new TRegionReplicaSet();
-          sortedReplicaSet.setRegionId(replicaSet.getRegionId());
-
-          // List<Pair<loadScore, TDataNodeLocation>> for sorting
-          List<Pair<Long, TDataNodeLocation>> sortList = new Vector<>();
-          replicaSet
-              .getDataNodeLocations()
-              .forEach(
-                  dataNodeLocation -> {
-                    // The absenteeism of loadScoreMap means ConfigNode-leader 
doesn't receive any
-                    // heartbeat from that DataNode.
-                    // In this case we put a maximum loadScore into the 
sortList.
-                    sortList.add(
-                        new Pair<>(
-                            dataNodeLoadScoreMap.computeIfAbsent(
-                                dataNodeLocation.getDataNodeId(), empty -> 
Long.MAX_VALUE),
-                            dataNodeLocation));
-                  });
-
-          sortList.sort(Comparator.comparingLong(Pair::getLeft));
-          for (Pair<Long, TDataNodeLocation> entry : sortList) {
-            sortedReplicaSet.addToDataNodeLocations(entry.getRight());
-          }
-
+          TRegionReplicaSet sortedReplicaSet =
+              sortReplicasByLoadScore(replicaSet, dataNodeLoadScoreMap);
           regionPriorityMap.put(sortedReplicaSet.getRegionId(), 
sortedReplicaSet);
         });
 
     return regionPriorityMap;
   }
+
+  protected static TRegionReplicaSet sortReplicasByLoadScore(
+      TRegionReplicaSet replicaSet, Map<Integer, Long> dataNodeLoadScoreMap) {
+    TRegionReplicaSet sortedReplicaSet = new TRegionReplicaSet();
+    sortedReplicaSet.setRegionId(replicaSet.getRegionId());
+
+    // List<Pair<loadScore, TDataNodeLocation>> for sorting
+    List<Pair<Long, TDataNodeLocation>> sortList = new Vector<>();
+    replicaSet
+        .getDataNodeLocations()
+        .forEach(
+            dataNodeLocation -> {
+              // The absenteeism of loadScoreMap means ConfigNode-leader 
doesn't receive any
+              // heartbeat from that DataNode.
+              // In this case we put a maximum loadScore into the sortList.
+              sortList.add(
+                  new Pair<>(
+                      dataNodeLoadScoreMap.computeIfAbsent(
+                          dataNodeLocation.getDataNodeId(), empty -> 
Long.MAX_VALUE),
+                      dataNodeLocation));
+            });
+
+    sortList.sort(Comparator.comparingLong(Pair::getLeft));
+    for (Pair<Long, TDataNodeLocation> entry : sortList) {
+      sortedReplicaSet.addToDataNodeLocations(entry.getRight());
+    }
+
+    return sortedReplicaSet;
+  }
 }
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/priority/LeaderPriorityBalancer.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/priority/LeaderPriorityBalancer.java
index 80c1d1aaf5..6d37a4ce0d 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/priority/LeaderPriorityBalancer.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/priority/LeaderPriorityBalancer.java
@@ -19,18 +19,15 @@
 package org.apache.iotdb.confignode.manager.load.balancer.router.priority;
 
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
-import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
-import org.apache.iotdb.tsfile.utils.Pair;
 
-import java.util.Comparator;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import java.util.Vector;
 import java.util.concurrent.ConcurrentHashMap;
 
-/** The LeaderRouter always pick the leader Replica as first */
-public class LeaderPriorityBalancer implements IPriorityBalancer {
+/** The LeaderPriorityBalancer always pick the leader Replica */
+public class LeaderPriorityBalancer extends GreedyPriorityBalancer implements 
IPriorityBalancer {
 
   public LeaderPriorityBalancer() {
     // Empty constructor
@@ -46,42 +43,22 @@ public class LeaderPriorityBalancer implements 
IPriorityBalancer {
 
     replicaSets.forEach(
         replicaSet -> {
-          int leaderId = 
regionLeaderMap.getOrDefault(replicaSet.getRegionId(), -1);
-          TRegionReplicaSet sortedReplicaSet = new TRegionReplicaSet();
-          sortedReplicaSet.setRegionId(replicaSet.getRegionId());
+          /* 1. Sort replicaSet by loadScore */
+          TRegionReplicaSet sortedReplicaSet =
+              sortReplicasByLoadScore(replicaSet, dataNodeLoadScoreMap);
 
-          /* 1. Pick leader if leader exists */
-          if (leaderId != -1) {
-            for (TDataNodeLocation dataNodeLocation : 
replicaSet.getDataNodeLocations()) {
-              if (dataNodeLocation.getDataNodeId() == leaderId) {
-                sortedReplicaSet.addToDataNodeLocations(dataNodeLocation);
+          /* 2. Pick leader if leader exists and available */
+          int leaderId = 
regionLeaderMap.getOrDefault(replicaSet.getRegionId(), -1);
+          if (leaderId != -1
+              && dataNodeLoadScoreMap.getOrDefault(leaderId, Long.MAX_VALUE) < 
Long.MAX_VALUE) {
+            for (int i = 0; i < sortedReplicaSet.getDataNodeLocationsSize(); 
i++) {
+              if 
(sortedReplicaSet.getDataNodeLocations().get(i).getDataNodeId() == leaderId) {
+                Collections.swap(sortedReplicaSet.getDataNodeLocations(), 0, 
i);
+                break;
               }
             }
           }
 
-          /* 2. Sort replicaSets by loadScore and pick the rest */
-          // List<Pair<loadScore, TDataNodeLocation>> for sorting
-          List<Pair<Long, TDataNodeLocation>> sortList = new Vector<>();
-          replicaSet
-              .getDataNodeLocations()
-              .forEach(
-                  dataNodeLocation -> {
-                    // The absenteeism of loadScoreMap means ConfigNode-leader 
doesn't receive any
-                    // heartbeat from that DataNode.
-                    // In this case we put a maximum loadScore into the 
sortList.
-                    sortList.add(
-                        new Pair<>(
-                            dataNodeLoadScoreMap.computeIfAbsent(
-                                dataNodeLocation.getDataNodeId(), empty -> 
Long.MAX_VALUE),
-                            dataNodeLocation));
-                  });
-          sortList.sort(Comparator.comparingLong(Pair::getLeft));
-          for (Pair<Long, TDataNodeLocation> entry : sortList) {
-            if (entry.getRight().getDataNodeId() != leaderId) {
-              sortedReplicaSet.addToDataNodeLocations(entry.getRight());
-            }
-          }
-
           regionPriorityMap.put(sortedReplicaSet.getRegionId(), 
sortedReplicaSet);
         });
 
diff --git 
a/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/priority/LeaderPriorityBalancerTest.java
 
b/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/priority/LeaderPriorityBalancerTest.java
index 51a475f7a8..d94f45def1 100644
--- 
a/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/priority/LeaderPriorityBalancerTest.java
+++ 
b/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/priority/LeaderPriorityBalancerTest.java
@@ -34,6 +34,7 @@ import org.junit.Test;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -115,4 +116,52 @@ public class LeaderPriorityBalancerTest {
     Assert.assertEquals(dataNodeLocations.get(3), 
result2.getDataNodeLocations().get(1));
     Assert.assertEquals(dataNodeLocations.get(5), 
result2.getDataNodeLocations().get(2));
   }
+
+  @Test
+  public void testLeaderUnavailable() {
+    // Build TDataNodeLocations
+    List<TDataNodeLocation> dataNodeLocations = new ArrayList<>();
+    for (int i = 0; i < 3; i++) {
+      dataNodeLocations.add(
+          new TDataNodeLocation(
+              i,
+              new TEndPoint("0.0.0.0", 6667 + i),
+              new TEndPoint("0.0.0.0", 9003 + i),
+              new TEndPoint("0.0.0.0", 8777 + i),
+              new TEndPoint("0.0.0.0", 40010 + i),
+              new TEndPoint("0.0.0.0", 50010 + i)));
+    }
+
+    // Build TRegionReplicaSet
+    TConsensusGroupId groupId1 = new 
TConsensusGroupId(TConsensusGroupType.SchemaRegion, 1);
+    TRegionReplicaSet regionReplicaSet1 =
+        new TRegionReplicaSet(
+            groupId1,
+            Arrays.asList(
+                dataNodeLocations.get(2), dataNodeLocations.get(1), 
dataNodeLocations.get(0)));
+
+    // Build leaderMap
+    Map<TConsensusGroupId, Integer> leaderMap = new HashMap<>();
+    leaderMap.put(groupId1, 1);
+
+    // Build loadScoreMap
+    Map<Integer, Long> loadScoreMap = new ConcurrentHashMap<>();
+    loadScoreMap.put(0, 10L);
+    loadScoreMap.put(2, 20L);
+    // The leader is DataNode-1, but it's unavailable
+    loadScoreMap.put(1, Long.MAX_VALUE);
+
+    // Check result
+    Map<TConsensusGroupId, TRegionReplicaSet> result =
+        new LeaderPriorityBalancer()
+            .generateOptimalRoutePriority(
+                Collections.singletonList(regionReplicaSet1), leaderMap, 
loadScoreMap);
+    // Only sorted by loadScore since the leader is unavailable
+    Assert.assertEquals(
+        dataNodeLocations.get(0), 
result.get(groupId1).getDataNodeLocations().get(0));
+    Assert.assertEquals(
+        dataNodeLocations.get(2), 
result.get(groupId1).getDataNodeLocations().get(1));
+    Assert.assertEquals(
+        dataNodeLocations.get(1), 
result.get(groupId1).getDataNodeLocations().get(2));
+  }
 }

Reply via email to