This is an automated email from the ASF dual-hosted git repository. szetszwo pushed a commit to branch branch-2_tmp in repository https://gitbox.apache.org/repos/asf/ratis.git
commit e01c8e579e0cac8b918de7348afc409bf1e218c8 Author: Tsz-Wo Nicholas Sze <[email protected]> AuthorDate: Sat Dec 24 16:41:50 2022 +0800 RATIS-1761. If LeaderStateImpl is not running, it should not restart a LogAppender. (#799) (cherry picked from commit 77a9949f98d6c80a8c1466887763d44fb64c9ccc) --- .../java/org/apache/ratis/util/OpenCloseState.java | 4 +-- .../apache/ratis/server/impl/LeaderStateImpl.java | 34 +++++++++++++++++----- .../ratis/server/leader/LogAppenderBase.java | 8 +++++ .../ratis/server/leader/LogAppenderDaemon.java | 6 ++-- 4 files changed, 39 insertions(+), 13 deletions(-) diff --git a/ratis-common/src/main/java/org/apache/ratis/util/OpenCloseState.java b/ratis-common/src/main/java/org/apache/ratis/util/OpenCloseState.java index 7847c21cd..b79b49b27 100644 --- a/ratis-common/src/main/java/org/apache/ratis/util/OpenCloseState.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/OpenCloseState.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -60,7 +60,7 @@ public class OpenCloseState { final Throwable t = state.get(); if (!(t instanceof OpenTrace)) { final String s = name + " is expected to be opened but it is " + toString(t); - throw new IllegalArgumentException(s, t); + throw new IllegalStateException(s, t); } } diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java index b55389343..43349354c 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java @@ -74,6 +74,7 @@ import java.util.concurrent.CompletionException; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; import java.util.function.LongSupplier; import java.util.function.Predicate; @@ -160,7 +161,7 @@ class LeaderStateImpl implements LeaderState { } catch (InterruptedException ie) { Thread.currentThread().interrupt(); String s = this + ": poll() is interrupted"; - if (!running) { + if (isStopped.get()) { LOG.info(s + " gracefully"); return null; } else { @@ -314,7 +315,7 @@ class LeaderStateImpl implements LeaderState { private final PendingRequests pendingRequests; private final WatchRequests watchRequests; private final MessageStreamRequests messageStreamRequests; - private volatile boolean running = true; + private final AtomicBoolean isStopped = new AtomicBoolean(); private final int stagingCatchupGap; private final long placeHolderIndex; @@ -386,7 +387,10 @@ class LeaderStateImpl implements LeaderState { } void stop() { - this.running = false; + if (!isStopped.compareAndSet(false, true)) { + LOG.info("{} is already stopped", this); + return; + } // do not interrupt event processor since it may be in the middle of logSync senders.forEach(LogAppender::stop); final NotLeaderException nle = server.generateNotLeaderException(); @@ -431,7 +435,8 @@ class LeaderStateImpl implements LeaderState { */ PendingRequest startSetConfiguration(SetConfigurationRequest request, List<RaftPeer> peersInNewConf) { LOG.info("{}: startSetConfiguration {}", this, request); - Preconditions.assertTrue(running && !inStagingState()); + Preconditions.assertTrue(isRunning(), () -> this + " is not running."); + Preconditions.assertTrue(!inStagingState(), () -> this + " is already in staging state " + stagingState); final List<RaftPeer> listenersInNewConf = request.getArguments().getPeersInNewConf(RaftPeerRole.LISTENER); final Collection<RaftPeer> peersToBootStrap = server.getRaftConf().filterNotContainedInConf(peersInNewConf); @@ -589,8 +594,21 @@ class LeaderStateImpl implements LeaderState { senders.removeAll(toStop); } + boolean isRunning() { + if (isStopped.get()) { + return false; + } + final LeaderStateImpl current = server.getRole().getLeaderState().orElse(null); + return this == current; + } + @Override public void restart(LogAppender sender) { + if (!isRunning()) { + LOG.warn("Failed to restart {}: {} is not running", sender, this); + return; + } + final FollowerInfo info = sender.getFollower(); LOG.info("{}: Restarting {} for {}", this, JavaUtils.getClassSimpleName(sender.getClass()), info.getName()); sender.stop(); @@ -629,7 +647,7 @@ class LeaderStateImpl implements LeaderState { LOG.warn(s, e); // the failure should happen while changing the state to follower // thus the in-memory state should have been updated - if (running) { + if (!isStopped.get()) { throw new IllegalStateException(s + " and running == true", e); } } @@ -665,7 +683,7 @@ class LeaderStateImpl implements LeaderState { private void prepare() { synchronized (server) { - if (running) { + if (isRunning()) { final ServerState state = server.getState(); if (state.getRaftConf().isTransitional() && state.isConfCommitted()) { // the configuration is in transitional state, and has been committed @@ -690,10 +708,10 @@ class LeaderStateImpl implements LeaderState { // apply an empty message; check if necessary to replicate (new) conf prepare(); - while (running) { + while (isRunning()) { final StateUpdateEvent event = eventQueue.poll(); synchronized(server) { - if (running) { + if (isRunning()) { if (event != null) { event.execute(); } else if (inStagingState()) { diff --git a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java index fda78fbcf..bc8a31181 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java @@ -125,6 +125,14 @@ public abstract class LogAppenderBase implements LogAppender { daemon.tryToClose(); } + void restart() { + if (!server.getInfo().isAlive()) { + LOG.warn("Failed to restart {}: server {} is not alive", this, server.getMemberId()); + return; + } + getLeaderState().restart(this); + } + @Override public final FollowerInfo getFollower() { return follower; diff --git a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDaemon.java b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDaemon.java index d985a6ae8..6ca237ecf 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDaemon.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDaemon.java @@ -42,9 +42,9 @@ class LogAppenderDaemon { private final LifeCycle lifeCycle; private final Daemon daemon; - private final LogAppender logAppender; + private final LogAppenderBase logAppender; - LogAppenderDaemon(LogAppender logAppender) { + LogAppenderDaemon(LogAppenderBase logAppender) { this.logAppender = logAppender; this.name = logAppender + "-" + JavaUtils.getClassSimpleName(getClass()); this.lifeCycle = new LifeCycle(name); @@ -88,7 +88,7 @@ class LogAppenderDaemon { lifeCycle.transitionIfValid(EXCEPTION); } finally { if (lifeCycle.transitionAndGet(TRANSITION_FINALLY) == EXCEPTION) { - logAppender.getLeaderState().restart(logAppender); + logAppender.restart(); } } }
