This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 9ac832bc1 RATIS-2152. GrpcLogAppender stucks while sending an
installSnapshot notification request (#1146)
9ac832bc1 is described below
commit 9ac832bc1dccbb437addcb4c36feae360e7b4538
Author: Chung En Lee <[email protected]>
AuthorDate: Fri Sep 13 00:26:57 2024 +0800
RATIS-2152. GrpcLogAppender stucks while sending an installSnapshot
notification request (#1146)
---
.../apache/ratis/grpc/server/GrpcLogAppender.java | 34 +++++++++-------------
1 file changed, 13 insertions(+), 21 deletions(-)
diff --git
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
index 1da7bb3b4..7edb3ae0b 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
@@ -54,8 +54,8 @@ import java.io.InterruptedIOException;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
/**
@@ -576,7 +576,7 @@ public class GrpcLogAppender extends LogAppenderBase {
private class InstallSnapshotResponseHandler implements
StreamObserver<InstallSnapshotReplyProto> {
private final String name = getFollower().getName() + "-" +
JavaUtils.getClassSimpleName(getClass());
private final Queue<Integer> pending;
- private final AtomicBoolean done = new AtomicBoolean(false);
+ private final CompletableFuture<Void> done = new CompletableFuture<>();
private final boolean isNotificationOnly;
InstallSnapshotResponseHandler() {
@@ -637,12 +637,18 @@ public class GrpcLogAppender extends LogAppenderBase {
getServer().getStateMachine().event().notifySnapshotInstalled(result,
snapshotIndex, getFollower().getPeer());
}
- boolean isDone() {
- return done.get();
+ void waitForResponse() {
+ try {
+ done.get();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } catch (ExecutionException e) {
+ throw new IllegalStateException("Failed to complete " + name, e);
+ }
}
void close() {
- done.set(true);
+ done.complete(null);
notifyLogAppender();
}
@@ -776,14 +782,7 @@ public class GrpcLogAppender extends LogAppenderBase {
}
return;
}
-
- while (isRunning() && !responseHandler.isDone()) {
- try {
- getEventAwaitForSignal().await(getWaitTimeMs(), TimeUnit.MILLISECONDS);
- } catch (InterruptedException ignored) {
- Thread.currentThread().interrupt();
- }
- }
+ responseHandler.waitForResponse();
if (responseHandler.hasAllResponse()) {
getFollower().setSnapshotIndex(snapshot.getTermIndex().getIndex());
@@ -821,14 +820,7 @@ public class GrpcLogAppender extends LogAppenderBase {
}
return;
}
-
- while (isRunning() && !responseHandler.isDone()) {
- try {
- getEventAwaitForSignal().await(getWaitTimeMs(), TimeUnit.MILLISECONDS);
- } catch (InterruptedException ignored) {
- Thread.currentThread().interrupt();
- }
- }
+ responseHandler.waitForResponse();
}
/**