This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new d08ba81c2 RATIS-2174. Move future.join outside the lock (#1168)
d08ba81c2 is described below

commit d08ba81c26bd5e3ab862be3649c7d5ec541a9b04
Author: 133tosakarin <[email protected]>
AuthorDate: Tue Oct 22 02:26:53 2024 +0800

    RATIS-2174. Move future.join outside the lock (#1168)
---
 .../apache/ratis/server/impl/LeaderElection.java   |  1 -
 .../apache/ratis/server/impl/LeaderStateImpl.java  |  2 +-
 .../apache/ratis/server/impl/RaftServerImpl.java   | 35 +++++++++---------
 .../server/impl/SnapshotInstallationHandler.java   | 41 +++++++++++++---------
 4 files changed, 44 insertions(+), 35 deletions(-)

diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
index 4badd09cd..39b401dda 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
@@ -227,7 +227,6 @@ class LeaderElection implements Runnable {
 
   CompletableFuture<Void> shutdown() {
     lifeCycle.checkStateAndClose();
-    stopped.complete(null);
     return stopped;
   }
 
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
index 9ce7c4404..5c472c826 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
@@ -707,7 +707,7 @@ class LeaderStateImpl implements LeaderState {
   private void stepDown(long term, StepDownReason reason) {
     try {
       lease.getAndSetEnabled(false);
-      server.changeToFollowerAndPersistMetadata(term, false, reason);
+      server.changeToFollowerAndPersistMetadata(term, false, reason).join();
       pendingStepDown.complete(server::newSuccessReply);
     } catch(IOException e) {
       final String s = this + ": Failed to persist metadata for term " + term;
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 2e748f496..cfcdd1519 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -133,7 +133,6 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Function;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
@@ -571,14 +570,8 @@ class RaftServerImpl implements RaftServer.Division,
    * @param force Force to start a new {@link FollowerState} even if this 
server is already a follower.
    * @return if the term/votedFor should be updated to the new term
    */
-  private boolean changeToFollower(long newTerm, boolean force, boolean 
allowListener, Object reason) {
-    final AtomicReference<Boolean> metadataUpdated = new AtomicReference<>();
-    changeToFollowerAsync(newTerm, force, allowListener, reason, 
metadataUpdated).join();
-    return metadataUpdated.get();
-  }
-
-  private synchronized CompletableFuture<Void> changeToFollowerAsync(
-      long newTerm, boolean force, boolean allowListener, Object reason, 
AtomicReference<Boolean> metadataUpdated) {
+  private synchronized CompletableFuture<Void> changeToFollower(
+      long newTerm, boolean force, boolean allowListener, Object reason, 
AtomicBoolean metadataUpdated) {
     final RaftPeerRole old = role.getCurrentRole();
     if (old == RaftPeerRole.LISTENER && !allowListener) {
       throw new IllegalStateException("Unexpected role " + old);
@@ -613,13 +606,16 @@ class RaftServerImpl implements RaftServer.Division,
     return future;
   }
 
-  synchronized void changeToFollowerAndPersistMetadata(
+  synchronized CompletableFuture<Void> changeToFollowerAndPersistMetadata(
       long newTerm,
       boolean allowListener,
       Object reason) throws IOException {
-    if (changeToFollower(newTerm, false, allowListener, reason)) {
+    final AtomicBoolean metadataUpdated = new AtomicBoolean();
+    final CompletableFuture<Void> future = changeToFollower(newTerm, false, 
allowListener, reason, metadataUpdated);
+    if (metadataUpdated.get()) {
       state.persistMetadata();
     }
+    return future;
   }
 
   synchronized void changeToLeader() {
@@ -1451,6 +1447,7 @@ class RaftServerImpl implements RaftServer.Division,
 
     boolean shouldShutdown = false;
     final RequestVoteReplyProto reply;
+    CompletableFuture<Void> future = null;
     synchronized (this) {
       // Check life cycle state again to avoid the PAUSING/PAUSED state.
       assertLifeCycleState(LifeCycle.States.RUNNING);
@@ -1460,12 +1457,12 @@ class RaftServerImpl implements RaftServer.Division,
       final boolean voteGranted = context.decideVote(candidate, 
candidateLastEntry);
       if (candidate != null && phase == Phase.ELECTION) {
         // change server state in the ELECTION phase
-        final boolean termUpdated =
-            changeToFollower(candidateTerm, true, false, "candidate:" + 
candidateId);
+        final AtomicBoolean termUpdated = new AtomicBoolean();
+        future = changeToFollower(candidateTerm, true, false, "candidate:" + 
candidateId, termUpdated);
         if (voteGranted) {
           state.grantVote(candidate.getId());
         }
-        if (termUpdated || voteGranted) {
+        if (termUpdated.get() || voteGranted) {
           state.persistMetadata(); // sync metafile
         }
       }
@@ -1481,6 +1478,9 @@ class RaftServerImpl implements RaftServer.Division,
             getMemberId(), phase, toRequestVoteReplyString(reply), state);
       }
     }
+    if (future != null) {
+      future.join();
+    }
     return reply;
   }
 
@@ -1582,6 +1582,7 @@ class RaftServerImpl implements RaftServer.Division,
     final long followerCommit = state.getLog().getLastCommittedIndex();
     final Optional<FollowerState> followerState;
     final Timekeeper.Context timer = 
raftServerMetrics.getFollowerAppendEntryTimer(isHeartbeat).time();
+    final CompletableFuture<Void> future;
     synchronized (this) {
       // Check life cycle state again to avoid the PAUSING/PAUSED state.
       assertLifeCycleState(LifeCycle.States.STARTING_OR_RUNNING);
@@ -1593,7 +1594,7 @@ class RaftServerImpl implements RaftServer.Division,
             AppendResult.NOT_LEADER, callId, RaftLog.INVALID_LOG_INDEX, 
isHeartbeat));
       }
       try {
-        changeToFollowerAndPersistMetadata(leaderTerm, true, "appendEntries");
+        future = changeToFollowerAndPersistMetadata(leaderTerm, true, 
"appendEntries");
       } catch (IOException e) {
         return JavaUtils.completeExceptionally(e);
       }
@@ -1618,12 +1619,12 @@ class RaftServerImpl implements RaftServer.Division,
             AppendResult.INCONSISTENCY, callId, RaftLog.INVALID_LOG_INDEX, 
isHeartbeat);
         LOG.info("{}: appendEntries* reply {}", getMemberId(), 
toAppendEntriesReplyString(reply));
         followerState.ifPresent(fs -> 
fs.updateLastRpcTime(FollowerState.UpdateType.APPEND_COMPLETE));
-        return CompletableFuture.completedFuture(reply);
+        return future.thenApply(dummy -> reply);
       }
 
       state.updateConfiguration(entries);
     }
-
+    future.join();
 
     final List<CompletableFuture<Long>> futures = entries.isEmpty() ? 
Collections.emptyList()
         : state.getLog().append(requestRef.delegate(entries));
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java
index 537b384c6..8de9a3756 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java
@@ -46,6 +46,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
@@ -122,12 +123,12 @@ class SnapshotInstallationHandler {
     if (installSnapshotEnabled) {
       // Leader has sent InstallSnapshot request with SnapshotInfo. Install 
the snapshot.
       if (request.hasSnapshotChunk()) {
-        reply = checkAndInstallSnapshot(request, leaderId);
+        reply = checkAndInstallSnapshot(request, leaderId).join();
       }
     } else {
       // Leader has only sent a notification to install snapshot. Inform State 
Machine to install snapshot.
       if (request.hasNotification()) {
-        reply = notifyStateMachineToInstallSnapshot(request, leaderId);
+        reply = notifyStateMachineToInstallSnapshot(request, leaderId).join();
       }
     }
 
@@ -156,21 +157,22 @@ class SnapshotInstallationHandler {
     return failedReply;
   }
 
-  private InstallSnapshotReplyProto 
checkAndInstallSnapshot(InstallSnapshotRequestProto request,
+  private CompletableFuture<InstallSnapshotReplyProto> 
checkAndInstallSnapshot(InstallSnapshotRequestProto request,
       RaftPeerId leaderId) throws IOException {
     final long currentTerm;
     final long leaderTerm = request.getLeaderTerm();
     final InstallSnapshotRequestProto.SnapshotChunkProto snapshotChunkRequest 
= request.getSnapshotChunk();
     final TermIndex lastIncluded = 
TermIndex.valueOf(snapshotChunkRequest.getTermIndex());
     final long lastIncludedIndex = lastIncluded.getIndex();
+    final CompletableFuture<Void> future;
     synchronized (server) {
       final boolean recognized = 
state.recognizeLeader(RaftServerProtocol.Op.INSTALL_SNAPSHOT, leaderId, 
leaderTerm);
       currentTerm = state.getCurrentTerm();
       if (!recognized) {
-        return toInstallSnapshotReplyProto(leaderId, getMemberId(),
-            currentTerm, snapshotChunkRequest.getRequestIndex(), 
InstallSnapshotResult.NOT_LEADER);
+        return 
CompletableFuture.completedFuture(toInstallSnapshotReplyProto(leaderId, 
getMemberId(),
+            currentTerm, snapshotChunkRequest.getRequestIndex(), 
InstallSnapshotResult.NOT_LEADER));
       }
-      server.changeToFollowerAndPersistMetadata(leaderTerm, true, 
"installSnapshot");
+      future = server.changeToFollowerAndPersistMetadata(leaderTerm, true, 
"installSnapshot");
       state.setLeader(leaderId, "installSnapshot");
 
       
server.updateLastRpcTime(FollowerState.UpdateType.INSTALL_SNAPSHOT_START);
@@ -186,8 +188,9 @@ class SnapshotInstallationHandler {
         // have a lot of requests
         if (state.getLog().getLastCommittedIndex() >= lastIncludedIndex) {
           nextChunkIndex.set(snapshotChunkRequest.getRequestIndex() + 1);
-          return toInstallSnapshotReplyProto(leaderId, getMemberId(),
+          final InstallSnapshotReplyProto reply =  
toInstallSnapshotReplyProto(leaderId, getMemberId(),
               currentTerm, snapshotChunkRequest.getRequestIndex(), 
InstallSnapshotResult.ALREADY_INSTALLED);
+          return future.thenApply(dummy -> reply);
         }
 
         //TODO: We should only update State with installed snapshot once the 
request is done.
@@ -210,25 +213,27 @@ class SnapshotInstallationHandler {
     if (snapshotChunkRequest.getDone()) {
       LOG.info("{}: successfully install the entire snapshot-{}", 
getMemberId(), lastIncludedIndex);
     }
-    return toInstallSnapshotReplyProto(leaderId, getMemberId(),
+    final InstallSnapshotReplyProto reply = 
toInstallSnapshotReplyProto(leaderId, getMemberId(),
         currentTerm, snapshotChunkRequest.getRequestIndex(), 
InstallSnapshotResult.SUCCESS);
+    return future.thenApply(dummy -> reply);
   }
 
-  private InstallSnapshotReplyProto notifyStateMachineToInstallSnapshot(
+  private CompletableFuture<InstallSnapshotReplyProto> 
notifyStateMachineToInstallSnapshot(
       InstallSnapshotRequestProto request, RaftPeerId leaderId) throws 
IOException {
     final long currentTerm;
     final long leaderTerm = request.getLeaderTerm();
     final TermIndex firstAvailableLogTermIndex = TermIndex.valueOf(
         request.getNotification().getFirstAvailableTermIndex());
     final long firstAvailableLogIndex = firstAvailableLogTermIndex.getIndex();
+    final CompletableFuture<Void> future;
     synchronized (server) {
       final boolean recognized = 
state.recognizeLeader("notifyInstallSnapshot", leaderId, leaderTerm);
       currentTerm = state.getCurrentTerm();
       if (!recognized) {
-        return toInstallSnapshotReplyProto(leaderId, getMemberId(),
-            currentTerm, InstallSnapshotResult.NOT_LEADER);
+        return 
CompletableFuture.completedFuture(toInstallSnapshotReplyProto(leaderId, 
getMemberId(),
+            currentTerm, InstallSnapshotResult.NOT_LEADER));
       }
-      server.changeToFollowerAndPersistMetadata(leaderTerm, true, 
"installSnapshot");
+      future = server.changeToFollowerAndPersistMetadata(leaderTerm, true, 
"installSnapshot");
       state.setLeader(leaderId, "installSnapshot");
       
server.updateLastRpcTime(FollowerState.UpdateType.INSTALL_SNAPSHOT_NOTIFICATION);
 
@@ -245,8 +250,9 @@ class SnapshotInstallationHandler {
           inProgressInstallSnapshotIndex.compareAndSet(firstAvailableLogIndex, 
INVALID_LOG_INDEX);
           LOG.info("{}: InstallSnapshot notification result: {}, current 
snapshot index: {}", getMemberId(),
               InstallSnapshotResult.ALREADY_INSTALLED, snapshotIndex);
-          return toInstallSnapshotReplyProto(leaderId, getMemberId(), 
currentTerm,
+          final InstallSnapshotReplyProto reply = 
toInstallSnapshotReplyProto(leaderId, getMemberId(), currentTerm,
               InstallSnapshotResult.ALREADY_INSTALLED, snapshotIndex);
+          return future.thenApply(dummy -> reply);
         }
 
         final RaftPeerProto leaderProto;
@@ -323,8 +329,9 @@ class SnapshotInstallationHandler {
         inProgressInstallSnapshotIndex.set(INVALID_LOG_INDEX);
         server.getStateMachine().event().notifySnapshotInstalled(
             InstallSnapshotResult.SNAPSHOT_UNAVAILABLE, INVALID_LOG_INDEX, 
server.getPeer());
-        return toInstallSnapshotReplyProto(leaderId, getMemberId(),
+        final InstallSnapshotReplyProto reply =  
toInstallSnapshotReplyProto(leaderId, getMemberId(),
             currentTerm, InstallSnapshotResult.SNAPSHOT_UNAVAILABLE);
+        return future.thenApply(dummy -> reply);
       }
 
       // If a snapshot has been installed, return SNAPSHOT_INSTALLED with the 
installed snapshot index and reset
@@ -341,8 +348,9 @@ class SnapshotInstallationHandler {
         server.getStateMachine().event().notifySnapshotInstalled(
             InstallSnapshotResult.SNAPSHOT_INSTALLED, latestInstalledIndex, 
server.getPeer());
         installedIndex.set(latestInstalledIndex);
-        return toInstallSnapshotReplyProto(leaderId, getMemberId(),
+        final InstallSnapshotReplyProto reply = 
toInstallSnapshotReplyProto(leaderId, getMemberId(),
             currentTerm, InstallSnapshotResult.SNAPSHOT_INSTALLED, 
latestInstalledSnapshotTermIndex.getIndex());
+        return future.thenApply(dummy -> reply);
       }
 
       // Otherwise, Snapshot installation is in progress.
@@ -350,8 +358,9 @@ class SnapshotInstallationHandler {
         LOG.debug("{}: InstallSnapshot notification result: {}", getMemberId(),
             InstallSnapshotResult.IN_PROGRESS);
       }
-      return toInstallSnapshotReplyProto(leaderId, getMemberId(),
+      final InstallSnapshotReplyProto reply = 
toInstallSnapshotReplyProto(leaderId, getMemberId(),
           currentTerm, InstallSnapshotResult.IN_PROGRESS);
+      return future.thenApply(dummy -> reply);
     }
   }
 

Reply via email to