Repository: incubator-ratis Updated Branches: refs/heads/master 704072c5c -> ee262d7e3
RATIS-109. Improve the log messages in RaftServerImpl and the related code. Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/ee262d7e Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/ee262d7e Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/ee262d7e Branch: refs/heads/master Commit: ee262d7e3daefe323900146beb460b20b1a72869 Parents: 704072c Author: Tsz-Wo Nicholas Sze <[email protected]> Authored: Wed Aug 23 11:51:42 2017 -0700 Committer: Tsz-Wo Nicholas Sze <[email protected]> Committed: Wed Aug 23 11:51:42 2017 -0700 ---------------------------------------------------------------------- .../java/org/apache/ratis/protocol/RaftId.java | 2 +- .../ratis/protocol/ServerNotReadyException.java | 27 +++++ .../arithmetic/ArithmeticStateMachine.java | 14 ++- .../ratis/examples/RaftExamplesTestUtil.java | 5 +- .../examples/arithmetic/TestArithmetic.java | 29 +++-- .../TestRaftStateMachineException.java | 2 +- .../apache/ratis/server/impl/FollowerState.java | 3 +- .../ratis/server/impl/LeaderElection.java | 2 +- .../apache/ratis/server/impl/LeaderState.java | 8 +- .../ratis/server/impl/PendingRequest.java | 2 +- .../ratis/server/impl/RaftConfiguration.java | 6 +- .../ratis/server/impl/RaftServerImpl.java | 110 ++++++++++--------- .../ratis/server/impl/ServerImplUtils.java | 6 +- .../ratis/server/impl/ServerProtoUtils.java | 17 ++- .../apache/ratis/server/impl/ServerState.java | 15 +-- .../ratis/server/impl/StateMachineUpdater.java | 11 +- .../apache/ratis/server/protocol/TermIndex.java | 10 ++ .../ratis/server/storage/RaftLogWorker.java | 2 +- .../statemachine/SimpleStateMachineStorage.java | 1 + .../java/org/apache/ratis/MiniRaftCluster.java | 11 +- .../impl/RaftReconfigurationBaseTest.java | 7 +- 21 files changed, 185 insertions(+), 105 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ee262d7e/ratis-common/src/main/java/org/apache/ratis/protocol/RaftId.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftId.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftId.java index ebf9f75..0846856 100644 --- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftId.java +++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftId.java @@ -31,7 +31,7 @@ public abstract class RaftId { private static void checkLength(int length, String name) { Preconditions.assertTrue(length == BYTE_LENGTH, - " = %s != BYTE_LENGTH = %s", name, length, BYTE_LENGTH); + "%s = %s != BYTE_LENGTH = %s", name, length, BYTE_LENGTH); } private static UUID toUuid(ByteString bytes) { http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ee262d7e/ratis-common/src/main/java/org/apache/ratis/protocol/ServerNotReadyException.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/ServerNotReadyException.java b/ratis-common/src/main/java/org/apache/ratis/protocol/ServerNotReadyException.java new file mode 100644 index 0000000..80307e6 --- /dev/null +++ b/ratis-common/src/main/java/org/apache/ratis/protocol/ServerNotReadyException.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.protocol; + +/** + * The server is not ready yet. + */ +public class ServerNotReadyException extends RaftException { + public ServerNotReadyException(String message) { + super(message); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ee262d7e/ratis-examples/src/main/java/org/apache/ratis/examples/arithmetic/ArithmeticStateMachine.java ---------------------------------------------------------------------- diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/arithmetic/ArithmeticStateMachine.java b/ratis-examples/src/main/java/org/apache/ratis/examples/arithmetic/ArithmeticStateMachine.java index a4199b6..cf61df1 100644 --- a/ratis-examples/src/main/java/org/apache/ratis/examples/arithmetic/ArithmeticStateMachine.java +++ b/ratis-examples/src/main/java/org/apache/ratis/examples/arithmetic/ArithmeticStateMachine.java @@ -88,8 +88,8 @@ public class ArithmeticStateMachine extends BaseStateMachine { last = latestTermIndex.get(); } - File snapshotFile = new File(SimpleStateMachineStorage.getSnapshotFileName( - last.getTerm(), last.getIndex())); + final File snapshotFile = storage.getSnapshotFile(last.getTerm(), last.getIndex()); + LOG.info("Taking a snapshot to file {}", snapshotFile); try(final ObjectOutputStream out = new ObjectOutputStream( new BufferedOutputStream(new FileOutputStream(snapshotFile)))) { @@ -107,12 +107,16 @@ public class ArithmeticStateMachine extends BaseStateMachine { } private long load(SingleFileSnapshotInfo snapshot, boolean reload) throws IOException { - if (snapshot == null || !snapshot.getFile().getPath().toFile().exists()) { - LOG.warn("The snapshot file {} does not exist", snapshot); + if (snapshot == null) { + LOG.warn("The snapshot info is null."); + return RaftServerConstants.INVALID_LOG_INDEX; + } + final File snapshotFile = snapshot.getFile().getPath().toFile(); + if (!snapshotFile.exists()) { + LOG.warn("The snapshot file {} does not exist for snapshot {}", snapshotFile, snapshot); return RaftServerConstants.INVALID_LOG_INDEX; } - File snapshotFile =snapshot.getFile().getPath().toFile(); final TermIndex last = SimpleStateMachineStorage.getTermIndexFromSnapshotFile(snapshotFile); try(final AutoCloseableLock writeLock = writeLock(); final ObjectInputStream in = new ObjectInputStream( http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ee262d7e/ratis-examples/src/test/java/org/apache/ratis/examples/RaftExamplesTestUtil.java ---------------------------------------------------------------------- diff --git a/ratis-examples/src/test/java/org/apache/ratis/examples/RaftExamplesTestUtil.java b/ratis-examples/src/test/java/org/apache/ratis/examples/RaftExamplesTestUtil.java index 7804353..786ac48 100644 --- a/ratis-examples/src/test/java/org/apache/ratis/examples/RaftExamplesTestUtil.java +++ b/ratis-examples/src/test/java/org/apache/ratis/examples/RaftExamplesTestUtil.java @@ -80,10 +80,11 @@ public class RaftExamplesTestUtil { } public static <S extends StateMachine> Collection<Object[]> getMiniRaftClusters( - Class<S> stateMachineClass, Class<?>... clusterClasses) throws IOException { + Class<S> stateMachineClass, int clusterSize, Class<?>... clusterClasses) + throws IOException { final RaftProperties prop = new RaftProperties(); prop.setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY, stateMachineClass, StateMachine.class); - return getMiniRaftClusters(prop, 3, clusterClasses); + return getMiniRaftClusters(prop, clusterSize, clusterClasses); } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ee262d7e/ratis-examples/src/test/java/org/apache/ratis/examples/arithmetic/TestArithmetic.java ---------------------------------------------------------------------- diff --git a/ratis-examples/src/test/java/org/apache/ratis/examples/arithmetic/TestArithmetic.java b/ratis-examples/src/test/java/org/apache/ratis/examples/arithmetic/TestArithmetic.java index 6a16316..feff88f 100644 --- a/ratis-examples/src/test/java/org/apache/ratis/examples/arithmetic/TestArithmetic.java +++ b/ratis-examples/src/test/java/org/apache/ratis/examples/arithmetic/TestArithmetic.java @@ -17,7 +17,6 @@ */ package org.apache.ratis.examples.arithmetic; - import org.apache.log4j.Level; import org.apache.ratis.BaseTest; import org.apache.ratis.MiniRaftCluster; @@ -26,8 +25,8 @@ import org.apache.ratis.client.RaftClient; import org.apache.ratis.examples.RaftExamplesTestUtil; import org.apache.ratis.examples.arithmetic.expression.*; import org.apache.ratis.protocol.RaftClientReply; -import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.util.LogUtils; +import org.apache.ratis.util.Preconditions; import org.junit.Assert; import org.junit.Test; import org.junit.runner.RunWith; @@ -38,13 +37,13 @@ import java.util.Collection; @RunWith(Parameterized.class) public class TestArithmetic extends BaseTest { - static { + { LogUtils.setLogLevel(ArithmeticStateMachine.LOG, Level.ALL); } @Parameterized.Parameters public static Collection<Object[]> data() throws IOException { - return RaftExamplesTestUtil.getMiniRaftClusters(ArithmeticStateMachine.class); + return RaftExamplesTestUtil.getMiniRaftClusters(ArithmeticStateMachine.class, 3); } @Parameterized.Parameter @@ -53,9 +52,20 @@ public class TestArithmetic extends BaseTest { @Test public void testPythagorean() throws Exception { cluster.start(); - RaftTestUtil.waitForLeader(cluster); - final RaftPeerId leaderId = cluster.getLeader().getId(); - final RaftClient client = cluster.createClient(leaderId); + try { + RaftTestUtil.waitForLeader(cluster); + try (final RaftClient client = cluster.createClient()) { + runTestPythagorean(client, 3, 100); + } + } finally { + cluster.shutdown(); + } + } + + public static void runTestPythagorean( + RaftClient client, int start, int count) throws IOException { + Preconditions.assertTrue(count > 0, () -> "count = " + count + " <= 0"); + Preconditions.assertTrue(start >= 2, () -> "start = " + start + " < 2"); final Variable a = new Variable("a"); final Variable b = new Variable("b"); @@ -70,7 +80,8 @@ public class TestArithmetic extends BaseTest { final AssignmentMessage nullB = new AssignmentMessage(b, NullValue.getInstance()); final AssignmentMessage nullC = new AssignmentMessage(c, NullValue.getInstance()); - for(int n = 3; n < 100; n += 2) { + final int end = start + 2*count; + for(int n = (start & 1) == 0? start + 1: start; n < end; n += 2) { int n2 = n*n; int half_n2 = n2/2; @@ -93,8 +104,6 @@ public class TestArithmetic extends BaseTest { r = client.send(nullC); assertRaftClientReply(r, null); } - client.close(); - cluster.shutdown(); } static void assertRaftClientReply(RaftClientReply reply, Double expected) { http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ee262d7e/ratis-examples/src/test/java/org/apache/ratis/statemachine/TestRaftStateMachineException.java ---------------------------------------------------------------------- diff --git a/ratis-examples/src/test/java/org/apache/ratis/statemachine/TestRaftStateMachineException.java b/ratis-examples/src/test/java/org/apache/ratis/statemachine/TestRaftStateMachineException.java index f893cd3..d84a369 100644 --- a/ratis-examples/src/test/java/org/apache/ratis/statemachine/TestRaftStateMachineException.java +++ b/ratis-examples/src/test/java/org/apache/ratis/statemachine/TestRaftStateMachineException.java @@ -73,7 +73,7 @@ public class TestRaftStateMachineException extends BaseTest { @Parameterized.Parameters public static Collection<Object[]> data() throws IOException { return RaftExamplesTestUtil.getMiniRaftClusters( - StateMachineWithException.class); + StateMachineWithException.class, 3); } @Parameterized.Parameter http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ee262d7e/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java index 1e57fa2..0a44e2f 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java @@ -20,12 +20,13 @@ package org.apache.ratis.server.impl; import org.apache.ratis.util.Daemon; import org.apache.ratis.util.Timestamp; import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Used when the peer is a follower. Used to track the election timeout. */ class FollowerState extends Daemon { - static final Logger LOG = RaftServerImpl.LOG; + static final Logger LOG = LoggerFactory.getLogger(FollowerState.class); private final RaftServerImpl server; http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ee262d7e/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java index 8b8e4ff..8c49005 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java @@ -227,7 +227,7 @@ class LeaderElection extends Daemon { } } } catch(ExecutionException e) { - LOG.info("Got exception when requesting votes: " + e); + LOG.info("{} got exception when requesting votes: {}", server.getId(), e); LOG.trace("TRACE", e); exceptions.add(e); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ee262d7e/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java index 313a3bb..e711774 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java @@ -182,7 +182,7 @@ public class LeaderState { try { pendingRequests.sendNotLeaderResponses(); } catch (IOException e) { - LOG.warn("Caught exception in sendNotLeaderResponses", e); + LOG.warn(server.getId() + ": Caught exception in sendNotLeaderResponses", e); } } @@ -549,12 +549,6 @@ public class LeaderState { return lists; } - PendingRequest returnNoConfChange(SetConfigurationRequest r) { - PendingRequest pending = new PendingRequest(r); - pending.setSuccessReply(null); - return pending; - } - void replyPendingRequest(long logIndex, RaftClientReply reply) { pendingRequests.replyPendingRequest(logIndex, reply); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ee262d7e/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequest.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequest.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequest.java index 59f870e..f1909d4 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequest.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequest.java @@ -27,7 +27,7 @@ import org.apache.ratis.util.Preconditions; import java.util.concurrent.CompletableFuture; public class PendingRequest implements Comparable<PendingRequest> { - private final Long index; + private final long index; private final RaftClientRequest request; private final TransactionContext entry; private final CompletableFuture<RaftClientReply> future; http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ee262d7e/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftConfiguration.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftConfiguration.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftConfiguration.java index 6ce7ecd..0034f4e 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftConfiguration.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftConfiguration.java @@ -156,6 +156,10 @@ public class RaftConfiguration { return oldConf != null && oldConf.contains(peerId); } + /** + * @return true iff the given peer is contained in conf and, + * if old conf exists, is contained in old conf. + */ boolean contains(RaftPeerId peerId) { return containsInConf(peerId) && (oldConf == null || containsInOldConf(peerId)); @@ -211,7 +215,7 @@ public class RaftConfiguration { @Override public String toString() { - return conf + (oldConf != null ? "old:" + oldConf : ""); + return conf + ", old=" + oldConf; } boolean hasNoChange(RaftPeer[] newMembers) { http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ee262d7e/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java index 9d7788e..9e9215b 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -91,6 +91,7 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerImpl(RaftPeerId id, RaftGroup group, RaftServerProxy proxy, RaftProperties properties) throws IOException { + LOG.debug("new RaftServerImpl {}, {}", id , group); this.groupId = group.getGroupId(); this.lifeCycle = new LifeCycle(id); minTimeoutMs = RaftServerConfigKeys.Rpc.timeoutMin(properties).toInt(TimeUnit.MILLISECONDS); @@ -150,15 +151,20 @@ public class RaftServerImpl implements RaftServerProtocol, return proxy.getServerRpc(); } + private void setRole(Role newRole) { + LOG.debug("{} changes role from {} to {}", getId(), this.role, newRole); + this.role = newRole; + } + void start() { lifeCycle.transition(STARTING); state.start(); RaftConfiguration conf = getRaftConf(); if (conf != null && conf.contains(getId())) { - LOG.debug("{} starts as a follower", getId()); + LOG.debug("{} starts as a follower, conf={}", getId(), conf); startAsFollower(); } else { - LOG.debug("{} starts with initializing state", getId()); + LOG.debug("{} starts with initializing state, conf={}", getId(), conf); startInitializing(); } registerMBean(); @@ -181,10 +187,8 @@ public class RaftServerImpl implements RaftServerProtocol, * The peer belongs to the current configuration, should start as a follower */ private void startAsFollower() { - role = Role.FOLLOWER; - heartbeatMonitor = new FollowerState(this); - heartbeatMonitor.start(); - + setRole(Role.FOLLOWER); + startHeartbeatMonitor(); lifeCycle.transition(RUNNING); } @@ -194,7 +198,7 @@ public class RaftServerImpl implements RaftServerProtocol, * start election. */ private void startInitializing() { - role = Role.FOLLOWER; + setRole(Role.FOLLOWER); // do not start heartbeatMonitoring } @@ -223,7 +227,7 @@ public class RaftServerImpl implements RaftServerProtocol, LOG.warn("Failed to shutdown election daemon for " + getId(), ignored); } try{ - shutdownLeaderState(); + shutdownLeaderState(true); } catch (Exception ignored) { LOG.warn("Failed to shutdown leader state monitor for " + getId(), ignored); } @@ -262,25 +266,16 @@ public class RaftServerImpl implements RaftServerProtocol, synchronized boolean changeToFollower(long newTerm, boolean sync) throws IOException { final Role old = role; - role = Role.FOLLOWER; - - boolean metadataUpdated = false; - if (newTerm > state.getCurrentTerm()) { - state.setCurrentTerm(newTerm); - state.resetLeaderAndVotedFor(); - metadataUpdated = true; - } - - if (old == Role.LEADER) { - assert leaderState != null; - shutdownLeaderState(); - } else if (old == Role.CANDIDATE) { - shutdownElectionDaemon(); - } + final boolean metadataUpdated = state.updateCurrentTerm(newTerm); if (old != Role.FOLLOWER) { - heartbeatMonitor = new FollowerState(this); - heartbeatMonitor.start(); + setRole(Role.FOLLOWER); + if (old == Role.LEADER) { + shutdownLeaderState(false); + } else if (old == Role.CANDIDATE) { + shutdownElectionDaemon(); + } + startHeartbeatMonitor(); } if (metadataUpdated && sync) { @@ -289,12 +284,15 @@ public class RaftServerImpl implements RaftServerProtocol, return metadataUpdated; } - private synchronized void shutdownLeaderState() { - final LeaderState leader = leaderState; - if (leader != null) { - leader.stop(); + private synchronized void shutdownLeaderState(boolean allowNull) { + if (leaderState == null) { + if (!allowNull) { + throw new NullPointerException("leaderState == null"); + } + } else { + leaderState.stop(); + leaderState = null; } - leaderState = null; // TODO: make sure that StateMachineUpdater has applied all transactions that have context } @@ -310,13 +308,20 @@ public class RaftServerImpl implements RaftServerProtocol, synchronized void changeToLeader() { Preconditions.assertTrue(isCandidate()); shutdownElectionDaemon(); - role = Role.LEADER; + setRole(Role.LEADER); state.becomeLeader(); // start sending AppendEntries RPC to followers leaderState = new LeaderState(this, getProxy().getProperties()); leaderState.start(); } + private void startHeartbeatMonitor() { + Preconditions.assertTrue(heartbeatMonitor == null, "heartbeatMonitor != null"); + LOG.debug("{} starts heartbeatMonitor", getId()); + heartbeatMonitor = new FollowerState(this); + heartbeatMonitor.start(); + } + private void shutdownHeartbeatMonitor() { final FollowerState hm = heartbeatMonitor; if (hm != null) { @@ -329,7 +334,7 @@ public class RaftServerImpl implements RaftServerProtocol, synchronized void changeToCandidate() { Preconditions.assertTrue(isFollower()); shutdownHeartbeatMonitor(); - role = Role.CANDIDATE; + setRole(Role.CANDIDATE); // start election electionDaemon = new LeaderElection(this); electionDaemon.start(); @@ -387,9 +392,9 @@ public class RaftServerImpl implements RaftServerProtocol, peers.toArray(new RaftPeer[peers.size()])); } - private void assertLifeCycleState(LifeCycle.State... expected) throws IOException { - lifeCycle.assertCurrentState((n, c) -> new IOException("Server " + n - + " is not " + Arrays.asList(expected) + ": current state is " + c), + private void assertLifeCycleState(LifeCycle.State... expected) throws ServerNotReadyException { + lifeCycle.assertCurrentState((n, c) -> new ServerNotReadyException("Server " + n + + " is not " + Arrays.toString(expected) + ": current state is " + c), expected); } @@ -534,9 +539,10 @@ public class RaftServerImpl implements RaftServerProtocol, "Reconfiguration is already in progress: " + current); } - // return true if the new configuration is the same with the current one + // return success with a null message if the new conf is the same as the current if (current.hasNoChange(peersInNewConf)) { - pending = leaderState.returnNoConfChange(request); + pending = new PendingRequest(request); + pending.setSuccessReply(null); return pending.getFuture(); } @@ -574,18 +580,20 @@ public class RaftServerImpl implements RaftServerProtocol, @Override public RequestVoteReplyProto requestVote(RequestVoteRequestProto r) throws IOException { - final RaftPeerId candidateId = - RaftPeerId.valueOf(r.getServerRequest().getRequestorId()); - return requestVote(candidateId, r.getCandidateTerm(), + final RaftRpcRequestProto request = r.getServerRequest(); + return requestVote(RaftPeerId.valueOf(request.getRequestorId()), + ProtoUtils.toRaftGroupId(request.getRaftGroupId()), + r.getCandidateTerm(), ServerProtoUtils.toTermIndex(r.getCandidateLastEntry())); } - private RequestVoteReplyProto requestVote(RaftPeerId candidateId, + private RequestVoteReplyProto requestVote( + RaftPeerId candidateId, RaftGroupId candidateGroupId, long candidateTerm, TermIndex candidateLastEntry) throws IOException { CodeInjectionForTesting.execute(REQUEST_VOTE, getId(), candidateId, candidateTerm, candidateLastEntry); - LOG.debug("{}: receive requestVote({}, {}, {})", - getId(), candidateId, candidateTerm, candidateLastEntry); + LOG.debug("{}: receive requestVote({}, {}, {}, {})", + getId(), candidateId, candidateGroupId, candidateTerm, candidateLastEntry); assertLifeCycleState(RUNNING); boolean voteGranted = false; @@ -655,11 +663,13 @@ public class RaftServerImpl implements RaftServerProtocol, public AppendEntriesReplyProto appendEntries(AppendEntriesRequestProto r) throws IOException { // TODO avoid converting list to array + final RaftRpcRequestProto request = r.getServerRequest(); final LogEntryProto[] entries = r.getEntriesList() .toArray(new LogEntryProto[r.getEntriesCount()]); final TermIndex previous = r.hasPreviousLog() ? ServerProtoUtils.toTermIndex(r.getPreviousLog()) : null; - return appendEntries(RaftPeerId.valueOf(r.getServerRequest().getRequestorId()), + return appendEntries(RaftPeerId.valueOf(request.getRequestorId()), + ProtoUtils.toRaftGroupId(request.getRaftGroupId()), r.getLeaderTerm(), previous, r.getLeaderCommit(), r.getInitializing(), entries); } @@ -676,14 +686,15 @@ public class RaftServerImpl implements RaftServerProtocol, } } - private AppendEntriesReplyProto appendEntries(RaftPeerId leaderId, long leaderTerm, + private AppendEntriesReplyProto appendEntries( + RaftPeerId leaderId, RaftGroupId leaderGroupId, long leaderTerm, TermIndex previous, long leaderCommit, boolean initializing, LogEntryProto... entries) throws IOException { CodeInjectionForTesting.execute(APPEND_ENTRIES, getId(), leaderId, leaderTerm, previous, leaderCommit, initializing, entries); final boolean isHeartbeat = entries.length == 0; logAppendEntries(isHeartbeat, - () -> getId() + ": receive appendEntries(" + leaderId + ", " + () -> getId() + ": receive appendEntries(" + leaderId + ", " + leaderGroupId + ", " + leaderTerm + ", " + previous + ", " + leaderCommit + ", " + initializing + ServerProtoUtils.toString(entries)); @@ -713,8 +724,7 @@ public class RaftServerImpl implements RaftServerProtocol, state.setLeader(leaderId); if (!initializing && lifeCycle.compareAndTransition(STARTING, RUNNING)) { - heartbeatMonitor = new FollowerState(this); - heartbeatMonitor.start(); + startHeartbeatMonitor(); } if (lifeCycle.getCurrentState() == RUNNING) { heartbeatMonitor.updateLastRpcTime(true); @@ -780,8 +790,8 @@ public class RaftServerImpl implements RaftServerProtocol, @Override public InstallSnapshotReplyProto installSnapshot( InstallSnapshotRequestProto request) throws IOException { - final RaftPeerId leaderId = RaftPeerId.valueOf( - request.getServerRequest().getRequestorId()); + final RaftRpcRequestProto r = request.getServerRequest(); + final RaftPeerId leaderId = RaftPeerId.valueOf(r.getRequestorId()); CodeInjectionForTesting.execute(INSTALL_SNAPSHOT, getId(), leaderId, request); LOG.debug("{}: receive installSnapshot({})", getId(), request); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ee262d7e/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java index 3c617f1..bcbab9a 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java @@ -81,13 +81,9 @@ public class ServerImplUtils { return Long.hashCode(term) ^ Long.hashCode(index); } - private static String toString(long n) { - return n < 0 ? "~" : "" + n; - } - @Override public String toString() { - return "(t:" + toString(term) + ", i:" + toString(index) + ")"; + return TermIndex.toString(term, index); } } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ee262d7e/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java index 845a6ca..193ac83 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java @@ -24,9 +24,11 @@ import java.util.List; import java.util.stream.Collectors; import org.apache.ratis.client.impl.ClientProtoUtils; +import org.apache.ratis.protocol.ClientId; import org.apache.ratis.protocol.RaftGroupId; import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.server.protocol.TermIndex; +import org.apache.ratis.shaded.com.google.protobuf.ByteString; import org.apache.ratis.shaded.proto.RaftProtos.*; import org.apache.ratis.shaded.proto.RaftProtos.AppendEntriesReplyProto.*; import org.apache.ratis.util.ProtoUtils; @@ -50,11 +52,22 @@ public class ServerProtoUtils { TermIndex.newTermIndex(entry.getTerm(), entry.getIndex()); } + public static String toTermIndexString(LogEntryProto entry) { + return TermIndex.toString(entry.getTerm(), entry.getIndex()); + } + + private static String toLogEntryString(LogEntryProto entry) { + final ByteString clientId = entry.getClientId(); + return toTermIndexString(entry) + entry.getLogEntryBodyCase() + + ", " + (clientId.isEmpty()? "<empty clientId>": new ClientId(clientId)) + + ", callId=" + entry.getCallId(); + } + public static String toString(LogEntryProto... entries) { return entries == null? "null" : entries.length == 0 ? "[]" - : entries.length == 1? "" + toTermIndex(entries[0]) - : "" + Arrays.stream(entries).map(ServerProtoUtils::toTermIndex) + : entries.length == 1? toLogEntryString(entries[0]) + : "" + Arrays.stream(entries).map(ServerProtoUtils::toLogEntryString) .collect(Collectors.toList()); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ee262d7e/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java index 5db3509..e83a931 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java @@ -165,8 +165,14 @@ public class ServerState implements Closeable { return currentTerm; } - void setCurrentTerm(long term) { - currentTerm = term; + boolean updateCurrentTerm(long newTerm) { + if (newTerm > currentTerm) { + currentTerm = newTerm; + votedFor = null; + leaderId = null; + return true; + } + return false; } RaftPeerId getLeaderId() { @@ -190,11 +196,6 @@ public class ServerState implements Closeable { this.log.writeMetadata(currentTerm, votedFor); } - void resetLeaderAndVotedFor() { - votedFor = null; - leaderId = null; - } - /** * Vote for a candidate and update the local state. */ http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ee262d7e/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java index b9ed6a5..7ed6731 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java @@ -137,13 +137,18 @@ class StateMachineUpdater implements Runnable { } while (lastAppliedIndex < committedIndex) { - final LogEntryProto next = raftLog.get(lastAppliedIndex + 1); + final long nextIndex = lastAppliedIndex + 1; + final LogEntryProto next = raftLog.get(nextIndex); if (next != null) { + if (LOG.isDebugEnabled()) { + LOG.debug("{}: applying nextIndex={}, nextLog={}", + this, nextIndex, ServerProtoUtils.toString(next)); + } server.applyLogToStateMachine(next); - lastAppliedIndex++; + lastAppliedIndex = nextIndex; } else { LOG.debug("{}: logEntry {} is null. There may be snapshot to load. state:{}", - this, lastAppliedIndex + 1, state); + this, nextIndex, state); break; } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ee262d7e/ratis-server/src/main/java/org/apache/ratis/server/protocol/TermIndex.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/protocol/TermIndex.java b/ratis-server/src/main/java/org/apache/ratis/server/protocol/TermIndex.java index a16110f..fed14a3 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/protocol/TermIndex.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/protocol/TermIndex.java @@ -19,6 +19,8 @@ package org.apache.ratis.server.protocol; import org.apache.ratis.server.impl.ServerImplUtils; +import java.util.function.LongFunction; + /** The term and the log index defined in the Raft consensus algorithm. */ public interface TermIndex extends Comparable<TermIndex> { TermIndex[] EMPTY_TERMINDEX_ARRAY = {}; @@ -33,6 +35,14 @@ public interface TermIndex extends Comparable<TermIndex> { static TermIndex newTermIndex(long term, long index) { return ServerImplUtils.newTermIndex(term, index); } + + LongFunction<String> LONG_TO_STRING = n -> n >= 0L? String.valueOf(n): "~"; + + /** @return a string representing the given term and index. */ + static String toString(long term, long index) { + return String.format("(t:%s, i:%s)", + LONG_TO_STRING.apply(term), LONG_TO_STRING.apply(index)); + } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ee262d7e/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java index 1dc8ae1..0a90d91 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java @@ -44,7 +44,7 @@ import org.slf4j.LoggerFactory; * raft peer. */ class RaftLogWorker implements Runnable { - static final Logger LOG = RaftServerImpl.LOG; + static final Logger LOG = LoggerFactory.getLogger(RaftLogWorker.class); /** * The task queue accessed by rpc handler threads and the io worker thread. */ http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ee262d7e/ratis-server/src/main/java/org/apache/ratis/statemachine/SimpleStateMachineStorage.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/statemachine/SimpleStateMachineStorage.java b/ratis-server/src/main/java/org/apache/ratis/statemachine/SimpleStateMachineStorage.java index 5a34271..05c5337 100644 --- a/ratis-server/src/main/java/org/apache/ratis/statemachine/SimpleStateMachineStorage.java +++ b/ratis-server/src/main/java/org/apache/ratis/statemachine/SimpleStateMachineStorage.java @@ -52,6 +52,7 @@ public class SimpleStateMachineStorage implements StateMachineStorage { private volatile SingleFileSnapshotInfo currentSnapshot = null; + @Override public void init(RaftStorage raftStorage) throws IOException { this.raftStorage = raftStorage; this.smDir = raftStorage.getStorageDir().getStateMachineDir(); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ee262d7e/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java index f003127..31d16fe 100644 --- a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java +++ b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java @@ -28,7 +28,8 @@ import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.rpc.RpcType; import org.apache.ratis.server.RaftServer; import org.apache.ratis.server.RaftServerConfigKeys; -import org.apache.ratis.server.impl.*; +import org.apache.ratis.server.impl.RaftServerImpl; +import org.apache.ratis.server.impl.RaftServerProxy; import org.apache.ratis.server.storage.MemoryRaftLog; import org.apache.ratis.server.storage.RaftLog; import org.apache.ratis.statemachine.BaseStateMachine; @@ -49,10 +50,6 @@ import java.util.stream.StreamSupport; public abstract class MiniRaftCluster { public static final Logger LOG = LoggerFactory.getLogger(MiniRaftCluster.class); - public static final DelayLocalExecutionInjection logSyncDelay = - new DelayLocalExecutionInjection(RaftLog.LOG_SYNC); - public static final DelayLocalExecutionInjection leaderPlaceHolderDelay = - new DelayLocalExecutionInjection(LeaderState.APPEND_PLACEHOLDER); public static final String CLASS_NAME = MiniRaftCluster.class.getSimpleName(); public static final String STATEMACHINE_CLASS_KEY = CLASS_NAME + ".statemachine.class"; @@ -421,6 +418,10 @@ public abstract class MiniRaftCluster { return group; } + public RaftClient createClient() { + return createClient(null, group); + } + public RaftClient createClient(RaftPeerId leaderId) { return createClient(leaderId, group); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ee262d7e/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java index f1d640a..34ade52 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java @@ -45,8 +45,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import static java.util.Arrays.asList; -import static org.apache.ratis.MiniRaftCluster.leaderPlaceHolderDelay; -import static org.apache.ratis.MiniRaftCluster.logSyncDelay; import static org.apache.ratis.server.impl.RaftServerConstants.DEFAULT_CALLID; import static org.apache.ratis.server.impl.RaftServerTestUtil.waitAndCheckNewConf; import static org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto.LogEntryBodyCase.CONFIGURATIONENTRY; @@ -57,6 +55,11 @@ public abstract class RaftReconfigurationBaseTest extends BaseTest { LogUtils.setLogLevel(RaftClient.LOG, Level.DEBUG); } + private static final DelayLocalExecutionInjection logSyncDelay = + new DelayLocalExecutionInjection(RaftLog.LOG_SYNC); + private static final DelayLocalExecutionInjection leaderPlaceHolderDelay = + new DelayLocalExecutionInjection(LeaderState.APPEND_PLACEHOLDER); + protected static final RaftProperties prop = new RaftProperties(); private static final ClientId clientId = ClientId.createId();
