This is an automated email from the ASF dual-hosted git repository. dragonyliu pushed a commit to branch branch-2 in repository https://gitbox.apache.org/repos/asf/ratis.git
commit 3e3fd599da110722958b490463ed21efe2dca625 Author: William Song <[email protected]> AuthorDate: Thu Sep 29 23:14:50 2022 +0800 RATIS-1674. In GrpcLogAppender, disable retry and add minWait. (#752) (cherry picked from commit 16ba152c7e45a0c3b3c96e3d400837408c69e921) --- .../java/org/apache/ratis/grpc/server/GrpcLogAppender.java | 14 ++++++++++++-- .../apache/ratis/grpc/server/GrpcServerProtocolClient.java | 1 + .../java/org/apache/ratis/server/RaftServerConfigKeys.java | 10 ++++++++++ 3 files changed, 23 insertions(+), 2 deletions(-) diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java index e87edac5e..125dd7dfa 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java @@ -48,6 +48,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import com.codahale.metrics.Timer; @@ -73,6 +74,9 @@ public class GrpcLogAppender extends LogAppenderBase { private final TimeDuration requestTimeoutDuration; private final TimeoutExecutor scheduler = TimeoutExecutor.getInstance(); + private final long waitTimeMinMs; + private final AtomicReference<Timestamp> lastAppendEntries; + private volatile StreamObservers appendLogRequestObserver; private final boolean useSeparateHBChannel; @@ -92,6 +96,10 @@ public class GrpcLogAppender extends LogAppenderBase { this.installSnapshotEnabled = RaftServerConfigKeys.Log.Appender.installSnapshotEnabled(properties); this.useSeparateHBChannel = GrpcConfigKeys.Server.heartbeatChannel(properties); + final TimeDuration waitTimeMin = RaftServerConfigKeys.Log.Appender.waitTimeMin(properties); + this.waitTimeMinMs = waitTimeMin.toLong(TimeUnit.MILLISECONDS); + this.lastAppendEntries = new AtomicReference<>(Timestamp.currentTime().addTime(waitTimeMin.negate())); + grpcServerMetrics = new GrpcServerMetrics(server.getMemberId().toString()); grpcServerMetrics.addPendingRequestsCount(getFollowerId().toString(), pendingRequests::logRequestsSize); @@ -173,9 +181,10 @@ public class GrpcLogAppender extends LogAppenderBase { // For normal nodes, new entries should be sent ASAP // however for slow followers (especially when the follower is down), // keep sending without any wait time only ends up in high CPU load - return 0L; + final long min = waitTimeMinMs - lastAppendEntries.get().elapsedTimeMs(); + return Math.max(0L, min); } - return Math.min(10L, getHeartbeatWaitTimeMs()); + return Math.min(waitTimeMinMs, getHeartbeatWaitTimeMs()); } private boolean isSlowFollower() { @@ -285,6 +294,7 @@ public class GrpcLogAppender extends LogAppenderBase { final boolean sent = Optional.ofNullable(appendLogRequestObserver) .map(observer -> { observer.onNext(proto); + lastAppendEntries.set(Timestamp.currentTime()); return true; }).isPresent(); diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java index 6f5d06c2e..d9aefbf0d 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java @@ -102,6 +102,7 @@ public class GrpcServerProtocolClient implements Closeable { } else { channelBuilder.negotiationType(NegotiationType.PLAINTEXT); } + channelBuilder.disableRetry(); return channelBuilder.flowControlWindow(flowControlWindow).build(); } diff --git a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java index e24631378..f75ddae46 100644 --- a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java +++ b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java @@ -507,6 +507,16 @@ public interface RaftServerConfigKeys { static void setInstallSnapshotEnabled(RaftProperties properties, boolean shouldInstallSnapshot) { setBoolean(properties::setBoolean, INSTALL_SNAPSHOT_ENABLED_KEY, shouldInstallSnapshot); } + + String WAIT_TIME_MIN_KEY = PREFIX + ".wait-time.min"; + TimeDuration WAIT_TIME_MIN_DEFAULT = TimeDuration.valueOf(10, TimeUnit.MILLISECONDS); + static TimeDuration waitTimeMin(RaftProperties properties) { + return getTimeDuration(properties.getTimeDuration(WAIT_TIME_MIN_DEFAULT.getUnit()), + WAIT_TIME_MIN_KEY, WAIT_TIME_MIN_DEFAULT, getDefaultLog()); + } + static void setWaitTimeMin(RaftProperties properties, TimeDuration minDuration) { + setTimeDuration(properties::setTimeDuration, WAIT_TIME_MIN_KEY, minDuration); + } } }
