This is an automated email from the ASF dual-hosted git repository.
caogaofei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 03c826930a [IOTDB-5027] The RegionRouteMap doesn't update after the
leader Node is Unknown (#8106)
03c826930a is described below
commit 03c826930a4c51a31f1ab691c9fa1cef1ff47b2f
Author: YongzaoDan <[email protected]>
AuthorDate: Wed Nov 23 19:24:03 2022 +0800
[IOTDB-5027] The RegionRouteMap doesn't update after the leader Node is
Unknown (#8106)
---
.../router/leader/GreedyLeaderBalancer.java | 1 +
.../router/priority/GreedyPriorityBalancer.java | 57 ++++++++++++----------
.../router/priority/LeaderPriorityBalancer.java | 51 ++++++-------------
.../priority/LeaderPriorityBalancerTest.java | 49 +++++++++++++++++++
4 files changed, 96 insertions(+), 62 deletions(-)
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/GreedyLeaderBalancer.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/GreedyLeaderBalancer.java
index 21c4356bac..0d3f813a68 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/GreedyLeaderBalancer.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/GreedyLeaderBalancer.java
@@ -32,6 +32,7 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
+/** Leader distribution balancer that uses greedy algorithm */
public class GreedyLeaderBalancer implements ILeaderBalancer {
private final Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap;
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/priority/GreedyPriorityBalancer.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/priority/GreedyPriorityBalancer.java
index c2dd329b64..0ae1315fad 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/priority/GreedyPriorityBalancer.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/priority/GreedyPriorityBalancer.java
@@ -29,7 +29,7 @@ import java.util.Map;
import java.util.Vector;
import java.util.concurrent.ConcurrentHashMap;
-/** The GreedyRouter always pick the Replica with the lowest loadScore */
+/** The GreedyPriorityBalancer always pick the Replica with the lowest
loadScore */
public class GreedyPriorityBalancer implements IPriorityBalancer {
public GreedyPriorityBalancer() {
@@ -46,33 +46,40 @@ public class GreedyPriorityBalancer implements
IPriorityBalancer {
replicaSets.forEach(
replicaSet -> {
- TRegionReplicaSet sortedReplicaSet = new TRegionReplicaSet();
- sortedReplicaSet.setRegionId(replicaSet.getRegionId());
-
- // List<Pair<loadScore, TDataNodeLocation>> for sorting
- List<Pair<Long, TDataNodeLocation>> sortList = new Vector<>();
- replicaSet
- .getDataNodeLocations()
- .forEach(
- dataNodeLocation -> {
- // The absenteeism of loadScoreMap means ConfigNode-leader
doesn't receive any
- // heartbeat from that DataNode.
- // In this case we put a maximum loadScore into the
sortList.
- sortList.add(
- new Pair<>(
- dataNodeLoadScoreMap.computeIfAbsent(
- dataNodeLocation.getDataNodeId(), empty ->
Long.MAX_VALUE),
- dataNodeLocation));
- });
-
- sortList.sort(Comparator.comparingLong(Pair::getLeft));
- for (Pair<Long, TDataNodeLocation> entry : sortList) {
- sortedReplicaSet.addToDataNodeLocations(entry.getRight());
- }
-
+ TRegionReplicaSet sortedReplicaSet =
+ sortReplicasByLoadScore(replicaSet, dataNodeLoadScoreMap);
regionPriorityMap.put(sortedReplicaSet.getRegionId(),
sortedReplicaSet);
});
return regionPriorityMap;
}
+
+ protected static TRegionReplicaSet sortReplicasByLoadScore(
+ TRegionReplicaSet replicaSet, Map<Integer, Long> dataNodeLoadScoreMap) {
+ TRegionReplicaSet sortedReplicaSet = new TRegionReplicaSet();
+ sortedReplicaSet.setRegionId(replicaSet.getRegionId());
+
+ // List<Pair<loadScore, TDataNodeLocation>> for sorting
+ List<Pair<Long, TDataNodeLocation>> sortList = new Vector<>();
+ replicaSet
+ .getDataNodeLocations()
+ .forEach(
+ dataNodeLocation -> {
+ // The absenteeism of loadScoreMap means ConfigNode-leader
doesn't receive any
+ // heartbeat from that DataNode.
+ // In this case we put a maximum loadScore into the sortList.
+ sortList.add(
+ new Pair<>(
+ dataNodeLoadScoreMap.computeIfAbsent(
+ dataNodeLocation.getDataNodeId(), empty ->
Long.MAX_VALUE),
+ dataNodeLocation));
+ });
+
+ sortList.sort(Comparator.comparingLong(Pair::getLeft));
+ for (Pair<Long, TDataNodeLocation> entry : sortList) {
+ sortedReplicaSet.addToDataNodeLocations(entry.getRight());
+ }
+
+ return sortedReplicaSet;
+ }
}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/priority/LeaderPriorityBalancer.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/priority/LeaderPriorityBalancer.java
index 80c1d1aaf5..6d37a4ce0d 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/priority/LeaderPriorityBalancer.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/priority/LeaderPriorityBalancer.java
@@ -19,18 +19,15 @@
package org.apache.iotdb.confignode.manager.load.balancer.router.priority;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
-import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
-import org.apache.iotdb.tsfile.utils.Pair;
-import java.util.Comparator;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
-import java.util.Vector;
import java.util.concurrent.ConcurrentHashMap;
-/** The LeaderRouter always pick the leader Replica as first */
-public class LeaderPriorityBalancer implements IPriorityBalancer {
+/** The LeaderPriorityBalancer always pick the leader Replica */
+public class LeaderPriorityBalancer extends GreedyPriorityBalancer implements
IPriorityBalancer {
public LeaderPriorityBalancer() {
// Empty constructor
@@ -46,42 +43,22 @@ public class LeaderPriorityBalancer implements
IPriorityBalancer {
replicaSets.forEach(
replicaSet -> {
- int leaderId =
regionLeaderMap.getOrDefault(replicaSet.getRegionId(), -1);
- TRegionReplicaSet sortedReplicaSet = new TRegionReplicaSet();
- sortedReplicaSet.setRegionId(replicaSet.getRegionId());
+ /* 1. Sort replicaSet by loadScore */
+ TRegionReplicaSet sortedReplicaSet =
+ sortReplicasByLoadScore(replicaSet, dataNodeLoadScoreMap);
- /* 1. Pick leader if leader exists */
- if (leaderId != -1) {
- for (TDataNodeLocation dataNodeLocation :
replicaSet.getDataNodeLocations()) {
- if (dataNodeLocation.getDataNodeId() == leaderId) {
- sortedReplicaSet.addToDataNodeLocations(dataNodeLocation);
+ /* 2. Pick leader if leader exists and available */
+ int leaderId =
regionLeaderMap.getOrDefault(replicaSet.getRegionId(), -1);
+ if (leaderId != -1
+ && dataNodeLoadScoreMap.getOrDefault(leaderId, Long.MAX_VALUE) <
Long.MAX_VALUE) {
+ for (int i = 0; i < sortedReplicaSet.getDataNodeLocationsSize();
i++) {
+ if
(sortedReplicaSet.getDataNodeLocations().get(i).getDataNodeId() == leaderId) {
+ Collections.swap(sortedReplicaSet.getDataNodeLocations(), 0,
i);
+ break;
}
}
}
- /* 2. Sort replicaSets by loadScore and pick the rest */
- // List<Pair<loadScore, TDataNodeLocation>> for sorting
- List<Pair<Long, TDataNodeLocation>> sortList = new Vector<>();
- replicaSet
- .getDataNodeLocations()
- .forEach(
- dataNodeLocation -> {
- // The absenteeism of loadScoreMap means ConfigNode-leader
doesn't receive any
- // heartbeat from that DataNode.
- // In this case we put a maximum loadScore into the
sortList.
- sortList.add(
- new Pair<>(
- dataNodeLoadScoreMap.computeIfAbsent(
- dataNodeLocation.getDataNodeId(), empty ->
Long.MAX_VALUE),
- dataNodeLocation));
- });
- sortList.sort(Comparator.comparingLong(Pair::getLeft));
- for (Pair<Long, TDataNodeLocation> entry : sortList) {
- if (entry.getRight().getDataNodeId() != leaderId) {
- sortedReplicaSet.addToDataNodeLocations(entry.getRight());
- }
- }
-
regionPriorityMap.put(sortedReplicaSet.getRegionId(),
sortedReplicaSet);
});
diff --git
a/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/priority/LeaderPriorityBalancerTest.java
b/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/priority/LeaderPriorityBalancerTest.java
index 51a475f7a8..d94f45def1 100644
---
a/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/priority/LeaderPriorityBalancerTest.java
+++
b/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/router/priority/LeaderPriorityBalancerTest.java
@@ -34,6 +34,7 @@ import org.junit.Test;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -115,4 +116,52 @@ public class LeaderPriorityBalancerTest {
Assert.assertEquals(dataNodeLocations.get(3),
result2.getDataNodeLocations().get(1));
Assert.assertEquals(dataNodeLocations.get(5),
result2.getDataNodeLocations().get(2));
}
+
+ @Test
+ public void testLeaderUnavailable() {
+ // Build TDataNodeLocations
+ List<TDataNodeLocation> dataNodeLocations = new ArrayList<>();
+ for (int i = 0; i < 3; i++) {
+ dataNodeLocations.add(
+ new TDataNodeLocation(
+ i,
+ new TEndPoint("0.0.0.0", 6667 + i),
+ new TEndPoint("0.0.0.0", 9003 + i),
+ new TEndPoint("0.0.0.0", 8777 + i),
+ new TEndPoint("0.0.0.0", 40010 + i),
+ new TEndPoint("0.0.0.0", 50010 + i)));
+ }
+
+ // Build TRegionReplicaSet
+ TConsensusGroupId groupId1 = new
TConsensusGroupId(TConsensusGroupType.SchemaRegion, 1);
+ TRegionReplicaSet regionReplicaSet1 =
+ new TRegionReplicaSet(
+ groupId1,
+ Arrays.asList(
+ dataNodeLocations.get(2), dataNodeLocations.get(1),
dataNodeLocations.get(0)));
+
+ // Build leaderMap
+ Map<TConsensusGroupId, Integer> leaderMap = new HashMap<>();
+ leaderMap.put(groupId1, 1);
+
+ // Build loadScoreMap
+ Map<Integer, Long> loadScoreMap = new ConcurrentHashMap<>();
+ loadScoreMap.put(0, 10L);
+ loadScoreMap.put(2, 20L);
+ // The leader is DataNode-1, but it's unavailable
+ loadScoreMap.put(1, Long.MAX_VALUE);
+
+ // Check result
+ Map<TConsensusGroupId, TRegionReplicaSet> result =
+ new LeaderPriorityBalancer()
+ .generateOptimalRoutePriority(
+ Collections.singletonList(regionReplicaSet1), leaderMap,
loadScoreMap);
+ // Only sorted by loadScore since the leader is unavailable
+ Assert.assertEquals(
+ dataNodeLocations.get(0),
result.get(groupId1).getDataNodeLocations().get(0));
+ Assert.assertEquals(
+ dataNodeLocations.get(2),
result.get(groupId1).getDataNodeLocations().get(1));
+ Assert.assertEquals(
+ dataNodeLocations.get(1),
result.get(groupId1).getDataNodeLocations().get(2));
+ }
}