This is an automated email from the ASF dual-hosted git repository.
msingh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 300d9c5 RATIS-564. Add raft group Id to install snapshot
notification. Contributed by Siddharth Wagle.
300d9c5 is described below
commit 300d9c53c4b77e16ddd5d68dcf6c792d52a94a02
Author: Mukul Kumar Singh <[email protected]>
AuthorDate: Wed Jun 5 12:57:16 2019 +0530
RATIS-564. Add raft group Id to install snapshot notification. Contributed
by Siddharth Wagle.
---
.../main/java/org/apache/ratis/server/impl/LogAppender.java | 2 +-
.../java/org/apache/ratis/server/impl/RaftServerImpl.java | 4 ++--
.../java/org/apache/ratis/statemachine/StateMachine.java | 12 ++++++------
.../ratis/statemachine/SimpleStateMachine4Testing.java | 11 +++++++----
.../org/apache/ratis/grpc/TestInstallSnapshotWithGrpc.java | 6 +++++-
5 files changed, 21 insertions(+), 14 deletions(-)
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
index d5a6ec0..bbc97a6 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
@@ -525,7 +525,7 @@ public class LogAppender {
protected void checkSlowness() {
if (follower.isSlow()) {
- server.getStateMachine().notifySlowness(server.getGroup(),
server.getRoleInfoProto());
+ server.getStateMachine().notifySlowness(server.getRoleInfoProto());
}
}
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 200ac63..6bfd7a1 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -405,7 +405,7 @@ public class RaftServerImpl implements RaftServerProtocol,
RaftServerAsynchronou
role.shutdownFollowerState();
setRole(RaftPeerRole.CANDIDATE, "changeToCandidate");
if (state.checkForExtendedNoLeader()) {
- stateMachine.notifyExtendedNoLeader(getGroup(), getRoleInfoProto());
+ stateMachine.notifyExtendedNoLeader(getRoleInfoProto());
}
// start election
role.startLeaderElection(this);
@@ -1138,7 +1138,7 @@ public class RaftServerImpl implements
RaftServerProtocol, RaftServerAsynchronou
"index is {} but the leader's first available log index is
{}.",
getId(), state.getLog().getNextIndex(), firstAvailableLogIndex);
-
stateMachine.notifyInstallSnapshotFromLeader(firstAvailableLogTermIndex)
+ stateMachine.notifyInstallSnapshotFromLeader(getRoleInfoProto(),
firstAvailableLogTermIndex)
.whenComplete((reply, exception) -> {
if (exception != null) {
LOG.error(getId() + ": State Machine failed to install
snapshot", exception);
diff --git
a/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java
b/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java
index ab7897c..6d2ff5e 100644
--- a/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java
+++ b/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java
@@ -19,7 +19,6 @@ package org.apache.ratis.statemachine;
import org.apache.ratis.protocol.Message;
import org.apache.ratis.protocol.RaftClientRequest;
-import org.apache.ratis.protocol.RaftGroup;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
@@ -215,10 +214,9 @@ public interface StateMachine extends Closeable {
* Notify the Leader's state machine that one of the followers is slow
* this notification is based on "raft.server.rpc.slowness.timeout"
*
- * @param group raft group information
* @param roleInfoProto information about the current node role and rpc
delay information
*/
- default void notifySlowness(RaftGroup group, RoleInfoProto roleInfoProto) {
+ default void notifySlowness(RoleInfoProto roleInfoProto) {
}
@@ -226,10 +224,9 @@ public interface StateMachine extends Closeable {
* Notify the Leader's state machine that a leader has not been elected for
a long time
* this notification is based on "raft.server.leader.election.timeout"
*
- * @param group raft group information
* @param roleInfoProto information about the current node role and rpc
delay information
*/
- default void notifyExtendedNoLeader(RaftGroup group, RoleInfoProto
roleInfoProto) {
+ default void notifyExtendedNoLeader(RoleInfoProto roleInfoProto) {
}
@@ -260,10 +257,13 @@ public interface StateMachine extends Closeable {
* to install the latest snapshot.
* @param firstTermIndexInLog TermIndex of the first append entry available
* in the Leader's log.
+ * @param roleInfoProto information about the current node role and
+ * rpc delay information
* @return After the snapshot installation is complete, return the last
* included term index in the snapshot.
*/
- default CompletableFuture<TermIndex>
notifyInstallSnapshotFromLeader(TermIndex firstTermIndexInLog) {
+ default CompletableFuture<TermIndex> notifyInstallSnapshotFromLeader(
+ RoleInfoProto roleInfoProto, TermIndex firstTermIndexInLog) {
return CompletableFuture.completedFuture(null);
}
}
diff --git
a/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
b/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
index 0b6491e..3594429 100644
---
a/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
+++
b/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
@@ -159,6 +159,8 @@ public class SimpleStateMachine4Testing extends
BaseStateMachine {
private volatile RoleInfoProto slownessInfo = null;
private volatile RoleInfoProto leaderElectionTimeoutInfo = null;
+ private RaftGroupId groupId;
+
public SimpleStateMachine4Testing() {
checkpointer = new Daemon(() -> {
while (running) {
@@ -200,6 +202,7 @@ public class SimpleStateMachine4Testing extends
BaseStateMachine {
public synchronized void initialize(RaftServer server, RaftGroupId groupId,
RaftStorage raftStorage) throws IOException {
LOG.info("Initializing " + this);
+ this.groupId = groupId;
lifeCycle.startAndTransition(() -> {
super.initialize(server, groupId, raftStorage);
storage.init(raftStorage);
@@ -402,14 +405,14 @@ public class SimpleStateMachine4Testing extends
BaseStateMachine {
}
@Override
- public void notifySlowness(RaftGroup group, RoleInfoProto roleInfoProto) {
- LOG.info("{}: notifySlowness {}, {}", this, group, roleInfoProto);
+ public void notifySlowness(RoleInfoProto roleInfoProto) {
+ LOG.info("{}: notifySlowness {}, {}", this, groupId, roleInfoProto);
slownessInfo = roleInfoProto;
}
@Override
- public void notifyExtendedNoLeader(RaftGroup group, RoleInfoProto
roleInfoProto) {
- LOG.info("{}: notifyExtendedNoLeader {}, {}", this, group, roleInfoProto);
+ public void notifyExtendedNoLeader(RoleInfoProto roleInfoProto) {
+ LOG.info("{}: notifyExtendedNoLeader {}, {}", this, groupId,
roleInfoProto);
leaderElectionTimeoutInfo = roleInfoProto;
}
diff --git
a/ratis-test/src/test/java/org/apache/ratis/grpc/TestInstallSnapshotWithGrpc.java
b/ratis-test/src/test/java/org/apache/ratis/grpc/TestInstallSnapshotWithGrpc.java
index 8dc94d9..6e469b6 100644
---
a/ratis-test/src/test/java/org/apache/ratis/grpc/TestInstallSnapshotWithGrpc.java
+++
b/ratis-test/src/test/java/org/apache/ratis/grpc/TestInstallSnapshotWithGrpc.java
@@ -22,7 +22,9 @@ import org.apache.ratis.MiniRaftCluster;
import org.apache.ratis.RaftTestUtil;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.proto.RaftProtos;
import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.RaftGroup;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.impl.RaftServerImpl;
@@ -93,7 +95,9 @@ public class TestInstallSnapshotWithGrpc {
private static class StateMachineForGRpcTest extends
SimpleStateMachine4Testing {
@Override
- public CompletableFuture<TermIndex>
notifyInstallSnapshotFromLeader(TermIndex termIndex) {
+ public CompletableFuture<TermIndex> notifyInstallSnapshotFromLeader(
+ RaftProtos.RoleInfoProto roleInfoProto,
+ TermIndex termIndex) {
try {
Path leaderSnapshotFile = leaderSnapshotInfo.getFile().getPath();
File followerSnapshotFilePath = new File(getSMdir(),