Repository: incubator-ratis Updated Branches: refs/heads/master 7a955ef43 -> 460842246
RATIS-186. Add a Config option to specify the flow control window size in ratis-grpc. Contributed by Mukul Kumar Singh Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/46084224 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/46084224 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/46084224 Branch: refs/heads/master Commit: 46084224605791a34632b03483c5abf656cf6c8f Parents: 7a955ef Author: Tsz-Wo Nicholas Sze <[email protected]> Authored: Sun Jan 7 16:06:08 2018 +0800 Committer: Tsz-Wo Nicholas Sze <[email protected]> Committed: Sun Jan 7 16:06:08 2018 +0800 ---------------------------------------------------------------------- .../main/java/org/apache/ratis/grpc/GrpcConfigKeys.java | 11 +++++++++++ .../main/java/org/apache/ratis/grpc/RaftGRpcService.java | 11 ++++++++--- .../org/apache/ratis/grpc/client/AppendStreamer.java | 3 ++- .../java/org/apache/ratis/grpc/client/GrpcClientRpc.java | 3 ++- .../ratis/grpc/client/RaftClientProtocolClient.java | 11 +++++++---- .../ratis/grpc/client/RaftClientProtocolProxy.java | 6 ++++-- .../ratis/grpc/server/RaftServerProtocolClient.java | 9 +++++---- 7 files changed, 39 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/46084224/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java index 7be7e8f..c1cc33a 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java @@ -104,6 +104,17 @@ public interface GrpcConfigKeys { static void setMessageSizeMax(RaftProperties properties, SizeInBytes maxMessageSize) { setSizeInBytes(properties::set, MESSAGE_SIZE_MAX_KEY, maxMessageSize); } + + String FLOW_CONTROL_WINDOW_KEY = PREFIX + ".flow.control.window"; + SizeInBytes FLOW_CONTROL_WINDOW_DEFAULT = SizeInBytes.valueOf("1MB"); + static SizeInBytes flowControlWindow(RaftProperties properties) { + return getSizeInBytes(properties::getSizeInBytes, + FLOW_CONTROL_WINDOW_KEY, FLOW_CONTROL_WINDOW_DEFAULT); + } + static void setFlowControlWindow(RaftProperties properties, SizeInBytes flowControlWindowSize) { + setSizeInBytes(properties::set, FLOW_CONTROL_WINDOW_KEY, flowControlWindowSize); + } + static void main(String[] args) { printAll(GrpcConfigKeys.class); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/46084224/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java index e4701cc..ae7a977 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java @@ -74,25 +74,30 @@ public class RaftGRpcService implements RaftServerRpc { private final Map<RaftPeerId, RaftServerProtocolClient> peers = Collections.synchronizedMap(new HashMap<>()); private final Supplier<RaftPeerId> idSupplier; + private final int flowControlWindow; private RaftGRpcService(RaftServer server) { this(server, GrpcConfigKeys.Server.port(server.getProperties()), GrpcConfigKeys.messageSizeMax(server.getProperties()).getSizeInt(), GrpcConfigKeys.messageSizeMax(server.getProperties()), - RaftServerConfigKeys.Log.Appender.bufferCapacity(server.getProperties())); + RaftServerConfigKeys.Log.Appender.bufferCapacity(server.getProperties()), + GrpcConfigKeys.flowControlWindow(server.getProperties())); } private RaftGRpcService(RaftServer raftServer, int port, int maxMessageSize, - SizeInBytes grpcMessageSizeMax, SizeInBytes appenderBufferSize) { + SizeInBytes grpcMessageSizeMax, SizeInBytes appenderBufferSize, + SizeInBytes flowControlWindowSize) { if (appenderBufferSize.getSize() > grpcMessageSizeMax.getSize()) { throw new IllegalArgumentException("Illegal configuration: " + RaftServerConfigKeys.Log.Appender.BUFFER_CAPACITY_KEY + " = " + appenderBufferSize + " > " + GrpcConfigKeys.MESSAGE_SIZE_MAX_KEY + " = " + grpcMessageSizeMax); } + this.flowControlWindow = flowControlWindowSize.getSizeInt(); ServerBuilder serverBuilder = ServerBuilder.forPort(port); idSupplier = raftServer::getId; server = ((NettyServerBuilder) serverBuilder).maxMessageSize(maxMessageSize) + .flowControlWindow(flowControlWindow) .addService(new RaftServerProtocolService(idSupplier, raftServer)) .addService(new RaftClientProtocolService(idSupplier, raftServer)) .addService(new AdminProtocolService(raftServer)) @@ -173,7 +178,7 @@ public class RaftGRpcService implements RaftServerRpc { public void addPeers(Iterable<RaftPeer> newPeers) { for (RaftPeer p : newPeers) { if (!peers.containsKey(p.getId())) { - peers.put(p.getId(), new RaftServerProtocolClient(p)); + peers.put(p.getId(), new RaftServerProtocolClient(p, flowControlWindow)); } } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/46084224/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java index 810121b..281c838 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java @@ -97,7 +97,8 @@ public class AppendStreamer implements Closeable { this.peers = group.getPeers().stream().collect( Collectors.toMap(RaftPeer::getId, Function.identity())); proxyMap = new PeerProxyMap<>(clientId.toString(), - raftPeer -> new RaftClientProtocolProxy(clientId, raftPeer, ResponseHandler::new)); + raftPeer -> new RaftClientProtocolProxy(clientId, raftPeer, ResponseHandler::new, + GrpcConfigKeys.flowControlWindow(prop))); proxyMap.addPeers(group.getPeers()); refreshLeaderProxy(leaderId, null); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/46084224/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java index ea1f204..508260e 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java @@ -48,7 +48,8 @@ public class GrpcClientRpc extends RaftClientRpcWithProxy<RaftClientProtocolClie private final int maxMessageSize; public GrpcClientRpc(ClientId clientId, RaftProperties properties) { - super(new PeerProxyMap<>(clientId.toString(), p -> new RaftClientProtocolClient(clientId, p))); + super(new PeerProxyMap<>(clientId.toString(), p -> new RaftClientProtocolClient(clientId, p, + GrpcConfigKeys.flowControlWindow(properties)))); this.clientId = clientId; maxMessageSize = GrpcConfigKeys.messageSizeMax(properties).getSizeInt(); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/46084224/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolClient.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolClient.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolClient.java index 0b05475..601232a 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolClient.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolClient.java @@ -21,8 +21,8 @@ import org.apache.ratis.client.impl.ClientProtoUtils; import org.apache.ratis.grpc.RaftGrpcUtil; import org.apache.ratis.protocol.*; import org.apache.ratis.shaded.io.grpc.ManagedChannel; -import org.apache.ratis.shaded.io.grpc.ManagedChannelBuilder; import org.apache.ratis.shaded.io.grpc.StatusRuntimeException; +import org.apache.ratis.shaded.io.grpc.netty.NettyChannelBuilder; import org.apache.ratis.shaded.io.grpc.stub.StreamObserver; import org.apache.ratis.shaded.proto.RaftProtos.*; import org.apache.ratis.shaded.proto.grpc.AdminProtocolServiceGrpc; @@ -33,6 +33,7 @@ import org.apache.ratis.shaded.proto.grpc.RaftClientProtocolServiceGrpc.RaftClie import org.apache.ratis.util.CheckedSupplier; import org.apache.ratis.util.CollectionUtils; import org.apache.ratis.util.JavaUtils; +import org.apache.ratis.util.SizeInBytes; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -56,11 +57,13 @@ public class RaftClientProtocolClient implements Closeable { private final AtomicReference<AsyncStreamObservers> appendStreamObservers = new AtomicReference<>(); - public RaftClientProtocolClient(ClientId id, RaftPeer target) { + public RaftClientProtocolClient(ClientId id, RaftPeer target, + SizeInBytes flowControlWindow) { this.name = JavaUtils.memoize(() -> id + "->" + target.getId()); this.target = target; - channel = ManagedChannelBuilder.forTarget(target.getAddress()) - .usePlaintext(true).build(); + channel = NettyChannelBuilder.forTarget(target.getAddress()) + .usePlaintext(true).flowControlWindow(flowControlWindow.getSizeInt()) + .build(); blockingStub = RaftClientProtocolServiceGrpc.newBlockingStub(channel); asyncStub = RaftClientProtocolServiceGrpc.newStub(channel); adminBlockingStub = AdminProtocolServiceGrpc.newBlockingStub(channel); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/46084224/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolProxy.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolProxy.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolProxy.java index 297fe26..d07add7 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolProxy.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolProxy.java @@ -22,6 +22,7 @@ import org.apache.ratis.shaded.io.grpc.stub.StreamObserver; import org.apache.ratis.shaded.proto.RaftProtos.RaftClientReplyProto; import org.apache.ratis.shaded.proto.RaftProtos.RaftClientRequestProto; import org.apache.ratis.protocol.RaftPeer; +import org.apache.ratis.util.SizeInBytes; import java.io.Closeable; import java.io.IOException; @@ -34,8 +35,9 @@ public class RaftClientProtocolProxy implements Closeable { public RaftClientProtocolProxy( ClientId clientId, RaftPeer target, - Function<RaftPeer, CloseableStreamObserver> responseHandlerCreation) { - proxy = new RaftClientProtocolClient(clientId, target); + Function<RaftPeer, CloseableStreamObserver> responseHandlerCreation, + SizeInBytes flowControlWindow) { + proxy = new RaftClientProtocolClient(clientId, target, flowControlWindow); this.responseHandlerCreation = responseHandlerCreation; } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/46084224/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/RaftServerProtocolClient.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/RaftServerProtocolClient.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/RaftServerProtocolClient.java index 53df10d..53f962a 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/RaftServerProtocolClient.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/RaftServerProtocolClient.java @@ -18,7 +18,7 @@ package org.apache.ratis.grpc.server; import org.apache.ratis.shaded.io.grpc.ManagedChannel; -import org.apache.ratis.shaded.io.grpc.ManagedChannelBuilder; +import org.apache.ratis.shaded.io.grpc.netty.NettyChannelBuilder; import org.apache.ratis.shaded.io.grpc.stub.StreamObserver; import org.apache.ratis.shaded.proto.RaftProtos.*; import org.apache.ratis.shaded.proto.grpc.RaftServerProtocolServiceGrpc; @@ -35,9 +35,10 @@ public class RaftServerProtocolClient { private final RaftServerProtocolServiceBlockingStub blockingStub; private final RaftServerProtocolServiceStub asyncStub; - public RaftServerProtocolClient(RaftPeer target) { - channel = ManagedChannelBuilder.forTarget(target.getAddress()) - .usePlaintext(true).build(); + public RaftServerProtocolClient(RaftPeer target, int flowControlWindow) { + channel = NettyChannelBuilder.forTarget(target.getAddress()) + .usePlaintext(true).flowControlWindow(flowControlWindow) + .build(); blockingStub = RaftServerProtocolServiceGrpc.newBlockingStub(channel); asyncStub = RaftServerProtocolServiceGrpc.newStub(channel); }
