This is an automated email from the ASF dual-hosted git repository. szetszwo pushed a commit to branch branch-2_readIndex in repository https://gitbox.apache.org/repos/asf/ratis.git
commit 3ed7c481c068c0a1a1d67d3a32cdf28bc27e1ce8 Author: Kaijie Chen <[email protected]> AuthorDate: Fri Mar 10 00:42:47 2023 +0800 RATIS-1808. Rerun PreVote when Vote is timed out (#846) (cherry picked from commit 601d01deb517e7d7093fcafd91589aa14c2dd72d) --- .../apache/ratis/server/impl/LeaderElection.java | 81 +++++++++++----------- 1 file changed, 39 insertions(+), 42 deletions(-) diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java index ced72604a..5f79940cb 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java @@ -234,11 +234,12 @@ class LeaderElection implements Runnable { return; } - final Timer.Context electionContext = server.getLeaderElectionMetrics().getLeaderElectionTimer().time(); - try { - if (skipPreVote || askForVotes(Phase.PRE_VOTE)) { - if (askForVotes(Phase.ELECTION)) { - server.changeToLeader(); + try (Timer.Context ignored = server.getLeaderElectionMetrics().getLeaderElectionTimer().time()) { + for (int round = 0; shouldRun(); round++) { + if (skipPreVote || askForVotes(Phase.PRE_VOTE, round)) { + if (askForVotes(Phase.ELECTION, round)) { + server.changeToLeader(); + } } } } catch(Exception e) { @@ -257,7 +258,6 @@ class LeaderElection implements Runnable { } } finally { // Update leader election completion metric(s). - electionContext.stop(); server.getLeaderElectionMetrics().onNewLeaderElectionCompletion(); lifeCycle.checkStateAndClose(() -> {}); } @@ -296,48 +296,45 @@ class LeaderElection implements Runnable { } /** Send requestVote rpc to all other peers for the given phase. */ - private boolean askForVotes(Phase phase) throws InterruptedException, IOException { - for(int round = 0; shouldRun(); round++) { - final long electionTerm; - final RaftConfigurationImpl conf; - synchronized (server) { - if (!shouldRun()) { - return false; - } - final ConfAndTerm confAndTerm = server.getState().initElection(phase); - electionTerm = confAndTerm.getTerm(); - conf = confAndTerm.getConf(); + private boolean askForVotes(Phase phase, int round) throws InterruptedException, IOException { + final long electionTerm; + final RaftConfigurationImpl conf; + synchronized (server) { + if (!shouldRun()) { + return false; } + final ConfAndTerm confAndTerm = server.getState().initElection(phase); + electionTerm = confAndTerm.getTerm(); + conf = confAndTerm.getConf(); + } - LOG.info("{} {} round {}: submit vote requests at term {} for {}", this, phase, round, electionTerm, conf); - final ResultAndTerm r = submitRequestAndWaitResult(phase, conf, electionTerm); - LOG.info("{} {} round {}: result {}", this, phase, round, r); + LOG.info("{} {} round {}: submit vote requests at term {} for {}", this, phase, round, electionTerm, conf); + final ResultAndTerm r = submitRequestAndWaitResult(phase, conf, electionTerm); + LOG.info("{} {} round {}: result {}", this, phase, round, r); - synchronized (server) { - if (!shouldRun(electionTerm)) { - return false; // term already passed or this should not run anymore. - } + synchronized (server) { + if (!shouldRun(electionTerm)) { + return false; // term already passed or this should not run anymore. + } - switch (r.getResult()) { - case PASSED: - return true; - case NOT_IN_CONF: - case SHUTDOWN: - server.getRaftServer().close(); - server.getStateMachine().event().notifyServerShutdown(server.getRoleInfoProto()); - return false; - case TIMEOUT: - continue; // should retry - case REJECTED: - case DISCOVERED_A_NEW_TERM: - final long term = r.maxTerm(server.getState().getCurrentTerm()); - server.changeToFollowerAndPersistMetadata(term, false, r); - return false; - default: throw new IllegalArgumentException("Unable to process result " + r.result); - } + switch (r.getResult()) { + case PASSED: + return true; + case NOT_IN_CONF: + case SHUTDOWN: + server.getRaftServer().close(); + server.getStateMachine().event().notifyServerShutdown(server.getRoleInfoProto()); + return false; + case TIMEOUT: + return false; // should retry + case REJECTED: + case DISCOVERED_A_NEW_TERM: + final long term = r.maxTerm(server.getState().getCurrentTerm()); + server.changeToFollowerAndPersistMetadata(term, false, r); + return false; + default: throw new IllegalArgumentException("Unable to process result " + r.result); } } - return false; } private int submitRequests(Phase phase, long electionTerm, TermIndex lastEntry,
