This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new f3a5d4de6 RATIS-1868. Handling Netty back pressure when streaming
ratis log (#900)
f3a5d4de6 is described below
commit f3a5d4de627ec7d4884ad4ecaeb7347fdf5a41ef
Author: Duong Nguyen <[email protected]>
AuthorDate: Wed Aug 9 11:25:45 2023 -0700
RATIS-1868. Handling Netty back pressure when streaming ratis log (#900)
---
.../apache/ratis/grpc/server/GrpcLogAppender.java | 64 ++++++++++++++--------
.../grpc/server/GrpcServerProtocolClient.java | 7 ++-
2 files changed, 46 insertions(+), 25 deletions(-)
diff --git
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
index 3544f3be1..d3bf28dee 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
@@ -34,6 +34,7 @@ import org.apache.ratis.server.leader.LogAppenderBase;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.raftlog.RaftLog;
import org.apache.ratis.server.util.ServerStringUtils;
+import org.apache.ratis.thirdparty.io.grpc.stub.CallStreamObserver;
import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
import org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto;
import org.apache.ratis.proto.RaftProtos.AppendEntriesRequestProto;
@@ -45,6 +46,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.io.InterruptedIOException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
@@ -121,6 +123,7 @@ public class GrpcLogAppender extends LogAppenderBase {
private void resetClient(AppendEntriesRequest request, boolean onError) {
try (AutoCloseableLock writeLock = lock.writeLock(caller, LOG::trace)) {
getClient().resetConnectBackoff();
+ appendLogRequestObserver.stop();
appendLogRequestObserver = null;
firstResponseReceived = false;
// clear the pending requests queue and reset the next index of follower
@@ -241,20 +244,34 @@ public class GrpcLogAppender extends LogAppenderBase {
}
static class StreamObservers {
- private final StreamObserver<AppendEntriesRequestProto> appendLog;
- private final StreamObserver<AppendEntriesRequestProto> heartbeat;
+ public static final int DEFAULT_WAIT_FOR_READY_MS = 10;
+ private final CallStreamObserver<AppendEntriesRequestProto> appendLog;
+ private final CallStreamObserver<AppendEntriesRequestProto> heartbeat;
+ private volatile boolean running = true;
StreamObservers(GrpcServerProtocolClient client, AppendLogResponseHandler
handler, boolean separateHeartbeat) {
this.appendLog = client.appendEntries(handler, false);
this.heartbeat = separateHeartbeat? client.appendEntries(handler, true):
null;
}
- void onNext(AppendEntriesRequestProto proto) {
- if (heartbeat != null && proto.getEntriesCount() == 0) {
- heartbeat.onNext(proto);
+ void onNext(AppendEntriesRequestProto proto)
+ throws InterruptedIOException {
+ CallStreamObserver<AppendEntriesRequestProto> stream;
+ boolean isHeartBeat = heartbeat != null && proto.getEntriesCount() == 0;
+ if (isHeartBeat) {
+ stream = heartbeat;
} else {
- appendLog.onNext(proto);
+ stream = appendLog;
}
+ // stall for stream to be ready.
+ while (!stream.isReady() && running) {
+ sleep(DEFAULT_WAIT_FOR_READY_MS, isHeartBeat);
+ }
+ stream.onNext(proto);
+ }
+
+ void stop() {
+ running = false;
}
void onCompleted() {
@@ -294,31 +311,34 @@ public class GrpcLogAppender extends LogAppenderBase {
final long waitMs = getMinWaitTimeMs();
if (waitMs > 0) {
- try {
- Thread.sleep(waitMs);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw IOUtils.toInterruptedIOException(
- "Interrupted appendLog, heartbeat? " + heartbeat, e);
- }
+ sleep(waitMs, heartbeat);
}
if (isRunning()) {
sendRequest(request, pending);
}
}
- private void sendRequest(AppendEntriesRequest request,
AppendEntriesRequestProto proto) {
+ private static void sleep(long waitMs, boolean heartbeat)
+ throws InterruptedIOException {
+ try {
+ Thread.sleep(waitMs);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw IOUtils.toInterruptedIOException(
+ "Interrupted appendLog, heartbeat? " + heartbeat, e);
+ }
+ }
+
+ private void sendRequest(AppendEntriesRequest request,
+ AppendEntriesRequestProto proto) throws InterruptedIOException {
CodeInjectionForTesting.execute(GrpcService.GRPC_SEND_SERVER_REQUEST,
getServer().getId(), null, proto);
resetHeartbeatTrigger();
- final boolean sent = Optional.ofNullable(appendLogRequestObserver)
- .map(observer -> {
- request.startRequestTimer();
- observer.onNext(proto);
- return true;
- }).isPresent();
-
- if (sent) {
+
+ StreamObservers observers = appendLogRequestObserver;
+ if (observers != null) {
+ request.startRequestTimer();
+ observers.onNext(proto);
getFollower().updateLastRpcSendTime(request.isHeartbeat());
scheduler.onTimeout(requestTimeoutDuration,
() -> timeoutAppendRequest(request.getCallId(),
request.isHeartbeat()),
diff --git
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java
index 34f014ebc..2eb73f6aa 100644
---
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java
+++
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java
@@ -26,6 +26,7 @@ import org.apache.ratis.thirdparty.io.grpc.ManagedChannel;
import org.apache.ratis.thirdparty.io.grpc.netty.GrpcSslContexts;
import org.apache.ratis.thirdparty.io.grpc.netty.NegotiationType;
import org.apache.ratis.thirdparty.io.grpc.netty.NettyChannelBuilder;
+import org.apache.ratis.thirdparty.io.grpc.stub.CallStreamObserver;
import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
import org.apache.ratis.proto.RaftProtos.*;
import org.apache.ratis.proto.grpc.RaftServerProtocolServiceGrpc;
@@ -128,12 +129,12 @@ public class GrpcServerProtocolClient implements
Closeable {
.readIndex(request, s);
}
- StreamObserver<AppendEntriesRequestProto> appendEntries(
+ CallStreamObserver<AppendEntriesRequestProto> appendEntries(
StreamObserver<AppendEntriesReplyProto> responseHandler, boolean
isHeartbeat) {
if (isHeartbeat && useSeparateHBChannel) {
- return hbAsyncStub.appendEntries(responseHandler);
+ return (CallStreamObserver<AppendEntriesRequestProto>)
hbAsyncStub.appendEntries(responseHandler);
} else {
- return asyncStub.appendEntries(responseHandler);
+ return (CallStreamObserver<AppendEntriesRequestProto>)
asyncStub.appendEntries(responseHandler);
}
}