Repository: incubator-ratis
Updated Branches:
  refs/heads/master 7a955ef43 -> 460842246


RATIS-186. Add a Config option to specify the flow control window size in 
ratis-grpc.  Contributed by Mukul Kumar Singh


Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/46084224
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/46084224
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/46084224

Branch: refs/heads/master
Commit: 46084224605791a34632b03483c5abf656cf6c8f
Parents: 7a955ef
Author: Tsz-Wo Nicholas Sze <[email protected]>
Authored: Sun Jan 7 16:06:08 2018 +0800
Committer: Tsz-Wo Nicholas Sze <[email protected]>
Committed: Sun Jan 7 16:06:08 2018 +0800

----------------------------------------------------------------------
 .../main/java/org/apache/ratis/grpc/GrpcConfigKeys.java  | 11 +++++++++++
 .../main/java/org/apache/ratis/grpc/RaftGRpcService.java | 11 ++++++++---
 .../org/apache/ratis/grpc/client/AppendStreamer.java     |  3 ++-
 .../java/org/apache/ratis/grpc/client/GrpcClientRpc.java |  3 ++-
 .../ratis/grpc/client/RaftClientProtocolClient.java      | 11 +++++++----
 .../ratis/grpc/client/RaftClientProtocolProxy.java       |  6 ++++--
 .../ratis/grpc/server/RaftServerProtocolClient.java      |  9 +++++----
 7 files changed, 39 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/46084224/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java
index 7be7e8f..c1cc33a 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java
@@ -104,6 +104,17 @@ public interface GrpcConfigKeys {
   static void setMessageSizeMax(RaftProperties properties, SizeInBytes 
maxMessageSize) {
     setSizeInBytes(properties::set, MESSAGE_SIZE_MAX_KEY, maxMessageSize);
   }
+
+  String FLOW_CONTROL_WINDOW_KEY = PREFIX + ".flow.control.window";
+  SizeInBytes FLOW_CONTROL_WINDOW_DEFAULT = SizeInBytes.valueOf("1MB");
+  static SizeInBytes flowControlWindow(RaftProperties properties) {
+    return getSizeInBytes(properties::getSizeInBytes,
+        FLOW_CONTROL_WINDOW_KEY, FLOW_CONTROL_WINDOW_DEFAULT);
+  }
+  static void setFlowControlWindow(RaftProperties properties, SizeInBytes 
flowControlWindowSize) {
+    setSizeInBytes(properties::set, FLOW_CONTROL_WINDOW_KEY, 
flowControlWindowSize);
+  }
+
   static void main(String[] args) {
     printAll(GrpcConfigKeys.class);
   }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/46084224/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java
----------------------------------------------------------------------
diff --git 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java
index e4701cc..ae7a977 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java
@@ -74,25 +74,30 @@ public class RaftGRpcService implements RaftServerRpc {
   private final Map<RaftPeerId, RaftServerProtocolClient> peers =
       Collections.synchronizedMap(new HashMap<>());
   private final Supplier<RaftPeerId> idSupplier;
+  private final int flowControlWindow;
 
   private RaftGRpcService(RaftServer server) {
     this(server,
         GrpcConfigKeys.Server.port(server.getProperties()),
         GrpcConfigKeys.messageSizeMax(server.getProperties()).getSizeInt(),
         GrpcConfigKeys.messageSizeMax(server.getProperties()),
-        
RaftServerConfigKeys.Log.Appender.bufferCapacity(server.getProperties()));
+        
RaftServerConfigKeys.Log.Appender.bufferCapacity(server.getProperties()),
+        GrpcConfigKeys.flowControlWindow(server.getProperties()));
   }
   private RaftGRpcService(RaftServer raftServer, int port, int maxMessageSize,
-      SizeInBytes grpcMessageSizeMax, SizeInBytes appenderBufferSize) {
+      SizeInBytes grpcMessageSizeMax, SizeInBytes appenderBufferSize,
+      SizeInBytes flowControlWindowSize) {
     if (appenderBufferSize.getSize() > grpcMessageSizeMax.getSize()) {
       throw new IllegalArgumentException("Illegal configuration: "
           + RaftServerConfigKeys.Log.Appender.BUFFER_CAPACITY_KEY + " = " + 
appenderBufferSize
           + " > " + GrpcConfigKeys.MESSAGE_SIZE_MAX_KEY + " = " + 
grpcMessageSizeMax);
     }
+    this.flowControlWindow = flowControlWindowSize.getSizeInt();
 
     ServerBuilder serverBuilder = ServerBuilder.forPort(port);
     idSupplier = raftServer::getId;
     server = ((NettyServerBuilder) 
serverBuilder).maxMessageSize(maxMessageSize)
+        .flowControlWindow(flowControlWindow)
         .addService(new RaftServerProtocolService(idSupplier, raftServer))
         .addService(new RaftClientProtocolService(idSupplier, raftServer))
         .addService(new AdminProtocolService(raftServer))
@@ -173,7 +178,7 @@ public class RaftGRpcService implements RaftServerRpc {
   public void addPeers(Iterable<RaftPeer> newPeers) {
     for (RaftPeer p : newPeers) {
       if (!peers.containsKey(p.getId())) {
-        peers.put(p.getId(), new RaftServerProtocolClient(p));
+        peers.put(p.getId(), new RaftServerProtocolClient(p, 
flowControlWindow));
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/46084224/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java
----------------------------------------------------------------------
diff --git 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java
index 810121b..281c838 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java
@@ -97,7 +97,8 @@ public class AppendStreamer implements Closeable {
     this.peers = group.getPeers().stream().collect(
         Collectors.toMap(RaftPeer::getId, Function.identity()));
     proxyMap = new PeerProxyMap<>(clientId.toString(),
-        raftPeer -> new RaftClientProtocolProxy(clientId, raftPeer, 
ResponseHandler::new));
+        raftPeer -> new RaftClientProtocolProxy(clientId, raftPeer, 
ResponseHandler::new,
+            GrpcConfigKeys.flowControlWindow(prop)));
     proxyMap.addPeers(group.getPeers());
     refreshLeaderProxy(leaderId, null);
 

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/46084224/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java
----------------------------------------------------------------------
diff --git 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java
index ea1f204..508260e 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java
@@ -48,7 +48,8 @@ public class GrpcClientRpc extends 
RaftClientRpcWithProxy<RaftClientProtocolClie
   private final int maxMessageSize;
 
   public GrpcClientRpc(ClientId clientId, RaftProperties properties) {
-    super(new PeerProxyMap<>(clientId.toString(), p -> new 
RaftClientProtocolClient(clientId, p)));
+    super(new PeerProxyMap<>(clientId.toString(), p -> new 
RaftClientProtocolClient(clientId, p,
+        GrpcConfigKeys.flowControlWindow(properties))));
     this.clientId = clientId;
     maxMessageSize = GrpcConfigKeys.messageSizeMax(properties).getSizeInt();
   }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/46084224/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolClient.java
----------------------------------------------------------------------
diff --git 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolClient.java
 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolClient.java
index 0b05475..601232a 100644
--- 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolClient.java
+++ 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolClient.java
@@ -21,8 +21,8 @@ import org.apache.ratis.client.impl.ClientProtoUtils;
 import org.apache.ratis.grpc.RaftGrpcUtil;
 import org.apache.ratis.protocol.*;
 import org.apache.ratis.shaded.io.grpc.ManagedChannel;
-import org.apache.ratis.shaded.io.grpc.ManagedChannelBuilder;
 import org.apache.ratis.shaded.io.grpc.StatusRuntimeException;
+import org.apache.ratis.shaded.io.grpc.netty.NettyChannelBuilder;
 import org.apache.ratis.shaded.io.grpc.stub.StreamObserver;
 import org.apache.ratis.shaded.proto.RaftProtos.*;
 import org.apache.ratis.shaded.proto.grpc.AdminProtocolServiceGrpc;
@@ -33,6 +33,7 @@ import 
org.apache.ratis.shaded.proto.grpc.RaftClientProtocolServiceGrpc.RaftClie
 import org.apache.ratis.util.CheckedSupplier;
 import org.apache.ratis.util.CollectionUtils;
 import org.apache.ratis.util.JavaUtils;
+import org.apache.ratis.util.SizeInBytes;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -56,11 +57,13 @@ public class RaftClientProtocolClient implements Closeable {
 
   private final AtomicReference<AsyncStreamObservers> appendStreamObservers = 
new AtomicReference<>();
 
-  public RaftClientProtocolClient(ClientId id, RaftPeer target) {
+  public RaftClientProtocolClient(ClientId id, RaftPeer target,
+      SizeInBytes flowControlWindow) {
     this.name = JavaUtils.memoize(() -> id + "->" + target.getId());
     this.target = target;
-    channel = ManagedChannelBuilder.forTarget(target.getAddress())
-        .usePlaintext(true).build();
+    channel = NettyChannelBuilder.forTarget(target.getAddress())
+        .usePlaintext(true).flowControlWindow(flowControlWindow.getSizeInt())
+        .build();
     blockingStub = RaftClientProtocolServiceGrpc.newBlockingStub(channel);
     asyncStub = RaftClientProtocolServiceGrpc.newStub(channel);
     adminBlockingStub = AdminProtocolServiceGrpc.newBlockingStub(channel);

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/46084224/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolProxy.java
----------------------------------------------------------------------
diff --git 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolProxy.java
 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolProxy.java
index 297fe26..d07add7 100644
--- 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolProxy.java
+++ 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolProxy.java
@@ -22,6 +22,7 @@ import org.apache.ratis.shaded.io.grpc.stub.StreamObserver;
 import org.apache.ratis.shaded.proto.RaftProtos.RaftClientReplyProto;
 import org.apache.ratis.shaded.proto.RaftProtos.RaftClientRequestProto;
 import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.util.SizeInBytes;
 
 import java.io.Closeable;
 import java.io.IOException;
@@ -34,8 +35,9 @@ public class RaftClientProtocolProxy implements Closeable {
 
   public RaftClientProtocolProxy(
       ClientId clientId, RaftPeer target,
-      Function<RaftPeer, CloseableStreamObserver> responseHandlerCreation) {
-    proxy = new RaftClientProtocolClient(clientId, target);
+      Function<RaftPeer, CloseableStreamObserver> responseHandlerCreation,
+      SizeInBytes flowControlWindow) {
+    proxy = new RaftClientProtocolClient(clientId, target, flowControlWindow);
     this.responseHandlerCreation = responseHandlerCreation;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/46084224/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/RaftServerProtocolClient.java
----------------------------------------------------------------------
diff --git 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/RaftServerProtocolClient.java
 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/RaftServerProtocolClient.java
index 53df10d..53f962a 100644
--- 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/RaftServerProtocolClient.java
+++ 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/RaftServerProtocolClient.java
@@ -18,7 +18,7 @@
 package org.apache.ratis.grpc.server;
 
 import org.apache.ratis.shaded.io.grpc.ManagedChannel;
-import org.apache.ratis.shaded.io.grpc.ManagedChannelBuilder;
+import org.apache.ratis.shaded.io.grpc.netty.NettyChannelBuilder;
 import org.apache.ratis.shaded.io.grpc.stub.StreamObserver;
 import org.apache.ratis.shaded.proto.RaftProtos.*;
 import org.apache.ratis.shaded.proto.grpc.RaftServerProtocolServiceGrpc;
@@ -35,9 +35,10 @@ public class RaftServerProtocolClient {
   private final RaftServerProtocolServiceBlockingStub blockingStub;
   private final RaftServerProtocolServiceStub asyncStub;
 
-  public RaftServerProtocolClient(RaftPeer target) {
-    channel = ManagedChannelBuilder.forTarget(target.getAddress())
-        .usePlaintext(true).build();
+  public RaftServerProtocolClient(RaftPeer target, int flowControlWindow) {
+    channel = NettyChannelBuilder.forTarget(target.getAddress())
+        .usePlaintext(true).flowControlWindow(flowControlWindow)
+        .build();
     blockingStub = RaftServerProtocolServiceGrpc.newBlockingStub(channel);
     asyncStub = RaftServerProtocolServiceGrpc.newStub(channel);
   }

Reply via email to