This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 05cac19  RATIS-1482. fix shouldNotifyToInstallSnapshot to return the 
first avail term index (#574)
05cac19 is described below

commit 05cac191b8482168804a0e7a29144e744eb2aa8e
Author: Nibiru <[email protected]>
AuthorDate: Mon Jan 10 15:05:02 2022 +0800

    RATIS-1482. fix shouldNotifyToInstallSnapshot to return the first avail 
term index (#574)
---
 .../apache/ratis/grpc/server/GrpcLogAppender.java  | 25 +++++------------
 .../ratis/InstallSnapshotNotificationTests.java    | 31 +++++++++++++++-------
 2 files changed, 28 insertions(+), 28 deletions(-)

diff --git 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
index 70b23a7..990d06f 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
@@ -591,23 +591,15 @@ public class GrpcLogAppender extends LogAppenderBase {
     final FollowerInfo follower = getFollower();
     final long leaderNextIndex = getRaftLog().getNextIndex();
     final boolean isFollowerBootstrapping = 
getLeaderState().isFollowerBootstrapping(follower);
-
+    final long leaderStartIndex = getRaftLog().getStartIndex();
+    final TermIndex firstAvailable = 
Optional.ofNullable(getRaftLog().getTermIndex(leaderStartIndex))
+        .orElseGet(() -> 
TermIndex.valueOf(getServer().getInfo().getCurrentTerm(), leaderNextIndex));
     if (isFollowerBootstrapping && !follower.hasAttemptedToInstallSnapshot()) {
       // If the follower is bootstrapping and has not yet installed any 
snapshot from leader, then the follower should
       // be notified to install a snapshot. Every follower should try to 
install at least one snapshot during
       // bootstrapping, if available.
-      TermIndex lastEntry = getRaftLog().getLastEntryTermIndex();
-      if (lastEntry == null) {
-        // lastEntry may need to be derived from snapshot
-        SnapshotInfo snapshot = 
getServer().getStateMachine().getLatestSnapshot();
-        if (snapshot != null) {
-          lastEntry = snapshot.getTermIndex();
-        }
-      }
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("{}: Notify follower to install snapshot as it is 
bootstrapping to TermIndex {}.", this, lastEntry);
-      }
-      return lastEntry;
+      LOG.debug("{}: follower is bootstrapping, notify to install snapshot to 
{}.", this, firstAvailable);
+      return firstAvailable;
     }
 
     final long followerNextIndex = follower.getNextIndex();
@@ -615,18 +607,15 @@ public class GrpcLogAppender extends LogAppenderBase {
       return null;
     }
 
-    final long leaderStartIndex = getRaftLog().getStartIndex();
-
     if (followerNextIndex < leaderStartIndex) {
       // The Leader does not have the logs from the Follower's last log
       // index onwards. And install snapshot is disabled. So the Follower
       // should be notified to install the latest snapshot through its
       // State Machine.
-      return getRaftLog().getTermIndex(leaderStartIndex);
+      return firstAvailable;
     } else if (leaderStartIndex == RaftLog.INVALID_LOG_INDEX) {
       // Leader has no logs to check from, hence return next index.
-      return TermIndex.valueOf(getServer().getInfo().getCurrentTerm(),
-          leaderNextIndex);
+      return firstAvailable;
     }
 
     return null;
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotNotificationTests.java
 
b/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotNotificationTests.java
index b307e81..e15ffb9 100644
--- 
a/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotNotificationTests.java
+++ 
b/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotNotificationTests.java
@@ -52,6 +52,7 @@ import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
 
 public abstract class InstallSnapshotNotificationTests<CLUSTER extends 
MiniRaftCluster>
     extends BaseTest
@@ -98,16 +99,26 @@ public abstract class 
InstallSnapshotNotificationTests<CLUSTER extends MiniRaftC
         return super.notifyInstallSnapshotFromLeader(roleInfoProto, termIndex);
       }
 
-      try {
-        Path leaderSnapshotFile = leaderSnapshotInfo.getFile().getPath();
-        File followerSnapshotFilePath = new File(getSMdir(),
-            leaderSnapshotFile.getFileName().toString());
-        Files.copy(leaderSnapshotFile, followerSnapshotFilePath.toPath());
-      } catch (IOException e) {
-        LOG.error("Failed notifyInstallSnapshotFromLeader", e);
-        return JavaUtils.completeExceptionally(e);
-      }
-      return 
CompletableFuture.completedFuture(leaderSnapshotInfo.getTermIndex());
+      Supplier<TermIndex> supplier = () -> {
+        try {
+          Path leaderSnapshotFile = leaderSnapshotInfo.getFile().getPath();
+          File followerSnapshotFilePath = new File(getSMdir(),
+              leaderSnapshotFile.getFileName().toString());
+          // simulate the real situation such as snapshot transmission delay
+          Thread.sleep(1000);
+          if (followerSnapshotFilePath.exists()) {
+            LOG.warn(followerSnapshotFilePath + " exists");
+          } else {
+            Files.copy(leaderSnapshotFile, followerSnapshotFilePath.toPath());
+          }
+        } catch (IOException | InterruptedException e) {
+          LOG.error("Failed notifyInstallSnapshotFromLeader", e);
+          return null;
+        }
+        return leaderSnapshotInfo.getTermIndex();
+      };
+
+      return CompletableFuture.supplyAsync(supplier);
     }
   }
 

Reply via email to