This is an automated email from the ASF dual-hosted git repository. szetszwo pushed a commit to branch branch-2_readIndex in repository https://gitbox.apache.org/repos/asf/ratis.git
commit 80617d4bb801a166b364620f429d2b53dd9e207a Author: William Song <[email protected]> AuthorDate: Wed Mar 15 16:47:34 2023 +0800 RATIS-1809. Use separated timeout for GrpcLogAppender's streaming RPC (#848) (cherry picked from commit cf62b4298ad778fbe1dff8478899883aa91a2234) --- .../java/org/apache/ratis/grpc/GrpcConfigKeys.java | 38 +++++++++++++++++++++- .../apache/ratis/grpc/server/GrpcLogAppender.java | 10 ++++-- 2 files changed, 45 insertions(+), 3 deletions(-) diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java index febf54172..48b57fa63 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java @@ -19,13 +19,27 @@ package org.apache.ratis.grpc; import org.apache.ratis.conf.Parameters; import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.server.RaftServerConfigKeys; import org.apache.ratis.util.SizeInBytes; +import org.apache.ratis.util.TimeDuration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.function.Consumer; -import static org.apache.ratis.conf.ConfUtils.*; +import static org.apache.ratis.conf.ConfUtils.get; +import static org.apache.ratis.conf.ConfUtils.getBoolean; +import static org.apache.ratis.conf.ConfUtils.getInt; +import static org.apache.ratis.conf.ConfUtils.getSizeInBytes; +import static org.apache.ratis.conf.ConfUtils.getTimeDuration; +import static org.apache.ratis.conf.ConfUtils.printAll; +import static org.apache.ratis.conf.ConfUtils.requireMax; +import static org.apache.ratis.conf.ConfUtils.requireMin; +import static org.apache.ratis.conf.ConfUtils.set; +import static org.apache.ratis.conf.ConfUtils.setBoolean; +import static org.apache.ratis.conf.ConfUtils.setInt; +import static org.apache.ratis.conf.ConfUtils.setSizeInBytes; +import static org.apache.ratis.conf.ConfUtils.setTimeDuration; public interface GrpcConfigKeys { Logger LOG = LoggerFactory.getLogger(GrpcConfigKeys.class); @@ -234,6 +248,28 @@ public interface GrpcConfigKeys { setInt(properties::setInt, LEADER_OUTSTANDING_APPENDS_MAX_KEY, maxAppend); } + String INSTALL_SNAPSHOT_REQUEST_ELEMENT_LIMIT_KEY = PREFIX + ".install_snapshot.request.element-limit"; + int INSTALL_SNAPSHOT_REQUEST_ELEMENT_LIMIT_DEFAULT = 8; + static int installSnapshotRequestElementLimit(RaftProperties properties) { + return getInt(properties::getInt, INSTALL_SNAPSHOT_REQUEST_ELEMENT_LIMIT_KEY, + INSTALL_SNAPSHOT_REQUEST_ELEMENT_LIMIT_DEFAULT, getDefaultLog(), requireMin(0)); + } + static void setInstallSnapshotRequestElementLimit(RaftProperties properties, int elementLimit) { + setInt(properties::setInt, INSTALL_SNAPSHOT_REQUEST_ELEMENT_LIMIT_KEY, elementLimit); + } + + String INSTALL_SNAPSHOT_REQUEST_TIMEOUT_KEY = PREFIX + ".install_snapshot.request.timeout"; + TimeDuration INSTALL_SNAPSHOT_REQUEST_TIMEOUT_DEFAULT = RaftServerConfigKeys.Rpc.REQUEST_TIMEOUT_DEFAULT; + static TimeDuration installSnapshotRequestTimeout(RaftProperties properties) { + return getTimeDuration(properties.getTimeDuration(INSTALL_SNAPSHOT_REQUEST_TIMEOUT_DEFAULT.getUnit()), + INSTALL_SNAPSHOT_REQUEST_TIMEOUT_KEY, INSTALL_SNAPSHOT_REQUEST_TIMEOUT_DEFAULT, getDefaultLog()); + } + static void setInstallSnapshotRequestTimeout(RaftProperties properties, + TimeDuration installSnapshotRequestTimeout) { + setTimeDuration(properties::setTimeDuration, + INSTALL_SNAPSHOT_REQUEST_TIMEOUT_KEY, installSnapshotRequestTimeout); + } + String HEARTBEAT_CHANNEL_KEY = PREFIX + ".heartbeat.channel"; boolean HEARTBEAT_CHANNEL_DEFAULT = true; static boolean heartbeatChannel(RaftProperties properties) { diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java index c688b66cd..0cf0e663f 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java @@ -70,6 +70,8 @@ public class GrpcLogAppender extends LogAppenderBase { private final boolean installSnapshotEnabled; private final TimeDuration requestTimeoutDuration; + private final TimeDuration installSnapshotStreamTimeout; + private final int maxOutstandingInstallSnapshots; private final TimeoutExecutor scheduler = TimeoutExecutor.getInstance(); private volatile StreamObservers appendLogRequestObserver; @@ -88,6 +90,9 @@ public class GrpcLogAppender extends LogAppenderBase { final RaftProperties properties = server.getRaftServer().getProperties(); this.maxPendingRequestsNum = GrpcConfigKeys.Server.leaderOutstandingAppendsMax(properties); this.requestTimeoutDuration = RaftServerConfigKeys.Rpc.requestTimeout(properties); + this.maxOutstandingInstallSnapshots = GrpcConfigKeys.Server.installSnapshotRequestElementLimit(properties); + this.installSnapshotStreamTimeout = GrpcConfigKeys.Server.installSnapshotRequestTimeout(properties) + .multiply(maxOutstandingInstallSnapshots); this.installSnapshotEnabled = RaftServerConfigKeys.Log.Appender.installSnapshotEnabled(properties); this.useSeparateHBChannel = GrpcConfigKeys.Server.heartbeatChannel(properties); @@ -597,8 +602,9 @@ public class GrpcLogAppender extends LogAppenderBase { StreamObserver<InstallSnapshotRequestProto> snapshotRequestObserver = null; final String requestId = UUID.randomUUID().toString(); try { - snapshotRequestObserver = getClient().installSnapshot(getFollower().getName() + "-installSnapshot", - requestTimeoutDuration, 8, responseHandler); //FIXME: RATIS-1809 + snapshotRequestObserver = getClient().installSnapshot( + getFollower().getName() + "-installSnapshot-" + requestId, + installSnapshotStreamTimeout, maxOutstandingInstallSnapshots, responseHandler); for (InstallSnapshotRequestProto request : newInstallSnapshotRequests(requestId, snapshot)) { if (isRunning()) { snapshotRequestObserver.onNext(request);
