This is an automated email from the ASF dual-hosted git repository. szetszwo pushed a commit to branch branch-2 in repository https://gitbox.apache.org/repos/asf/ratis.git
commit 0d723e4ee0b179aa16763b92117dc1dbc9696b4b Author: Nibiru <[email protected]> AuthorDate: Fri Jul 22 03:19:33 2022 +0800 RATIS-1631. Add Raftpeer Info for NotifySnapshotInstalled (#689) (cherry picked from commit f66530cde1449d5a2cc3d20633263e5f1109cf94) --- .../java/org/apache/ratis/grpc/server/GrpcLogAppender.java | 10 ++++++---- .../java/org/apache/ratis/statemachine/StateMachine.java | 6 +++++- .../java/org/apache/ratis/server/impl/RaftServerImpl.java | 3 ++- .../ratis/server/impl/SnapshotInstallationHandler.java | 4 ++-- .../org/apache/ratis/InstallSnapshotNotificationTests.java | 13 +++++++------ 5 files changed, 22 insertions(+), 14 deletions(-) diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java index 4b5751fd..69a3795c 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java @@ -22,6 +22,7 @@ import org.apache.ratis.grpc.GrpcConfigKeys; import org.apache.ratis.grpc.GrpcUtil; import org.apache.ratis.grpc.metrics.GrpcServerMetrics; import org.apache.ratis.proto.RaftProtos.InstallSnapshotResult; +import org.apache.ratis.protocol.RaftPeer; import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.server.RaftServer; import org.apache.ratis.server.RaftServerConfigKeys; @@ -435,12 +436,12 @@ public class GrpcLogAppender extends LogAppenderBase { if (followerNextIndex >= leaderStartIndex) { LOG.info("{}: Follower can catch up leader after install the snapshot, as leader's start index is {}", this, followerNextIndex); - notifyInstallSnapshotFinished(InstallSnapshotResult.SUCCESS, followerSnapshotIndex); + notifyInstallSnapshotFinished(InstallSnapshotResult.SUCCESS, followerSnapshotIndex, getFollower().getPeer()); } } - void notifyInstallSnapshotFinished(InstallSnapshotResult result, long snapshotIndex) { - getServer().getStateMachine().event().notifySnapshotInstalled(result, snapshotIndex); + void notifyInstallSnapshotFinished(InstallSnapshotResult result, long snapshotIndex, RaftPeer peer) { + getServer().getStateMachine().event().notifySnapshotInstalled(result, snapshotIndex, peer); } boolean isDone() { @@ -512,7 +513,8 @@ public class GrpcLogAppender extends LogAppenderBase { case SNAPSHOT_UNAVAILABLE: LOG.info("{}: Follower could not install snapshot as it is not available.", this); getFollower().setAttemptedToInstallSnapshot(); - notifyInstallSnapshotFinished(InstallSnapshotResult.SNAPSHOT_UNAVAILABLE, RaftLog.INVALID_LOG_INDEX); + notifyInstallSnapshotFinished(InstallSnapshotResult.SNAPSHOT_UNAVAILABLE, RaftLog.INVALID_LOG_INDEX, + getFollower().getPeer()); removePending(reply); break; case UNRECOGNIZED: diff --git a/ratis-server-api/src/main/java/org/apache/ratis/statemachine/StateMachine.java b/ratis-server-api/src/main/java/org/apache/ratis/statemachine/StateMachine.java index 6754d50b..79f8818d 100644 --- a/ratis-server-api/src/main/java/org/apache/ratis/statemachine/StateMachine.java +++ b/ratis-server-api/src/main/java/org/apache/ratis/statemachine/StateMachine.java @@ -190,8 +190,12 @@ public interface StateMachine extends Closeable { /** * Notify the {@link StateMachine} that the progress of install snapshot is * completely done. Could trigger the cleanup of snapshots. + * + * @param result {@link InstallSnapshotResult} + * @param snapshotIndex the index of installed snapshot + * @param peer the peer who installed the snapshot */ - default void notifySnapshotInstalled(InstallSnapshotResult result, long snapshotIndex) {} + default void notifySnapshotInstalled(InstallSnapshotResult result, long snapshotIndex, RaftPeer peer) {} /** * Notify the {@link StateMachine} that a raft server has step down. diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java index afab42cf..fe9f87ce 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -1409,7 +1409,8 @@ class RaftServerImpl implements RaftServer.Division, final long installedIndex = snapshotInstallationHandler.getInstalledIndex(); if (installedIndex >= RaftLog.LEAST_VALID_LOG_INDEX) { LOG.info("{}: Follower has completed install the snapshot {}.", this, installedIndex); - stateMachine.event().notifySnapshotInstalled(InstallSnapshotResult.SUCCESS, installedIndex); + stateMachine.event().notifySnapshotInstalled(InstallSnapshotResult.SUCCESS, installedIndex, + getRaftServer().getPeer()); } } return JavaUtils.allOf(futures).whenCompleteAsync( diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java index 18f3b542..2969ca31 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java @@ -305,7 +305,7 @@ class SnapshotInstallationHandler { InstallSnapshotResult.SNAPSHOT_UNAVAILABLE); inProgressInstallSnapshotIndex.set(INVALID_LOG_INDEX); server.getStateMachine().event().notifySnapshotInstalled( - InstallSnapshotResult.SNAPSHOT_UNAVAILABLE, INVALID_LOG_INDEX); + InstallSnapshotResult.SNAPSHOT_UNAVAILABLE, INVALID_LOG_INDEX, server.getRaftServer().getPeer()); return ServerProtoUtils.toInstallSnapshotReplyProto(leaderId, getMemberId(), currentTerm, InstallSnapshotResult.SNAPSHOT_UNAVAILABLE, -1); } @@ -323,7 +323,7 @@ class SnapshotInstallationHandler { inProgressInstallSnapshotIndex.set(INVALID_LOG_INDEX); final long latestInstalledIndex = latestInstalledSnapshotTermIndex.getIndex(); server.getStateMachine().event().notifySnapshotInstalled( - InstallSnapshotResult.SNAPSHOT_INSTALLED, latestInstalledIndex); + InstallSnapshotResult.SNAPSHOT_INSTALLED, latestInstalledIndex, server.getRaftServer().getPeer()); installedIndex.set(latestInstalledIndex); return ServerProtoUtils.toInstallSnapshotReplyProto(leaderId, getMemberId(), currentTerm, InstallSnapshotResult.SNAPSHOT_INSTALLED, latestInstalledSnapshotTermIndex.getIndex()); diff --git a/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotNotificationTests.java b/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotNotificationTests.java index 592a7ed1..4476f3ec 100644 --- a/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotNotificationTests.java +++ b/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotNotificationTests.java @@ -22,6 +22,7 @@ import org.apache.ratis.client.RaftClient; import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.proto.RaftProtos; import org.apache.ratis.protocol.RaftClientReply; +import org.apache.ratis.protocol.RaftPeer; import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.server.RaftServer; import org.apache.ratis.server.RaftServerConfigKeys; @@ -123,7 +124,7 @@ public abstract class InstallSnapshotNotificationTests<CLUSTER extends MiniRaftC } @Override - public void notifySnapshotInstalled(RaftProtos.InstallSnapshotResult result, long installIndex) { + public void notifySnapshotInstalled(RaftProtos.InstallSnapshotResult result, long installIndex, RaftPeer peer) { if (result != RaftProtos.InstallSnapshotResult.SUCCESS && result != RaftProtos.InstallSnapshotResult.SNAPSHOT_UNAVAILABLE) { return; @@ -134,14 +135,14 @@ public abstract class InstallSnapshotNotificationTests<CLUSTER extends MiniRaftC synchronized (this) { try { if (getServer().get().getDivision(this.getGroupId()).getInfo().isLeader()) { - LOG.info("Receive the notification to clean up snapshot as leader for {}", result); + LOG.info("Receive the notification to clean up snapshot as leader for {}, result: {}", peer, result); if (leaderSnapshotFile.exists()) { // For test purpose, we do not delete the leader's snapshot actually, which could be // sent to more than one peer during the test LOG.info("leader snapshot {} existed", leaderSnapshotFile); } } else { - LOG.info("Receive the notification to clean up snapshot as follower for {}", result); + LOG.info("Receive the notification to clean up snapshot as follower for {}, result: {}", peer, result); File followerSnapshotFile = new File(getSMdir(), leaderSnapshotFile.getName()); if (followerSnapshotFile.exists()) { FileUtils.deleteFile(followerSnapshotFile); @@ -398,11 +399,11 @@ public abstract class InstallSnapshotNotificationTests<CLUSTER extends MiniRaftC } @Test - public void testInstallSnapshotFinishedEvent() throws Exception{ - runWithNewCluster(1, this::testInstallSnapshotFinishedEvent); + public void testInstallSnapshotInstalledEvent() throws Exception{ + runWithNewCluster(1, this::testInstallSnapshotInstalledEvent); } - private void testInstallSnapshotFinishedEvent(CLUSTER cluster) throws Exception{ + private void testInstallSnapshotInstalledEvent(CLUSTER cluster) throws Exception{ leaderSnapshotInfoRef.set(null); numNotifyInstallSnapshotFinished.set(0); final List<LogSegmentPath> logs;
