This is an automated email from the ASF dual-hosted git repository. tanxinyu pushed a commit to branch release-3.1.1 in repository https://gitbox.apache.org/repos/asf/ratis.git
commit f23f9b7c10b3d96efc53a478e811d5c94f9d8a20 Author: Chung En Lee <[email protected]> AuthorDate: Fri Sep 13 00:26:57 2024 +0800 RATIS-2152. GrpcLogAppender stucks while sending an installSnapshot notification request (#1146) --- .../apache/ratis/grpc/server/GrpcLogAppender.java | 34 +++++++++------------- 1 file changed, 13 insertions(+), 21 deletions(-) diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java index 0229f8e84..192bc7564 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java @@ -54,8 +54,8 @@ import java.io.InterruptedIOException; import java.util.*; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; /** @@ -567,7 +567,7 @@ public class GrpcLogAppender extends LogAppenderBase { private class InstallSnapshotResponseHandler implements StreamObserver<InstallSnapshotReplyProto> { private final String name = getFollower().getName() + "-" + JavaUtils.getClassSimpleName(getClass()); private final Queue<Integer> pending; - private final AtomicBoolean done = new AtomicBoolean(false); + private final CompletableFuture<Void> done = new CompletableFuture<>(); private final boolean isNotificationOnly; InstallSnapshotResponseHandler() { @@ -628,12 +628,18 @@ public class GrpcLogAppender extends LogAppenderBase { getServer().getStateMachine().event().notifySnapshotInstalled(result, snapshotIndex, getFollower().getPeer()); } - boolean isDone() { - return done.get(); + void waitForResponse() { + try { + done.get(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } catch (ExecutionException e) { + throw new IllegalStateException("Failed to complete " + name, e); + } } void close() { - done.set(true); + done.complete(null); notifyLogAppender(); } @@ -767,14 +773,7 @@ public class GrpcLogAppender extends LogAppenderBase { } return; } - - while (isRunning() && !responseHandler.isDone()) { - try { - getEventAwaitForSignal().await(getWaitTimeMs(), TimeUnit.MILLISECONDS); - } catch (InterruptedException ignored) { - Thread.currentThread().interrupt(); - } - } + responseHandler.waitForResponse(); if (responseHandler.hasAllResponse()) { getFollower().setSnapshotIndex(snapshot.getTermIndex().getIndex()); @@ -812,14 +811,7 @@ public class GrpcLogAppender extends LogAppenderBase { } return; } - - while (isRunning() && !responseHandler.isDone()) { - try { - getEventAwaitForSignal().await(getWaitTimeMs(), TimeUnit.MILLISECONDS); - } catch (InterruptedException ignored) { - Thread.currentThread().interrupt(); - } - } + responseHandler.waitForResponse(); } /**
