This is an automated email from the ASF dual-hosted git repository.
hanishakoneru pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new b735bb5 RATIS-1402. do not send extra rpc calls to follower when the
follower is still installing a snapshot (#504)
b735bb5 is described below
commit b735bb520c1e11cf3d19328f3fc31765c4aea5e3
Author: Jackson Yao <[email protected]>
AuthorDate: Wed Oct 6 00:47:45 2021 +0800
RATIS-1402. do not send extra rpc calls to follower when the follower is
still installing a snapshot (#504)
---
.../apache/ratis/server/impl/RaftServerImpl.java | 33 ++++++++++++----------
.../ratis/InstallSnapshotNotificationTests.java | 8 +-----
2 files changed, 19 insertions(+), 22 deletions(-)
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 5db2690..62a0676 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -17,6 +17,7 @@
*/
package org.apache.ratis.server.impl;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.client.impl.ClientProtoUtils;
@@ -74,7 +75,6 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
@@ -166,7 +166,7 @@ class RaftServerImpl implements RaftServer.Division,
private final LeaderElectionMetrics leaderElectionMetrics;
private final RaftServerMetricsImpl raftServerMetrics;
- private final AtomicReference<TermIndex> inProgressInstallSnapshotRequest;
+ private final AtomicLong inProgressInstallSnapshotRequest;
private final AtomicLong installedSnapshotIndex;
private final AtomicBoolean isSnapshotNull;
@@ -196,7 +196,7 @@ class RaftServerImpl implements RaftServer.Division,
this.state = new ServerState(id, group, properties, this, stateMachine);
this.retryCache = new RetryCacheImpl(properties);
- this.inProgressInstallSnapshotRequest = new AtomicReference<>(null);
+ this.inProgressInstallSnapshotRequest = new AtomicLong();
this.installedSnapshotIndex = new AtomicLong();
this.isSnapshotNull = new AtomicBoolean(false);
this.dataStreamMap = new DataStreamMapImpl(id);
@@ -1291,8 +1291,8 @@ class RaftServerImpl implements RaftServer.Division,
private long checkInconsistentAppendEntries(TermIndex previous,
LogEntryProto... entries) {
// Check if a snapshot installation through state machine is in progress.
- final TermIndex installSnapshot = inProgressInstallSnapshotRequest.get();
- if (installSnapshot != null) {
+ final long installSnapshot = inProgressInstallSnapshotRequest.get();
+ if (installSnapshot != 0) {
LOG.info("{}: Failed appendEntries as snapshot ({}) installation is in
progress", getMemberId(), installSnapshot);
return state.getNextIndex();
}
@@ -1547,7 +1547,7 @@ class RaftServerImpl implements RaftServer.Division,
long snapshotIndex = state.getSnapshotIndex();
updateLastRpcTime(FollowerState.UpdateType.INSTALL_SNAPSHOT_NOTIFICATION);
- if (inProgressInstallSnapshotRequest.compareAndSet(null,
firstAvailableLogTermIndex)) {
+ if (inProgressInstallSnapshotRequest.compareAndSet(0,
firstAvailableLogIndex)) {
LOG.info("{}: Received notification to install snapshot at index {}",
getMemberId(), firstAvailableLogIndex);
// Check if snapshot index is already at par or ahead of the first
// available log index of the Leader.
@@ -1555,7 +1555,7 @@ class RaftServerImpl implements RaftServer.Division,
// State Machine has already installed the snapshot. Return the
// latest snapshot index to the Leader.
-
inProgressInstallSnapshotRequest.compareAndSet(firstAvailableLogTermIndex,
null);
+
inProgressInstallSnapshotRequest.compareAndSet(firstAvailableLogIndex, 0);
LOG.info("{}: InstallSnapshot notification result: {}, current
snapshot index: {}", getMemberId(),
InstallSnapshotResult.ALREADY_INSTALLED, snapshotIndex);
return ServerProtoUtils.toInstallSnapshotReplyProto(leaderId,
getMemberId(), currentTerm,
@@ -1587,7 +1587,7 @@ class RaftServerImpl implements RaftServer.Division,
if (exception != null) {
LOG.warn("{}: Failed to notify StateMachine to
InstallSnapshot. Exception: {}",
getMemberId(), exception.getMessage());
-
inProgressInstallSnapshotRequest.compareAndSet(firstAvailableLogTermIndex,
null);
+
inProgressInstallSnapshotRequest.compareAndSet(firstAvailableLogIndex, 0);
return;
}
@@ -1604,11 +1604,14 @@ class RaftServerImpl implements RaftServer.Division,
LOG.debug("{}: StateMachine could not install snapshot as
it is not available", this);
}
}
- });
- } catch (Throwable t) {
-
inProgressInstallSnapshotRequest.compareAndSet(firstAvailableLogTermIndex,
null);
- throw t;
- }
+ // wait for 10 seconds for statemachine to install snapshot
+ }).get(1, TimeUnit.SECONDS);
+ } catch (ExecutionException t) {
+
inProgressInstallSnapshotRequest.compareAndSet(firstAvailableLogIndex, 0);
+ LOG.warn("{}: Failed to notify StateMachine to InstallSnapshot.
ExecutionException: {}",
+ getMemberId(), t.getMessage());
+ throw new IOException("Failed to install snapshot");
+ } catch (InterruptedException | TimeoutException t) {}
if (LOG.isDebugEnabled()) {
LOG.debug("{}: StateMachine is processing Snapshot Installation
Request.", getMemberId());
@@ -1623,7 +1626,7 @@ class RaftServerImpl implements RaftServer.Division,
if (isSnapshotNull.compareAndSet(true, false)) {
LOG.info("{}: InstallSnapshot notification result: {}", getMemberId(),
InstallSnapshotResult.SNAPSHOT_UNAVAILABLE);
-
inProgressInstallSnapshotRequest.compareAndSet(firstAvailableLogTermIndex,
null);
+ inProgressInstallSnapshotRequest.compareAndSet(firstAvailableLogIndex,
0);
return ServerProtoUtils.toInstallSnapshotReplyProto(leaderId,
getMemberId(),
currentTerm, InstallSnapshotResult.SNAPSHOT_UNAVAILABLE, -1);
}
@@ -1634,7 +1637,7 @@ class RaftServerImpl implements RaftServer.Division,
if (latestInstalledSnapshotIndex > 0) {
LOG.info("{}: InstallSnapshot notification result: {}, at index: {}",
getMemberId(),
InstallSnapshotResult.SNAPSHOT_INSTALLED,
latestInstalledSnapshotIndex);
-
inProgressInstallSnapshotRequest.compareAndSet(firstAvailableLogTermIndex,
null);
+ inProgressInstallSnapshotRequest.compareAndSet(firstAvailableLogIndex,
0);
return ServerProtoUtils.toInstallSnapshotReplyProto(leaderId,
getMemberId(),
currentTerm, InstallSnapshotResult.SNAPSHOT_INSTALLED,
latestInstalledSnapshotIndex);
}
diff --git
a/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotNotificationTests.java
b/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotNotificationTests.java
index 5f7f4a7..b307e81 100644
---
a/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotNotificationTests.java
+++
b/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotNotificationTests.java
@@ -350,13 +350,7 @@ public abstract class
InstallSnapshotNotificationTests<CLUSTER extends MiniRaftC
cluster.shutdown();
}
}
-
- /**
- * Basic test for install snapshot notification: start a one node cluster
- * (disable install snapshot option) and let it generate a snapshot. Then
- * delete the log and restart the node, and add more nodes as followers.
- * The new follower nodes should get a install snapshot notification.
- */
+
/**
* Test for install snapshot during a peer bootstrap: start a one node
cluster
* (disable install snapshot option) and let it generate a snapshot. Add