This is an automated email from the ASF dual-hosted git repository.
williamsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 1b05bfcc7 RATIS-1928. Join the LogAppenders when closing the server.
(#959)
1b05bfcc7 is described below
commit 1b05bfcc76e4f3007d389dc52ee0305b9fff8e41
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Mon Nov 6 17:22:34 2023 -0800
RATIS-1928. Join the LogAppenders when closing the server. (#959)
---
.../apache/ratis/grpc/server/GrpcLogAppender.java | 5 ++--
.../apache/ratis/server/leader/LogAppender.java | 31 ++++++++++++++++++++--
.../apache/ratis/server/impl/LeaderStateImpl.java | 17 +++++++++---
.../apache/ratis/server/impl/RaftServerImpl.java | 2 +-
.../org/apache/ratis/server/impl/RoleInfo.java | 7 ++---
.../ratis/server/leader/LogAppenderBase.java | 6 +++--
.../ratis/server/leader/LogAppenderDaemon.java | 11 +++++---
7 files changed, 62 insertions(+), 17 deletions(-)
diff --git
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
index 15f15463f..b9764e987 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
@@ -48,6 +48,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.*;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -219,9 +220,9 @@ public class GrpcLogAppender extends LogAppenderBase {
}
@Override
- public void stop() {
+ public CompletableFuture<LifeCycle.State> stopAsync() {
grpcServerMetrics.unregister();
- super.stop();
+ return super.stopAsync();
}
@Override
diff --git
a/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LogAppender.java
b/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LogAppender.java
index 342c48528..020a352c0 100644
---
a/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LogAppender.java
+++
b/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LogAppender.java
@@ -33,6 +33,8 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Comparator;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
/**
* A {@link LogAppender} is for the leader to send appendEntries to a
particular follower.
@@ -68,8 +70,33 @@ public interface LogAppender {
/** Is this {@link LogAppender} running? */
boolean isRunning();
- /** Stop this {@link LogAppender}. */
- void stop();
+ /**
+ * Stop this {@link LogAppender} asynchronously.
+ * @deprecated override {@link #stopAsync()} instead.
+ */
+ @Deprecated
+ default void stop() {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * Stop this {@link LogAppender} asynchronously.
+ *
+ * @return a future of the final state.
+ */
+ default CompletableFuture<?> stopAsync() {
+ stop();
+ return CompletableFuture.supplyAsync(() -> {
+ for (; isRunning(); ) {
+ try {
+ Thread.sleep(10);
+ } catch (InterruptedException e) {
+ throw new CompletionException("stopAsync interrupted", e);
+ }
+ }
+ return null;
+ });
+ }
/** @return the leader state. */
LeaderState getLeaderState();
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
index 44468fb53..e1b12d486 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
@@ -221,6 +221,14 @@ class LeaderStateImpl implements LeaderState {
boolean removeAll(Collection<LogAppender> c) {
return senders.removeAll(c);
}
+
+ CompletableFuture<Void> stopAll() {
+ final CompletableFuture<?>[] futures = new
CompletableFuture<?>[senders.size()];
+ for(int i = 0; i < futures.length; i++) {
+ futures[i] = senders.get(i).stopAsync();
+ }
+ return CompletableFuture.allOf(futures);
+ }
}
/** For caching {@link FollowerInfo}s. This class is immutable. */
@@ -419,13 +427,13 @@ class LeaderStateImpl implements LeaderState {
}
}
- void stop() {
+ CompletableFuture<Void> stop() {
if (!isStopped.compareAndSet(false, true)) {
LOG.info("{} is already stopped", this);
- return;
+ return CompletableFuture.completedFuture(null);
}
// do not interrupt event processor since it may be in the middle of
logSync
- senders.forEach(LogAppender::stop);
+ final CompletableFuture<Void> f = senders.stopAll();
final NotLeaderException nle = server.generateNotLeaderException();
final Collection<CommitInfoProto> commitInfos = server.getCommitInfos();
try {
@@ -443,6 +451,7 @@ class LeaderStateImpl implements LeaderState {
logAppenderMetrics.unregister();
raftServerMetrics.unregister();
pendingRequests.close();
+ return f;
}
void notifySenders() {
@@ -631,7 +640,7 @@ class LeaderStateImpl implements LeaderState {
}
private void stopAndRemoveSenders(Collection<LogAppender> toStop) {
- toStop.forEach(LogAppender::stop);
+ toStop.forEach(LogAppender::stopAsync);
senders.removeAll(toStop);
}
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index b822870a4..2cbd3146a 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -528,7 +528,7 @@ class RaftServerImpl implements RaftServer.Division,
LOG.warn("{}: Failed to shutdown LeaderElection", getMemberId(), e);
}
try{
- role.shutdownLeaderState(true);
+ role.shutdownLeaderState(true).join();
} catch (Exception e) {
LOG.warn("{}: Failed to shutdown LeaderState monitor", getMemberId(),
e);
}
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RoleInfo.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RoleInfo.java
index 603ba1326..9f53b6934 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RoleInfo.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RoleInfo.java
@@ -27,6 +27,7 @@ import org.slf4j.LoggerFactory;
import java.util.Objects;
import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
@@ -82,17 +83,17 @@ class RoleInfo {
return updateAndGet(leaderState, new LeaderStateImpl(server));
}
- void shutdownLeaderState(boolean allowNull) {
+ CompletableFuture<Void> shutdownLeaderState(boolean allowNull) {
final LeaderStateImpl leader = leaderState.getAndSet(null);
if (leader == null) {
if (!allowNull) {
throw new NullPointerException("leaderState == null");
}
+ return CompletableFuture.completedFuture(null);
} else {
LOG.info("{}: shutdown {}", id, leader);
- leader.stop();
+ return leader.stop();
}
- // TODO: make sure that StateMachineUpdater has applied all transactions
that have context
}
Optional<FollowerState> getFollowerState() {
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java
b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java
index b218261da..958cc6fa8 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java
@@ -31,6 +31,7 @@ import org.apache.ratis.statemachine.SnapshotInfo;
import org.apache.ratis.util.AwaitForSignal;
import org.apache.ratis.util.DataQueue;
import org.apache.ratis.util.JavaUtils;
+import org.apache.ratis.util.LifeCycle;
import org.apache.ratis.util.Preconditions;
import org.apache.ratis.util.SizeInBytes;
import org.apache.ratis.util.TimeDuration;
@@ -38,6 +39,7 @@ import org.apache.ratis.util.TimeDuration;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.LongUnaryOperator;
@@ -126,8 +128,8 @@ public abstract class LogAppenderBase implements
LogAppender {
}
@Override
- public void stop() {
- daemon.tryToClose();
+ public CompletableFuture<LifeCycle.State> stopAsync() {
+ return daemon.tryToClose();
}
void restart() {
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDaemon.java
b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDaemon.java
index 6ca237ecf..847617426 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDaemon.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDaemon.java
@@ -22,6 +22,7 @@ import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.LifeCycle;
import java.io.InterruptedIOException;
+import java.util.concurrent.CompletableFuture;
import java.util.function.UnaryOperator;
import org.apache.ratis.util.LifeCycle.State;
@@ -43,6 +44,7 @@ class LogAppenderDaemon {
private final Daemon daemon;
private final LogAppenderBase logAppender;
+ private final CompletableFuture<State> closeFuture = new
CompletableFuture<>();
LogAppenderDaemon(LogAppenderBase logAppender) {
this.logAppender = logAppender;
@@ -84,12 +86,14 @@ class LogAppenderDaemon {
} catch (InterruptedIOException e) {
LOG.info(this + " I/O was interrupted: " + e);
} catch (Throwable e) {
- LOG.error(this + " failed", e);
+ LOG.warn(this + " failed", e);
lifeCycle.transitionIfValid(EXCEPTION);
} finally {
- if (lifeCycle.transitionAndGet(TRANSITION_FINALLY) == EXCEPTION) {
+ final State finalState = lifeCycle.transitionAndGet(TRANSITION_FINALLY);
+ if (finalState == EXCEPTION) {
logAppender.restart();
}
+ closeFuture.complete(finalState);
}
}
@@ -103,10 +107,11 @@ class LogAppenderDaemon {
}
};
- public void tryToClose() {
+ public CompletableFuture<State> tryToClose() {
if (lifeCycle.transition(TRY_TO_CLOSE) == CLOSING) {
daemon.interrupt();
}
+ return closeFuture;
}
static final UnaryOperator<State> TRY_TO_CLOSE = current -> {