This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch snapshot-3
in repository https://gitbox.apache.org/repos/asf/ratis.git

commit d8482f1f0a72420e50aeeb2401a669d43bbfbf6d
Author: 133tosakarin <[email protected]>
AuthorDate: Mon Sep 30 14:43:38 2024 +0800

    RATIS-2162. When closing leaderState, if the logAppender thread sends a 
snapshot, a deadlock may occur. (#1154)
---
 .../ratis/server/impl/ConfigurationManager.java    | 10 +++----
 .../apache/ratis/server/impl/FollowerState.java    | 14 +++++++++-
 .../apache/ratis/server/impl/LeaderElection.java   | 14 +++++++++-
 .../apache/ratis/server/impl/LeaderStateImpl.java  |  7 ++---
 .../apache/ratis/server/impl/RaftServerImpl.java   | 32 ++++++++++++----------
 .../org/apache/ratis/server/impl/RoleInfo.java     | 26 +++++++++---------
 .../ratis/server/leader/LogAppenderBase.java       |  2 +-
 7 files changed, 64 insertions(+), 41 deletions(-)

diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ConfigurationManager.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ConfigurationManager.java
index 10c59c8b1..6d3f68d5c 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ConfigurationManager.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ConfigurationManager.java
@@ -41,11 +41,9 @@ public class ConfigurationManager {
    * The current raft configuration. If configurations is not empty, should be
    * the last entry of the map. Otherwise is initialConf.
    */
-  @SuppressWarnings({"squid:S3077"}) // Suppress volatile for generic type
-  private volatile RaftConfigurationImpl currentConf;
+  private RaftConfigurationImpl currentConf;
   /** Cache the peer corresponding to {@link #id}. */
-  @SuppressWarnings({"squid:S3077"}) // Suppress volatile for generic type
-  private volatile RaftPeer currentPeer;
+  private RaftPeer currentPeer;
 
   ConfigurationManager(RaftPeerId id, RaftConfigurationImpl initialConf) {
     this.id = id;
@@ -78,11 +76,11 @@ public class ConfigurationManager {
     }
   }
 
-  RaftConfigurationImpl getCurrent() {
+  synchronized RaftConfigurationImpl getCurrent() {
     return currentConf;
   }
 
-  RaftPeer getCurrentPeer() {
+  synchronized RaftPeer getCurrentPeer() {
     return currentPeer;
   }
 
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java
index e980daede..fa61e9088 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java
@@ -26,6 +26,7 @@ import org.apache.ratis.util.Timestamp;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.ToIntFunction;
@@ -62,6 +63,7 @@ class FollowerState extends Daemon {
   @SuppressWarnings({"squid:S3077"}) // Suppress volatile for generic type
   private volatile Timestamp lastRpcTime = creationTime;
   private volatile boolean isRunning = true;
+  private final CompletableFuture<Void> stopped = new CompletableFuture<>();
   private final AtomicInteger outstandingOp = new AtomicInteger();
 
   FollowerState(RaftServerImpl server, Object reason) {
@@ -93,8 +95,10 @@ class FollowerState extends Daemon {
     return 
lastRpcTime.elapsedTime().compareTo(server.properties().minRpcTimeout()) < 0;
   }
 
-  void stopRunning() {
+  CompletableFuture<Void> stopRunning() {
     this.isRunning = false;
+    interrupt();
+    return stopped;
   }
 
   boolean lostMajorityHeartbeatsRecently() {
@@ -122,6 +126,14 @@ class FollowerState extends Daemon {
 
   @Override
   public  void run() {
+    try {
+      runImpl();
+    } finally {
+      stopped.complete(null);
+    }
+  }
+
+  private void runImpl() {
     final TimeDuration sleepDeviationThreshold = 
server.getSleepDeviationThreshold();
     while (shouldRun()) {
       final TimeDuration electionTimeout = server.getRandomElectionTimeout();
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
index af25ae912..a5bfba7be 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
@@ -46,6 +46,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorCompletionService;
 import java.util.concurrent.ExecutorService;
@@ -183,6 +184,7 @@ class LeaderElection implements Runnable {
   private final String name;
   private final LifeCycle lifeCycle;
   private final Daemon daemon;
+  private final CompletableFuture<Void> stopped = new CompletableFuture<>();
 
   private final RaftServerImpl server;
   private final boolean skipPreVote;
@@ -223,8 +225,10 @@ class LeaderElection implements Runnable {
     }
   }
 
-  void shutdown() {
+  CompletableFuture<Void> shutdown() {
     lifeCycle.checkStateAndClose();
+    stopped.complete(null);
+    return stopped;
   }
 
   @VisibleForTesting
@@ -234,6 +238,14 @@ class LeaderElection implements Runnable {
 
   @Override
   public void run() {
+    try {
+      runImpl();
+    } finally {
+      stopped.complete(null);
+    }
+  }
+
+  private void runImpl() {
     if (!lifeCycle.compareAndTransition(STARTING, RUNNING)) {
       final LifeCycle.State state = lifeCycle.getCurrentState();
       LOG.info("{}: skip running since this is already {}", this, state);
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
index 68121cef6..3d8bc2219 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
@@ -223,11 +223,8 @@ class LeaderStateImpl implements LeaderState {
     }
 
     CompletableFuture<Void> stopAll() {
-      final CompletableFuture<?>[] futures = new 
CompletableFuture<?>[senders.size()];
-      for(int i = 0; i < futures.length; i++) {
-        futures[i] = senders.get(i).stopAsync();
-      }
-      return CompletableFuture.allOf(futures);
+      return CompletableFuture.allOf(senders.stream().
+              map(LogAppender::stopAsync).toArray(CompletableFuture[]::new));
     }
   }
 
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 0473564b3..ae158ad75 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -132,6 +132,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Function;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
@@ -565,20 +566,23 @@ class RaftServerImpl implements RaftServer.Division,
    * @param force Force to start a new {@link FollowerState} even if this 
server is already a follower.
    * @return if the term/votedFor should be updated to the new term
    */
-  private synchronized boolean changeToFollower(
-      long newTerm,
-      boolean force,
-      boolean allowListener,
-      Object reason) {
+  private boolean changeToFollower(long newTerm, boolean force, boolean 
allowListener, Object reason) {
+    final AtomicReference<Boolean> metadataUpdated = new AtomicReference<>();
+    changeToFollowerAsync(newTerm, force, allowListener, reason, 
metadataUpdated).join();
+    return metadataUpdated.get();
+  }
+
+  private synchronized CompletableFuture<Void> changeToFollowerAsync(
+      long newTerm, boolean force, boolean allowListener, Object reason, 
AtomicReference<Boolean> metadataUpdated) {
     final RaftPeerRole old = role.getCurrentRole();
     if (old == RaftPeerRole.LISTENER && !allowListener) {
       throw new IllegalStateException("Unexpected role " + old);
     }
-    boolean metadataUpdated;
+    CompletableFuture<Void> future = CompletableFuture.completedFuture(null);
     if ((old != RaftPeerRole.FOLLOWER || force) && old != 
RaftPeerRole.LISTENER) {
       setRole(RaftPeerRole.FOLLOWER, reason);
       if (old == RaftPeerRole.LEADER) {
-        role.shutdownLeaderState(false)
+        future = role.shutdownLeaderState(false)
             .exceptionally(e -> {
               if (e != null) {
                 if (!getInfo().isAlive()) {
@@ -587,21 +591,21 @@ class RaftServerImpl implements RaftServer.Division,
                 }
               }
               throw new CompletionException("Failed to shutdownLeaderState: " 
+ this, e);
-            })
-            .join();
+            });
         state.setLeader(null, reason);
       } else if (old == RaftPeerRole.CANDIDATE) {
-        role.shutdownLeaderElection();
+        future = role.shutdownLeaderElection();
       } else if (old == RaftPeerRole.FOLLOWER) {
-        role.shutdownFollowerState();
+        future = role.shutdownFollowerState();
       }
-      metadataUpdated = state.updateCurrentTerm(newTerm);
+
+      metadataUpdated.set(state.updateCurrentTerm(newTerm));
       role.startFollowerState(this, reason);
       setFirstElection(reason);
     } else {
-      metadataUpdated = state.updateCurrentTerm(newTerm);
+      metadataUpdated.set(state.updateCurrentTerm(newTerm));
     }
-    return metadataUpdated;
+    return future;
   }
 
   synchronized void changeToFollowerAndPersistMetadata(
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RoleInfo.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RoleInfo.java
index 5eb01a9d6..a5cd7da66 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RoleInfo.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RoleInfo.java
@@ -47,7 +47,7 @@ class RoleInfo {
   public static final Logger LOG = LoggerFactory.getLogger(RoleInfo.class);
 
   private final RaftPeerId id;
-  private volatile RaftPeerRole role;
+  private final AtomicReference<RaftPeerRole> role = new AtomicReference<>();
   /** Used when the peer is leader */
   private final AtomicReference<LeaderStateImpl> leaderState = new 
AtomicReference<>();
   /** Used when the peer is follower, to monitor election timeout */
@@ -64,7 +64,7 @@ class RoleInfo {
   }
 
   void transitionRole(RaftPeerRole newRole) {
-    this.role = newRole;
+    this.role.set(newRole);
     this.transitionTime.set(Timestamp.currentTime());
   }
 
@@ -73,7 +73,7 @@ class RoleInfo {
   }
 
   RaftPeerRole getCurrentRole() {
-    return role;
+    return role.get();
   }
 
   boolean isLeaderReady() {
@@ -113,13 +113,13 @@ class RoleInfo {
     updateAndGet(followerState, new FollowerState(server, reason)).start();
   }
 
-  void shutdownFollowerState() {
+  CompletableFuture<Void> shutdownFollowerState() {
     final FollowerState follower = followerState.getAndSet(null);
-    if (follower != null) {
-      LOG.info("{}: shutdown {}", id, follower);
-      follower.stopRunning();
-      follower.interrupt();
+    if (follower == null) {
+      return CompletableFuture.completedFuture(null);
     }
+    LOG.info("{}: shutdown {}", id, follower);
+    return follower.stopRunning();
   }
 
   void startLeaderElection(RaftServerImpl server, boolean force) {
@@ -133,13 +133,13 @@ class RoleInfo {
     pauseLeaderElection.set(pause);
   }
 
-  void shutdownLeaderElection() {
+  CompletableFuture<Void> shutdownLeaderElection() {
     final LeaderElection election = leaderElection.getAndSet(null);
-    if (election != null) {
-      LOG.info("{}: shutdown {}", id, election);
-      election.shutdown();
-      // no need to interrupt the election thread
+    if (election == null) {
+      return CompletableFuture.completedFuture(null);
     }
+    LOG.info("{}: shutdown {}", id, election);
+    return election.shutdown();
   }
 
   private <T> T updateAndGet(AtomicReference<T> ref, T current) {
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java
index 958cc6fa8..5a27cda51 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java
@@ -124,7 +124,7 @@ public abstract class LogAppenderBase implements 
LogAppender {
 
   @Override
   public boolean isRunning() {
-    return daemon.isWorking();
+    return daemon.isWorking() && server.getInfo().isLeader();
   }
 
   @Override

Reply via email to