This is an automated email from the ASF dual-hosted git repository. szetszwo pushed a commit to branch branch-2 in repository https://gitbox.apache.org/repos/asf/ratis.git
commit 9a12804439fef527674154bd9e333c5a64de109e Author: Duong Nguyen <[email protected]> AuthorDate: Wed Aug 9 11:25:45 2023 -0700 RATIS-1868. Handling Netty back pressure when streaming ratis log (#900) (cherry picked from commit f3a5d4de627ec7d4884ad4ecaeb7347fdf5a41ef) --- .../apache/ratis/grpc/server/GrpcLogAppender.java | 64 ++++++++++++++-------- .../grpc/server/GrpcServerProtocolClient.java | 7 ++- 2 files changed, 45 insertions(+), 26 deletions(-) diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java index 19e1216d0..61bc3dfe0 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java @@ -33,6 +33,7 @@ import org.apache.ratis.server.leader.LogAppenderBase; import org.apache.ratis.server.protocol.TermIndex; import org.apache.ratis.server.raftlog.RaftLog; import org.apache.ratis.server.util.ServerStringUtils; +import org.apache.ratis.thirdparty.io.grpc.stub.CallStreamObserver; import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver; import org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto; import org.apache.ratis.proto.RaftProtos.AppendEntriesRequestProto; @@ -44,6 +45,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.io.InterruptedIOException; import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; @@ -122,6 +124,7 @@ public class GrpcLogAppender extends LogAppenderBase { private void resetClient(AppendEntriesRequest request, boolean onError) { try (AutoCloseableLock writeLock = lock.writeLock(caller, LOG::trace)) { getClient().resetConnectBackoff(); + appendLogRequestObserver.stop(); appendLogRequestObserver = null; firstResponseReceived = false; // clear the pending requests queue and reset the next index of follower @@ -242,20 +245,34 @@ public class GrpcLogAppender extends LogAppenderBase { } static class StreamObservers { - private final StreamObserver<AppendEntriesRequestProto> appendLog; - private final StreamObserver<AppendEntriesRequestProto> heartbeat; + public static final int DEFAULT_WAIT_FOR_READY_MS = 10; + private final CallStreamObserver<AppendEntriesRequestProto> appendLog; + private final CallStreamObserver<AppendEntriesRequestProto> heartbeat; + private volatile boolean running = true; StreamObservers(GrpcServerProtocolClient client, AppendLogResponseHandler handler, boolean separateHeartbeat) { this.appendLog = client.appendEntries(handler, false); this.heartbeat = separateHeartbeat? client.appendEntries(handler, true): null; } - void onNext(AppendEntriesRequestProto proto) { - if (heartbeat != null && proto.getEntriesCount() == 0) { - heartbeat.onNext(proto); + void onNext(AppendEntriesRequestProto proto) + throws InterruptedIOException { + CallStreamObserver<AppendEntriesRequestProto> stream; + boolean isHeartBeat = heartbeat != null && proto.getEntriesCount() == 0; + if (isHeartBeat) { + stream = heartbeat; } else { - appendLog.onNext(proto); + stream = appendLog; } + // stall for stream to be ready. + while (!stream.isReady() && running) { + sleep(DEFAULT_WAIT_FOR_READY_MS, isHeartBeat); + } + stream.onNext(proto); + } + + void stop() { + running = false; } void onCompleted() { @@ -295,37 +312,38 @@ public class GrpcLogAppender extends LogAppenderBase { final long waitMs = getMinWaitTimeMs(); if (waitMs > 0) { - try { - Thread.sleep(waitMs); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw IOUtils.toInterruptedIOException( - "Interrupted appendLog, heartbeat? " + heartbeat, e); - } + sleep(waitMs, heartbeat); } if (isRunning()) { sendRequest(request, pending); } } - private void sendRequest(AppendEntriesRequest request, AppendEntriesRequestProto proto) { + private static void sleep(long waitMs, boolean heartbeat) + throws InterruptedIOException { + try { + Thread.sleep(waitMs); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw IOUtils.toInterruptedIOException( + "Interrupted appendLog, heartbeat? " + heartbeat, e); + } + } + + private void sendRequest(AppendEntriesRequest request, + AppendEntriesRequestProto proto) throws InterruptedIOException { CodeInjectionForTesting.execute(GrpcService.GRPC_SEND_SERVER_REQUEST, getServer().getId(), null, proto); - request.startRequestTimer(); resetHeartbeatTrigger(); - final boolean sent = Optional.ofNullable(appendLogRequestObserver) - .map(observer -> { - observer.onNext(proto); - return true; - }).isPresent(); - if (sent) { + StreamObservers observers = appendLogRequestObserver; + if (observers != null) { + request.startRequestTimer(); + observers.onNext(proto); getFollower().updateLastRpcSendTime(request.isHeartbeat()); scheduler.onTimeout(requestTimeoutDuration, () -> timeoutAppendRequest(request.getCallId(), request.isHeartbeat()), LOG, () -> "Timeout check failed for append entry request: " + request); - } else { - request.stopRequestTimer(); } } diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java index 34f014ebc..2eb73f6aa 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java @@ -26,6 +26,7 @@ import org.apache.ratis.thirdparty.io.grpc.ManagedChannel; import org.apache.ratis.thirdparty.io.grpc.netty.GrpcSslContexts; import org.apache.ratis.thirdparty.io.grpc.netty.NegotiationType; import org.apache.ratis.thirdparty.io.grpc.netty.NettyChannelBuilder; +import org.apache.ratis.thirdparty.io.grpc.stub.CallStreamObserver; import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver; import org.apache.ratis.proto.RaftProtos.*; import org.apache.ratis.proto.grpc.RaftServerProtocolServiceGrpc; @@ -128,12 +129,12 @@ public class GrpcServerProtocolClient implements Closeable { .readIndex(request, s); } - StreamObserver<AppendEntriesRequestProto> appendEntries( + CallStreamObserver<AppendEntriesRequestProto> appendEntries( StreamObserver<AppendEntriesReplyProto> responseHandler, boolean isHeartbeat) { if (isHeartbeat && useSeparateHBChannel) { - return hbAsyncStub.appendEntries(responseHandler); + return (CallStreamObserver<AppendEntriesRequestProto>) hbAsyncStub.appendEntries(responseHandler); } else { - return asyncStub.appendEntries(responseHandler); + return (CallStreamObserver<AppendEntriesRequestProto>) asyncStub.appendEntries(responseHandler); } }
