This is an automated email from the ASF dual-hosted git repository.

williamsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 1b05bfcc7 RATIS-1928. Join the LogAppenders when closing the server. 
(#959)
1b05bfcc7 is described below

commit 1b05bfcc76e4f3007d389dc52ee0305b9fff8e41
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Mon Nov 6 17:22:34 2023 -0800

    RATIS-1928. Join the LogAppenders when closing the server. (#959)
---
 .../apache/ratis/grpc/server/GrpcLogAppender.java  |  5 ++--
 .../apache/ratis/server/leader/LogAppender.java    | 31 ++++++++++++++++++++--
 .../apache/ratis/server/impl/LeaderStateImpl.java  | 17 +++++++++---
 .../apache/ratis/server/impl/RaftServerImpl.java   |  2 +-
 .../org/apache/ratis/server/impl/RoleInfo.java     |  7 ++---
 .../ratis/server/leader/LogAppenderBase.java       |  6 +++--
 .../ratis/server/leader/LogAppenderDaemon.java     | 11 +++++---
 7 files changed, 62 insertions(+), 17 deletions(-)

diff --git 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
index 15f15463f..b9764e987 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
@@ -48,6 +48,7 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.util.*;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -219,9 +220,9 @@ public class GrpcLogAppender extends LogAppenderBase {
   }
 
   @Override
-  public void stop() {
+  public CompletableFuture<LifeCycle.State> stopAsync() {
     grpcServerMetrics.unregister();
-    super.stop();
+    return super.stopAsync();
   }
 
   @Override
diff --git 
a/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LogAppender.java
 
b/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LogAppender.java
index 342c48528..020a352c0 100644
--- 
a/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LogAppender.java
+++ 
b/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LogAppender.java
@@ -33,6 +33,8 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.Comparator;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 
 /**
  * A {@link LogAppender} is for the leader to send appendEntries to a 
particular follower.
@@ -68,8 +70,33 @@ public interface LogAppender {
   /** Is this {@link LogAppender} running? */
   boolean isRunning();
 
-  /** Stop this {@link LogAppender}. */
-  void stop();
+  /**
+   * Stop this {@link LogAppender} asynchronously.
+   * @deprecated override {@link #stopAsync()} instead.
+   */
+  @Deprecated
+  default void stop() {
+    throw new UnsupportedOperationException();
+  }
+
+  /**
+   * Stop this {@link LogAppender} asynchronously.
+   *
+   * @return a future of the final state.
+   */
+  default CompletableFuture<?> stopAsync() {
+    stop();
+    return CompletableFuture.supplyAsync(() -> {
+      for (; isRunning(); ) {
+        try {
+          Thread.sleep(10);
+        } catch (InterruptedException e) {
+          throw new CompletionException("stopAsync interrupted", e);
+        }
+      }
+      return null;
+    });
+  }
 
   /** @return the leader state. */
   LeaderState getLeaderState();
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
index 44468fb53..e1b12d486 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
@@ -221,6 +221,14 @@ class LeaderStateImpl implements LeaderState {
     boolean removeAll(Collection<LogAppender> c) {
       return senders.removeAll(c);
     }
+
+    CompletableFuture<Void> stopAll() {
+      final CompletableFuture<?>[] futures = new 
CompletableFuture<?>[senders.size()];
+      for(int i = 0; i < futures.length; i++) {
+        futures[i] = senders.get(i).stopAsync();
+      }
+      return CompletableFuture.allOf(futures);
+    }
   }
 
   /** For caching {@link FollowerInfo}s.  This class is immutable. */
@@ -419,13 +427,13 @@ class LeaderStateImpl implements LeaderState {
     }
   }
 
-  void stop() {
+  CompletableFuture<Void> stop() {
     if (!isStopped.compareAndSet(false, true)) {
       LOG.info("{} is already stopped", this);
-      return;
+      return CompletableFuture.completedFuture(null);
     }
     // do not interrupt event processor since it may be in the middle of 
logSync
-    senders.forEach(LogAppender::stop);
+    final CompletableFuture<Void> f = senders.stopAll();
     final NotLeaderException nle = server.generateNotLeaderException();
     final Collection<CommitInfoProto> commitInfos = server.getCommitInfos();
     try {
@@ -443,6 +451,7 @@ class LeaderStateImpl implements LeaderState {
     logAppenderMetrics.unregister();
     raftServerMetrics.unregister();
     pendingRequests.close();
+    return f;
   }
 
   void notifySenders() {
@@ -631,7 +640,7 @@ class LeaderStateImpl implements LeaderState {
   }
 
   private void stopAndRemoveSenders(Collection<LogAppender> toStop) {
-    toStop.forEach(LogAppender::stop);
+    toStop.forEach(LogAppender::stopAsync);
     senders.removeAll(toStop);
   }
 
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index b822870a4..2cbd3146a 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -528,7 +528,7 @@ class RaftServerImpl implements RaftServer.Division,
         LOG.warn("{}: Failed to shutdown LeaderElection", getMemberId(), e);
       }
       try{
-        role.shutdownLeaderState(true);
+        role.shutdownLeaderState(true).join();
       } catch (Exception e) {
         LOG.warn("{}: Failed to shutdown LeaderState monitor", getMemberId(), 
e);
       }
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RoleInfo.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RoleInfo.java
index 603ba1326..9f53b6934 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RoleInfo.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RoleInfo.java
@@ -27,6 +27,7 @@ import org.slf4j.LoggerFactory;
 
 import java.util.Objects;
 import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -82,17 +83,17 @@ class RoleInfo {
     return updateAndGet(leaderState, new LeaderStateImpl(server));
   }
 
-  void shutdownLeaderState(boolean allowNull) {
+  CompletableFuture<Void> shutdownLeaderState(boolean allowNull) {
     final LeaderStateImpl leader = leaderState.getAndSet(null);
     if (leader == null) {
       if (!allowNull) {
         throw new NullPointerException("leaderState == null");
       }
+      return CompletableFuture.completedFuture(null);
     } else {
       LOG.info("{}: shutdown {}", id, leader);
-      leader.stop();
+      return leader.stop();
     }
-    // TODO: make sure that StateMachineUpdater has applied all transactions 
that have context
   }
 
   Optional<FollowerState> getFollowerState() {
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java
index b218261da..958cc6fa8 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java
@@ -31,6 +31,7 @@ import org.apache.ratis.statemachine.SnapshotInfo;
 import org.apache.ratis.util.AwaitForSignal;
 import org.apache.ratis.util.DataQueue;
 import org.apache.ratis.util.JavaUtils;
+import org.apache.ratis.util.LifeCycle;
 import org.apache.ratis.util.Preconditions;
 import org.apache.ratis.util.SizeInBytes;
 import org.apache.ratis.util.TimeDuration;
@@ -38,6 +39,7 @@ import org.apache.ratis.util.TimeDuration;
 import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.LongUnaryOperator;
@@ -126,8 +128,8 @@ public abstract class LogAppenderBase implements 
LogAppender {
   }
 
   @Override
-  public void stop() {
-    daemon.tryToClose();
+  public CompletableFuture<LifeCycle.State> stopAsync() {
+    return daemon.tryToClose();
   }
 
   void restart() {
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDaemon.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDaemon.java
index 6ca237ecf..847617426 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDaemon.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDaemon.java
@@ -22,6 +22,7 @@ import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.LifeCycle;
 
 import java.io.InterruptedIOException;
+import java.util.concurrent.CompletableFuture;
 import java.util.function.UnaryOperator;
 
 import org.apache.ratis.util.LifeCycle.State;
@@ -43,6 +44,7 @@ class LogAppenderDaemon {
   private final Daemon daemon;
 
   private final LogAppenderBase logAppender;
+  private final CompletableFuture<State> closeFuture = new 
CompletableFuture<>();
 
   LogAppenderDaemon(LogAppenderBase logAppender) {
     this.logAppender = logAppender;
@@ -84,12 +86,14 @@ class LogAppenderDaemon {
     } catch (InterruptedIOException e) {
       LOG.info(this + " I/O was interrupted: " + e);
     } catch (Throwable e) {
-      LOG.error(this + " failed", e);
+      LOG.warn(this + " failed", e);
       lifeCycle.transitionIfValid(EXCEPTION);
     } finally {
-      if (lifeCycle.transitionAndGet(TRANSITION_FINALLY) == EXCEPTION) {
+      final State finalState = lifeCycle.transitionAndGet(TRANSITION_FINALLY);
+      if (finalState == EXCEPTION) {
         logAppender.restart();
       }
+      closeFuture.complete(finalState);
     }
   }
 
@@ -103,10 +107,11 @@ class LogAppenderDaemon {
     }
   };
 
-  public void tryToClose() {
+  public CompletableFuture<State> tryToClose() {
     if (lifeCycle.transition(TRY_TO_CLOSE) == CLOSING) {
       daemon.interrupt();
     }
+    return closeFuture;
   }
 
   static final UnaryOperator<State> TRY_TO_CLOSE = current -> {

Reply via email to