This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 65fd44453 RATIS-1995. Prevent data loss when a storage is accidentally 
re-formatted. (#1261)
65fd44453 is described below

commit 65fd4445335d0500fd372f37c8b7cb3c39259e87
Author: Tsz-Wo Nicholas Sze <szets...@apache.org>
AuthorDate: Fri May 16 04:43:24 2025 -0700

    RATIS-1995. Prevent data loss when a storage is accidentally re-formatted. 
(#1261)
---
 ratis-proto/src/main/proto/Raft.proto              |   1 +
 .../apache/ratis/server/protocol/TermIndex.java    |  15 ++
 .../apache/ratis/server/impl/LeaderElection.java   | 191 +++++++++++++++++---
 .../apache/ratis/server/impl/RaftServerImpl.java   |   2 +-
 .../apache/ratis/server/impl/ServerProtoUtils.java |   4 +-
 .../ratis/server/util/ServerStringUtils.java       |   3 +-
 .../impl/TestLeaderElectionServerInterface.java    | 193 +++++++++++++++++++++
 7 files changed, 382 insertions(+), 27 deletions(-)

diff --git a/ratis-proto/src/main/proto/Raft.proto 
b/ratis-proto/src/main/proto/Raft.proto
index 7cf2fd87c..6dbfdb15a 100644
--- a/ratis-proto/src/main/proto/Raft.proto
+++ b/ratis-proto/src/main/proto/Raft.proto
@@ -169,6 +169,7 @@ message RequestVoteReplyProto {
   RaftRpcReplyProto serverReply = 1;
   uint64 term = 2;
   bool shouldShutdown = 3;
+  TermIndexProto lastEntry = 4; // to determine if the voter log is empty.
 }
 
 message CommitInfoProto {
diff --git 
a/ratis-server-api/src/main/java/org/apache/ratis/server/protocol/TermIndex.java
 
b/ratis-server-api/src/main/java/org/apache/ratis/server/protocol/TermIndex.java
index 6115bccad..369aefc85 100644
--- 
a/ratis-server-api/src/main/java/org/apache/ratis/server/protocol/TermIndex.java
+++ 
b/ratis-server-api/src/main/java/org/apache/ratis/server/protocol/TermIndex.java
@@ -21,9 +21,11 @@ import org.apache.ratis.proto.RaftProtos.LogEntryProto;
 import org.apache.ratis.proto.RaftProtos.TermIndexProto;
 import org.apache.ratis.server.raftlog.RaftLog;
 import org.apache.ratis.util.BiWeakValueCache;
+import org.apache.ratis.util.MemoizedSupplier;
 
 import java.util.Comparator;
 import java.util.Optional;
+import java.util.function.Supplier;
 
 /** The term and the log index defined in the Raft consensus algorithm. */
 public interface TermIndex extends Comparable<TermIndex> {
@@ -37,6 +39,7 @@ public interface TermIndex extends Comparable<TermIndex> {
    * are respectively 1 and 0 (= {@link RaftLog#LEAST_VALID_LOG_INDEX}).
    */
   TermIndex INITIAL_VALUE = valueOf(0, RaftLog.INVALID_LOG_INDEX);
+  TermIndex PROTO_DEFAULT = valueOf(TermIndexProto.getDefaultInstance());
 
   /** An empty {@link TermIndex} array. */
   TermIndex[] EMPTY_ARRAY = {};
@@ -93,6 +96,8 @@ public interface TermIndex extends Comparable<TermIndex> {
 
     private static TermIndex newTermIndex(long term, long index) {
       return new TermIndex() {
+        private final Supplier<TermIndexProto> protoSupplier = 
MemoizedSupplier.valueOf(TermIndex.super::toProto);
+
         @Override
         public long getTerm() {
           return term;
@@ -121,12 +126,22 @@ public interface TermIndex extends Comparable<TermIndex> {
           return Long.hashCode(term) ^ Long.hashCode(index);
         }
 
+        @Override
+        public TermIndexProto toProto() {
+          return protoSupplier.get();
+        }
+
         private String longToString(long n) {
           return n >= 0L ? String.valueOf(n) : "~";
         }
 
         @Override
         public String toString() {
+          if (this.equals(INITIAL_VALUE)) {
+            return "<INITIAL_VALUE>";
+          } else if (this.equals(PROTO_DEFAULT)) {
+            return "<PROTO_DEFAULT>";
+          }
           return String.format("(t:%s, i:%s)", longToString(term), 
longToString(index));
         }
       };
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
index 439405871..9953e12af 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
@@ -20,12 +20,14 @@ package org.apache.ratis.server.impl;
 import org.apache.ratis.metrics.Timekeeper;
 import org.apache.ratis.proto.RaftProtos.RequestVoteReplyProto;
 import org.apache.ratis.proto.RaftProtos.RequestVoteRequestProto;
+import org.apache.ratis.proto.RaftProtos.TermIndexProto;
+import org.apache.ratis.protocol.RaftGroupMemberId;
 import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.protocol.RaftPeerId;
-import org.apache.ratis.server.DivisionInfo;
 import org.apache.ratis.server.RaftConfiguration;
 import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.server.protocol.TermIndex;
+import org.apache.ratis.server.raftlog.RaftLog;
 import org.apache.ratis.server.util.ServerStringUtils;
 import 
org.apache.ratis.thirdparty.com.google.common.annotations.VisibleForTesting;
 import org.apache.ratis.util.Daemon;
@@ -78,6 +80,121 @@ import static 
org.apache.ratis.util.LifeCycle.State.STARTING;
 class LeaderElection implements Runnable {
   public static final Logger LOG = 
LoggerFactory.getLogger(LeaderElection.class);
 
+  interface ServerInterface {
+    default RaftPeerId getId() {
+      return getMemberId().getPeerId();
+    }
+
+    RaftGroupMemberId getMemberId();
+    boolean isAlive();
+    boolean isCandidate();
+
+    long getCurrentTerm();
+    long getLastCommittedIndex();
+    TermIndex getLastEntry();
+
+    boolean isPreVoteEnabled();
+    ConfAndTerm initElection(Phase phase) throws IOException;
+    RequestVoteReplyProto requestVote(RequestVoteRequestProto r) throws 
IOException;
+
+    void changeToLeader();
+    void rejected(long term, ResultAndTerm result) throws IOException;
+    void shutdown();
+
+    Timekeeper getLeaderElectionTimer();
+    void onNewLeaderElectionCompletion();
+
+    TimeDuration getRandomElectionTimeout();
+    ThreadGroup getThreadGroup();
+
+    static ServerInterface get(RaftServerImpl server) {
+      final boolean preVote = 
RaftServerConfigKeys.LeaderElection.preVote(server.getRaftServer().getProperties());
+
+      return new ServerInterface() {
+        @Override
+        public RaftGroupMemberId getMemberId() {
+          return server.getMemberId();
+        }
+
+        @Override
+        public boolean isAlive() {
+          return server.getInfo().isAlive();
+        }
+
+        @Override
+        public boolean isCandidate() {
+          return server.getInfo().isCandidate();
+        }
+
+        @Override
+        public long getCurrentTerm() {
+          return server.getState().getCurrentTerm();
+        }
+
+        @Override
+        public long getLastCommittedIndex() {
+          return server.getRaftLog().getLastCommittedIndex();
+        }
+
+        @Override
+        public TermIndex getLastEntry() {
+          return server.getState().getLastEntry();
+        }
+
+        @Override
+        public boolean isPreVoteEnabled() {
+          return preVote;
+        }
+
+        @Override
+        public ConfAndTerm initElection(Phase phase) throws IOException {
+          return server.getState().initElection(phase);
+        }
+
+        @Override
+        public RequestVoteReplyProto requestVote(RequestVoteRequestProto r) 
throws IOException {
+          return server.getServerRpc().requestVote(r);
+        }
+
+        @Override
+        public void changeToLeader() {
+          server.changeToLeader();
+        }
+
+        @Override
+        public void rejected(long term, ResultAndTerm result) throws 
IOException {
+          server.changeToFollowerAndPersistMetadata(term, false, result);
+        }
+
+        @Override
+        public void shutdown() {
+          server.close();
+          
server.getStateMachine().event().notifyServerShutdown(server.getRoleInfoProto(),
 false);
+        }
+
+        @Override
+        public Timekeeper getLeaderElectionTimer() {
+          return server.getLeaderElectionMetrics().getLeaderElectionTimer();
+        }
+
+        @Override
+        public void onNewLeaderElectionCompletion() {
+          server.getLeaderElectionMetrics().onNewLeaderElectionCompletion();
+        }
+
+        @Override
+        public TimeDuration getRandomElectionTimeout() {
+          return server.getRandomElectionTimeout();
+        }
+
+        @Override
+        public ThreadGroup getThreadGroup() {
+          return server.getThreadGroup();
+        }
+      };
+    }
+  }
+
   private ResultAndTerm logAndReturn(Phase phase, Result result, 
Map<RaftPeerId, RequestVoteReplyProto> responses,
       List<Exception> exceptions) {
     return logAndReturn(phase, result, responses, exceptions, null);
@@ -106,7 +223,7 @@ class LeaderElection implements Runnable {
 
   enum Result {PASSED, SINGLE_MODE_PASSED, REJECTED, TIMEOUT, 
DISCOVERED_A_NEW_TERM, SHUTDOWN, NOT_IN_CONF}
 
-  private static class ResultAndTerm {
+  static class ResultAndTerm {
     private final Result result;
     private final Long term;
 
@@ -185,22 +302,24 @@ class LeaderElection implements Runnable {
   private final Daemon daemon;
   private final CompletableFuture<Void> stopped = new CompletableFuture<>();
 
-  private final RaftServerImpl server;
+  private final ServerInterface server;
   private final boolean skipPreVote;
   private final ConfAndTerm round0;
 
   LeaderElection(RaftServerImpl server, boolean force) {
+    this(ServerInterface.get(server), force);
+  }
+
+  LeaderElection(ServerInterface server, boolean force) {
     this.name = ServerStringUtils.generateUnifiedName(server.getMemberId(), 
getClass()) + COUNT.incrementAndGet();
     this.lifeCycle = new LifeCycle(this);
     this.daemon = Daemon.newBuilder().setName(name).setRunnable(this)
         .setThreadGroup(server.getThreadGroup()).build();
     this.server = server;
-    this.skipPreVote = force ||
-        !RaftServerConfigKeys.LeaderElection.preVote(
-            server.getRaftServer().getProperties());
+    this.skipPreVote = force || !server.isPreVoteEnabled();
     try {
       // increase term of the candidate in advance if it's forced to election
-      this.round0 = force ? server.getState().initElection(Phase.ELECTION) : 
null;
+      this.round0 = force ? server.initElection(Phase.ELECTION) : null;
     } catch (IOException e) {
       throw new IllegalStateException(name + ": Failed to initialize 
election", e);
     }
@@ -250,7 +369,7 @@ class LeaderElection implements Runnable {
       return;
     }
 
-    try (AutoCloseable ignored = 
Timekeeper.start(server.getLeaderElectionMetrics().getLeaderElectionTimer())) {
+    try (AutoCloseable ignored = 
Timekeeper.start(server.getLeaderElectionTimer())) {
       for (int round = 0; shouldRun(); round++) {
         if (skipPreVote || askForVotes(Phase.PRE_VOTE, round)) {
           if (askForVotes(Phase.ELECTION, round)) {
@@ -264,10 +383,10 @@ class LeaderElection implements Runnable {
       }
       final LifeCycle.State state = lifeCycle.getCurrentState();
       if (state.isClosingOrClosed()) {
-        LOG.info(this + ": since this is already " + state + ", safely ignore 
" + e);
+        LOG.info("{}: since this is already {}, safely ignore {}", this, 
state, e.toString());
       } else {
-        if (!server.getInfo().isAlive()) {
-          LOG.info(this + ": since the server is not alive, safely ignore " + 
e);
+        if (!server.isAlive()) {
+          LOG.info("{}: since the server is not alive, safely ignore {}", 
this, e.toString());
         } else {
           LOG.error("{}: Failed, state={}", this, state, e);
         }
@@ -275,18 +394,17 @@ class LeaderElection implements Runnable {
       }
     } finally {
       // Update leader election completion metric(s).
-      server.getLeaderElectionMetrics().onNewLeaderElectionCompletion();
+      server.onNewLeaderElectionCompletion();
       lifeCycle.checkStateAndClose(() -> {});
     }
   }
 
   private boolean shouldRun() {
-    final DivisionInfo info = server.getInfo();
-    return lifeCycle.getCurrentState().isRunning() && info.isCandidate() && 
info.isAlive();
+    return lifeCycle.getCurrentState().isRunning() && server.isCandidate() && 
server.isAlive();
   }
 
   private boolean shouldRun(long electionTerm) {
-    return shouldRun() && server.getState().getCurrentTerm() == electionTerm;
+    return shouldRun() && server.getCurrentTerm() == electionTerm;
   }
 
   private ResultAndTerm submitRequestAndWaitResult(Phase phase, 
RaftConfigurationImpl conf, long electionTerm)
@@ -299,7 +417,7 @@ class LeaderElection implements Runnable {
     if (others.isEmpty()) {
       r = new ResultAndTerm(Result.PASSED, electionTerm);
     } else {
-      final TermIndex lastEntry = server.getState().getLastEntry();
+      final TermIndex lastEntry = server.getLastEntry();
       final Executor voteExecutor = new Executor(this, others.size());
       try {
         final int submitted = submitRequests(phase, electionTerm, lastEntry, 
others, voteExecutor);
@@ -322,8 +440,7 @@ class LeaderElection implements Runnable {
       }
       // If round0 is non-null, we have already called initElection in the 
constructor,
       // reuse round0 to avoid initElection again for the first round
-      final ConfAndTerm confAndTerm = (round == 0 && round0 != null) ?
-          round0 : server.getState().initElection(phase);
+      final ConfAndTerm confAndTerm = (round == 0 && round0 != null) ? round0 
: server.initElection(phase);
       electionTerm = confAndTerm.getTerm();
       conf = confAndTerm.getConf();
     }
@@ -343,15 +460,14 @@ class LeaderElection implements Runnable {
           return true;
         case NOT_IN_CONF:
         case SHUTDOWN:
-          server.close();
-          
server.getStateMachine().event().notifyServerShutdown(server.getRoleInfoProto(),
 false);
+          server.shutdown();
           return false;
         case TIMEOUT:
           return false; // should retry
         case REJECTED:
         case DISCOVERED_A_NEW_TERM:
-          final long term = r.maxTerm(server.getState().getCurrentTerm());
-          server.changeToFollowerAndPersistMetadata(term, false, r);
+          final long term = r.maxTerm(server.getCurrentTerm());
+          server.rejected(term, r);
           return false;
         default: throw new IllegalArgumentException("Unable to process result 
" + r.result);
       }
@@ -364,7 +480,7 @@ class LeaderElection implements Runnable {
     for (final RaftPeer peer : others) {
       final RequestVoteRequestProto r = 
ServerProtoUtils.toRequestVoteRequestProto(
           server.getMemberId(), peer.getId(), electionTerm, lastEntry, phase 
== Phase.PRE_VOTE);
-      voteExecutor.submit(() -> server.getServerRpc().requestVote(r));
+      voteExecutor.submit(() -> server.requestVote(r));
       submitted++;
     }
     return submitted;
@@ -390,6 +506,9 @@ class LeaderElection implements Runnable {
     Set<RaftPeerId> higherPriorityPeers = getHigherPriorityPeers(conf);
     final boolean singleMode = conf.isSingleMode(server.getId());
 
+    // true iff this server does not have any commits
+    final boolean emptyCommit = server.getLastCommittedIndex() < 
RaftLog.LEAST_VALID_LOG_INDEX;
+
     while (waitForNum > 0 && shouldRun(electionTerm)) {
       final TimeDuration waitTime = timeout.elapsedTime().apply(n -> -n);
       if (waitTime.isNonPositive()) {
@@ -439,7 +558,10 @@ class LeaderElection implements Runnable {
         // all higher priority peers have replied
         higherPriorityPeers.remove(replierId);
 
-        if (r.getServerReply().getSuccess()) {
+        final boolean acceptVote = r.getServerReply().getSuccess()
+            // When the commits are non-empty, do not accept votes from empty 
log voters.
+            && (emptyCommit || nonEmptyLog(r));
+        if (acceptVote) {
           votedPeers.add(replierId);
           // If majority and all peers with higher priority have voted, 
candidate pass vote
           if (higherPriorityPeers.isEmpty() && conf.hasMajority(votedPeers, 
server.getId())) {
@@ -448,6 +570,7 @@ class LeaderElection implements Runnable {
         } else {
           rejectedPeers.add(replierId);
           if (conf.majorityRejectVotes(rejectedPeers)) {
+            LOG.info("rejectedPeers: {}, emptyCommit? {}", rejectedPeers, 
emptyCommit);
             return logAndReturn(phase, Result.REJECTED, responses, exceptions);
           }
         }
@@ -467,6 +590,26 @@ class LeaderElection implements Runnable {
     }
   }
 
+  /**
+   * @return true if the given reply indicates that the voter has a non-empty 
raft log.
+   *         Note that a voter running with an old version may not include the 
lastEntry in the reply.
+   *         For compatibility, this method returns true for such case.
+   */
+  static boolean nonEmptyLog(RequestVoteReplyProto reply) {
+    final TermIndexProto lastEntry = reply.getLastEntry();
+    // valid term >= 1 and valid index >= 0; therefore, (0, 0) can only be the 
proto default
+    if (lastEntry.equals(TermIndexProto.getDefaultInstance())) { // default: 
(0,0)
+      LOG.info("Reply missing lastEntry: {} ", 
ServerStringUtils.toRequestVoteReplyString(reply));
+      return true; // accept voters with an older version
+    }
+    if (lastEntry.getTerm() > 0) { // when log is empty, lastEntry is (0,-1).
+      return true; // accept voters with a non-empty log
+    }
+
+    LOG.info("Replier log is empty: {} ", 
ServerStringUtils.toRequestVoteReplyString(reply));
+    return false; // reject voters with an empty log
+  }
+
   @Override
   public String toString() {
     return name;
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index c7e29c539..043ba1ee7 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -1504,7 +1504,7 @@ class RaftServerImpl implements RaftServer.Division,
         shouldShutdown = true;
       }
       reply = toRequestVoteReplyProto(candidateId, getMemberId(),
-          voteGranted, state.getCurrentTerm(), shouldShutdown);
+          voteGranted, state.getCurrentTerm(), shouldShutdown, 
state.getLastEntry());
       if (LOG.isInfoEnabled()) {
         LOG.info("{} replies to {} vote request: {}. Peer's state: {}",
             getMemberId(), phase, toRequestVoteReplyString(reply), state);
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
index 29a42f65a..e6a29189a 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
@@ -43,11 +43,13 @@ final class ServerProtoUtils {
   }
 
   static RequestVoteReplyProto toRequestVoteReplyProto(
-      RaftPeerId requestorId, RaftGroupMemberId replyId, boolean success, long 
term, boolean shouldShutdown) {
+      RaftPeerId requestorId, RaftGroupMemberId replyId, boolean success, long 
term, boolean shouldShutdown,
+      TermIndex lastEntry) {
     return RequestVoteReplyProto.newBuilder()
         .setServerReply(toRaftRpcReplyProtoBuilder(requestorId, replyId, 
success))
         .setTerm(term)
         .setShouldShutdown(shouldShutdown)
+        .setLastEntry((lastEntry != null? lastEntry : 
TermIndex.INITIAL_VALUE).toProto())
         .build();
   }
 
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/util/ServerStringUtils.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/util/ServerStringUtils.java
index 6601eddce..3a5db6285 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/util/ServerStringUtils.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/util/ServerStringUtils.java
@@ -118,7 +118,8 @@ public final class ServerStringUtils {
     if (proto == null) {
       return null;
     }
-    return ProtoUtils.toString(proto.getServerReply()) + "-t" + 
proto.getTerm();
+    return ProtoUtils.toString(proto.getServerReply()) + "-t" + proto.getTerm()
+        + "-last:" + TermIndex.valueOf(proto.getLastEntry());
   }
 
   /**
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/server/impl/TestLeaderElectionServerInterface.java
 
b/ratis-test/src/test/java/org/apache/ratis/server/impl/TestLeaderElectionServerInterface.java
new file mode 100644
index 000000000..876633db1
--- /dev/null
+++ 
b/ratis-test/src/test/java/org/apache/ratis/server/impl/TestLeaderElectionServerInterface.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.impl;
+
+import org.apache.ratis.BaseTest;
+import org.apache.ratis.metrics.Timekeeper;
+import org.apache.ratis.proto.RaftProtos.RequestVoteReplyProto;
+import org.apache.ratis.proto.RaftProtos.RequestVoteRequestProto;
+import org.apache.ratis.protocol.RaftGroup;
+import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.protocol.RaftGroupMemberId;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.server.protocol.TermIndex;
+import org.apache.ratis.util.TimeDuration;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+public class TestLeaderElectionServerInterface extends BaseTest {
+  private final List<RaftPeer> peers = IntStream.range(0, 3).boxed()
+      .map(i -> RaftPeer.newBuilder().setId("s" + i).build())
+      .collect(Collectors.toList());
+  private final RaftGroup group = RaftGroup.valueOf(RaftGroupId.randomId(), 
peers);
+  private final RaftConfigurationImpl conf = 
RaftConfigurationImpl.newBuilder().setLogEntryIndex(0).setConf(peers).build();
+  private final ThreadGroup threadGroup = new ThreadGroup("ServerInterface");
+
+  private final RaftGroupMemberId candidate = 
RaftGroupMemberId.valueOf(peers.get(0).getId(), group.getGroupId());
+
+  LeaderElection.ServerInterface newServerInterface(boolean expectToPass,
+      Map<RaftPeerId, TermIndex> lastEntries) {
+    return new LeaderElection.ServerInterface() {
+      private volatile boolean isAlive = true;
+
+      @Override
+      public RaftGroupMemberId getMemberId() {
+        return candidate;
+      }
+
+      @Override
+      public boolean isAlive() {
+        return isAlive;
+      }
+
+      @Override
+      public boolean isCandidate() {
+        return true;
+      }
+
+      @Override
+      public long getCurrentTerm() {
+        final TermIndex lastEntry = getLastEntry();
+        return lastEntry != null? lastEntry.getTerm() : 
TermIndex.INITIAL_VALUE.getTerm();
+      }
+
+      @Override
+      public long getLastCommittedIndex() {
+        final TermIndex lastEntry = getLastEntry();
+        return lastEntry != null? lastEntry.getIndex() : 
TermIndex.INITIAL_VALUE.getIndex();
+      }
+
+      @Override
+      public TermIndex getLastEntry() {
+        return lastEntries.get(getId());
+      }
+
+      @Override
+      public boolean isPreVoteEnabled() {
+        return false;
+      }
+
+      @Override
+      public LeaderElection.ConfAndTerm initElection(LeaderElection.Phase 
phase) {
+        return new LeaderElection.ConfAndTerm(conf, getCurrentTerm());
+      }
+
+      @Override
+      public RequestVoteReplyProto requestVote(RequestVoteRequestProto r) {
+        final RaftPeerId voterPeerId = 
RaftPeerId.valueOf(r.getServerRequest().getReplyId());
+        final RaftGroupMemberId voter = RaftGroupMemberId.valueOf(voterPeerId, 
group.getGroupId());
+        final TermIndex lastEntry = lastEntries.get(voterPeerId);
+        final long term = (lastEntry != null? lastEntry : 
TermIndex.INITIAL_VALUE).getTerm();
+
+        // voter replies to candidate
+        return ServerProtoUtils.toRequestVoteReplyProto(getId(), voter, true, 
term, false, lastEntry);
+      }
+
+      @Override
+      public void changeToLeader() {
+        assertTrue(expectToPass);
+        isAlive = false;
+      }
+
+      @Override
+      public void rejected(long term, LeaderElection.ResultAndTerm result) {
+        assertFalse(expectToPass);
+        isAlive = false;
+      }
+
+      @Override
+      public void shutdown() {
+        fail();
+      }
+
+      @Override
+      public Timekeeper getLeaderElectionTimer() {
+        final long start = System.nanoTime();
+        final Timekeeper.Context context = () -> System.nanoTime() - start;
+        return () -> context;
+      }
+
+      @Override
+      public void onNewLeaderElectionCompletion() {
+        // no op
+      }
+
+      @Override
+      public TimeDuration getRandomElectionTimeout() {
+        final int millis = 100 + ThreadLocalRandom.current().nextInt(100);
+        return TimeDuration.valueOf(millis, TimeUnit.MILLISECONDS);
+      }
+
+      @Override
+      public ThreadGroup getThreadGroup() {
+        return threadGroup;
+      }
+    };
+  }
+
+  @Test
+  public void testVoterWithEmptyLog() {
+    // all the candidate and the voters have an empty log
+    // expect to pass: empty-log-candidate will accept votes from 
empty-log-voters
+    runTestVoterWithEmptyLog(true);
+
+    // candidate: non-empty commit
+    // voter 1  : empty log
+    // voter 2  : empty log
+    // expect to fail: non-empty-commit-candidate will NOT accept votes from 
empty-log-voters
+    final TermIndex candidateLastEntry = TermIndex.valueOf(2, 9);
+    runTestVoterWithEmptyLog(false, candidateLastEntry);
+
+    // candidate: non-empty commit
+    // voter 1  : non-empty log
+    // voter 2  : empty log
+    // expect to pass: non-empty-commit-candidate will accept votes from 
non-empty-log-voters
+    final TermIndex voterLastEntry = TermIndex.valueOf(2, 7);
+    runTestVoterWithEmptyLog(true, candidateLastEntry, voterLastEntry);
+
+    // candidate: non-empty log
+    // voter 1  : older version
+    // voter 2  : empty log
+    // expect to pass: non-empty-commit-candidate will accept votes from 
older-version-voters
+    runTestVoterWithEmptyLog(true, candidateLastEntry, 
TermIndex.PROTO_DEFAULT);
+  }
+
+  void runTestVoterWithEmptyLog(boolean expectToPass, TermIndex... 
lastEntries) {
+    LOG.info("expectToPass? {}, lastEntries={}",
+        expectToPass, lastEntries);
+    final Map<RaftPeerId, TermIndex> map = new HashMap<>();
+    for(int i = 0; i < lastEntries.length; i++) {
+      map.put(peers.get(i).getId(), lastEntries[i]);
+    }
+    final LeaderElection election = new 
LeaderElection(newServerInterface(expectToPass, map), false);
+    election.startInForeground();
+  }
+
+}
\ No newline at end of file

Reply via email to