This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch release-3.1.1
in repository https://gitbox.apache.org/repos/asf/ratis.git

commit f23f9b7c10b3d96efc53a478e811d5c94f9d8a20
Author: Chung En Lee <[email protected]>
AuthorDate: Fri Sep 13 00:26:57 2024 +0800

    RATIS-2152. GrpcLogAppender stucks while sending an installSnapshot 
notification request (#1146)
---
 .../apache/ratis/grpc/server/GrpcLogAppender.java  | 34 +++++++++-------------
 1 file changed, 13 insertions(+), 21 deletions(-)

diff --git 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
index 0229f8e84..192bc7564 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
@@ -54,8 +54,8 @@ import java.io.InterruptedIOException;
 import java.util.*;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
 /**
@@ -567,7 +567,7 @@ public class GrpcLogAppender extends LogAppenderBase {
   private class InstallSnapshotResponseHandler implements 
StreamObserver<InstallSnapshotReplyProto> {
     private final String name = getFollower().getName() + "-" + 
JavaUtils.getClassSimpleName(getClass());
     private final Queue<Integer> pending;
-    private final AtomicBoolean done = new AtomicBoolean(false);
+    private final CompletableFuture<Void> done = new CompletableFuture<>();
     private final boolean isNotificationOnly;
 
     InstallSnapshotResponseHandler() {
@@ -628,12 +628,18 @@ public class GrpcLogAppender extends LogAppenderBase {
       getServer().getStateMachine().event().notifySnapshotInstalled(result, 
snapshotIndex, getFollower().getPeer());
     }
 
-    boolean isDone() {
-      return done.get();
+    void waitForResponse() {
+      try {
+        done.get();
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+      } catch (ExecutionException e) {
+        throw new IllegalStateException("Failed to complete " + name, e);
+      }
     }
 
     void close() {
-      done.set(true);
+      done.complete(null);
       notifyLogAppender();
     }
 
@@ -767,14 +773,7 @@ public class GrpcLogAppender extends LogAppenderBase {
       }
       return;
     }
-
-    while (isRunning() && !responseHandler.isDone()) {
-      try {
-        getEventAwaitForSignal().await(getWaitTimeMs(), TimeUnit.MILLISECONDS);
-      } catch (InterruptedException ignored) {
-        Thread.currentThread().interrupt();
-      }
-    }
+    responseHandler.waitForResponse();
 
     if (responseHandler.hasAllResponse()) {
       getFollower().setSnapshotIndex(snapshot.getTermIndex().getIndex());
@@ -812,14 +811,7 @@ public class GrpcLogAppender extends LogAppenderBase {
       }
       return;
     }
-
-    while (isRunning() && !responseHandler.isDone()) {
-      try {
-        getEventAwaitForSignal().await(getWaitTimeMs(), TimeUnit.MILLISECONDS);
-      } catch (InterruptedException ignored) {
-        Thread.currentThread().interrupt();
-      }
-    }
+    responseHandler.waitForResponse();
   }
 
   /**

Reply via email to