This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/ratis.git

commit 9a12804439fef527674154bd9e333c5a64de109e
Author: Duong Nguyen <[email protected]>
AuthorDate: Wed Aug 9 11:25:45 2023 -0700

    RATIS-1868. Handling Netty back pressure when streaming ratis log (#900)
    
    (cherry picked from commit f3a5d4de627ec7d4884ad4ecaeb7347fdf5a41ef)
---
 .../apache/ratis/grpc/server/GrpcLogAppender.java  | 64 ++++++++++++++--------
 .../grpc/server/GrpcServerProtocolClient.java      |  7 ++-
 2 files changed, 45 insertions(+), 26 deletions(-)

diff --git 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
index 19e1216d0..61bc3dfe0 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
@@ -33,6 +33,7 @@ import org.apache.ratis.server.leader.LogAppenderBase;
 import org.apache.ratis.server.protocol.TermIndex;
 import org.apache.ratis.server.raftlog.RaftLog;
 import org.apache.ratis.server.util.ServerStringUtils;
+import org.apache.ratis.thirdparty.io.grpc.stub.CallStreamObserver;
 import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
 import org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto;
 import org.apache.ratis.proto.RaftProtos.AppendEntriesRequestProto;
@@ -44,6 +45,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.io.InterruptedIOException;
 import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
@@ -122,6 +124,7 @@ public class GrpcLogAppender extends LogAppenderBase {
   private void resetClient(AppendEntriesRequest request, boolean onError) {
     try (AutoCloseableLock writeLock = lock.writeLock(caller, LOG::trace)) {
       getClient().resetConnectBackoff();
+      appendLogRequestObserver.stop();
       appendLogRequestObserver = null;
       firstResponseReceived = false;
       // clear the pending requests queue and reset the next index of follower
@@ -242,20 +245,34 @@ public class GrpcLogAppender extends LogAppenderBase {
   }
 
   static class StreamObservers {
-    private final StreamObserver<AppendEntriesRequestProto> appendLog;
-    private final StreamObserver<AppendEntriesRequestProto> heartbeat;
+    public static final int DEFAULT_WAIT_FOR_READY_MS = 10;
+    private final CallStreamObserver<AppendEntriesRequestProto> appendLog;
+    private final CallStreamObserver<AppendEntriesRequestProto> heartbeat;
+    private volatile boolean running = true;
 
     StreamObservers(GrpcServerProtocolClient client, AppendLogResponseHandler 
handler, boolean separateHeartbeat) {
       this.appendLog = client.appendEntries(handler, false);
       this.heartbeat = separateHeartbeat? client.appendEntries(handler, true): 
null;
     }
 
-    void onNext(AppendEntriesRequestProto proto) {
-      if (heartbeat != null && proto.getEntriesCount() == 0) {
-        heartbeat.onNext(proto);
+    void onNext(AppendEntriesRequestProto proto)
+        throws InterruptedIOException {
+      CallStreamObserver<AppendEntriesRequestProto> stream;
+      boolean isHeartBeat = heartbeat != null && proto.getEntriesCount() == 0;
+      if (isHeartBeat) {
+        stream = heartbeat;
       } else {
-        appendLog.onNext(proto);
+        stream = appendLog;
       }
+      // stall for stream to be ready.
+      while (!stream.isReady() && running) {
+        sleep(DEFAULT_WAIT_FOR_READY_MS, isHeartBeat);
+      }
+      stream.onNext(proto);
+    }
+
+    void stop() {
+      running = false;
     }
 
     void onCompleted() {
@@ -295,37 +312,38 @@ public class GrpcLogAppender extends LogAppenderBase {
 
     final long waitMs = getMinWaitTimeMs();
     if (waitMs > 0) {
-      try {
-        Thread.sleep(waitMs);
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        throw IOUtils.toInterruptedIOException(
-            "Interrupted appendLog, heartbeat? " + heartbeat, e);
-      }
+      sleep(waitMs, heartbeat);
     }
     if (isRunning()) {
       sendRequest(request, pending);
     }
   }
 
-  private void sendRequest(AppendEntriesRequest request, 
AppendEntriesRequestProto proto) {
+  private static void sleep(long waitMs, boolean heartbeat)
+      throws InterruptedIOException {
+    try {
+      Thread.sleep(waitMs);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw IOUtils.toInterruptedIOException(
+          "Interrupted appendLog, heartbeat? " + heartbeat, e);
+    }
+  }
+
+  private void sendRequest(AppendEntriesRequest request,
+      AppendEntriesRequestProto proto) throws InterruptedIOException {
     CodeInjectionForTesting.execute(GrpcService.GRPC_SEND_SERVER_REQUEST,
         getServer().getId(), null, proto);
-    request.startRequestTimer();
     resetHeartbeatTrigger();
-    final boolean sent = Optional.ofNullable(appendLogRequestObserver)
-        .map(observer -> {
-          observer.onNext(proto);
-          return true;
-        }).isPresent();
 
-    if (sent) {
+    StreamObservers observers = appendLogRequestObserver;
+    if (observers != null) {
+      request.startRequestTimer();
+      observers.onNext(proto);
       getFollower().updateLastRpcSendTime(request.isHeartbeat());
       scheduler.onTimeout(requestTimeoutDuration,
           () -> timeoutAppendRequest(request.getCallId(), 
request.isHeartbeat()),
           LOG, () -> "Timeout check failed for append entry request: " + 
request);
-    } else {
-      request.stopRequestTimer();
     }
   }
 
diff --git 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java
 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java
index 34f014ebc..2eb73f6aa 100644
--- 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java
+++ 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java
@@ -26,6 +26,7 @@ import org.apache.ratis.thirdparty.io.grpc.ManagedChannel;
 import org.apache.ratis.thirdparty.io.grpc.netty.GrpcSslContexts;
 import org.apache.ratis.thirdparty.io.grpc.netty.NegotiationType;
 import org.apache.ratis.thirdparty.io.grpc.netty.NettyChannelBuilder;
+import org.apache.ratis.thirdparty.io.grpc.stub.CallStreamObserver;
 import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
 import org.apache.ratis.proto.RaftProtos.*;
 import org.apache.ratis.proto.grpc.RaftServerProtocolServiceGrpc;
@@ -128,12 +129,12 @@ public class GrpcServerProtocolClient implements 
Closeable {
         .readIndex(request, s);
   }
 
-  StreamObserver<AppendEntriesRequestProto> appendEntries(
+  CallStreamObserver<AppendEntriesRequestProto> appendEntries(
       StreamObserver<AppendEntriesReplyProto> responseHandler, boolean 
isHeartbeat) {
     if (isHeartbeat && useSeparateHBChannel) {
-      return hbAsyncStub.appendEntries(responseHandler);
+      return (CallStreamObserver<AppendEntriesRequestProto>) 
hbAsyncStub.appendEntries(responseHandler);
     } else {
-      return asyncStub.appendEntries(responseHandler);
+      return (CallStreamObserver<AppendEntriesRequestProto>) 
asyncStub.appendEntries(responseHandler);
     }
   }
 

Reply via email to