szetszwo commented on PR #1146:
URL: https://github.com/apache/ratis/pull/1146#issuecomment-2344762073

   >  I think the while loop in the new diff might also become infinite ...
   
   Good catch!
   
   We actually should use `CompletableFuture` in this case; see below:
   ```java
   diff --git 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
   index 1da7bb3b47..355adc2f8b 100644
   --- 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
   +++ 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
   @@ -54,8 +54,9 @@ import java.io.InterruptedIOException;
    import java.util.*;
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ConcurrentHashMap;
   +import java.util.concurrent.ExecutionException;
    import java.util.concurrent.TimeUnit;
   -import java.util.concurrent.atomic.AtomicBoolean;
   +import java.util.concurrent.TimeoutException;
    import java.util.concurrent.atomic.AtomicLong;
    
    /**
   @@ -576,7 +577,7 @@ public class GrpcLogAppender extends LogAppenderBase {
      private class InstallSnapshotResponseHandler implements 
StreamObserver<InstallSnapshotReplyProto> {
        private final String name = getFollower().getName() + "-" + 
JavaUtils.getClassSimpleName(getClass());
        private final Queue<Integer> pending;
   -    private final AtomicBoolean done = new AtomicBoolean(false);
   +    private final CompletableFuture<Void> done = new CompletableFuture<>();
        private final boolean isNotificationOnly;
    
        InstallSnapshotResponseHandler() {
   @@ -637,12 +638,19 @@ public class GrpcLogAppender extends LogAppenderBase {
          getServer().getStateMachine().event().notifySnapshotInstalled(result, 
snapshotIndex, getFollower().getPeer());
        }
    
   -    boolean isDone() {
   -      return done.get();
   +    boolean isRunning() {
   +      try {
   +        done.get(100, TimeUnit.MILLISECONDS);
   +        return false;
   +      } catch (InterruptedException | TimeoutException e) {
   +        return true;
   +      } catch (ExecutionException e) {
   +        throw new IllegalStateException("Failed to complete " + name, e);
   +      }
        }
    
        void close() {
   -      done.set(true);
   +      done.complete(null);
          notifyLogAppender();
        }
    
   @@ -718,7 +726,7 @@ public class GrpcLogAppender extends LogAppenderBase {
    
        @Override
        public void onError(Throwable t) {
   -      if (!isRunning()) {
   +      if (!GrpcLogAppender.this.isRunning()) {
            LOG.info("{} is stopped", GrpcLogAppender.this);
            return;
          }
   @@ -777,13 +785,8 @@ public class GrpcLogAppender extends LogAppenderBase {
          return;
        }
    
   -    while (isRunning() && !responseHandler.isDone()) {
   -      try {
   -        getEventAwaitForSignal().await(getWaitTimeMs(), 
TimeUnit.MILLISECONDS);
   -      } catch (InterruptedException ignored) {
   -        Thread.currentThread().interrupt();
   -      }
   -    }
   +    // wait for responseHandler to complete
   +    while (isRunning() && responseHandler.isRunning());
    
        if (responseHandler.hasAllResponse()) {
          getFollower().setSnapshotIndex(snapshot.getTermIndex().getIndex());
   @@ -822,13 +825,8 @@ public class GrpcLogAppender extends LogAppenderBase {
          return;
        }
    
   -    while (isRunning() && !responseHandler.isDone()) {
   -      try {
   -        getEventAwaitForSignal().await(getWaitTimeMs(), 
TimeUnit.MILLISECONDS);
   -      } catch (InterruptedException ignored) {
   -        Thread.currentThread().interrupt();
   -      }
   -    }
   +    // wait for responseHandler to complete
   +    while (isRunning() && responseHandler.isRunning());
      }
    
      /**
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to