szetszwo commented on code in PR #561:
URL: https://github.com/apache/ratis/pull/561#discussion_r922460393


##########
ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java:
##########
@@ -81,15 +102,24 @@ public GrpcServerProtocolClient(RaftPeer target, int 
flowControlWindow,
     } else {
       channelBuilder.negotiationType(NegotiationType.PLAINTEXT);
     }
-    channel = channelBuilder.flowControlWindow(flowControlWindow).build();
-    blockingStub = RaftServerProtocolServiceGrpc.newBlockingStub(channel);
-    asyncStub = RaftServerProtocolServiceGrpc.newStub(channel);
-    this.requestTimeoutDuration = requestTimeoutDuration;
+    return channelBuilder.flowControlWindow(flowControlWindow).build();
   }
 
   @Override
   public void close() {
-    GrpcUtil.shutdownManagedChannel(channel);
+    LOG.info("{} Close channels", raftPeerId);
+    CompletableFuture<Void> future1;
+    if (useSeparateHBChannel) {
+      future1 = GrpcUtil.asyncShutdownManagedChannel(hbChannel);
+    } else {
+      future1 = new CompletableFuture();

Review Comment:
   It should be `CompletableFuture.completedFuture(null)`.  Otherwise, this 
future will never be completed.



##########
ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java:
##########
@@ -344,6 +344,10 @@ public TimeDuration getTimeoutMax() {
     return RaftServerConfigKeys.Rpc.timeoutMax(properties);
   }
 
+  public TimeDuration getRequestTimeout() {
+    return RaftServerConfigKeys.Rpc.requestTimeout(properties);
+  }

Review Comment:
   Please remove it since it is not used.



##########
ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcUtil.java:
##########
@@ -232,4 +232,12 @@ static void shutdownManagedChannel(ManagedChannel 
managedChannel) {
       }
     }
   }
+
+  /**
+   * Tries to gracefully shut down the managed channel with call back. Falls 
back to forceful shutdown if
+   * graceful shutdown times out.
+   */
+  static CompletableFuture<Void> asyncShutdownManagedChannel(ManagedChannel 
channel) {
+    return CompletableFuture.runAsync(() -> shutdownManagedChannel(channel));
+  }

Review Comment:
   Since the code always shutdown two channels (and they may be equal), it is 
better to have the following instead:
   ```
     static void shutdownManagedChannels(ManagedChannel channel1, 
ManagedChannel channel2) {
       if (channel1 == null || channel1 == channel2) {
         shutdownManagedChannel(channel2);
       } else if (channel2 == null) {
         shutdownManagedChannel(channel1);
       } else {
         final CompletableFuture<Void> f = CompletableFuture.runAsync(() -> 
shutdownManagedChannel(channel1));
         shutdownManagedChannel(channel2);
         f.join();
       }
     }
   ```



##########
ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java:
##########
@@ -97,18 +102,19 @@ private GrpcServerProtocolClient getClient() throws 
IOException {
     return getServerRpc().getProxies().getProxy(getFollowerId());
   }
 
-  private void resetClient(AppendEntriesRequest request, boolean onError) {
+  private void resetClient(AppendEntriesRequest request, Throwable throwable) {

Review Comment:
   This change is not needed since the throwable is never used.  Let's revert 
it.



##########
ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java:
##########
@@ -486,6 +486,16 @@ static boolean installSnapshotEnabled(RaftProperties 
properties) {
       static void setInstallSnapshotEnabled(RaftProperties properties, boolean 
shouldInstallSnapshot) {
         setBoolean(properties::setBoolean, INSTALL_SNAPSHOT_ENABLED_KEY, 
shouldInstallSnapshot);
       }
+
+      String HEARTBEAT_CHANNEL_KEY = PREFIX + ".heartbeat.channel";
+      boolean HEARTBEAT_CHANNEL_DEFAULT = true;
+      static boolean heartbeatChannel(RaftProperties properties) {
+        return getBoolean(properties::getBoolean, HEARTBEAT_CHANNEL_KEY,
+            HEARTBEAT_CHANNEL_DEFAULT, getDefaultLog());
+      }
+      static void setHeartbeatChannel(RaftProperties properties, boolean 
useCached) {
+        setBoolean(properties::setBoolean, HEARTBEAT_CHANNEL_KEY, useCached);
+      }

Review Comment:
   Move it to GrpcConfigKeys.Server since default log appender does not support 
it.



##########
ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java:
##########
@@ -95,6 +95,7 @@ public class GrpcClientProtocolClient implements Closeable {
 
   private final AtomicReference<AsyncStreamObservers> unorderedStreamObservers 
= new AtomicReference<>();
   private final MetricClientInterceptor metricClientInterceptor;
+  private final boolean separateAdminChannel;

Review Comment:
   This field is not needed since it is only used for shutdown.  We can take 
care shutdown by checking if the channels are equal.



##########
ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java:
##########
@@ -212,33 +225,50 @@ private void appendLog(boolean excludeLogEntries) throws 
IOException {
       pendingRequests.put(request);
       increaseNextIndex(pending);
       if (appendLogRequestObserver == null) {
-        appendLogRequestObserver = getClient().appendEntries(new 
AppendLogResponseHandler());
+        appendLogRequestObserver =
+            getClient().appendEntries(appendLogResponseHandler, false);
+      }
+
+      if (heartbeatRequestObserver == null && useSeparateHBChannel) {
+        heartbeatRequestObserver =
+            getClient().appendEntries(appendLogResponseHandler, true);
       }
-      s = appendLogRequestObserver;
     }
 
     if (isRunning()) {
-      sendRequest(request, pending, s);
+      sendRequest(request, pending);
     }
   }
 
-  private void sendRequest(AppendEntriesRequest request, 
AppendEntriesRequestProto proto,
-        StreamObserver<AppendEntriesRequestProto> s) {
+  private void sendRequest(AppendEntriesRequest request, 
AppendEntriesRequestProto proto) {
     CodeInjectionForTesting.execute(GrpcService.GRPC_SEND_SERVER_REQUEST,
         getServer().getId(), null, proto);
     request.startRequestTimer();
-    s.onNext(proto);
-    scheduler.onTimeout(requestTimeoutDuration,
-        () -> timeoutAppendRequest(request.getCallId(), request.isHeartbeat()),
-        LOG, () -> "Timeout check failed for append entry request: " + 
request);
-    getFollower().updateLastRpcSendTime(request.isHeartbeat());
+    boolean sent = false;
+    if (request.isHeartbeat() && useSeparateHBChannel) {
+      // NPE throw out when observer is closed in resetClient
+      Optional.ofNullable(heartbeatRequestObserver).ifPresent(observer -> 
observer.onNext(proto));
+      sent = true;
+    } else {
+      Optional.ofNullable(appendLogRequestObserver).ifPresent(observer -> 
observer.onNext(proto));
+      sent = true;
+    }
+    if (sent) {

Review Comment:
   `sent` is always true here so that it can be removed.



##########
ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java:
##########
@@ -65,6 +65,9 @@ public class GrpcLogAppender extends LogAppenderBase {
   private final TimeoutScheduler scheduler = TimeoutScheduler.getInstance();
 
   private volatile StreamObserver<AppendEntriesRequestProto> 
appendLogRequestObserver;
+  private volatile StreamObserver<AppendEntriesRequestProto> 
heartbeatRequestObserver;
+  private final boolean useSeparateHBChannel;
+  private final AppendLogResponseHandler appendLogResponseHandler;

Review Comment:
   With the new StreamObservers class, this is not needed.



##########
ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java:
##########
@@ -132,17 +136,19 @@ private TermIndex getPrevious(long nextIndex) {
   }
 
   @Override
-  public AppendEntriesRequestProto newAppendEntriesRequest(long callId, 
boolean heartbeat) throws RaftLogIOException {
-    final TermIndex previous = getPrevious(follower.getNextIndex());
-    final long snapshotIndex = follower.getSnapshotIndex();
+  public AppendEntriesRequestProto newAppendEntriesRequest(long callId, 
boolean heartbeat)
+      throws RaftLogIOException {
     final long heartbeatWaitTimeMs = getHeartbeatWaitTimeMs();
+    final TermIndex previous = getPrevious(follower.getNextIndex());
     if (heartbeatWaitTimeMs <= 0L || heartbeat) {
       // heartbeat
-      return leaderState.newAppendEntriesRequestProto(follower, 
Collections.emptyList(), previous, callId);
+      return leaderState.newAppendEntriesRequestProto(follower, 
Collections.emptyList(),
+          hasPendingDataRequests()? null : previous, callId);

Review Comment:
   Let's just send null for heartbeat and remove `hasPendingDataRequests()`?



##########
ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java:
##########
@@ -65,6 +65,9 @@ public class GrpcLogAppender extends LogAppenderBase {
   private final TimeoutScheduler scheduler = TimeoutScheduler.getInstance();
 
   private volatile StreamObserver<AppendEntriesRequestProto> 
appendLogRequestObserver;
+  private volatile StreamObserver<AppendEntriesRequestProto> 
heartbeatRequestObserver;

Review Comment:
   Add an inner class as below.
   ```
     static class StreamObservers {
       private final StreamObserver<AppendEntriesRequestProto> appendLog;
       private final StreamObserver<AppendEntriesRequestProto> heartbeat;
   
       StreamObservers(GrpcServerProtocolClient client, 
AppendLogResponseHandler handler, boolean separateHeartbeat) {
         this.appendLog = client.appendEntries(handler, false);
         this.heartbeat = separateHeartbeat? client.appendEntries(handler, 
true): null;
       }
   
       void onNext(AppendEntriesRequestProto proto) {
         if (heartbeat != null && proto.getEntriesCount() == 0) {
           heartbeat.onNext(proto);
         } else {
           appendLog.onNext(proto);
         }
       }
   
       void onCompleted() {
         appendLog.onCompleted();
         Optional.ofNullable(heartbeat).ifPresent(StreamObserver::onCompleted);
       }
     }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to