This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 05cac19 RATIS-1482. fix shouldNotifyToInstallSnapshot to return the
first avail term index (#574)
05cac19 is described below
commit 05cac191b8482168804a0e7a29144e744eb2aa8e
Author: Nibiru <[email protected]>
AuthorDate: Mon Jan 10 15:05:02 2022 +0800
RATIS-1482. fix shouldNotifyToInstallSnapshot to return the first avail
term index (#574)
---
.../apache/ratis/grpc/server/GrpcLogAppender.java | 25 +++++------------
.../ratis/InstallSnapshotNotificationTests.java | 31 +++++++++++++++-------
2 files changed, 28 insertions(+), 28 deletions(-)
diff --git
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
index 70b23a7..990d06f 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
@@ -591,23 +591,15 @@ public class GrpcLogAppender extends LogAppenderBase {
final FollowerInfo follower = getFollower();
final long leaderNextIndex = getRaftLog().getNextIndex();
final boolean isFollowerBootstrapping =
getLeaderState().isFollowerBootstrapping(follower);
-
+ final long leaderStartIndex = getRaftLog().getStartIndex();
+ final TermIndex firstAvailable =
Optional.ofNullable(getRaftLog().getTermIndex(leaderStartIndex))
+ .orElseGet(() ->
TermIndex.valueOf(getServer().getInfo().getCurrentTerm(), leaderNextIndex));
if (isFollowerBootstrapping && !follower.hasAttemptedToInstallSnapshot()) {
// If the follower is bootstrapping and has not yet installed any
snapshot from leader, then the follower should
// be notified to install a snapshot. Every follower should try to
install at least one snapshot during
// bootstrapping, if available.
- TermIndex lastEntry = getRaftLog().getLastEntryTermIndex();
- if (lastEntry == null) {
- // lastEntry may need to be derived from snapshot
- SnapshotInfo snapshot =
getServer().getStateMachine().getLatestSnapshot();
- if (snapshot != null) {
- lastEntry = snapshot.getTermIndex();
- }
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug("{}: Notify follower to install snapshot as it is
bootstrapping to TermIndex {}.", this, lastEntry);
- }
- return lastEntry;
+ LOG.debug("{}: follower is bootstrapping, notify to install snapshot to
{}.", this, firstAvailable);
+ return firstAvailable;
}
final long followerNextIndex = follower.getNextIndex();
@@ -615,18 +607,15 @@ public class GrpcLogAppender extends LogAppenderBase {
return null;
}
- final long leaderStartIndex = getRaftLog().getStartIndex();
-
if (followerNextIndex < leaderStartIndex) {
// The Leader does not have the logs from the Follower's last log
// index onwards. And install snapshot is disabled. So the Follower
// should be notified to install the latest snapshot through its
// State Machine.
- return getRaftLog().getTermIndex(leaderStartIndex);
+ return firstAvailable;
} else if (leaderStartIndex == RaftLog.INVALID_LOG_INDEX) {
// Leader has no logs to check from, hence return next index.
- return TermIndex.valueOf(getServer().getInfo().getCurrentTerm(),
- leaderNextIndex);
+ return firstAvailable;
}
return null;
diff --git
a/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotNotificationTests.java
b/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotNotificationTests.java
index b307e81..e15ffb9 100644
---
a/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotNotificationTests.java
+++
b/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotNotificationTests.java
@@ -52,6 +52,7 @@ import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
public abstract class InstallSnapshotNotificationTests<CLUSTER extends
MiniRaftCluster>
extends BaseTest
@@ -98,16 +99,26 @@ public abstract class
InstallSnapshotNotificationTests<CLUSTER extends MiniRaftC
return super.notifyInstallSnapshotFromLeader(roleInfoProto, termIndex);
}
- try {
- Path leaderSnapshotFile = leaderSnapshotInfo.getFile().getPath();
- File followerSnapshotFilePath = new File(getSMdir(),
- leaderSnapshotFile.getFileName().toString());
- Files.copy(leaderSnapshotFile, followerSnapshotFilePath.toPath());
- } catch (IOException e) {
- LOG.error("Failed notifyInstallSnapshotFromLeader", e);
- return JavaUtils.completeExceptionally(e);
- }
- return
CompletableFuture.completedFuture(leaderSnapshotInfo.getTermIndex());
+ Supplier<TermIndex> supplier = () -> {
+ try {
+ Path leaderSnapshotFile = leaderSnapshotInfo.getFile().getPath();
+ File followerSnapshotFilePath = new File(getSMdir(),
+ leaderSnapshotFile.getFileName().toString());
+ // simulate the real situation such as snapshot transmission delay
+ Thread.sleep(1000);
+ if (followerSnapshotFilePath.exists()) {
+ LOG.warn(followerSnapshotFilePath + " exists");
+ } else {
+ Files.copy(leaderSnapshotFile, followerSnapshotFilePath.toPath());
+ }
+ } catch (IOException | InterruptedException e) {
+ LOG.error("Failed notifyInstallSnapshotFromLeader", e);
+ return null;
+ }
+ return leaderSnapshotInfo.getTermIndex();
+ };
+
+ return CompletableFuture.supplyAsync(supplier);
}
}