This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 16ba152c7 RATIS-1674. In GrpcLogAppender, disable retry and add
minWait. (#752)
16ba152c7 is described below
commit 16ba152c7e45a0c3b3c96e3d400837408c69e921
Author: William Song <[email protected]>
AuthorDate: Thu Sep 29 23:14:50 2022 +0800
RATIS-1674. In GrpcLogAppender, disable retry and add minWait. (#752)
---
.../java/org/apache/ratis/grpc/server/GrpcLogAppender.java | 14 ++++++++++++--
.../apache/ratis/grpc/server/GrpcServerProtocolClient.java | 1 +
.../java/org/apache/ratis/server/RaftServerConfigKeys.java | 10 ++++++++++
3 files changed, 23 insertions(+), 2 deletions(-)
diff --git
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
index 66cc6653c..6e57891ea 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
@@ -49,6 +49,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
/**
* A new log appender implementation using grpc bi-directional stream API.
@@ -72,6 +73,9 @@ public class GrpcLogAppender extends LogAppenderBase {
private final TimeDuration requestTimeoutDuration;
private final TimeoutExecutor scheduler = TimeoutExecutor.getInstance();
+ private final long waitTimeMinMs;
+ private final AtomicReference<Timestamp> lastAppendEntries;
+
private volatile StreamObservers appendLogRequestObserver;
private final boolean useSeparateHBChannel;
@@ -91,6 +95,10 @@ public class GrpcLogAppender extends LogAppenderBase {
this.installSnapshotEnabled =
RaftServerConfigKeys.Log.Appender.installSnapshotEnabled(properties);
this.useSeparateHBChannel =
GrpcConfigKeys.Server.heartbeatChannel(properties);
+ final TimeDuration waitTimeMin =
RaftServerConfigKeys.Log.Appender.waitTimeMin(properties);
+ this.waitTimeMinMs = waitTimeMin.toLong(TimeUnit.MILLISECONDS);
+ this.lastAppendEntries = new
AtomicReference<>(Timestamp.currentTime().addTime(waitTimeMin.negate()));
+
grpcServerMetrics = new GrpcServerMetrics(server.getMemberId().toString());
grpcServerMetrics.addPendingRequestsCount(getFollowerId().toString(),
pendingRequests::logRequestsSize);
@@ -172,9 +180,10 @@ public class GrpcLogAppender extends LogAppenderBase {
// For normal nodes, new entries should be sent ASAP
// however for slow followers (especially when the follower is down),
// keep sending without any wait time only ends up in high CPU load
- return 0L;
+ final long min = waitTimeMinMs - lastAppendEntries.get().elapsedTimeMs();
+ return Math.max(0L, min);
}
- return Math.min(10L, getHeartbeatWaitTimeMs());
+ return Math.min(waitTimeMinMs, getHeartbeatWaitTimeMs());
}
private boolean isSlowFollower() {
@@ -284,6 +293,7 @@ public class GrpcLogAppender extends LogAppenderBase {
.map(observer -> {
request.startRequestTimer();
observer.onNext(proto);
+ lastAppendEntries.set(Timestamp.currentTime());
return true;
}).isPresent();
diff --git
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java
index a8ee5f30d..1494dd594 100644
---
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java
+++
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java
@@ -102,6 +102,7 @@ public class GrpcServerProtocolClient implements Closeable {
} else {
channelBuilder.negotiationType(NegotiationType.PLAINTEXT);
}
+ channelBuilder.disableRetry();
return channelBuilder.flowControlWindow(flowControlWindow).build();
}
diff --git
a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
index 47e50a048..813717f2e 100644
---
a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
+++
b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
@@ -543,6 +543,16 @@ public interface RaftServerConfigKeys {
static void setInstallSnapshotEnabled(RaftProperties properties, boolean
shouldInstallSnapshot) {
setBoolean(properties::setBoolean, INSTALL_SNAPSHOT_ENABLED_KEY,
shouldInstallSnapshot);
}
+
+ String WAIT_TIME_MIN_KEY = PREFIX + ".wait-time.min";
+ TimeDuration WAIT_TIME_MIN_DEFAULT = TimeDuration.valueOf(10,
TimeUnit.MILLISECONDS);
+ static TimeDuration waitTimeMin(RaftProperties properties) {
+ return
getTimeDuration(properties.getTimeDuration(WAIT_TIME_MIN_DEFAULT.getUnit()),
+ WAIT_TIME_MIN_KEY, WAIT_TIME_MIN_DEFAULT, getDefaultLog());
+ }
+ static void setWaitTimeMin(RaftProperties properties, TimeDuration
minDuration) {
+ setTimeDuration(properties::setTimeDuration, WAIT_TIME_MIN_KEY,
minDuration);
+ }
}
}