This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new de0e022 RATIS-1033. Change leader election to add priority
consideration (#182)
de0e022 is described below
commit de0e0223a8cf1344126b6edc5bd6286b1cff72c7
Author: runzhiwang <[email protected]>
AuthorDate: Thu Aug 27 13:59:49 2020 +0800
RATIS-1033. Change leader election to add priority consideration (#182)
---
.../apache/ratis/server/impl/LeaderElection.java | 42 ++++++++++++-
.../apache/ratis/server/impl/RaftServerImpl.java | 16 ++++-
.../org/apache/ratis/server/impl/ServerState.java | 19 ++++--
.../ratis/server/impl/GroupManagementBaseTest.java | 69 ++++++++++++++++++++++
4 files changed, 136 insertions(+), 10 deletions(-)
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
index e0ab955..a85c1a5 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
@@ -46,6 +46,8 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.HashSet;
+import java.util.Set;
import java.util.stream.Collectors;
import static org.apache.ratis.util.LifeCycle.State.NEW;
@@ -269,6 +271,21 @@ class LeaderElection implements Runnable {
return submitted;
}
+ private Set<RaftPeerId> getHigherPriorityPeers(RaftConfiguration conf) {
+ Set<RaftPeerId> higherPriorityPeers = new HashSet<>();
+
+ int currPriority = conf.getPeer(server.getId()).getPriority();
+ Collection<RaftPeer> peers = conf.getPeers();
+
+ for (RaftPeer peer : peers) {
+ if (peer.getPriority() > currPriority) {
+ higherPriorityPeers.add(peer.getId());
+ }
+ }
+
+ return higherPriorityPeers;
+ }
+
private ResultAndTerm waitForResults(final long electionTerm, final int
submitted,
RaftConfiguration conf, Executor voteExecutor) throws
InterruptedException {
final Timestamp timeout =
Timestamp.currentTime().addTimeMs(server.getRandomTimeoutMs());
@@ -276,10 +293,18 @@ class LeaderElection implements Runnable {
final List<Exception> exceptions = new ArrayList<>();
int waitForNum = submitted;
Collection<RaftPeerId> votedPeers = new ArrayList<>();
+ Set<RaftPeerId> higherPriorityPeers = getHigherPriorityPeers(conf);
+
while (waitForNum > 0 && shouldRun(electionTerm)) {
final TimeDuration waitTime = timeout.elapsedTime().apply(n -> -n);
if (waitTime.isNonPositive()) {
- return logAndReturn(Result.TIMEOUT, responses, exceptions, -1);
+ if (conf.hasMajority(votedPeers, server.getId())) {
+ // if some higher priority peer did not response when timeout, but
candidate get majority,
+ // candidate pass vote
+ return logAndReturn(Result.PASSED, responses, exceptions, -1);
+ } else {
+ return logAndReturn(Result.TIMEOUT, responses, exceptions, -1);
+ }
}
try {
@@ -305,9 +330,22 @@ class LeaderElection implements Runnable {
return logAndReturn(Result.DISCOVERED_A_NEW_TERM, responses,
exceptions, r.getTerm());
}
+
+ // If any peer with higher priority rejects vote, candidate can not
pass vote
+ if (!r.getServerReply().getSuccess() &&
higherPriorityPeers.contains(replierId)) {
+ return logAndReturn(Result.REJECTED, responses, exceptions, -1);
+ }
+
+ // remove higher priority peer, so that we check higherPriorityPeers
empty to make sure
+ // all higher priority peers have replied
+ if (higherPriorityPeers.contains(replierId)) {
+ higherPriorityPeers.remove(replierId);
+ }
+
if (r.getServerReply().getSuccess()) {
votedPeers.add(replierId);
- if (conf.hasMajority(votedPeers, server.getId())) {
+ // If majority and all peers with higher priority have voted,
candidate pass vote
+ if (higherPriorityPeers.size() == 0 && conf.hasMajority(votedPeers,
server.getId())) {
return logAndReturn(Result.PASSED, responses, exceptions, -1);
}
}
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 1af39c2..f72dde1 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -878,9 +878,19 @@ public class RaftServerImpl implements RaftServerProtocol,
RaftServerAsynchronou
} else if (state.recognizeCandidate(candidateId, candidateTerm)) {
final boolean termUpdated = changeToFollower(candidateTerm, true,
"recognizeCandidate:" + candidateId);
// see Section 5.4.1 Election restriction
- if (state.isLogUpToDate(candidateLastEntry) && fs != null) {
- state.grantVote(candidateId);
- voteGranted = true;
+ RaftPeer candidate = getRaftConf().getPeer(candidateId);
+ if (fs != null && candidate != null) {
+ int compare = state.compareLog(candidateLastEntry);
+ int priority = getRaftConf().getPeer(getId()).getPriority();
+ LOG.info("{} priority:{} candidate:{} candidatePriority:{}
compare:{}",
+ this, priority, candidate, candidate.getPriority(), compare);
+ // vote for candidate if:
+ // 1. log lags behind candidate
+ // 2. log equals candidate's, and priority less or equal candidate's
+ if (compare < 0 || (compare == 0 && priority <=
candidate.getPriority())) {
+ state.grantVote(candidateId);
+ voteGranted = true;
+ }
}
if (termUpdated || voteGranted) {
state.persistMetadata(); // sync metafile
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
index 380bcf2..1213d14 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
@@ -318,19 +318,28 @@ public class ServerState implements Closeable {
return false;
}
- boolean isLogUpToDate(TermIndex candidateLastEntry) {
+ int compareLog(TermIndex candidateLastEntry) {
TermIndex local = log.getLastEntryTermIndex();
// need to take into account snapshot
SnapshotInfo snapshot = server.getStateMachine().getLatestSnapshot();
- if (local == null && snapshot == null) {
- return true;
+ if (local == null && snapshot == null) {
+ // If the lastEntry of candidate is null, the proto will transfer an
empty TermIndexProto,
+ // then term and index of candidateLastEntry will both be 0.
+ // Besides, candidateLastEntry comes from proto now, it never be null.
+ // But we still check candidateLastEntry == null here,
+ // to avoid candidateLastEntry did not come from proto in future.
+ if (candidateLastEntry == null ||
+ (candidateLastEntry.getTerm() == 0 && candidateLastEntry.getIndex()
== 0)) {
+ return 0;
+ }
+ return -1;
} else if (candidateLastEntry == null) {
- return false;
+ return 1;
}
if (local == null || (snapshot != null && snapshot.getIndex() >
local.getIndex())) {
local = snapshot.getTermIndex();
}
- return local.compareTo(candidateLastEntry) <= 0;
+ return local.compareTo(candidateLastEntry);
}
@Override
diff --git
a/ratis-server/src/test/java/org/apache/ratis/server/impl/GroupManagementBaseTest.java
b/ratis-server/src/test/java/org/apache/ratis/server/impl/GroupManagementBaseTest.java
index 3e5f3a3..903f903 100644
---
a/ratis-server/src/test/java/org/apache/ratis/server/impl/GroupManagementBaseTest.java
+++
b/ratis-server/src/test/java/org/apache/ratis/server/impl/GroupManagementBaseTest.java
@@ -43,12 +43,14 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
+import java.util.Random;
import java.util.stream.Collectors;
public abstract class GroupManagementBaseTest extends BaseTest {
@@ -75,6 +77,73 @@ public abstract class GroupManagementBaseTest extends
BaseTest {
}
@Test
+ public void testGroupWithPriority() throws Exception {
+ final MiniRaftCluster cluster = getCluster(0);
+ LOG.info("Start testMultiGroup" + cluster.printServers());
+
+ // Start server with null group
+ final List<RaftPeerId> ids = Arrays.stream(MiniRaftCluster.generateIds(3,
0))
+ .map(RaftPeerId::valueOf).collect(Collectors.toList());
+ ids.forEach(id -> cluster.putNewServer(id, null, true));
+ LOG.info("putNewServer: " + cluster.printServers());
+
+ cluster.start();
+
+ // Make sure that there are no leaders.
+ TimeUnit.SECONDS.sleep(1);
+ LOG.info("start: " + cluster.printServers());
+ Assert.assertNull(cluster.getLeader());
+
+ // Add groups
+ List<RaftPeer> peers = cluster.getPeers();
+ Random r = new Random(1);
+ int suggestedLeaderIndex = r.nextInt(peers.size());
+
+ List<RaftPeer> peersWithPriority = new ArrayList<>();
+ for (int i = 0; i < peers.size(); i++) {
+ RaftPeer peer = peers.get(i);
+ if (i == suggestedLeaderIndex) {
+ peersWithPriority.add(new RaftPeer(peer.getId(), peer.getAddress(),
2));
+ } else {
+ peersWithPriority.add(new RaftPeer(peer.getId(), peer.getAddress(),
1));
+ }
+ }
+
+ final RaftGroup newGroup = RaftGroup.valueOf(RaftGroupId.randomId(),
peersWithPriority);
+ LOG.info("add new group: " + newGroup);
+ try (final RaftClient client = cluster.createClient(newGroup)) {
+ for (RaftPeer p : newGroup.getPeers()) {
+ client.groupAdd(newGroup, p.getId());
+ }
+ }
+
+ JavaUtils.attempt(() -> {
+ RaftServerImpl leader = RaftTestUtil.waitForLeader(cluster,
newGroup.getGroupId());
+ Assert.assertTrue(leader.getId() ==
peers.get(suggestedLeaderIndex).getId());
+ }, 10, TimeDuration.valueOf(1, TimeUnit.SECONDS),
"testMultiGroupWithPriority", LOG);
+
+ String suggestedLeader =
peers.get(suggestedLeaderIndex).getId().toString();
+
+ // isolate leader, then follower will trigger leader election.
+ // Because leader was isolated, so leader can not vote, and candidate wait
timeout,
+ // then if candidate get majority, candidate can pass vote
+
BlockRequestHandlingInjection.getInstance().blockRequestor(suggestedLeader);
+ BlockRequestHandlingInjection.getInstance().blockReplier(suggestedLeader);
+ cluster.setBlockRequestsFrom(suggestedLeader, true);
+
+ JavaUtils.attempt(() -> {
+ RaftServerImpl leader = RaftTestUtil.waitForLeader(cluster,
newGroup.getGroupId());
+ Assert.assertTrue(leader.getId() !=
peers.get(suggestedLeaderIndex).getId());
+ }, 10, TimeDuration.valueOf(1, TimeUnit.SECONDS),
"testMultiGroupWithPriority", LOG);
+
+
BlockRequestHandlingInjection.getInstance().unblockRequestor(suggestedLeader);
+
BlockRequestHandlingInjection.getInstance().unblockReplier(suggestedLeader);
+ cluster.setBlockRequestsFrom(suggestedLeader, false);
+
+ cluster.shutdown();
+ }
+
+ @Test
public void testSingleGroupRestart() throws Exception {
final MiniRaftCluster cluster = getCluster(0);
LOG.info("Start testMultiGroup" + cluster.printServers());