This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new f66530cd RATIS-1631. Add Raftpeer Info for NotifySnapshotInstalled 
(#689)
f66530cd is described below

commit f66530cde1449d5a2cc3d20633263e5f1109cf94
Author: Nibiru <[email protected]>
AuthorDate: Fri Jul 22 03:19:33 2022 +0800

    RATIS-1631. Add Raftpeer Info for NotifySnapshotInstalled (#689)
---
 .../java/org/apache/ratis/grpc/server/GrpcLogAppender.java  | 10 ++++++----
 .../java/org/apache/ratis/statemachine/StateMachine.java    |  6 +++++-
 .../java/org/apache/ratis/server/impl/RaftServerImpl.java   |  3 ++-
 .../ratis/server/impl/SnapshotInstallationHandler.java      |  4 ++--
 .../org/apache/ratis/InstallSnapshotNotificationTests.java  | 13 +++++++------
 5 files changed, 22 insertions(+), 14 deletions(-)

diff --git 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
index 4b5751fd..69a3795c 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
@@ -22,6 +22,7 @@ import org.apache.ratis.grpc.GrpcConfigKeys;
 import org.apache.ratis.grpc.GrpcUtil;
 import org.apache.ratis.grpc.metrics.GrpcServerMetrics;
 import org.apache.ratis.proto.RaftProtos.InstallSnapshotResult;
+import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.RaftServerConfigKeys;
@@ -435,12 +436,12 @@ public class GrpcLogAppender extends LogAppenderBase {
       if (followerNextIndex >= leaderStartIndex) {
         LOG.info("{}: Follower can catch up leader after install the snapshot, 
as leader's start index is {}",
             this, followerNextIndex);
-        notifyInstallSnapshotFinished(InstallSnapshotResult.SUCCESS, 
followerSnapshotIndex);
+        notifyInstallSnapshotFinished(InstallSnapshotResult.SUCCESS, 
followerSnapshotIndex, getFollower().getPeer());
       }
     }
 
-    void notifyInstallSnapshotFinished(InstallSnapshotResult result, long 
snapshotIndex) {
-      getServer().getStateMachine().event().notifySnapshotInstalled(result, 
snapshotIndex);
+    void notifyInstallSnapshotFinished(InstallSnapshotResult result, long 
snapshotIndex, RaftPeer peer) {
+      getServer().getStateMachine().event().notifySnapshotInstalled(result, 
snapshotIndex, peer);
     }
 
     boolean isDone() {
@@ -512,7 +513,8 @@ public class GrpcLogAppender extends LogAppenderBase {
         case SNAPSHOT_UNAVAILABLE:
           LOG.info("{}: Follower could not install snapshot as it is not 
available.", this);
           getFollower().setAttemptedToInstallSnapshot();
-          
notifyInstallSnapshotFinished(InstallSnapshotResult.SNAPSHOT_UNAVAILABLE, 
RaftLog.INVALID_LOG_INDEX);
+          
notifyInstallSnapshotFinished(InstallSnapshotResult.SNAPSHOT_UNAVAILABLE, 
RaftLog.INVALID_LOG_INDEX,
+              getFollower().getPeer());
           removePending(reply);
           break;
         case UNRECOGNIZED:
diff --git 
a/ratis-server-api/src/main/java/org/apache/ratis/statemachine/StateMachine.java
 
b/ratis-server-api/src/main/java/org/apache/ratis/statemachine/StateMachine.java
index 6754d50b..79f8818d 100644
--- 
a/ratis-server-api/src/main/java/org/apache/ratis/statemachine/StateMachine.java
+++ 
b/ratis-server-api/src/main/java/org/apache/ratis/statemachine/StateMachine.java
@@ -190,8 +190,12 @@ public interface StateMachine extends Closeable {
     /**
      * Notify the {@link StateMachine} that the progress of install snapshot is
      * completely done. Could trigger the cleanup of snapshots.
+     *
+     * @param result {@link InstallSnapshotResult}
+     * @param snapshotIndex the index of installed snapshot
+     * @param peer the peer who installed the snapshot
      */
-    default void notifySnapshotInstalled(InstallSnapshotResult result, long 
snapshotIndex) {}
+    default void notifySnapshotInstalled(InstallSnapshotResult result, long 
snapshotIndex,  RaftPeer peer) {}
 
     /**
      * Notify the {@link StateMachine} that a raft server has step down.
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index afab42cf..fe9f87ce 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -1409,7 +1409,8 @@ class RaftServerImpl implements RaftServer.Division,
       final long installedIndex = 
snapshotInstallationHandler.getInstalledIndex();
       if (installedIndex >= RaftLog.LEAST_VALID_LOG_INDEX) {
         LOG.info("{}: Follower has completed install the snapshot {}.", this, 
installedIndex);
-        
stateMachine.event().notifySnapshotInstalled(InstallSnapshotResult.SUCCESS, 
installedIndex);
+        
stateMachine.event().notifySnapshotInstalled(InstallSnapshotResult.SUCCESS, 
installedIndex,
+            getRaftServer().getPeer());
       }
     }
     return JavaUtils.allOf(futures).whenCompleteAsync(
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java
index 18f3b542..2969ca31 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java
@@ -305,7 +305,7 @@ class SnapshotInstallationHandler {
             InstallSnapshotResult.SNAPSHOT_UNAVAILABLE);
         inProgressInstallSnapshotIndex.set(INVALID_LOG_INDEX);
         server.getStateMachine().event().notifySnapshotInstalled(
-            InstallSnapshotResult.SNAPSHOT_UNAVAILABLE, INVALID_LOG_INDEX);
+            InstallSnapshotResult.SNAPSHOT_UNAVAILABLE, INVALID_LOG_INDEX, 
server.getRaftServer().getPeer());
         return ServerProtoUtils.toInstallSnapshotReplyProto(leaderId, 
getMemberId(),
             currentTerm, InstallSnapshotResult.SNAPSHOT_UNAVAILABLE, -1);
       }
@@ -323,7 +323,7 @@ class SnapshotInstallationHandler {
         inProgressInstallSnapshotIndex.set(INVALID_LOG_INDEX);
         final long latestInstalledIndex = 
latestInstalledSnapshotTermIndex.getIndex();
         server.getStateMachine().event().notifySnapshotInstalled(
-            InstallSnapshotResult.SNAPSHOT_INSTALLED, latestInstalledIndex);
+            InstallSnapshotResult.SNAPSHOT_INSTALLED, latestInstalledIndex, 
server.getRaftServer().getPeer());
         installedIndex.set(latestInstalledIndex);
         return ServerProtoUtils.toInstallSnapshotReplyProto(leaderId, 
getMemberId(),
             currentTerm, InstallSnapshotResult.SNAPSHOT_INSTALLED, 
latestInstalledSnapshotTermIndex.getIndex());
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotNotificationTests.java
 
b/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotNotificationTests.java
index 592a7ed1..4476f3ec 100644
--- 
a/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotNotificationTests.java
+++ 
b/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotNotificationTests.java
@@ -22,6 +22,7 @@ import org.apache.ratis.client.RaftClient;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.proto.RaftProtos;
 import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.RaftServerConfigKeys;
@@ -123,7 +124,7 @@ public abstract class 
InstallSnapshotNotificationTests<CLUSTER extends MiniRaftC
     }
 
     @Override
-    public void notifySnapshotInstalled(RaftProtos.InstallSnapshotResult 
result, long installIndex) {
+    public void notifySnapshotInstalled(RaftProtos.InstallSnapshotResult 
result, long installIndex, RaftPeer peer) {
       if (result != RaftProtos.InstallSnapshotResult.SUCCESS &&
           result != RaftProtos.InstallSnapshotResult.SNAPSHOT_UNAVAILABLE) {
         return;
@@ -134,14 +135,14 @@ public abstract class 
InstallSnapshotNotificationTests<CLUSTER extends MiniRaftC
       synchronized (this) {
         try {
           if 
(getServer().get().getDivision(this.getGroupId()).getInfo().isLeader()) {
-            LOG.info("Receive the notification to clean up snapshot as leader 
for {}", result);
+            LOG.info("Receive the notification to clean up snapshot as leader 
for {}, result: {}", peer, result);
             if (leaderSnapshotFile.exists()) {
               // For test purpose, we do not delete the leader's snapshot 
actually, which could be
               // sent to more than one peer during the test
               LOG.info("leader snapshot {} existed", leaderSnapshotFile);
             }
           } else {
-            LOG.info("Receive the notification to clean up snapshot as 
follower for {}", result);
+            LOG.info("Receive the notification to clean up snapshot as 
follower for {}, result: {}", peer, result);
             File followerSnapshotFile = new File(getSMdir(), 
leaderSnapshotFile.getName());
             if (followerSnapshotFile.exists()) {
               FileUtils.deleteFile(followerSnapshotFile);
@@ -398,11 +399,11 @@ public abstract class 
InstallSnapshotNotificationTests<CLUSTER extends MiniRaftC
   }
 
   @Test
-  public void testInstallSnapshotFinishedEvent() throws Exception{
-    runWithNewCluster(1, this::testInstallSnapshotFinishedEvent);
+  public void testInstallSnapshotInstalledEvent() throws Exception{
+    runWithNewCluster(1, this::testInstallSnapshotInstalledEvent);
   }
 
-  private void testInstallSnapshotFinishedEvent(CLUSTER cluster) throws 
Exception{
+  private void testInstallSnapshotInstalledEvent(CLUSTER cluster) throws 
Exception{
     leaderSnapshotInfoRef.set(null);
     numNotifyInstallSnapshotFinished.set(0);
     final List<LogSegmentPath> logs;

Reply via email to