szetszwo commented on code in PR #561:
URL: https://github.com/apache/ratis/pull/561#discussion_r926968196


##########
ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java:
##########
@@ -220,33 +250,40 @@ private void appendLog(boolean excludeLogEntries) throws 
IOException {
       pendingRequests.put(request);
       increaseNextIndex(pending);
       if (appendLogRequestObserver == null) {
-        appendLogRequestObserver = getClient().appendEntries(new 
AppendLogResponseHandler());
+        appendLogRequestObserver = new StreamObservers(
+            getClient(), new AppendLogResponseHandler(), useSeparateHBChannel);
       }
-      s = appendLogRequestObserver;
     }
 
     if (isRunning()) {
-      sendRequest(request, pending, s);
+      sendRequest(request, pending);
     }
   }
 
-  private void sendRequest(AppendEntriesRequest request, 
AppendEntriesRequestProto proto,
-        StreamObserver<AppendEntriesRequestProto> s) {
+  private void sendRequest(AppendEntriesRequest request, 
AppendEntriesRequestProto proto) {
     CodeInjectionForTesting.execute(GrpcService.GRPC_SEND_SERVER_REQUEST,
         getServer().getId(), null, proto);
     request.startRequestTimer();
-    s.onNext(proto);
-    scheduler.onTimeout(requestTimeoutDuration,
-        () -> timeoutAppendRequest(request.getCallId(), request.isHeartbeat()),
-        LOG, () -> "Timeout check failed for append entry request: " + 
request);
-    getFollower().updateLastRpcSendTime(request.isHeartbeat());
+    boolean sent = Optional.ofNullable(appendLogRequestObserver).map(observer 
-> {
+        observer.onNext(proto);
+        return true;}).isPresent();
+
+    if (sent) {
+      scheduler.onTimeout(requestTimeoutDuration,
+          () -> timeoutAppendRequest(request.getCallId(), 
request.isHeartbeat()),
+          LOG, () -> "Timeout check failed for append entry request: " + 
request);
+      getFollower().updateLastRpcSendTime(request.isHeartbeat());
+    } else {
+      request.stopRequestTimer();
+    }

Review Comment:
   We only have to assign appendLogRequestObserver to a local variable as below.
   ```
   +  private void sendRequest(AppendEntriesRequest request, 
AppendEntriesRequestProto proto) {
        CodeInjectionForTesting.execute(GrpcService.GRPC_SEND_SERVER_REQUEST,
            getServer().getId(), null, proto);
   +    final StreamObservers s = appendLogRequestObserver;
   +    if (s == null) {
   +      return;
   +    }
        request.startRequestTimer();
        s.onNext(proto);
        scheduler.onTimeout(requestTimeoutDuration,
   ```



##########
ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java:
##########
@@ -36,23 +36,44 @@
 import org.slf4j.LoggerFactory;
 
 import java.io.Closeable;
+import java.util.concurrent.CompletableFuture;
 
 /**
  * This is a RaftClient implementation that supports streaming data to the raft
  * ring. The stream implementation utilizes gRPC.
  */
 public class GrpcServerProtocolClient implements Closeable {
+  // Common channel
   private final ManagedChannel channel;
-  private final TimeDuration requestTimeoutDuration;
-  private final RaftServerProtocolServiceBlockingStub blockingStub;
+  // Channel and stub for heartbeat
+  private ManagedChannel hbChannel;
+  private RaftServerProtocolServiceStub hbAsyncStub;

Review Comment:
   Change them to final and set them to null when they are disabled.  Then, we 
don't need another useSeparateHBChannel field.



##########
ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java:
##########
@@ -132,17 +136,19 @@ private TermIndex getPrevious(long nextIndex) {
   }
 
   @Override
-  public AppendEntriesRequestProto newAppendEntriesRequest(long callId, 
boolean heartbeat) throws RaftLogIOException {
-    final TermIndex previous = getPrevious(follower.getNextIndex());
-    final long snapshotIndex = follower.getSnapshotIndex();
+  public AppendEntriesRequestProto newAppendEntriesRequest(long callId, 
boolean heartbeat)
+      throws RaftLogIOException {
     final long heartbeatWaitTimeMs = getHeartbeatWaitTimeMs();
+    final TermIndex previous = getPrevious(follower.getNextIndex());
     if (heartbeatWaitTimeMs <= 0L || heartbeat) {
       // heartbeat
-      return leaderState.newAppendEntriesRequestProto(follower, 
Collections.emptyList(), previous, callId);
+      return leaderState.newAppendEntriesRequestProto(follower, 
Collections.emptyList(),
+          hasPendingDataRequests()? null : previous, callId);

Review Comment:
   I see.  Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to