This is an automated email from the ASF dual-hosted git repository. szetszwo pushed a commit to branch branch-2_readIndex in repository https://gitbox.apache.org/repos/asf/ratis.git
commit f0d9d566e96a18776eb6e959d194dd7a228c3c93 Author: Kaijie Chen <[email protected]> AuthorDate: Sun Mar 19 14:34:31 2023 +0800 RATIS-1770. Yield leader to higher priority peer by TransferLeadership (#845) (cherry picked from commit ee642b0b537629108227864f3458daf20950b9bc) --- .../apache/ratis/server/impl/LeaderStateImpl.java | 68 ++---- .../apache/ratis/server/impl/RaftServerImpl.java | 11 +- .../org/apache/ratis/server/impl/ServerState.java | 1 - .../ratis/server/impl/TransferLeadership.java | 235 ++++++++++++++++++--- .../ratis/server/impl/LeaderElectionTests.java | 4 +- .../shell/cli/sh/election/TransferCommand.java | 8 +- 6 files changed, 234 insertions(+), 93 deletions(-) diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java index 694183bcf..6627d8e7b 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java @@ -26,8 +26,6 @@ import org.apache.ratis.proto.RaftProtos.LogEntryProto.LogEntryBodyCase; import org.apache.ratis.proto.RaftProtos.RaftPeerRole; import org.apache.ratis.proto.RaftProtos.ReplicationLevel; import org.apache.ratis.proto.RaftProtos.RoleInfoProto; -import org.apache.ratis.proto.RaftProtos.StartLeaderElectionReplyProto; -import org.apache.ratis.proto.RaftProtos.StartLeaderElectionRequestProto; import org.apache.ratis.protocol.Message; import org.apache.ratis.protocol.RaftClientReply; import org.apache.ratis.protocol.RaftClientRequest; @@ -62,6 +60,7 @@ import org.apache.ratis.util.TimeDuration; import org.apache.ratis.util.Timestamp; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -424,6 +423,10 @@ class LeaderStateImpl implements LeaderState { return currentTerm; } + TermIndex getLastEntry() { + return server.getState().getLastEntry(); + } + @Override public boolean onFollowerTerm(FollowerInfo follower, long followerTerm) { if (isAttendingVote(follower) && followerTerm > getCurrentTerm()) { @@ -658,46 +661,14 @@ class LeaderStateImpl implements LeaderState { return pendingStepDown.submitAsync(request); } - private synchronized void sendStartLeaderElection(RaftPeerId follower, TermIndex lastEntry) { - ServerState state = server.getState(); - TermIndex currLastEntry = state.getLastEntry(); - if (ServerState.compareLog(currLastEntry, lastEntry) != 0) { - LOG.warn("{} can not send StartLeaderElectionRequest to follower:{} because currLastEntry:{} " + - "did not match lastEntry:{}", this, follower, currLastEntry, lastEntry); - return; - } - LOG.info("{}: send StartLeaderElectionRequest to follower {} on term {}, lastEntry={}", - this, follower, currentTerm, lastEntry); - - final StartLeaderElectionRequestProto r = ServerProtoUtils.toStartLeaderElectionRequestProto( - server.getMemberId(), follower, lastEntry); - CompletableFuture.supplyAsync(() -> { - server.getLeaderElectionMetrics().onTransferLeadership(); - try { - StartLeaderElectionReplyProto replyProto = server.getServerRpc().startLeaderElection(r); - LOG.info("{} received {} reply of StartLeaderElectionRequest from follower:{}", - this, replyProto.getServerReply().getSuccess() ? "success" : "fail", follower); - } catch (IOException e) { - LOG.warn("{} send StartLeaderElectionRequest throw exception", this, e); + private static LogAppender chooseUpToDateFollower(List<LogAppender> followers, TermIndex leaderLastEntry) { + for(LogAppender f : followers) { + if (TransferLeadership.isFollowerUpToDate(f.getFollower(), leaderLastEntry) + == TransferLeadership.Result.SUCCESS) { + return f; } - return null; - }); - } - - boolean sendStartLeaderElection(FollowerInfo followerInfo) { - final RaftPeerId followerId = followerInfo.getId(); - final TermIndex leaderLastEntry = server.getState().getLastEntry(); - if (leaderLastEntry == null) { - sendStartLeaderElection(followerId, null); - return true; } - - final long followerMatchIndex = followerInfo.getMatchIndex(); - if (followerMatchIndex >= leaderLastEntry.getIndex()) { - sendStartLeaderElection(followerId, leaderLastEntry); - return true; - } - return false; + return null; } private void prepare() { @@ -778,7 +749,7 @@ class LeaderStateImpl implements LeaderState { } else { eventQueue.submit(checkStagingEvent); } - server.getTransferLeadership().onFollowerAppendEntriesReply(this, follower); + server.getTransferLeadership().onFollowerAppendEntriesReply(follower); } @Override @@ -1044,7 +1015,7 @@ class LeaderStateImpl implements LeaderState { } final int leaderPriority = leader.getPriority(); - FollowerInfo highestPriorityInfo = null; + final List<LogAppender> highestPriorityInfos = new ArrayList<>(); int highestPriority = Integer.MIN_VALUE; for (LogAppender logAppender : senders) { final RaftPeer follower = conf.getPeer(logAppender.getFollowerId()); @@ -1053,12 +1024,17 @@ class LeaderStateImpl implements LeaderState { } final int followerPriority = follower.getPriority(); if (followerPriority > leaderPriority && followerPriority >= highestPriority) { - highestPriority = followerPriority; - highestPriorityInfo = logAppender.getFollower(); + if (followerPriority > highestPriority) { + highestPriority = followerPriority; + highestPriorityInfos.clear(); + } + highestPriorityInfos.add(logAppender); } } - if (highestPriorityInfo != null) { - sendStartLeaderElection(highestPriorityInfo); + final TermIndex leaderLastEntry = getLastEntry(); + final LogAppender appender = chooseUpToDateFollower(highestPriorityInfos, leaderLastEntry); + if (appender != null) { + server.getTransferLeadership().start(appender); } } diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java index c64aaebba..5ecbc36d4 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -225,7 +225,7 @@ class RaftServerImpl implements RaftServer.Division, .setProperties(getRaftServer().getProperties()) .build()); - this.transferLeadership = new TransferLeadership(this); + this.transferLeadership = new TransferLeadership(this, properties); this.snapshotRequestHandler = new SnapshotManagementRequestHandler(this); this.snapshotInstallationHandler = new SnapshotInstallationHandler(this, properties); @@ -1074,10 +1074,6 @@ class RaftServerImpl implements RaftServer.Division, return transferLeadership.isSteppingDown(); } - void finishTransferLeadership() { - transferLeadership.finish(state.getLeaderId(), false); - } - CompletableFuture<RaftClientReply> transferLeadershipAsync(TransferLeadershipRequest request) throws IOException { if (request.getNewLeader() == null) { @@ -1463,6 +1459,10 @@ class RaftServerImpl implements RaftServer.Division, return commitInfoCache.update(getPeer(), state.getLog().getLastCommittedIndex()); } + ExecutorService getServerExecutor() { + return serverExecutor; + } + @SuppressWarnings("checkstyle:parameternumber") private CompletableFuture<AppendEntriesReplyProto> appendEntriesAsync( RaftPeerId leaderId, long leaderTerm, TermIndex previous, long leaderCommit, long callId, boolean initializing, @@ -1852,5 +1852,6 @@ class RaftServerImpl implements RaftServer.Division, void onGroupLeaderElected() { this.firstElectionSinceStartup.set(false); + transferLeadership.complete(TransferLeadership.Result.SUCCESS); } } diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java index 3ced7b9c7..fa685325e 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java @@ -317,7 +317,6 @@ class ServerState implements Closeable { LOG.info("{}: change Leader from {} to {} at term {} for {}{}", getMemberId(), oldLeaderId, newLeaderId, getCurrentTerm(), op, suffix); if (newLeaderId != null) { - server.finishTransferLeadership(); server.onGroupLeaderElected(); } } diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/TransferLeadership.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/TransferLeadership.java index c5c1a46cb..beab02b67 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/TransferLeadership.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/TransferLeadership.java @@ -17,27 +17,110 @@ */ package org.apache.ratis.server.impl; +import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.proto.RaftProtos; +import org.apache.ratis.protocol.ClientId; import org.apache.ratis.protocol.RaftClientReply; import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.protocol.TransferLeadershipRequest; import org.apache.ratis.protocol.exceptions.TransferLeadershipException; +import org.apache.ratis.server.RaftServerConfigKeys; import org.apache.ratis.server.leader.FollowerInfo; import org.apache.ratis.server.leader.LogAppender; +import org.apache.ratis.server.protocol.TermIndex; import org.apache.ratis.util.JavaUtils; import org.apache.ratis.util.MemoizedSupplier; +import org.apache.ratis.util.StringUtils; import org.apache.ratis.util.TimeDuration; import org.apache.ratis.util.TimeoutExecutor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; public class TransferLeadership { public static final Logger LOG = LoggerFactory.getLogger(TransferLeadership.class); + private static class Context { + private final TransferLeadershipRequest request; + private final Supplier<LogAppender> transferee; + + Context(TransferLeadershipRequest request, Supplier<LogAppender> transferee) { + this.request = request; + this.transferee = transferee; + } + + TransferLeadershipRequest getRequest() { + return request; + } + + RaftPeerId getTransfereeId() { + return request.getNewLeader(); + } + + LogAppender getTransfereeLogAppender() { + return transferee.get(); + } + } + + static class Result { + enum Type { + SUCCESS, + DIFFERENT_LEADER, + NULL_FOLLOWER, + NULL_LOG_APPENDER, + NOT_UP_TO_DATE, + TIMED_OUT, + FAILED_TO_START, + COMPLETED_EXCEPTIONALLY, + } + + static final Result SUCCESS = new Result(Type.SUCCESS); + static final Result DIFFERENT_LEADER = new Result(Type.DIFFERENT_LEADER); + static final Result NULL_FOLLOWER = new Result(Type.NULL_FOLLOWER); + static final Result NULL_LOG_APPENDER = new Result(Type.NULL_LOG_APPENDER); + + private final Type type; + private final String errorMessage; + private final Throwable exception; + + private Result(Type type) { + this(type, null); + } + + private Result(Type type, String errorMessage, Throwable exception) { + this.type = type; + this.errorMessage = errorMessage; + this.exception = exception; + } + + Result(Type type, String errorMessage) { + this(type, errorMessage, null); + } + + Result(Throwable t) { + this(Type.COMPLETED_EXCEPTIONALLY, null, t); + } + + Type getType() { + return type; + } + + @Override + public String toString() { + if (exception == null) { + return type + (errorMessage == null ? "" : "(" + errorMessage + ")"); + } + return type + ": " + StringUtils.stringifyException(exception); + } + } + class PendingRequest { private final TransferLeadershipRequest request; private final CompletableFuture<RaftClientReply> replyFuture = new CompletableFuture<>(); @@ -54,17 +137,20 @@ public class TransferLeadership { return replyFuture; } - void complete(RaftPeerId currentLeader, boolean timeout) { + void complete(Result result) { if (replyFuture.isDone()) { return; } - + final RaftPeerId currentLeader = server.getState().getLeaderId(); if (currentLeader != null && currentLeader.equals(request.getNewLeader())) { replyFuture.complete(server.newSuccessReply(request)); - } else if (timeout) { + } else { + if (result.getType() == Result.Type.SUCCESS) { + result = Result.DIFFERENT_LEADER; + } final TransferLeadershipException tle = new TransferLeadershipException(server.getMemberId() + ": Failed to transfer leadership to " + request.getNewLeader() - + " (timed out " + request.getTimeoutMs() + "ms): current leader is " + currentLeader); + + " (the current leader is " + currentLeader + "): " + result); replyFuture.complete(server.newExceptionReply(request, tle)); } } @@ -76,11 +162,14 @@ public class TransferLeadership { } private final RaftServerImpl server; + private final TimeDuration requestTimeout; private final TimeoutExecutor scheduler = TimeoutExecutor.getInstance(); + private final AtomicReference<PendingRequest> pending = new AtomicReference<>(); - TransferLeadership(RaftServerImpl server) { + TransferLeadership(RaftServerImpl server, RaftProperties properties) { this.server = server; + this.requestTimeout = RaftServerConfigKeys.Rpc.requestTimeout(properties); } private Optional<RaftPeerId> getTransferee() { @@ -92,50 +181,130 @@ public class TransferLeadership { return pending.get() != null; } - void onFollowerAppendEntriesReply(LeaderStateImpl leaderState, FollowerInfo follower) { - final Optional<RaftPeerId> transferee = getTransferee(); - // If the transferee has just append some entries and becomes up-to-date, - // send StartLeaderElection to it - if (transferee.filter(t -> t.equals(follower.getId())).isPresent() - && leaderState.sendStartLeaderElection(follower)) { + static Result isFollowerUpToDate(FollowerInfo follower, TermIndex leaderLastEntry) { + if (follower == null) { + return Result.NULL_FOLLOWER; + } else if (leaderLastEntry != null) { + final long followerMatchIndex = follower.getMatchIndex(); + if (followerMatchIndex < leaderLastEntry.getIndex()) { + return new Result(Result.Type.NOT_UP_TO_DATE, "followerMatchIndex = " + followerMatchIndex + + " < leaderLastEntry.getIndex() = " + leaderLastEntry.getIndex()); + } + } + return Result.SUCCESS; + } + + private Result sendStartLeaderElection(FollowerInfo follower) { + final TermIndex lastEntry = server.getState().getLastEntry(); + + final Result result = isFollowerUpToDate(follower, lastEntry); + if (result != Result.SUCCESS) { + return result; + } + + final RaftPeerId transferee = follower.getId(); + LOG.info("{}: sendStartLeaderElection to follower {}, lastEntry={}", + server.getMemberId(), transferee, lastEntry); + + final RaftProtos.StartLeaderElectionRequestProto r = ServerProtoUtils.toStartLeaderElectionRequestProto( + server.getMemberId(), transferee, lastEntry); + final CompletableFuture<RaftProtos.StartLeaderElectionReplyProto> f = CompletableFuture.supplyAsync(() -> { + server.getLeaderElectionMetrics().onTransferLeadership(); + try { + return server.getServerRpc().startLeaderElection(r); + } catch (IOException e) { + throw new CompletionException("Failed to sendStartLeaderElection to follower " + transferee, e); + } + }, server.getServerExecutor()).whenComplete((reply, exception) -> { + if (reply != null) { + LOG.info("{}: Received startLeaderElection reply from {}: success? {}", + server.getMemberId(), transferee, reply.getServerReply().getSuccess()); + } else if (exception != null) { + LOG.warn(server.getMemberId() + ": Failed to startLeaderElection for " + transferee, exception); + } + }); + + if (f.isCompletedExceptionally()) { // already failed + try { + f.join(); + } catch (Throwable t) { + return new Result(t); + } + } + return Result.SUCCESS; + } + + /** + * If the transferee has just append some entries and becomes up-to-date, + * send StartLeaderElection to it + */ + void onFollowerAppendEntriesReply(FollowerInfo follower) { + if (!getTransferee().filter(t -> t.equals(follower.getId())).isPresent()) { + return; + } + final Result result = sendStartLeaderElection(follower); + if (result == Result.SUCCESS) { LOG.info("{}: sent StartLeaderElection to transferee {} after received AppendEntriesResponse", - server.getMemberId(), transferee.get()); + server.getMemberId(), follower.getId()); } } - private void tryTransferLeadership(LeaderStateImpl leaderState, RaftPeerId transferee) { + private Result tryTransferLeadership(Context context) { + final RaftPeerId transferee = context.getTransfereeId(); LOG.info("{}: start transferring leadership to {}", server.getMemberId(), transferee); - final LogAppender appender = leaderState.getLogAppender(transferee).orElse(null); - + final LogAppender appender = context.getTransfereeLogAppender(); if (appender == null) { - LOG.error("{}: cannot find LogAppender for transferee {}", server.getMemberId(), transferee); - return; + return Result.NULL_LOG_APPENDER; } final FollowerInfo follower = appender.getFollower(); - if (leaderState.sendStartLeaderElection(follower)) { - LOG.info("{}: sent StartLeaderElection to transferee {} immediately as it already has up-to-date log", - server.getMemberId(), transferee); - } else { - LOG.info("{}: notifying LogAppender to send AppendEntries as transferee {} is not up-to-date", - server.getMemberId(), transferee); + final Result result = sendStartLeaderElection(follower); + if (result.getType() == Result.Type.SUCCESS) { + LOG.info("{}: {} sent StartLeaderElection to transferee {} immediately as it already has up-to-date log", + server.getMemberId(), result, transferee); + } else if (result.getType() == Result.Type.NOT_UP_TO_DATE) { + LOG.info("{}: {} notifying LogAppender to send AppendEntries to transferee {}", + server.getMemberId(), result, transferee); appender.notifyLogAppender(); } + return result; + } + + void start(LogAppender transferee) { + // TransferLeadership will block client request, so we don't want wait too long. + // If everything goes well, transferee should be elected within the min rpc timeout. + final long timeout = server.properties().minRpcTimeoutMs(); + final TransferLeadershipRequest request = new TransferLeadershipRequest(ClientId.emptyClientId(), + server.getId(), server.getMemberId().getGroupId(), 0, transferee.getFollowerId(), timeout); + start(new Context(request, () -> transferee)); } CompletableFuture<RaftClientReply> start(LeaderStateImpl leaderState, TransferLeadershipRequest request) { + final Context context = new Context(request, + JavaUtils.memoize(() -> leaderState.getLogAppender(request.getNewLeader()).orElse(null))); + return start(context); + } + + private CompletableFuture<RaftClientReply> start(Context context) { + final TransferLeadershipRequest request = context.getRequest(); final MemoizedSupplier<PendingRequest> supplier = JavaUtils.memoize(() -> new PendingRequest(request)); final PendingRequest previous = pending.getAndUpdate(f -> f != null? f: supplier.get()); if (previous != null) { return createReplyFutureFromPreviousRequest(request, previous); } - tryTransferLeadership(leaderState, request.getNewLeader()); - - // if timeout is not specified in request, default to random election timeout - final TimeDuration timeout = request.getTimeoutMs() == 0 ? server.getRandomElectionTimeout() - : TimeDuration.valueOf(request.getTimeoutMs(), TimeUnit.MILLISECONDS); - scheduler.onTimeout(timeout, () -> finish(server.getState().getLeaderId(), true), - LOG, () -> "Failed to transfer leadership to " + request.getNewLeader() + ": timeout after " + timeout); - return supplier.get().getReplyFuture(); + final PendingRequest pendingRequest = supplier.get(); + final Result result = tryTransferLeadership(context); + final Result.Type type = result.getType(); + if (type != Result.Type.SUCCESS && type != Result.Type.NOT_UP_TO_DATE) { + pendingRequest.complete(result); + } else { + // if timeout is not specified in request, use default request timeout + final TimeDuration timeout = request.getTimeoutMs() == 0 ? requestTimeout + : TimeDuration.valueOf(request.getTimeoutMs(), TimeUnit.MILLISECONDS); + scheduler.onTimeout(timeout, () -> complete(new Result(Result.Type.TIMED_OUT, + timeout.toString(TimeUnit.SECONDS, 3))), + LOG, () -> "Failed to handle timeout"); + } + return pendingRequest.getReplyFuture(); } private CompletableFuture<RaftClientReply> createReplyFutureFromPreviousRequest( @@ -158,8 +327,8 @@ public class TransferLeadership { } } - void finish(RaftPeerId currentLeader, boolean timeout) { + void complete(Result result) { Optional.ofNullable(pending.getAndSet(null)) - .ifPresent(r -> r.complete(currentLeader, timeout)); + .ifPresent(r -> r.complete(result)); } } diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java index f77d96f5e..76761249b 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java @@ -259,7 +259,7 @@ public abstract class LeaderElectionTests<CLUSTER extends MiniRaftCluster> long cost = System.currentTimeMillis() - start; Assert.assertTrue(cost > timeoutMs); Assert.assertTrue(e.getMessage().contains("Failed to transfer leadership to")); - Assert.assertTrue(e.getMessage().contains("timed out")); + Assert.assertTrue(e.getMessage().contains(TransferLeadership.Result.Type.TIMED_OUT.toString())); } return true; @@ -282,7 +282,7 @@ public abstract class LeaderElectionTests<CLUSTER extends MiniRaftCluster> // after transfer timeout, leader should accept request RaftClientReply reply = client.io().send(new RaftTestUtil.SimpleMessage("message")); - Assert.assertTrue(reply.getReplierId().equals(leader.getId().toString())); + Assert.assertEquals(leader.getId().toString(), reply.getReplierId()); Assert.assertTrue(reply.isSuccess()); deIsolate(cluster, newLeader.getId()); diff --git a/ratis-shell/src/main/java/org/apache/ratis/shell/cli/sh/election/TransferCommand.java b/ratis-shell/src/main/java/org/apache/ratis/shell/cli/sh/election/TransferCommand.java index b5180548c..7dba5ae9e 100644 --- a/ratis-shell/src/main/java/org/apache/ratis/shell/cli/sh/election/TransferCommand.java +++ b/ratis-shell/src/main/java/org/apache/ratis/shell/cli/sh/election/TransferCommand.java @@ -58,12 +58,8 @@ public class TransferCommand extends AbstractRatisCommand { super.run(cl); String strAddr = cl.getOptionValue(ADDRESS_OPTION_NAME); - // TODO: Default timeout should be set to 0, which means let server decide (based on election timeout). - // However, occasionally the request could timeout too fast while the transfer is in progress. - // i.e. request timeout doesn't mean transfer leadership has failed. - // Currently, Ratis shell returns merely based on the result of the request. - // So we set a larger default timeout here (3s). - final TimeDuration timeoutDefault = TimeDuration.valueOf(3, TimeUnit.SECONDS); + // Default timeout is 0, which means let server decide (will use default request timeout). + final TimeDuration timeoutDefault = TimeDuration.ZERO; // Default timeout for legacy mode matches with the legacy command (version 2.4.x and older). final TimeDuration timeoutLegacy = TimeDuration.valueOf(60, TimeUnit.SECONDS); final Optional<TimeDuration> timeout = !cl.hasOption(TIMEOUT_OPTION_NAME) ? Optional.empty() :
