This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch snapshot-3
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/snapshot-3 by this push:
new 5cd8971b6 RATIS-2174. Move future.join outside the lock (#1168)
5cd8971b6 is described below
commit 5cd8971b626d9defe559f2c20117dadd26435223
Author: 133tosakarin <[email protected]>
AuthorDate: Tue Oct 22 02:26:53 2024 +0800
RATIS-2174. Move future.join outside the lock (#1168)
---
.../apache/ratis/server/impl/LeaderElection.java | 1 -
.../apache/ratis/server/impl/LeaderStateImpl.java | 2 +-
.../apache/ratis/server/impl/RaftServerImpl.java | 34 +++++++++---------
.../server/impl/SnapshotInstallationHandler.java | 41 +++++++++++++---------
4 files changed, 44 insertions(+), 34 deletions(-)
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
index 4badd09cd..39b401dda 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
@@ -227,7 +227,6 @@ class LeaderElection implements Runnable {
CompletableFuture<Void> shutdown() {
lifeCycle.checkStateAndClose();
- stopped.complete(null);
return stopped;
}
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
index 3d8bc2219..0dfbf263d 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
@@ -705,7 +705,7 @@ class LeaderStateImpl implements LeaderState {
private void stepDown(long term, StepDownReason reason) {
try {
lease.getAndSetEnabled(false);
- server.changeToFollowerAndPersistMetadata(term, false, reason);
+ server.changeToFollowerAndPersistMetadata(term, false, reason).join();
pendingStepDown.complete(server::newSuccessReply);
} catch(IOException e) {
final String s = this + ": Failed to persist metadata for term " + term;
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 1ac62fd98..ba7d8668c 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -132,7 +132,6 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
@@ -570,14 +569,8 @@ class RaftServerImpl implements RaftServer.Division,
* @param force Force to start a new {@link FollowerState} even if this
server is already a follower.
* @return if the term/votedFor should be updated to the new term
*/
- private boolean changeToFollower(long newTerm, boolean force, boolean
allowListener, Object reason) {
- final AtomicReference<Boolean> metadataUpdated = new AtomicReference<>();
- changeToFollowerAsync(newTerm, force, allowListener, reason,
metadataUpdated).join();
- return metadataUpdated.get();
- }
-
- private synchronized CompletableFuture<Void> changeToFollowerAsync(
- long newTerm, boolean force, boolean allowListener, Object reason,
AtomicReference<Boolean> metadataUpdated) {
+ private synchronized CompletableFuture<Void> changeToFollower(
+ long newTerm, boolean force, boolean allowListener, Object reason,
AtomicBoolean metadataUpdated) {
final RaftPeerRole old = role.getCurrentRole();
if (old == RaftPeerRole.LISTENER && !allowListener) {
throw new IllegalStateException("Unexpected role " + old);
@@ -612,13 +605,16 @@ class RaftServerImpl implements RaftServer.Division,
return future;
}
- synchronized void changeToFollowerAndPersistMetadata(
+ synchronized CompletableFuture<Void> changeToFollowerAndPersistMetadata(
long newTerm,
boolean allowListener,
Object reason) throws IOException {
- if (changeToFollower(newTerm, false, allowListener, reason)) {
+ final AtomicBoolean metadataUpdated = new AtomicBoolean();
+ final CompletableFuture<Void> future = changeToFollower(newTerm, false,
allowListener, reason, metadataUpdated);
+ if (metadataUpdated.get()) {
state.persistMetadata();
}
+ return future;
}
synchronized void changeToLeader() {
@@ -1406,6 +1402,7 @@ class RaftServerImpl implements RaftServer.Division,
boolean shouldShutdown = false;
final RequestVoteReplyProto reply;
+ CompletableFuture<Void> future = null;
synchronized (this) {
// Check life cycle state again to avoid the PAUSING/PAUSED state.
assertLifeCycleState(LifeCycle.States.RUNNING);
@@ -1415,12 +1412,12 @@ class RaftServerImpl implements RaftServer.Division,
final boolean voteGranted = context.decideVote(candidate,
candidateLastEntry);
if (candidate != null && phase == Phase.ELECTION) {
// change server state in the ELECTION phase
- final boolean termUpdated =
- changeToFollower(candidateTerm, true, false, "candidate:" +
candidateId);
+ final AtomicBoolean termUpdated = new AtomicBoolean();
+ future = changeToFollower(candidateTerm, true, false, "candidate:" +
candidateId, termUpdated);
if (voteGranted) {
state.grantVote(candidate.getId());
}
- if (termUpdated || voteGranted) {
+ if (termUpdated.get() || voteGranted) {
state.persistMetadata(); // sync metafile
}
}
@@ -1436,6 +1433,9 @@ class RaftServerImpl implements RaftServer.Division,
getMemberId(), phase, toRequestVoteReplyString(reply), state);
}
}
+ if (future != null) {
+ future.join();
+ }
return reply;
}
@@ -1533,6 +1533,7 @@ class RaftServerImpl implements RaftServer.Division,
final long followerCommit = state.getLog().getLastCommittedIndex();
final Optional<FollowerState> followerState;
final Timekeeper.Context timer =
raftServerMetrics.getFollowerAppendEntryTimer(isHeartbeat).time();
+ final CompletableFuture<Void> future;
synchronized (this) {
// Check life cycle state again to avoid the PAUSING/PAUSED state.
assertLifeCycleState(LifeCycle.States.STARTING_OR_RUNNING);
@@ -1544,7 +1545,7 @@ class RaftServerImpl implements RaftServer.Division,
AppendResult.NOT_LEADER, callId, RaftLog.INVALID_LOG_INDEX,
isHeartbeat));
}
try {
- changeToFollowerAndPersistMetadata(leaderTerm, true, "appendEntries");
+ future = changeToFollowerAndPersistMetadata(leaderTerm, true,
"appendEntries");
} catch (IOException e) {
return JavaUtils.completeExceptionally(e);
}
@@ -1569,11 +1570,12 @@ class RaftServerImpl implements RaftServer.Division,
AppendResult.INCONSISTENCY, callId, RaftLog.INVALID_LOG_INDEX,
isHeartbeat);
LOG.info("{}: appendEntries* reply {}", getMemberId(),
toAppendEntriesReplyString(reply));
followerState.ifPresent(fs ->
fs.updateLastRpcTime(FollowerState.UpdateType.APPEND_COMPLETE));
- return CompletableFuture.completedFuture(reply);
+ return future.thenApply(dummy -> reply);
}
state.updateConfiguration(entries);
}
+ future.join();
final List<CompletableFuture<Long>> futures = entries.isEmpty() ?
Collections.emptyList()
: state.getLog().append(entries);
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java
index 537b384c6..8de9a3756 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java
@@ -46,6 +46,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@@ -122,12 +123,12 @@ class SnapshotInstallationHandler {
if (installSnapshotEnabled) {
// Leader has sent InstallSnapshot request with SnapshotInfo. Install
the snapshot.
if (request.hasSnapshotChunk()) {
- reply = checkAndInstallSnapshot(request, leaderId);
+ reply = checkAndInstallSnapshot(request, leaderId).join();
}
} else {
// Leader has only sent a notification to install snapshot. Inform State
Machine to install snapshot.
if (request.hasNotification()) {
- reply = notifyStateMachineToInstallSnapshot(request, leaderId);
+ reply = notifyStateMachineToInstallSnapshot(request, leaderId).join();
}
}
@@ -156,21 +157,22 @@ class SnapshotInstallationHandler {
return failedReply;
}
- private InstallSnapshotReplyProto
checkAndInstallSnapshot(InstallSnapshotRequestProto request,
+ private CompletableFuture<InstallSnapshotReplyProto>
checkAndInstallSnapshot(InstallSnapshotRequestProto request,
RaftPeerId leaderId) throws IOException {
final long currentTerm;
final long leaderTerm = request.getLeaderTerm();
final InstallSnapshotRequestProto.SnapshotChunkProto snapshotChunkRequest
= request.getSnapshotChunk();
final TermIndex lastIncluded =
TermIndex.valueOf(snapshotChunkRequest.getTermIndex());
final long lastIncludedIndex = lastIncluded.getIndex();
+ final CompletableFuture<Void> future;
synchronized (server) {
final boolean recognized =
state.recognizeLeader(RaftServerProtocol.Op.INSTALL_SNAPSHOT, leaderId,
leaderTerm);
currentTerm = state.getCurrentTerm();
if (!recognized) {
- return toInstallSnapshotReplyProto(leaderId, getMemberId(),
- currentTerm, snapshotChunkRequest.getRequestIndex(),
InstallSnapshotResult.NOT_LEADER);
+ return
CompletableFuture.completedFuture(toInstallSnapshotReplyProto(leaderId,
getMemberId(),
+ currentTerm, snapshotChunkRequest.getRequestIndex(),
InstallSnapshotResult.NOT_LEADER));
}
- server.changeToFollowerAndPersistMetadata(leaderTerm, true,
"installSnapshot");
+ future = server.changeToFollowerAndPersistMetadata(leaderTerm, true,
"installSnapshot");
state.setLeader(leaderId, "installSnapshot");
server.updateLastRpcTime(FollowerState.UpdateType.INSTALL_SNAPSHOT_START);
@@ -186,8 +188,9 @@ class SnapshotInstallationHandler {
// have a lot of requests
if (state.getLog().getLastCommittedIndex() >= lastIncludedIndex) {
nextChunkIndex.set(snapshotChunkRequest.getRequestIndex() + 1);
- return toInstallSnapshotReplyProto(leaderId, getMemberId(),
+ final InstallSnapshotReplyProto reply =
toInstallSnapshotReplyProto(leaderId, getMemberId(),
currentTerm, snapshotChunkRequest.getRequestIndex(),
InstallSnapshotResult.ALREADY_INSTALLED);
+ return future.thenApply(dummy -> reply);
}
//TODO: We should only update State with installed snapshot once the
request is done.
@@ -210,25 +213,27 @@ class SnapshotInstallationHandler {
if (snapshotChunkRequest.getDone()) {
LOG.info("{}: successfully install the entire snapshot-{}",
getMemberId(), lastIncludedIndex);
}
- return toInstallSnapshotReplyProto(leaderId, getMemberId(),
+ final InstallSnapshotReplyProto reply =
toInstallSnapshotReplyProto(leaderId, getMemberId(),
currentTerm, snapshotChunkRequest.getRequestIndex(),
InstallSnapshotResult.SUCCESS);
+ return future.thenApply(dummy -> reply);
}
- private InstallSnapshotReplyProto notifyStateMachineToInstallSnapshot(
+ private CompletableFuture<InstallSnapshotReplyProto>
notifyStateMachineToInstallSnapshot(
InstallSnapshotRequestProto request, RaftPeerId leaderId) throws
IOException {
final long currentTerm;
final long leaderTerm = request.getLeaderTerm();
final TermIndex firstAvailableLogTermIndex = TermIndex.valueOf(
request.getNotification().getFirstAvailableTermIndex());
final long firstAvailableLogIndex = firstAvailableLogTermIndex.getIndex();
+ final CompletableFuture<Void> future;
synchronized (server) {
final boolean recognized =
state.recognizeLeader("notifyInstallSnapshot", leaderId, leaderTerm);
currentTerm = state.getCurrentTerm();
if (!recognized) {
- return toInstallSnapshotReplyProto(leaderId, getMemberId(),
- currentTerm, InstallSnapshotResult.NOT_LEADER);
+ return
CompletableFuture.completedFuture(toInstallSnapshotReplyProto(leaderId,
getMemberId(),
+ currentTerm, InstallSnapshotResult.NOT_LEADER));
}
- server.changeToFollowerAndPersistMetadata(leaderTerm, true,
"installSnapshot");
+ future = server.changeToFollowerAndPersistMetadata(leaderTerm, true,
"installSnapshot");
state.setLeader(leaderId, "installSnapshot");
server.updateLastRpcTime(FollowerState.UpdateType.INSTALL_SNAPSHOT_NOTIFICATION);
@@ -245,8 +250,9 @@ class SnapshotInstallationHandler {
inProgressInstallSnapshotIndex.compareAndSet(firstAvailableLogIndex,
INVALID_LOG_INDEX);
LOG.info("{}: InstallSnapshot notification result: {}, current
snapshot index: {}", getMemberId(),
InstallSnapshotResult.ALREADY_INSTALLED, snapshotIndex);
- return toInstallSnapshotReplyProto(leaderId, getMemberId(),
currentTerm,
+ final InstallSnapshotReplyProto reply =
toInstallSnapshotReplyProto(leaderId, getMemberId(), currentTerm,
InstallSnapshotResult.ALREADY_INSTALLED, snapshotIndex);
+ return future.thenApply(dummy -> reply);
}
final RaftPeerProto leaderProto;
@@ -323,8 +329,9 @@ class SnapshotInstallationHandler {
inProgressInstallSnapshotIndex.set(INVALID_LOG_INDEX);
server.getStateMachine().event().notifySnapshotInstalled(
InstallSnapshotResult.SNAPSHOT_UNAVAILABLE, INVALID_LOG_INDEX,
server.getPeer());
- return toInstallSnapshotReplyProto(leaderId, getMemberId(),
+ final InstallSnapshotReplyProto reply =
toInstallSnapshotReplyProto(leaderId, getMemberId(),
currentTerm, InstallSnapshotResult.SNAPSHOT_UNAVAILABLE);
+ return future.thenApply(dummy -> reply);
}
// If a snapshot has been installed, return SNAPSHOT_INSTALLED with the
installed snapshot index and reset
@@ -341,8 +348,9 @@ class SnapshotInstallationHandler {
server.getStateMachine().event().notifySnapshotInstalled(
InstallSnapshotResult.SNAPSHOT_INSTALLED, latestInstalledIndex,
server.getPeer());
installedIndex.set(latestInstalledIndex);
- return toInstallSnapshotReplyProto(leaderId, getMemberId(),
+ final InstallSnapshotReplyProto reply =
toInstallSnapshotReplyProto(leaderId, getMemberId(),
currentTerm, InstallSnapshotResult.SNAPSHOT_INSTALLED,
latestInstalledSnapshotTermIndex.getIndex());
+ return future.thenApply(dummy -> reply);
}
// Otherwise, Snapshot installation is in progress.
@@ -350,8 +358,9 @@ class SnapshotInstallationHandler {
LOG.debug("{}: InstallSnapshot notification result: {}", getMemberId(),
InstallSnapshotResult.IN_PROGRESS);
}
- return toInstallSnapshotReplyProto(leaderId, getMemberId(),
+ final InstallSnapshotReplyProto reply =
toInstallSnapshotReplyProto(leaderId, getMemberId(),
currentTerm, InstallSnapshotResult.IN_PROGRESS);
+ return future.thenApply(dummy -> reply);
}
}