This is an automated email from the ASF dual-hosted git repository. tanxinyu pushed a commit to branch release-3.1.2_review in repository https://gitbox.apache.org/repos/asf/ratis.git
commit 7f10888a4150fac9426f3862c2b25e6301039465 Author: 133tosakarin <[email protected]> AuthorDate: Tue Oct 22 02:26:53 2024 +0800 RATIS-2174. Move future.join outside the lock (#1168) --- .../apache/ratis/server/impl/LeaderElection.java | 1 - .../apache/ratis/server/impl/LeaderStateImpl.java | 2 +- .../apache/ratis/server/impl/RaftServerImpl.java | 34 +++++++++--------- .../server/impl/SnapshotInstallationHandler.java | 41 +++++++++++++--------- 4 files changed, 44 insertions(+), 34 deletions(-) diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java index 4badd09cd..39b401dda 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java @@ -227,7 +227,6 @@ class LeaderElection implements Runnable { CompletableFuture<Void> shutdown() { lifeCycle.checkStateAndClose(); - stopped.complete(null); return stopped; } diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java index 3d8bc2219..0dfbf263d 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java @@ -705,7 +705,7 @@ class LeaderStateImpl implements LeaderState { private void stepDown(long term, StepDownReason reason) { try { lease.getAndSetEnabled(false); - server.changeToFollowerAndPersistMetadata(term, false, reason); + server.changeToFollowerAndPersistMetadata(term, false, reason).join(); pendingStepDown.complete(server::newSuccessReply); } catch(IOException e) { final String s = this + ": Failed to persist metadata for term " + term; diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java index 1ac62fd98..ba7d8668c 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -132,7 +132,6 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -570,14 +569,8 @@ class RaftServerImpl implements RaftServer.Division, * @param force Force to start a new {@link FollowerState} even if this server is already a follower. * @return if the term/votedFor should be updated to the new term */ - private boolean changeToFollower(long newTerm, boolean force, boolean allowListener, Object reason) { - final AtomicReference<Boolean> metadataUpdated = new AtomicReference<>(); - changeToFollowerAsync(newTerm, force, allowListener, reason, metadataUpdated).join(); - return metadataUpdated.get(); - } - - private synchronized CompletableFuture<Void> changeToFollowerAsync( - long newTerm, boolean force, boolean allowListener, Object reason, AtomicReference<Boolean> metadataUpdated) { + private synchronized CompletableFuture<Void> changeToFollower( + long newTerm, boolean force, boolean allowListener, Object reason, AtomicBoolean metadataUpdated) { final RaftPeerRole old = role.getCurrentRole(); if (old == RaftPeerRole.LISTENER && !allowListener) { throw new IllegalStateException("Unexpected role " + old); @@ -612,13 +605,16 @@ class RaftServerImpl implements RaftServer.Division, return future; } - synchronized void changeToFollowerAndPersistMetadata( + synchronized CompletableFuture<Void> changeToFollowerAndPersistMetadata( long newTerm, boolean allowListener, Object reason) throws IOException { - if (changeToFollower(newTerm, false, allowListener, reason)) { + final AtomicBoolean metadataUpdated = new AtomicBoolean(); + final CompletableFuture<Void> future = changeToFollower(newTerm, false, allowListener, reason, metadataUpdated); + if (metadataUpdated.get()) { state.persistMetadata(); } + return future; } synchronized void changeToLeader() { @@ -1406,6 +1402,7 @@ class RaftServerImpl implements RaftServer.Division, boolean shouldShutdown = false; final RequestVoteReplyProto reply; + CompletableFuture<Void> future = null; synchronized (this) { // Check life cycle state again to avoid the PAUSING/PAUSED state. assertLifeCycleState(LifeCycle.States.RUNNING); @@ -1415,12 +1412,12 @@ class RaftServerImpl implements RaftServer.Division, final boolean voteGranted = context.decideVote(candidate, candidateLastEntry); if (candidate != null && phase == Phase.ELECTION) { // change server state in the ELECTION phase - final boolean termUpdated = - changeToFollower(candidateTerm, true, false, "candidate:" + candidateId); + final AtomicBoolean termUpdated = new AtomicBoolean(); + future = changeToFollower(candidateTerm, true, false, "candidate:" + candidateId, termUpdated); if (voteGranted) { state.grantVote(candidate.getId()); } - if (termUpdated || voteGranted) { + if (termUpdated.get() || voteGranted) { state.persistMetadata(); // sync metafile } } @@ -1436,6 +1433,9 @@ class RaftServerImpl implements RaftServer.Division, getMemberId(), phase, toRequestVoteReplyString(reply), state); } } + if (future != null) { + future.join(); + } return reply; } @@ -1533,6 +1533,7 @@ class RaftServerImpl implements RaftServer.Division, final long followerCommit = state.getLog().getLastCommittedIndex(); final Optional<FollowerState> followerState; final Timekeeper.Context timer = raftServerMetrics.getFollowerAppendEntryTimer(isHeartbeat).time(); + final CompletableFuture<Void> future; synchronized (this) { // Check life cycle state again to avoid the PAUSING/PAUSED state. assertLifeCycleState(LifeCycle.States.STARTING_OR_RUNNING); @@ -1544,7 +1545,7 @@ class RaftServerImpl implements RaftServer.Division, AppendResult.NOT_LEADER, callId, RaftLog.INVALID_LOG_INDEX, isHeartbeat)); } try { - changeToFollowerAndPersistMetadata(leaderTerm, true, "appendEntries"); + future = changeToFollowerAndPersistMetadata(leaderTerm, true, "appendEntries"); } catch (IOException e) { return JavaUtils.completeExceptionally(e); } @@ -1569,11 +1570,12 @@ class RaftServerImpl implements RaftServer.Division, AppendResult.INCONSISTENCY, callId, RaftLog.INVALID_LOG_INDEX, isHeartbeat); LOG.info("{}: appendEntries* reply {}", getMemberId(), toAppendEntriesReplyString(reply)); followerState.ifPresent(fs -> fs.updateLastRpcTime(FollowerState.UpdateType.APPEND_COMPLETE)); - return CompletableFuture.completedFuture(reply); + return future.thenApply(dummy -> reply); } state.updateConfiguration(entries); } + future.join(); final List<CompletableFuture<Long>> futures = entries.isEmpty() ? Collections.emptyList() : state.getLog().append(entries); diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java index 537b384c6..8de9a3756 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java @@ -46,6 +46,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.Optional; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -122,12 +123,12 @@ class SnapshotInstallationHandler { if (installSnapshotEnabled) { // Leader has sent InstallSnapshot request with SnapshotInfo. Install the snapshot. if (request.hasSnapshotChunk()) { - reply = checkAndInstallSnapshot(request, leaderId); + reply = checkAndInstallSnapshot(request, leaderId).join(); } } else { // Leader has only sent a notification to install snapshot. Inform State Machine to install snapshot. if (request.hasNotification()) { - reply = notifyStateMachineToInstallSnapshot(request, leaderId); + reply = notifyStateMachineToInstallSnapshot(request, leaderId).join(); } } @@ -156,21 +157,22 @@ class SnapshotInstallationHandler { return failedReply; } - private InstallSnapshotReplyProto checkAndInstallSnapshot(InstallSnapshotRequestProto request, + private CompletableFuture<InstallSnapshotReplyProto> checkAndInstallSnapshot(InstallSnapshotRequestProto request, RaftPeerId leaderId) throws IOException { final long currentTerm; final long leaderTerm = request.getLeaderTerm(); final InstallSnapshotRequestProto.SnapshotChunkProto snapshotChunkRequest = request.getSnapshotChunk(); final TermIndex lastIncluded = TermIndex.valueOf(snapshotChunkRequest.getTermIndex()); final long lastIncludedIndex = lastIncluded.getIndex(); + final CompletableFuture<Void> future; synchronized (server) { final boolean recognized = state.recognizeLeader(RaftServerProtocol.Op.INSTALL_SNAPSHOT, leaderId, leaderTerm); currentTerm = state.getCurrentTerm(); if (!recognized) { - return toInstallSnapshotReplyProto(leaderId, getMemberId(), - currentTerm, snapshotChunkRequest.getRequestIndex(), InstallSnapshotResult.NOT_LEADER); + return CompletableFuture.completedFuture(toInstallSnapshotReplyProto(leaderId, getMemberId(), + currentTerm, snapshotChunkRequest.getRequestIndex(), InstallSnapshotResult.NOT_LEADER)); } - server.changeToFollowerAndPersistMetadata(leaderTerm, true, "installSnapshot"); + future = server.changeToFollowerAndPersistMetadata(leaderTerm, true, "installSnapshot"); state.setLeader(leaderId, "installSnapshot"); server.updateLastRpcTime(FollowerState.UpdateType.INSTALL_SNAPSHOT_START); @@ -186,8 +188,9 @@ class SnapshotInstallationHandler { // have a lot of requests if (state.getLog().getLastCommittedIndex() >= lastIncludedIndex) { nextChunkIndex.set(snapshotChunkRequest.getRequestIndex() + 1); - return toInstallSnapshotReplyProto(leaderId, getMemberId(), + final InstallSnapshotReplyProto reply = toInstallSnapshotReplyProto(leaderId, getMemberId(), currentTerm, snapshotChunkRequest.getRequestIndex(), InstallSnapshotResult.ALREADY_INSTALLED); + return future.thenApply(dummy -> reply); } //TODO: We should only update State with installed snapshot once the request is done. @@ -210,25 +213,27 @@ class SnapshotInstallationHandler { if (snapshotChunkRequest.getDone()) { LOG.info("{}: successfully install the entire snapshot-{}", getMemberId(), lastIncludedIndex); } - return toInstallSnapshotReplyProto(leaderId, getMemberId(), + final InstallSnapshotReplyProto reply = toInstallSnapshotReplyProto(leaderId, getMemberId(), currentTerm, snapshotChunkRequest.getRequestIndex(), InstallSnapshotResult.SUCCESS); + return future.thenApply(dummy -> reply); } - private InstallSnapshotReplyProto notifyStateMachineToInstallSnapshot( + private CompletableFuture<InstallSnapshotReplyProto> notifyStateMachineToInstallSnapshot( InstallSnapshotRequestProto request, RaftPeerId leaderId) throws IOException { final long currentTerm; final long leaderTerm = request.getLeaderTerm(); final TermIndex firstAvailableLogTermIndex = TermIndex.valueOf( request.getNotification().getFirstAvailableTermIndex()); final long firstAvailableLogIndex = firstAvailableLogTermIndex.getIndex(); + final CompletableFuture<Void> future; synchronized (server) { final boolean recognized = state.recognizeLeader("notifyInstallSnapshot", leaderId, leaderTerm); currentTerm = state.getCurrentTerm(); if (!recognized) { - return toInstallSnapshotReplyProto(leaderId, getMemberId(), - currentTerm, InstallSnapshotResult.NOT_LEADER); + return CompletableFuture.completedFuture(toInstallSnapshotReplyProto(leaderId, getMemberId(), + currentTerm, InstallSnapshotResult.NOT_LEADER)); } - server.changeToFollowerAndPersistMetadata(leaderTerm, true, "installSnapshot"); + future = server.changeToFollowerAndPersistMetadata(leaderTerm, true, "installSnapshot"); state.setLeader(leaderId, "installSnapshot"); server.updateLastRpcTime(FollowerState.UpdateType.INSTALL_SNAPSHOT_NOTIFICATION); @@ -245,8 +250,9 @@ class SnapshotInstallationHandler { inProgressInstallSnapshotIndex.compareAndSet(firstAvailableLogIndex, INVALID_LOG_INDEX); LOG.info("{}: InstallSnapshot notification result: {}, current snapshot index: {}", getMemberId(), InstallSnapshotResult.ALREADY_INSTALLED, snapshotIndex); - return toInstallSnapshotReplyProto(leaderId, getMemberId(), currentTerm, + final InstallSnapshotReplyProto reply = toInstallSnapshotReplyProto(leaderId, getMemberId(), currentTerm, InstallSnapshotResult.ALREADY_INSTALLED, snapshotIndex); + return future.thenApply(dummy -> reply); } final RaftPeerProto leaderProto; @@ -323,8 +329,9 @@ class SnapshotInstallationHandler { inProgressInstallSnapshotIndex.set(INVALID_LOG_INDEX); server.getStateMachine().event().notifySnapshotInstalled( InstallSnapshotResult.SNAPSHOT_UNAVAILABLE, INVALID_LOG_INDEX, server.getPeer()); - return toInstallSnapshotReplyProto(leaderId, getMemberId(), + final InstallSnapshotReplyProto reply = toInstallSnapshotReplyProto(leaderId, getMemberId(), currentTerm, InstallSnapshotResult.SNAPSHOT_UNAVAILABLE); + return future.thenApply(dummy -> reply); } // If a snapshot has been installed, return SNAPSHOT_INSTALLED with the installed snapshot index and reset @@ -341,8 +348,9 @@ class SnapshotInstallationHandler { server.getStateMachine().event().notifySnapshotInstalled( InstallSnapshotResult.SNAPSHOT_INSTALLED, latestInstalledIndex, server.getPeer()); installedIndex.set(latestInstalledIndex); - return toInstallSnapshotReplyProto(leaderId, getMemberId(), + final InstallSnapshotReplyProto reply = toInstallSnapshotReplyProto(leaderId, getMemberId(), currentTerm, InstallSnapshotResult.SNAPSHOT_INSTALLED, latestInstalledSnapshotTermIndex.getIndex()); + return future.thenApply(dummy -> reply); } // Otherwise, Snapshot installation is in progress. @@ -350,8 +358,9 @@ class SnapshotInstallationHandler { LOG.debug("{}: InstallSnapshot notification result: {}", getMemberId(), InstallSnapshotResult.IN_PROGRESS); } - return toInstallSnapshotReplyProto(leaderId, getMemberId(), + final InstallSnapshotReplyProto reply = toInstallSnapshotReplyProto(leaderId, getMemberId(), currentTerm, InstallSnapshotResult.IN_PROGRESS); + return future.thenApply(dummy -> reply); } }
