This is an automated email from the ASF dual-hosted git repository. tanxinyu pushed a commit to branch snapshot-3 in repository https://gitbox.apache.org/repos/asf/ratis.git
commit d8482f1f0a72420e50aeeb2401a669d43bbfbf6d Author: 133tosakarin <[email protected]> AuthorDate: Mon Sep 30 14:43:38 2024 +0800 RATIS-2162. When closing leaderState, if the logAppender thread sends a snapshot, a deadlock may occur. (#1154) --- .../ratis/server/impl/ConfigurationManager.java | 10 +++---- .../apache/ratis/server/impl/FollowerState.java | 14 +++++++++- .../apache/ratis/server/impl/LeaderElection.java | 14 +++++++++- .../apache/ratis/server/impl/LeaderStateImpl.java | 7 ++--- .../apache/ratis/server/impl/RaftServerImpl.java | 32 ++++++++++++---------- .../org/apache/ratis/server/impl/RoleInfo.java | 26 +++++++++--------- .../ratis/server/leader/LogAppenderBase.java | 2 +- 7 files changed, 64 insertions(+), 41 deletions(-) diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ConfigurationManager.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ConfigurationManager.java index 10c59c8b1..6d3f68d5c 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ConfigurationManager.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ConfigurationManager.java @@ -41,11 +41,9 @@ public class ConfigurationManager { * The current raft configuration. If configurations is not empty, should be * the last entry of the map. Otherwise is initialConf. */ - @SuppressWarnings({"squid:S3077"}) // Suppress volatile for generic type - private volatile RaftConfigurationImpl currentConf; + private RaftConfigurationImpl currentConf; /** Cache the peer corresponding to {@link #id}. */ - @SuppressWarnings({"squid:S3077"}) // Suppress volatile for generic type - private volatile RaftPeer currentPeer; + private RaftPeer currentPeer; ConfigurationManager(RaftPeerId id, RaftConfigurationImpl initialConf) { this.id = id; @@ -78,11 +76,11 @@ public class ConfigurationManager { } } - RaftConfigurationImpl getCurrent() { + synchronized RaftConfigurationImpl getCurrent() { return currentConf; } - RaftPeer getCurrentPeer() { + synchronized RaftPeer getCurrentPeer() { return currentPeer; } diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java index e980daede..fa61e9088 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java @@ -26,6 +26,7 @@ import org.apache.ratis.util.Timestamp; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.ToIntFunction; @@ -62,6 +63,7 @@ class FollowerState extends Daemon { @SuppressWarnings({"squid:S3077"}) // Suppress volatile for generic type private volatile Timestamp lastRpcTime = creationTime; private volatile boolean isRunning = true; + private final CompletableFuture<Void> stopped = new CompletableFuture<>(); private final AtomicInteger outstandingOp = new AtomicInteger(); FollowerState(RaftServerImpl server, Object reason) { @@ -93,8 +95,10 @@ class FollowerState extends Daemon { return lastRpcTime.elapsedTime().compareTo(server.properties().minRpcTimeout()) < 0; } - void stopRunning() { + CompletableFuture<Void> stopRunning() { this.isRunning = false; + interrupt(); + return stopped; } boolean lostMajorityHeartbeatsRecently() { @@ -122,6 +126,14 @@ class FollowerState extends Daemon { @Override public void run() { + try { + runImpl(); + } finally { + stopped.complete(null); + } + } + + private void runImpl() { final TimeDuration sleepDeviationThreshold = server.getSleepDeviationThreshold(); while (shouldRun()) { final TimeDuration electionTimeout = server.getRandomElectionTimeout(); diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java index af25ae912..a5bfba7be 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java @@ -46,6 +46,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; @@ -183,6 +184,7 @@ class LeaderElection implements Runnable { private final String name; private final LifeCycle lifeCycle; private final Daemon daemon; + private final CompletableFuture<Void> stopped = new CompletableFuture<>(); private final RaftServerImpl server; private final boolean skipPreVote; @@ -223,8 +225,10 @@ class LeaderElection implements Runnable { } } - void shutdown() { + CompletableFuture<Void> shutdown() { lifeCycle.checkStateAndClose(); + stopped.complete(null); + return stopped; } @VisibleForTesting @@ -234,6 +238,14 @@ class LeaderElection implements Runnable { @Override public void run() { + try { + runImpl(); + } finally { + stopped.complete(null); + } + } + + private void runImpl() { if (!lifeCycle.compareAndTransition(STARTING, RUNNING)) { final LifeCycle.State state = lifeCycle.getCurrentState(); LOG.info("{}: skip running since this is already {}", this, state); diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java index 68121cef6..3d8bc2219 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java @@ -223,11 +223,8 @@ class LeaderStateImpl implements LeaderState { } CompletableFuture<Void> stopAll() { - final CompletableFuture<?>[] futures = new CompletableFuture<?>[senders.size()]; - for(int i = 0; i < futures.length; i++) { - futures[i] = senders.get(i).stopAsync(); - } - return CompletableFuture.allOf(futures); + return CompletableFuture.allOf(senders.stream(). + map(LogAppender::stopAsync).toArray(CompletableFuture[]::new)); } } diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java index 0473564b3..ae158ad75 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -132,6 +132,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -565,20 +566,23 @@ class RaftServerImpl implements RaftServer.Division, * @param force Force to start a new {@link FollowerState} even if this server is already a follower. * @return if the term/votedFor should be updated to the new term */ - private synchronized boolean changeToFollower( - long newTerm, - boolean force, - boolean allowListener, - Object reason) { + private boolean changeToFollower(long newTerm, boolean force, boolean allowListener, Object reason) { + final AtomicReference<Boolean> metadataUpdated = new AtomicReference<>(); + changeToFollowerAsync(newTerm, force, allowListener, reason, metadataUpdated).join(); + return metadataUpdated.get(); + } + + private synchronized CompletableFuture<Void> changeToFollowerAsync( + long newTerm, boolean force, boolean allowListener, Object reason, AtomicReference<Boolean> metadataUpdated) { final RaftPeerRole old = role.getCurrentRole(); if (old == RaftPeerRole.LISTENER && !allowListener) { throw new IllegalStateException("Unexpected role " + old); } - boolean metadataUpdated; + CompletableFuture<Void> future = CompletableFuture.completedFuture(null); if ((old != RaftPeerRole.FOLLOWER || force) && old != RaftPeerRole.LISTENER) { setRole(RaftPeerRole.FOLLOWER, reason); if (old == RaftPeerRole.LEADER) { - role.shutdownLeaderState(false) + future = role.shutdownLeaderState(false) .exceptionally(e -> { if (e != null) { if (!getInfo().isAlive()) { @@ -587,21 +591,21 @@ class RaftServerImpl implements RaftServer.Division, } } throw new CompletionException("Failed to shutdownLeaderState: " + this, e); - }) - .join(); + }); state.setLeader(null, reason); } else if (old == RaftPeerRole.CANDIDATE) { - role.shutdownLeaderElection(); + future = role.shutdownLeaderElection(); } else if (old == RaftPeerRole.FOLLOWER) { - role.shutdownFollowerState(); + future = role.shutdownFollowerState(); } - metadataUpdated = state.updateCurrentTerm(newTerm); + + metadataUpdated.set(state.updateCurrentTerm(newTerm)); role.startFollowerState(this, reason); setFirstElection(reason); } else { - metadataUpdated = state.updateCurrentTerm(newTerm); + metadataUpdated.set(state.updateCurrentTerm(newTerm)); } - return metadataUpdated; + return future; } synchronized void changeToFollowerAndPersistMetadata( diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RoleInfo.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RoleInfo.java index 5eb01a9d6..a5cd7da66 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RoleInfo.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RoleInfo.java @@ -47,7 +47,7 @@ class RoleInfo { public static final Logger LOG = LoggerFactory.getLogger(RoleInfo.class); private final RaftPeerId id; - private volatile RaftPeerRole role; + private final AtomicReference<RaftPeerRole> role = new AtomicReference<>(); /** Used when the peer is leader */ private final AtomicReference<LeaderStateImpl> leaderState = new AtomicReference<>(); /** Used when the peer is follower, to monitor election timeout */ @@ -64,7 +64,7 @@ class RoleInfo { } void transitionRole(RaftPeerRole newRole) { - this.role = newRole; + this.role.set(newRole); this.transitionTime.set(Timestamp.currentTime()); } @@ -73,7 +73,7 @@ class RoleInfo { } RaftPeerRole getCurrentRole() { - return role; + return role.get(); } boolean isLeaderReady() { @@ -113,13 +113,13 @@ class RoleInfo { updateAndGet(followerState, new FollowerState(server, reason)).start(); } - void shutdownFollowerState() { + CompletableFuture<Void> shutdownFollowerState() { final FollowerState follower = followerState.getAndSet(null); - if (follower != null) { - LOG.info("{}: shutdown {}", id, follower); - follower.stopRunning(); - follower.interrupt(); + if (follower == null) { + return CompletableFuture.completedFuture(null); } + LOG.info("{}: shutdown {}", id, follower); + return follower.stopRunning(); } void startLeaderElection(RaftServerImpl server, boolean force) { @@ -133,13 +133,13 @@ class RoleInfo { pauseLeaderElection.set(pause); } - void shutdownLeaderElection() { + CompletableFuture<Void> shutdownLeaderElection() { final LeaderElection election = leaderElection.getAndSet(null); - if (election != null) { - LOG.info("{}: shutdown {}", id, election); - election.shutdown(); - // no need to interrupt the election thread + if (election == null) { + return CompletableFuture.completedFuture(null); } + LOG.info("{}: shutdown {}", id, election); + return election.shutdown(); } private <T> T updateAndGet(AtomicReference<T> ref, T current) { diff --git a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java index 958cc6fa8..5a27cda51 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java @@ -124,7 +124,7 @@ public abstract class LogAppenderBase implements LogAppender { @Override public boolean isRunning() { - return daemon.isWorking(); + return daemon.isWorking() && server.getInfo().isLeader(); } @Override
