This is an automated email from the ASF dual-hosted git repository. szetszwo pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push: new 65fd44453 RATIS-1995. Prevent data loss when a storage is accidentally re-formatted. (#1261) 65fd44453 is described below commit 65fd4445335d0500fd372f37c8b7cb3c39259e87 Author: Tsz-Wo Nicholas Sze <szets...@apache.org> AuthorDate: Fri May 16 04:43:24 2025 -0700 RATIS-1995. Prevent data loss when a storage is accidentally re-formatted. (#1261) --- ratis-proto/src/main/proto/Raft.proto | 1 + .../apache/ratis/server/protocol/TermIndex.java | 15 ++ .../apache/ratis/server/impl/LeaderElection.java | 191 +++++++++++++++++--- .../apache/ratis/server/impl/RaftServerImpl.java | 2 +- .../apache/ratis/server/impl/ServerProtoUtils.java | 4 +- .../ratis/server/util/ServerStringUtils.java | 3 +- .../impl/TestLeaderElectionServerInterface.java | 193 +++++++++++++++++++++ 7 files changed, 382 insertions(+), 27 deletions(-) diff --git a/ratis-proto/src/main/proto/Raft.proto b/ratis-proto/src/main/proto/Raft.proto index 7cf2fd87c..6dbfdb15a 100644 --- a/ratis-proto/src/main/proto/Raft.proto +++ b/ratis-proto/src/main/proto/Raft.proto @@ -169,6 +169,7 @@ message RequestVoteReplyProto { RaftRpcReplyProto serverReply = 1; uint64 term = 2; bool shouldShutdown = 3; + TermIndexProto lastEntry = 4; // to determine if the voter log is empty. } message CommitInfoProto { diff --git a/ratis-server-api/src/main/java/org/apache/ratis/server/protocol/TermIndex.java b/ratis-server-api/src/main/java/org/apache/ratis/server/protocol/TermIndex.java index 6115bccad..369aefc85 100644 --- a/ratis-server-api/src/main/java/org/apache/ratis/server/protocol/TermIndex.java +++ b/ratis-server-api/src/main/java/org/apache/ratis/server/protocol/TermIndex.java @@ -21,9 +21,11 @@ import org.apache.ratis.proto.RaftProtos.LogEntryProto; import org.apache.ratis.proto.RaftProtos.TermIndexProto; import org.apache.ratis.server.raftlog.RaftLog; import org.apache.ratis.util.BiWeakValueCache; +import org.apache.ratis.util.MemoizedSupplier; import java.util.Comparator; import java.util.Optional; +import java.util.function.Supplier; /** The term and the log index defined in the Raft consensus algorithm. */ public interface TermIndex extends Comparable<TermIndex> { @@ -37,6 +39,7 @@ public interface TermIndex extends Comparable<TermIndex> { * are respectively 1 and 0 (= {@link RaftLog#LEAST_VALID_LOG_INDEX}). */ TermIndex INITIAL_VALUE = valueOf(0, RaftLog.INVALID_LOG_INDEX); + TermIndex PROTO_DEFAULT = valueOf(TermIndexProto.getDefaultInstance()); /** An empty {@link TermIndex} array. */ TermIndex[] EMPTY_ARRAY = {}; @@ -93,6 +96,8 @@ public interface TermIndex extends Comparable<TermIndex> { private static TermIndex newTermIndex(long term, long index) { return new TermIndex() { + private final Supplier<TermIndexProto> protoSupplier = MemoizedSupplier.valueOf(TermIndex.super::toProto); + @Override public long getTerm() { return term; @@ -121,12 +126,22 @@ public interface TermIndex extends Comparable<TermIndex> { return Long.hashCode(term) ^ Long.hashCode(index); } + @Override + public TermIndexProto toProto() { + return protoSupplier.get(); + } + private String longToString(long n) { return n >= 0L ? String.valueOf(n) : "~"; } @Override public String toString() { + if (this.equals(INITIAL_VALUE)) { + return "<INITIAL_VALUE>"; + } else if (this.equals(PROTO_DEFAULT)) { + return "<PROTO_DEFAULT>"; + } return String.format("(t:%s, i:%s)", longToString(term), longToString(index)); } }; diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java index 439405871..9953e12af 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java @@ -20,12 +20,14 @@ package org.apache.ratis.server.impl; import org.apache.ratis.metrics.Timekeeper; import org.apache.ratis.proto.RaftProtos.RequestVoteReplyProto; import org.apache.ratis.proto.RaftProtos.RequestVoteRequestProto; +import org.apache.ratis.proto.RaftProtos.TermIndexProto; +import org.apache.ratis.protocol.RaftGroupMemberId; import org.apache.ratis.protocol.RaftPeer; import org.apache.ratis.protocol.RaftPeerId; -import org.apache.ratis.server.DivisionInfo; import org.apache.ratis.server.RaftConfiguration; import org.apache.ratis.server.RaftServerConfigKeys; import org.apache.ratis.server.protocol.TermIndex; +import org.apache.ratis.server.raftlog.RaftLog; import org.apache.ratis.server.util.ServerStringUtils; import org.apache.ratis.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.ratis.util.Daemon; @@ -78,6 +80,121 @@ import static org.apache.ratis.util.LifeCycle.State.STARTING; class LeaderElection implements Runnable { public static final Logger LOG = LoggerFactory.getLogger(LeaderElection.class); + interface ServerInterface { + default RaftPeerId getId() { + return getMemberId().getPeerId(); + } + + RaftGroupMemberId getMemberId(); + boolean isAlive(); + boolean isCandidate(); + + long getCurrentTerm(); + long getLastCommittedIndex(); + TermIndex getLastEntry(); + + boolean isPreVoteEnabled(); + ConfAndTerm initElection(Phase phase) throws IOException; + RequestVoteReplyProto requestVote(RequestVoteRequestProto r) throws IOException; + + void changeToLeader(); + void rejected(long term, ResultAndTerm result) throws IOException; + void shutdown(); + + Timekeeper getLeaderElectionTimer(); + void onNewLeaderElectionCompletion(); + + TimeDuration getRandomElectionTimeout(); + ThreadGroup getThreadGroup(); + + static ServerInterface get(RaftServerImpl server) { + final boolean preVote = RaftServerConfigKeys.LeaderElection.preVote(server.getRaftServer().getProperties()); + + return new ServerInterface() { + @Override + public RaftGroupMemberId getMemberId() { + return server.getMemberId(); + } + + @Override + public boolean isAlive() { + return server.getInfo().isAlive(); + } + + @Override + public boolean isCandidate() { + return server.getInfo().isCandidate(); + } + + @Override + public long getCurrentTerm() { + return server.getState().getCurrentTerm(); + } + + @Override + public long getLastCommittedIndex() { + return server.getRaftLog().getLastCommittedIndex(); + } + + @Override + public TermIndex getLastEntry() { + return server.getState().getLastEntry(); + } + + @Override + public boolean isPreVoteEnabled() { + return preVote; + } + + @Override + public ConfAndTerm initElection(Phase phase) throws IOException { + return server.getState().initElection(phase); + } + + @Override + public RequestVoteReplyProto requestVote(RequestVoteRequestProto r) throws IOException { + return server.getServerRpc().requestVote(r); + } + + @Override + public void changeToLeader() { + server.changeToLeader(); + } + + @Override + public void rejected(long term, ResultAndTerm result) throws IOException { + server.changeToFollowerAndPersistMetadata(term, false, result); + } + + @Override + public void shutdown() { + server.close(); + server.getStateMachine().event().notifyServerShutdown(server.getRoleInfoProto(), false); + } + + @Override + public Timekeeper getLeaderElectionTimer() { + return server.getLeaderElectionMetrics().getLeaderElectionTimer(); + } + + @Override + public void onNewLeaderElectionCompletion() { + server.getLeaderElectionMetrics().onNewLeaderElectionCompletion(); + } + + @Override + public TimeDuration getRandomElectionTimeout() { + return server.getRandomElectionTimeout(); + } + + @Override + public ThreadGroup getThreadGroup() { + return server.getThreadGroup(); + } + }; + } + } + private ResultAndTerm logAndReturn(Phase phase, Result result, Map<RaftPeerId, RequestVoteReplyProto> responses, List<Exception> exceptions) { return logAndReturn(phase, result, responses, exceptions, null); @@ -106,7 +223,7 @@ class LeaderElection implements Runnable { enum Result {PASSED, SINGLE_MODE_PASSED, REJECTED, TIMEOUT, DISCOVERED_A_NEW_TERM, SHUTDOWN, NOT_IN_CONF} - private static class ResultAndTerm { + static class ResultAndTerm { private final Result result; private final Long term; @@ -185,22 +302,24 @@ class LeaderElection implements Runnable { private final Daemon daemon; private final CompletableFuture<Void> stopped = new CompletableFuture<>(); - private final RaftServerImpl server; + private final ServerInterface server; private final boolean skipPreVote; private final ConfAndTerm round0; LeaderElection(RaftServerImpl server, boolean force) { + this(ServerInterface.get(server), force); + } + + LeaderElection(ServerInterface server, boolean force) { this.name = ServerStringUtils.generateUnifiedName(server.getMemberId(), getClass()) + COUNT.incrementAndGet(); this.lifeCycle = new LifeCycle(this); this.daemon = Daemon.newBuilder().setName(name).setRunnable(this) .setThreadGroup(server.getThreadGroup()).build(); this.server = server; - this.skipPreVote = force || - !RaftServerConfigKeys.LeaderElection.preVote( - server.getRaftServer().getProperties()); + this.skipPreVote = force || !server.isPreVoteEnabled(); try { // increase term of the candidate in advance if it's forced to election - this.round0 = force ? server.getState().initElection(Phase.ELECTION) : null; + this.round0 = force ? server.initElection(Phase.ELECTION) : null; } catch (IOException e) { throw new IllegalStateException(name + ": Failed to initialize election", e); } @@ -250,7 +369,7 @@ class LeaderElection implements Runnable { return; } - try (AutoCloseable ignored = Timekeeper.start(server.getLeaderElectionMetrics().getLeaderElectionTimer())) { + try (AutoCloseable ignored = Timekeeper.start(server.getLeaderElectionTimer())) { for (int round = 0; shouldRun(); round++) { if (skipPreVote || askForVotes(Phase.PRE_VOTE, round)) { if (askForVotes(Phase.ELECTION, round)) { @@ -264,10 +383,10 @@ class LeaderElection implements Runnable { } final LifeCycle.State state = lifeCycle.getCurrentState(); if (state.isClosingOrClosed()) { - LOG.info(this + ": since this is already " + state + ", safely ignore " + e); + LOG.info("{}: since this is already {}, safely ignore {}", this, state, e.toString()); } else { - if (!server.getInfo().isAlive()) { - LOG.info(this + ": since the server is not alive, safely ignore " + e); + if (!server.isAlive()) { + LOG.info("{}: since the server is not alive, safely ignore {}", this, e.toString()); } else { LOG.error("{}: Failed, state={}", this, state, e); } @@ -275,18 +394,17 @@ class LeaderElection implements Runnable { } } finally { // Update leader election completion metric(s). - server.getLeaderElectionMetrics().onNewLeaderElectionCompletion(); + server.onNewLeaderElectionCompletion(); lifeCycle.checkStateAndClose(() -> {}); } } private boolean shouldRun() { - final DivisionInfo info = server.getInfo(); - return lifeCycle.getCurrentState().isRunning() && info.isCandidate() && info.isAlive(); + return lifeCycle.getCurrentState().isRunning() && server.isCandidate() && server.isAlive(); } private boolean shouldRun(long electionTerm) { - return shouldRun() && server.getState().getCurrentTerm() == electionTerm; + return shouldRun() && server.getCurrentTerm() == electionTerm; } private ResultAndTerm submitRequestAndWaitResult(Phase phase, RaftConfigurationImpl conf, long electionTerm) @@ -299,7 +417,7 @@ class LeaderElection implements Runnable { if (others.isEmpty()) { r = new ResultAndTerm(Result.PASSED, electionTerm); } else { - final TermIndex lastEntry = server.getState().getLastEntry(); + final TermIndex lastEntry = server.getLastEntry(); final Executor voteExecutor = new Executor(this, others.size()); try { final int submitted = submitRequests(phase, electionTerm, lastEntry, others, voteExecutor); @@ -322,8 +440,7 @@ class LeaderElection implements Runnable { } // If round0 is non-null, we have already called initElection in the constructor, // reuse round0 to avoid initElection again for the first round - final ConfAndTerm confAndTerm = (round == 0 && round0 != null) ? - round0 : server.getState().initElection(phase); + final ConfAndTerm confAndTerm = (round == 0 && round0 != null) ? round0 : server.initElection(phase); electionTerm = confAndTerm.getTerm(); conf = confAndTerm.getConf(); } @@ -343,15 +460,14 @@ class LeaderElection implements Runnable { return true; case NOT_IN_CONF: case SHUTDOWN: - server.close(); - server.getStateMachine().event().notifyServerShutdown(server.getRoleInfoProto(), false); + server.shutdown(); return false; case TIMEOUT: return false; // should retry case REJECTED: case DISCOVERED_A_NEW_TERM: - final long term = r.maxTerm(server.getState().getCurrentTerm()); - server.changeToFollowerAndPersistMetadata(term, false, r); + final long term = r.maxTerm(server.getCurrentTerm()); + server.rejected(term, r); return false; default: throw new IllegalArgumentException("Unable to process result " + r.result); } @@ -364,7 +480,7 @@ class LeaderElection implements Runnable { for (final RaftPeer peer : others) { final RequestVoteRequestProto r = ServerProtoUtils.toRequestVoteRequestProto( server.getMemberId(), peer.getId(), electionTerm, lastEntry, phase == Phase.PRE_VOTE); - voteExecutor.submit(() -> server.getServerRpc().requestVote(r)); + voteExecutor.submit(() -> server.requestVote(r)); submitted++; } return submitted; @@ -390,6 +506,9 @@ class LeaderElection implements Runnable { Set<RaftPeerId> higherPriorityPeers = getHigherPriorityPeers(conf); final boolean singleMode = conf.isSingleMode(server.getId()); + // true iff this server does not have any commits + final boolean emptyCommit = server.getLastCommittedIndex() < RaftLog.LEAST_VALID_LOG_INDEX; + while (waitForNum > 0 && shouldRun(electionTerm)) { final TimeDuration waitTime = timeout.elapsedTime().apply(n -> -n); if (waitTime.isNonPositive()) { @@ -439,7 +558,10 @@ class LeaderElection implements Runnable { // all higher priority peers have replied higherPriorityPeers.remove(replierId); - if (r.getServerReply().getSuccess()) { + final boolean acceptVote = r.getServerReply().getSuccess() + // When the commits are non-empty, do not accept votes from empty log voters. + && (emptyCommit || nonEmptyLog(r)); + if (acceptVote) { votedPeers.add(replierId); // If majority and all peers with higher priority have voted, candidate pass vote if (higherPriorityPeers.isEmpty() && conf.hasMajority(votedPeers, server.getId())) { @@ -448,6 +570,7 @@ class LeaderElection implements Runnable { } else { rejectedPeers.add(replierId); if (conf.majorityRejectVotes(rejectedPeers)) { + LOG.info("rejectedPeers: {}, emptyCommit? {}", rejectedPeers, emptyCommit); return logAndReturn(phase, Result.REJECTED, responses, exceptions); } } @@ -467,6 +590,26 @@ class LeaderElection implements Runnable { } } + /** + * @return true if the given reply indicates that the voter has a non-empty raft log. + * Note that a voter running with an old version may not include the lastEntry in the reply. + * For compatibility, this method returns true for such case. + */ + static boolean nonEmptyLog(RequestVoteReplyProto reply) { + final TermIndexProto lastEntry = reply.getLastEntry(); + // valid term >= 1 and valid index >= 0; therefore, (0, 0) can only be the proto default + if (lastEntry.equals(TermIndexProto.getDefaultInstance())) { // default: (0,0) + LOG.info("Reply missing lastEntry: {} ", ServerStringUtils.toRequestVoteReplyString(reply)); + return true; // accept voters with an older version + } + if (lastEntry.getTerm() > 0) { // when log is empty, lastEntry is (0,-1). + return true; // accept voters with a non-empty log + } + + LOG.info("Replier log is empty: {} ", ServerStringUtils.toRequestVoteReplyString(reply)); + return false; // reject voters with an empty log + } + @Override public String toString() { return name; diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java index c7e29c539..043ba1ee7 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -1504,7 +1504,7 @@ class RaftServerImpl implements RaftServer.Division, shouldShutdown = true; } reply = toRequestVoteReplyProto(candidateId, getMemberId(), - voteGranted, state.getCurrentTerm(), shouldShutdown); + voteGranted, state.getCurrentTerm(), shouldShutdown, state.getLastEntry()); if (LOG.isInfoEnabled()) { LOG.info("{} replies to {} vote request: {}. Peer's state: {}", getMemberId(), phase, toRequestVoteReplyString(reply), state); diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java index 29a42f65a..e6a29189a 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java @@ -43,11 +43,13 @@ final class ServerProtoUtils { } static RequestVoteReplyProto toRequestVoteReplyProto( - RaftPeerId requestorId, RaftGroupMemberId replyId, boolean success, long term, boolean shouldShutdown) { + RaftPeerId requestorId, RaftGroupMemberId replyId, boolean success, long term, boolean shouldShutdown, + TermIndex lastEntry) { return RequestVoteReplyProto.newBuilder() .setServerReply(toRaftRpcReplyProtoBuilder(requestorId, replyId, success)) .setTerm(term) .setShouldShutdown(shouldShutdown) + .setLastEntry((lastEntry != null? lastEntry : TermIndex.INITIAL_VALUE).toProto()) .build(); } diff --git a/ratis-server/src/main/java/org/apache/ratis/server/util/ServerStringUtils.java b/ratis-server/src/main/java/org/apache/ratis/server/util/ServerStringUtils.java index 6601eddce..3a5db6285 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/util/ServerStringUtils.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/util/ServerStringUtils.java @@ -118,7 +118,8 @@ public final class ServerStringUtils { if (proto == null) { return null; } - return ProtoUtils.toString(proto.getServerReply()) + "-t" + proto.getTerm(); + return ProtoUtils.toString(proto.getServerReply()) + "-t" + proto.getTerm() + + "-last:" + TermIndex.valueOf(proto.getLastEntry()); } /** diff --git a/ratis-test/src/test/java/org/apache/ratis/server/impl/TestLeaderElectionServerInterface.java b/ratis-test/src/test/java/org/apache/ratis/server/impl/TestLeaderElectionServerInterface.java new file mode 100644 index 000000000..876633db1 --- /dev/null +++ b/ratis-test/src/test/java/org/apache/ratis/server/impl/TestLeaderElectionServerInterface.java @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.server.impl; + +import org.apache.ratis.BaseTest; +import org.apache.ratis.metrics.Timekeeper; +import org.apache.ratis.proto.RaftProtos.RequestVoteReplyProto; +import org.apache.ratis.proto.RaftProtos.RequestVoteRequestProto; +import org.apache.ratis.protocol.RaftGroup; +import org.apache.ratis.protocol.RaftGroupId; +import org.apache.ratis.protocol.RaftGroupMemberId; +import org.apache.ratis.protocol.RaftPeer; +import org.apache.ratis.protocol.RaftPeerId; +import org.apache.ratis.server.protocol.TermIndex; +import org.apache.ratis.util.TimeDuration; +import org.junit.jupiter.api.Test; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; + +public class TestLeaderElectionServerInterface extends BaseTest { + private final List<RaftPeer> peers = IntStream.range(0, 3).boxed() + .map(i -> RaftPeer.newBuilder().setId("s" + i).build()) + .collect(Collectors.toList()); + private final RaftGroup group = RaftGroup.valueOf(RaftGroupId.randomId(), peers); + private final RaftConfigurationImpl conf = RaftConfigurationImpl.newBuilder().setLogEntryIndex(0).setConf(peers).build(); + private final ThreadGroup threadGroup = new ThreadGroup("ServerInterface"); + + private final RaftGroupMemberId candidate = RaftGroupMemberId.valueOf(peers.get(0).getId(), group.getGroupId()); + + LeaderElection.ServerInterface newServerInterface(boolean expectToPass, + Map<RaftPeerId, TermIndex> lastEntries) { + return new LeaderElection.ServerInterface() { + private volatile boolean isAlive = true; + + @Override + public RaftGroupMemberId getMemberId() { + return candidate; + } + + @Override + public boolean isAlive() { + return isAlive; + } + + @Override + public boolean isCandidate() { + return true; + } + + @Override + public long getCurrentTerm() { + final TermIndex lastEntry = getLastEntry(); + return lastEntry != null? lastEntry.getTerm() : TermIndex.INITIAL_VALUE.getTerm(); + } + + @Override + public long getLastCommittedIndex() { + final TermIndex lastEntry = getLastEntry(); + return lastEntry != null? lastEntry.getIndex() : TermIndex.INITIAL_VALUE.getIndex(); + } + + @Override + public TermIndex getLastEntry() { + return lastEntries.get(getId()); + } + + @Override + public boolean isPreVoteEnabled() { + return false; + } + + @Override + public LeaderElection.ConfAndTerm initElection(LeaderElection.Phase phase) { + return new LeaderElection.ConfAndTerm(conf, getCurrentTerm()); + } + + @Override + public RequestVoteReplyProto requestVote(RequestVoteRequestProto r) { + final RaftPeerId voterPeerId = RaftPeerId.valueOf(r.getServerRequest().getReplyId()); + final RaftGroupMemberId voter = RaftGroupMemberId.valueOf(voterPeerId, group.getGroupId()); + final TermIndex lastEntry = lastEntries.get(voterPeerId); + final long term = (lastEntry != null? lastEntry : TermIndex.INITIAL_VALUE).getTerm(); + + // voter replies to candidate + return ServerProtoUtils.toRequestVoteReplyProto(getId(), voter, true, term, false, lastEntry); + } + + @Override + public void changeToLeader() { + assertTrue(expectToPass); + isAlive = false; + } + + @Override + public void rejected(long term, LeaderElection.ResultAndTerm result) { + assertFalse(expectToPass); + isAlive = false; + } + + @Override + public void shutdown() { + fail(); + } + + @Override + public Timekeeper getLeaderElectionTimer() { + final long start = System.nanoTime(); + final Timekeeper.Context context = () -> System.nanoTime() - start; + return () -> context; + } + + @Override + public void onNewLeaderElectionCompletion() { + // no op + } + + @Override + public TimeDuration getRandomElectionTimeout() { + final int millis = 100 + ThreadLocalRandom.current().nextInt(100); + return TimeDuration.valueOf(millis, TimeUnit.MILLISECONDS); + } + + @Override + public ThreadGroup getThreadGroup() { + return threadGroup; + } + }; + } + + @Test + public void testVoterWithEmptyLog() { + // all the candidate and the voters have an empty log + // expect to pass: empty-log-candidate will accept votes from empty-log-voters + runTestVoterWithEmptyLog(true); + + // candidate: non-empty commit + // voter 1 : empty log + // voter 2 : empty log + // expect to fail: non-empty-commit-candidate will NOT accept votes from empty-log-voters + final TermIndex candidateLastEntry = TermIndex.valueOf(2, 9); + runTestVoterWithEmptyLog(false, candidateLastEntry); + + // candidate: non-empty commit + // voter 1 : non-empty log + // voter 2 : empty log + // expect to pass: non-empty-commit-candidate will accept votes from non-empty-log-voters + final TermIndex voterLastEntry = TermIndex.valueOf(2, 7); + runTestVoterWithEmptyLog(true, candidateLastEntry, voterLastEntry); + + // candidate: non-empty log + // voter 1 : older version + // voter 2 : empty log + // expect to pass: non-empty-commit-candidate will accept votes from older-version-voters + runTestVoterWithEmptyLog(true, candidateLastEntry, TermIndex.PROTO_DEFAULT); + } + + void runTestVoterWithEmptyLog(boolean expectToPass, TermIndex... lastEntries) { + LOG.info("expectToPass? {}, lastEntries={}", + expectToPass, lastEntries); + final Map<RaftPeerId, TermIndex> map = new HashMap<>(); + for(int i = 0; i < lastEntries.length; i++) { + map.put(peers.get(i).getId(), lastEntries[i]); + } + final LeaderElection election = new LeaderElection(newServerInterface(expectToPass, map), false); + election.startInForeground(); + } + +} \ No newline at end of file