Repository: incubator-ratis Updated Branches: refs/heads/master 7f57773b8 -> 00f80b446
RATIS-178. New servers may not be able to join an existing raft group. Contributed by Tsz Wo Nicholas Sze. Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/00f80b44 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/00f80b44 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/00f80b44 Branch: refs/heads/master Commit: 00f80b446faa61310f58d9986aa9bdcea56ac970 Parents: 7f57773 Author: Chen Liang <[email protected]> Authored: Wed Jan 3 13:23:09 2018 -0800 Committer: Chen Liang <[email protected]> Committed: Wed Jan 3 13:23:09 2018 -0800 ---------------------------------------------------------------------- .../org/apache/ratis/grpc/RaftGrpcUtil.java | 10 ++++- .../ratis/grpc/server/GRpcLogAppender.java | 25 ++++++------ .../grpc/server/RaftServerProtocolService.java | 7 +++- .../ratis/server/impl/RaftServerImpl.java | 20 ++++++---- .../ratis/server/impl/LeaderElectionTests.java | 40 ++++++++++++++++++++ 5 files changed, 78 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/00f80b44/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGrpcUtil.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGrpcUtil.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGrpcUtil.java index 5499878..185abbf 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGrpcUtil.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGrpcUtil.java @@ -40,8 +40,14 @@ public interface RaftGrpcUtil { Metadata trailers = new Metadata(); trailers.put(EXCEPTION_TYPE_KEY, t.getClass().getCanonicalName()); return new StatusRuntimeException( - Status.INTERNAL.withDescription(StringUtils.stringifyException(t)), - trailers); + Status.INTERNAL.withCause(t).withDescription(t.getMessage()), trailers); + } + + static Throwable unwrapThrowable(Throwable t) { + if (t instanceof StatusRuntimeException) { + return unwrapException((StatusRuntimeException)t); + } + return t; } static IOException unwrapException(StatusRuntimeException se) { http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/00f80b44/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java index 2d507be..cb13561 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java @@ -17,6 +17,13 @@ */ package org.apache.ratis.grpc.server; +import org.apache.ratis.grpc.GrpcConfigKeys; +import org.apache.ratis.grpc.RaftGRpcService; +import org.apache.ratis.grpc.RaftGrpcUtil; +import org.apache.ratis.server.impl.FollowerInfo; +import org.apache.ratis.server.impl.LeaderState; +import org.apache.ratis.server.impl.LogAppender; +import org.apache.ratis.server.impl.RaftServerImpl; import org.apache.ratis.server.storage.RaftLogIOException; import org.apache.ratis.shaded.io.grpc.Status; import org.apache.ratis.shaded.io.grpc.stub.StreamObserver; @@ -24,18 +31,10 @@ import org.apache.ratis.shaded.proto.RaftProtos.AppendEntriesReplyProto; import org.apache.ratis.shaded.proto.RaftProtos.AppendEntriesRequestProto; import org.apache.ratis.shaded.proto.RaftProtos.InstallSnapshotReplyProto; import org.apache.ratis.shaded.proto.RaftProtos.InstallSnapshotRequestProto; -import org.apache.ratis.grpc.RaftGRpcService; -import org.apache.ratis.grpc.GrpcConfigKeys; -import org.apache.ratis.server.impl.FollowerInfo; -import org.apache.ratis.server.impl.LeaderState; -import org.apache.ratis.server.impl.LogAppender; -import org.apache.ratis.server.impl.RaftServerImpl; import org.apache.ratis.statemachine.SnapshotInfo; import org.apache.ratis.util.CodeInjectionForTesting; import org.apache.ratis.util.Preconditions; -import static org.apache.ratis.grpc.RaftGRpcService.GRPC_SEND_SERVER_REQUEST; - import java.util.LinkedList; import java.util.Objects; import java.util.Queue; @@ -151,8 +150,8 @@ public class GRpcLogAppender extends LogAppender { private void sendRequest(AppendEntriesRequestProto request, StreamObserver<AppendEntriesRequestProto> s) { - CodeInjectionForTesting.execute(GRPC_SEND_SERVER_REQUEST, server.getId(), - null, request); + CodeInjectionForTesting.execute(RaftGRpcService.GRPC_SEND_SERVER_REQUEST, + server.getId(), null, request); s.onNext(request); follower.updateLastRpcSendTime(); @@ -223,8 +222,10 @@ public class GRpcLogAppender extends LogAppender { LOG.info("{} is stopped", GRpcLogAppender.this); return; } - LOG.warn("{} got error when appending entries to {}, exception: {}.", - server.getId(), follower.getPeer().getId(), t); + if (LOG.isWarnEnabled()) { + LOG.warn("{} got error when appending entries to {}, exception: {}.", + server.getId(), follower.getPeer().getId(), RaftGrpcUtil.unwrapThrowable(t)); + } synchronized (this) { final Status cause = Status.fromThrowable(t); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/00f80b44/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/RaftServerProtocolService.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/RaftServerProtocolService.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/RaftServerProtocolService.java index 8c2f31f..a95926a 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/RaftServerProtocolService.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/RaftServerProtocolService.java @@ -23,6 +23,7 @@ import org.apache.ratis.server.protocol.RaftServerProtocol; import org.apache.ratis.shaded.io.grpc.stub.StreamObserver; import org.apache.ratis.shaded.proto.RaftProtos.*; import org.apache.ratis.shaded.proto.grpc.RaftServerProtocolServiceGrpc.RaftServerProtocolServiceImplBase; +import org.apache.ratis.util.ProtoUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -67,8 +68,10 @@ public class RaftServerProtocolService extends RaftServerProtocolServiceImplBase final AppendEntriesReplyProto reply = server.appendEntries(request); responseObserver.onNext(reply); } catch (Throwable e) { - LOG.info("{} got exception when handling appendEntries {}: {}", - getId(), request.getServerRequest(), e); + if (LOG.isDebugEnabled()) { + LOG.debug("{} got exception when appendEntries {}: {}", + getId(), ProtoUtils.toString(request.getServerRequest()), e); + } responseObserver.onError(RaftGrpcUtil.wrapException(e)); } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/00f80b44/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java index 773b0b0..89ea019 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -571,9 +571,14 @@ public class RaftServerImpl implements RaftServerProtocol, return pending.getFuture(); } - private boolean shouldWithholdVotes() { - return isLeader() || (isFollower() && state.hasLeader() - && heartbeatMonitor.shouldWithholdVotes()); + private boolean shouldWithholdVotes(long candidateTerm) { + if (state.getCurrentTerm() < candidateTerm) { + return false; + } else if (isLeader()) { + return true; + } else { + return isFollower() && state.hasLeader() && heartbeatMonitor.shouldWithholdVotes(); + } } /** @@ -618,11 +623,10 @@ public class RaftServerImpl implements RaftServerProtocol, boolean shouldShutdown = false; final RequestVoteReplyProto reply; synchronized (this) { - if (shouldWithholdVotes()) { - LOG.info("{} Withhold vote from server {} with term {}. " + - "This server:{}, last rpc time from leader {} is {}", getId(), - candidateId, candidateTerm, this, this.getState().getLeaderId(), - (isFollower() ? heartbeatMonitor.getLastRpcTime() : -1)); + if (shouldWithholdVotes(candidateTerm)) { + LOG.info("{}-{}: Withhold vote from candidate {} with term {}. State: leader={}, term={}, lastRpcElapsed={}", + getId(), role, candidateId, candidateTerm, state.getLeaderId(), state.getCurrentTerm(), + isFollower()? heartbeatMonitor.getLastRpcTime().elapsedTimeMs() + "ms": null); } else if (state.recognizeCandidate(candidateId, candidateTerm)) { boolean termUpdated = changeToFollower(candidateTerm, false); // see Section 5.4.1 Election restriction http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/00f80b44/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java index 736c5e7..4886bcc 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java @@ -25,10 +25,15 @@ import org.apache.ratis.client.RaftClient; import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.server.storage.RaftStorageTestUtils; import org.apache.ratis.util.ExitUtils; +import org.apache.ratis.util.JavaUtils; import org.apache.ratis.util.LogUtils; +import org.apache.ratis.util.TimeDuration; +import org.junit.Assert; import org.junit.Test; +import java.util.Iterator; import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; import static org.apache.ratis.RaftTestUtil.waitAndKillLeader; import static org.apache.ratis.RaftTestUtil.waitForLeader; @@ -81,4 +86,39 @@ public abstract class LeaderElectionTests<CLUSTER extends MiniRaftCluster> waitForLeader(cluster, leader); cluster.shutdown(); } + + @Test + public void testLateServerStart() throws Exception { + final int numServer = 3; + LOG.info("Running testLateServerStart"); + final MiniRaftCluster cluster = newCluster(numServer); + cluster.initServers(); + + // start all except one servers + final Iterator<RaftServerProxy> i = cluster.getServers().iterator(); + for(int j = 1; j < numServer; j++) { + i.next().start(); + } + + final RaftServerImpl leader = waitForLeader(cluster); + final TimeDuration sleepTime = TimeDuration.valueOf(5, TimeUnit.SECONDS); + LOG.info("sleep " + sleepTime); + sleepTime.sleep(); + + // start the last server + final RaftServerProxy lastServer = i.next(); + lastServer.start(); + final RaftPeerId lastServerLeaderId = JavaUtils.attempt( + () -> getLeader(lastServer.getImpl().getState()), + 10, 1000, "getLeaderId", LOG); + Assert.assertEquals(leader.getId(), lastServerLeaderId); + } + + static RaftPeerId getLeader(ServerState state) { + final RaftPeerId leader = state.getLeaderId(); + if (leader == null) { + throw new IllegalStateException("No leader yet"); + } + return leader; + } }
