This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new f66530cd RATIS-1631. Add Raftpeer Info for NotifySnapshotInstalled
(#689)
f66530cd is described below
commit f66530cde1449d5a2cc3d20633263e5f1109cf94
Author: Nibiru <[email protected]>
AuthorDate: Fri Jul 22 03:19:33 2022 +0800
RATIS-1631. Add Raftpeer Info for NotifySnapshotInstalled (#689)
---
.../java/org/apache/ratis/grpc/server/GrpcLogAppender.java | 10 ++++++----
.../java/org/apache/ratis/statemachine/StateMachine.java | 6 +++++-
.../java/org/apache/ratis/server/impl/RaftServerImpl.java | 3 ++-
.../ratis/server/impl/SnapshotInstallationHandler.java | 4 ++--
.../org/apache/ratis/InstallSnapshotNotificationTests.java | 13 +++++++------
5 files changed, 22 insertions(+), 14 deletions(-)
diff --git
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
index 4b5751fd..69a3795c 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
@@ -22,6 +22,7 @@ import org.apache.ratis.grpc.GrpcConfigKeys;
import org.apache.ratis.grpc.GrpcUtil;
import org.apache.ratis.grpc.metrics.GrpcServerMetrics;
import org.apache.ratis.proto.RaftProtos.InstallSnapshotResult;
+import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
@@ -435,12 +436,12 @@ public class GrpcLogAppender extends LogAppenderBase {
if (followerNextIndex >= leaderStartIndex) {
LOG.info("{}: Follower can catch up leader after install the snapshot,
as leader's start index is {}",
this, followerNextIndex);
- notifyInstallSnapshotFinished(InstallSnapshotResult.SUCCESS,
followerSnapshotIndex);
+ notifyInstallSnapshotFinished(InstallSnapshotResult.SUCCESS,
followerSnapshotIndex, getFollower().getPeer());
}
}
- void notifyInstallSnapshotFinished(InstallSnapshotResult result, long
snapshotIndex) {
- getServer().getStateMachine().event().notifySnapshotInstalled(result,
snapshotIndex);
+ void notifyInstallSnapshotFinished(InstallSnapshotResult result, long
snapshotIndex, RaftPeer peer) {
+ getServer().getStateMachine().event().notifySnapshotInstalled(result,
snapshotIndex, peer);
}
boolean isDone() {
@@ -512,7 +513,8 @@ public class GrpcLogAppender extends LogAppenderBase {
case SNAPSHOT_UNAVAILABLE:
LOG.info("{}: Follower could not install snapshot as it is not
available.", this);
getFollower().setAttemptedToInstallSnapshot();
-
notifyInstallSnapshotFinished(InstallSnapshotResult.SNAPSHOT_UNAVAILABLE,
RaftLog.INVALID_LOG_INDEX);
+
notifyInstallSnapshotFinished(InstallSnapshotResult.SNAPSHOT_UNAVAILABLE,
RaftLog.INVALID_LOG_INDEX,
+ getFollower().getPeer());
removePending(reply);
break;
case UNRECOGNIZED:
diff --git
a/ratis-server-api/src/main/java/org/apache/ratis/statemachine/StateMachine.java
b/ratis-server-api/src/main/java/org/apache/ratis/statemachine/StateMachine.java
index 6754d50b..79f8818d 100644
---
a/ratis-server-api/src/main/java/org/apache/ratis/statemachine/StateMachine.java
+++
b/ratis-server-api/src/main/java/org/apache/ratis/statemachine/StateMachine.java
@@ -190,8 +190,12 @@ public interface StateMachine extends Closeable {
/**
* Notify the {@link StateMachine} that the progress of install snapshot is
* completely done. Could trigger the cleanup of snapshots.
+ *
+ * @param result {@link InstallSnapshotResult}
+ * @param snapshotIndex the index of installed snapshot
+ * @param peer the peer who installed the snapshot
*/
- default void notifySnapshotInstalled(InstallSnapshotResult result, long
snapshotIndex) {}
+ default void notifySnapshotInstalled(InstallSnapshotResult result, long
snapshotIndex, RaftPeer peer) {}
/**
* Notify the {@link StateMachine} that a raft server has step down.
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index afab42cf..fe9f87ce 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -1409,7 +1409,8 @@ class RaftServerImpl implements RaftServer.Division,
final long installedIndex =
snapshotInstallationHandler.getInstalledIndex();
if (installedIndex >= RaftLog.LEAST_VALID_LOG_INDEX) {
LOG.info("{}: Follower has completed install the snapshot {}.", this,
installedIndex);
-
stateMachine.event().notifySnapshotInstalled(InstallSnapshotResult.SUCCESS,
installedIndex);
+
stateMachine.event().notifySnapshotInstalled(InstallSnapshotResult.SUCCESS,
installedIndex,
+ getRaftServer().getPeer());
}
}
return JavaUtils.allOf(futures).whenCompleteAsync(
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java
index 18f3b542..2969ca31 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java
@@ -305,7 +305,7 @@ class SnapshotInstallationHandler {
InstallSnapshotResult.SNAPSHOT_UNAVAILABLE);
inProgressInstallSnapshotIndex.set(INVALID_LOG_INDEX);
server.getStateMachine().event().notifySnapshotInstalled(
- InstallSnapshotResult.SNAPSHOT_UNAVAILABLE, INVALID_LOG_INDEX);
+ InstallSnapshotResult.SNAPSHOT_UNAVAILABLE, INVALID_LOG_INDEX,
server.getRaftServer().getPeer());
return ServerProtoUtils.toInstallSnapshotReplyProto(leaderId,
getMemberId(),
currentTerm, InstallSnapshotResult.SNAPSHOT_UNAVAILABLE, -1);
}
@@ -323,7 +323,7 @@ class SnapshotInstallationHandler {
inProgressInstallSnapshotIndex.set(INVALID_LOG_INDEX);
final long latestInstalledIndex =
latestInstalledSnapshotTermIndex.getIndex();
server.getStateMachine().event().notifySnapshotInstalled(
- InstallSnapshotResult.SNAPSHOT_INSTALLED, latestInstalledIndex);
+ InstallSnapshotResult.SNAPSHOT_INSTALLED, latestInstalledIndex,
server.getRaftServer().getPeer());
installedIndex.set(latestInstalledIndex);
return ServerProtoUtils.toInstallSnapshotReplyProto(leaderId,
getMemberId(),
currentTerm, InstallSnapshotResult.SNAPSHOT_INSTALLED,
latestInstalledSnapshotTermIndex.getIndex());
diff --git
a/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotNotificationTests.java
b/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotNotificationTests.java
index 592a7ed1..4476f3ec 100644
---
a/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotNotificationTests.java
+++
b/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotNotificationTests.java
@@ -22,6 +22,7 @@ import org.apache.ratis.client.RaftClient;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.proto.RaftProtos;
import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
@@ -123,7 +124,7 @@ public abstract class
InstallSnapshotNotificationTests<CLUSTER extends MiniRaftC
}
@Override
- public void notifySnapshotInstalled(RaftProtos.InstallSnapshotResult
result, long installIndex) {
+ public void notifySnapshotInstalled(RaftProtos.InstallSnapshotResult
result, long installIndex, RaftPeer peer) {
if (result != RaftProtos.InstallSnapshotResult.SUCCESS &&
result != RaftProtos.InstallSnapshotResult.SNAPSHOT_UNAVAILABLE) {
return;
@@ -134,14 +135,14 @@ public abstract class
InstallSnapshotNotificationTests<CLUSTER extends MiniRaftC
synchronized (this) {
try {
if
(getServer().get().getDivision(this.getGroupId()).getInfo().isLeader()) {
- LOG.info("Receive the notification to clean up snapshot as leader
for {}", result);
+ LOG.info("Receive the notification to clean up snapshot as leader
for {}, result: {}", peer, result);
if (leaderSnapshotFile.exists()) {
// For test purpose, we do not delete the leader's snapshot
actually, which could be
// sent to more than one peer during the test
LOG.info("leader snapshot {} existed", leaderSnapshotFile);
}
} else {
- LOG.info("Receive the notification to clean up snapshot as
follower for {}", result);
+ LOG.info("Receive the notification to clean up snapshot as
follower for {}, result: {}", peer, result);
File followerSnapshotFile = new File(getSMdir(),
leaderSnapshotFile.getName());
if (followerSnapshotFile.exists()) {
FileUtils.deleteFile(followerSnapshotFile);
@@ -398,11 +399,11 @@ public abstract class
InstallSnapshotNotificationTests<CLUSTER extends MiniRaftC
}
@Test
- public void testInstallSnapshotFinishedEvent() throws Exception{
- runWithNewCluster(1, this::testInstallSnapshotFinishedEvent);
+ public void testInstallSnapshotInstalledEvent() throws Exception{
+ runWithNewCluster(1, this::testInstallSnapshotInstalledEvent);
}
- private void testInstallSnapshotFinishedEvent(CLUSTER cluster) throws
Exception{
+ private void testInstallSnapshotInstalledEvent(CLUSTER cluster) throws
Exception{
leaderSnapshotInfoRef.set(null);
numNotifyInstallSnapshotFinished.set(0);
final List<LogSegmentPath> logs;