Repository: incubator-ratis
Updated Branches:
  refs/heads/master 7f1794316 -> 564e89ee4


RATIS-133. Raft gRPC client should check proto size before sending a message.  
Contributed by Mukul Kumar Singh


Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/564e89ee
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/564e89ee
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/564e89ee

Branch: refs/heads/master
Commit: 564e89ee4285e5dc2c8b7fa48a66f63a692a4cce
Parents: 7f17943
Author: Tsz-Wo Nicholas Sze <[email protected]>
Authored: Sat Nov 11 14:18:49 2017 -0800
Committer: Tsz-Wo Nicholas Sze <[email protected]>
Committed: Sat Nov 11 14:18:49 2017 -0800

----------------------------------------------------------------------
 .../java/org/apache/ratis/client/ClientFactory.java   |  3 ++-
 .../main/java/org/apache/ratis/client/RaftClient.java |  2 +-
 .../java/org/apache/ratis/grpc/GrpcConfigKeys.java    | 14 +++++++-------
 .../main/java/org/apache/ratis/grpc/GrpcFactory.java  |  5 +++--
 .../java/org/apache/ratis/grpc/RaftGRpcService.java   |  2 +-
 .../org/apache/ratis/grpc/client/AppendStreamer.java  |  6 ++++++
 .../org/apache/ratis/grpc/client/GrpcClientRpc.java   | 10 +++++++++-
 .../org/apache/ratis/hadooprpc/HadoopFactory.java     |  3 ++-
 .../java/org/apache/ratis/netty/NettyFactory.java     |  3 ++-
 .../apache/ratis/server/simulation/SimulatedRpc.java  |  3 ++-
 10 files changed, 35 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/564e89ee/ratis-client/src/main/java/org/apache/ratis/client/ClientFactory.java
----------------------------------------------------------------------
diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/ClientFactory.java 
b/ratis-client/src/main/java/org/apache/ratis/client/ClientFactory.java
index 0aa4827..08f98f6 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/ClientFactory.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/ClientFactory.java
@@ -17,6 +17,7 @@
  */
 package org.apache.ratis.client;
 
+import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.protocol.ClientId;
 import org.apache.ratis.rpc.RpcFactory;
 
@@ -32,5 +33,5 @@ public interface ClientFactory extends RpcFactory {
   }
 
   /** Create a {@link RaftClientRpc}. */
-  RaftClientRpc newRaftClientRpc(ClientId clientId);
+  RaftClientRpc newRaftClientRpc(ClientId clientId, RaftProperties properties);
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/564e89ee/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
----------------------------------------------------------------------
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java 
b/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
index 4251931..6e1e5c1 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
@@ -88,7 +88,7 @@ public interface RaftClient extends Closeable {
         if (clientRpc == null) {
           final RpcType rpcType = RaftConfigKeys.Rpc.type(properties);
           final ClientFactory factory = 
ClientFactory.cast(rpcType.newFactory(parameters));
-          clientRpc = factory.newRaftClientRpc(clientId);
+          clientRpc = factory.newRaftClientRpc(clientId, properties);
         }
       }
       return ClientImplUtils.newRaftClient(clientId,

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/564e89ee/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java
index 429e83d..d3276dc 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java
@@ -40,13 +40,6 @@ public interface GrpcConfigKeys {
       setInt(properties::setInt, PORT_KEY, port);
     }
 
-    String MESSAGE_SIZE_MAX_KEY = PREFIX + ".message.size.max";
-    SizeInBytes MESSAGE_SIZE_MAX_DEFAULT = SizeInBytes.valueOf("64MB");
-    static SizeInBytes messageSizeMax(RaftProperties properties) {
-      return getSizeInBytes(properties::getSizeInBytes,
-          MESSAGE_SIZE_MAX_KEY, MESSAGE_SIZE_MAX_DEFAULT);
-    }
-
     String LEADER_OUTSTANDING_APPENDS_MAX_KEY = PREFIX + 
".leader.outstanding.appends.max";
     int LEADER_OUTSTANDING_APPENDS_MAX_DEFAULT = 128;
     static int leaderOutstandingAppendsMax(RaftProperties properties) {
@@ -90,6 +83,13 @@ public interface GrpcConfigKeys {
     }
   }
 
+  String MESSAGE_SIZE_MAX_KEY = PREFIX + ".message.size.max";
+  SizeInBytes MESSAGE_SIZE_MAX_DEFAULT = SizeInBytes.valueOf("64MB");
+  static SizeInBytes messageSizeMax(RaftProperties properties) {
+    return getSizeInBytes(properties::getSizeInBytes,
+        MESSAGE_SIZE_MAX_KEY, MESSAGE_SIZE_MAX_DEFAULT);
+  }
+
   static void main(String[] args) {
     printAll(GrpcConfigKeys.class);
   }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/564e89ee/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcFactory.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcFactory.java 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcFactory.java
index 3d00952..4f2612e 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcFactory.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcFactory.java
@@ -19,6 +19,7 @@ package org.apache.ratis.grpc;
 
 import org.apache.ratis.client.ClientFactory;
 import org.apache.ratis.conf.Parameters;
+import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.grpc.client.GrpcClientRpc;
 import org.apache.ratis.grpc.server.GRpcLogAppender;
 import org.apache.ratis.protocol.ClientId;
@@ -48,7 +49,7 @@ public class GrpcFactory implements ServerFactory, 
ClientFactory {
   }
 
   @Override
-  public GrpcClientRpc newRaftClientRpc(ClientId clientId) {
-    return new GrpcClientRpc(clientId);
+  public GrpcClientRpc newRaftClientRpc(ClientId clientId, RaftProperties 
properties) {
+    return new GrpcClientRpc(clientId, properties);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/564e89ee/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java
----------------------------------------------------------------------
diff --git 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java
index 853830e..e0af140 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java
@@ -76,7 +76,7 @@ public class RaftGRpcService implements RaftServerRpc {
   private RaftGRpcService(RaftServer server) {
     this(server,
         GrpcConfigKeys.Server.port(server.getProperties()),
-        
GrpcConfigKeys.Server.messageSizeMax(server.getProperties()).getSizeInt());
+        GrpcConfigKeys.messageSizeMax(server.getProperties()).getSizeInt());
   }
   private RaftGRpcService(RaftServer raftServer, int port, int maxMessageSize) 
{
     ServerBuilder serverBuilder = ServerBuilder.forPort(port);

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/564e89ee/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java
----------------------------------------------------------------------
diff --git 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java
index 1eba386..c8c30fa 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java
@@ -71,6 +71,7 @@ public class AppendStreamer implements Closeable {
   private final Deque<RaftClientRequestProto> dataQueue;
   private final Deque<RaftClientRequestProto> ackQueue;
   private final int maxPendingNum;
+  private final int maxMessageSize;
 
   private final PeerProxyMap<RaftClientProtocolProxy> proxyMap;
   private final Map<RaftPeerId, RaftPeer> peers;
@@ -87,6 +88,7 @@ public class AppendStreamer implements Closeable {
       RaftPeerId leaderId, ClientId clientId) {
     this.clientId = clientId;
     maxPendingNum = GrpcConfigKeys.OutputStream.outstandingAppendsMax(prop);
+    maxMessageSize = GrpcConfigKeys.messageSizeMax(prop).getSizeInt();
     dataQueue = new ConcurrentLinkedDeque<>();
     ackQueue = new ConcurrentLinkedDeque<>();
     exceptionAndRetry = new ExceptionAndRetry(prop);
@@ -155,6 +157,10 @@ public class AppendStreamer implements Closeable {
       // wrap the current buffer into a RaftClientRequestProto
       final RaftClientRequestProto request = 
ClientProtoUtils.toRaftClientRequestProto(
           clientId, leaderId, groupId, seqNum, content, false);
+      if (request.getSerializedSize() > maxMessageSize) {
+        throw new IOException("msg size:" + request.getSerializedSize() +
+            " exceeds maximum:" + maxMessageSize);
+      }
       dataQueue.offer(request);
       this.notifyAll();
     } else {

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/564e89ee/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java
----------------------------------------------------------------------
diff --git 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java
index d7ae284..3084289 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java
@@ -19,6 +19,8 @@ package org.apache.ratis.grpc.client;
 
 import org.apache.ratis.client.impl.ClientProtoUtils;
 import org.apache.ratis.client.impl.RaftClientRpcWithProxy;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.grpc.GrpcConfigKeys;
 import org.apache.ratis.grpc.RaftGrpcUtil;
 import org.apache.ratis.protocol.*;
 import org.apache.ratis.shaded.io.grpc.StatusRuntimeException;
@@ -41,9 +43,11 @@ import static 
org.apache.ratis.client.impl.ClientProtoUtils.*;
 
 public class GrpcClientRpc extends 
RaftClientRpcWithProxy<RaftClientProtocolClient> {
   public static final Logger LOG = 
LoggerFactory.getLogger(GrpcClientRpc.class);
+  private final int maxMessageSize;
 
-  public GrpcClientRpc(ClientId clientId) {
+  public GrpcClientRpc(ClientId clientId, RaftProperties properties) {
     super(new PeerProxyMap<>(clientId.toString(), 
RaftClientProtocolClient::new));
+    maxMessageSize = GrpcConfigKeys.messageSizeMax(properties).getSizeInt();
   }
 
   @Override
@@ -66,6 +70,10 @@ public class GrpcClientRpc extends 
RaftClientRpcWithProxy<RaftClientProtocolClie
           proxy.serverInformation(proto));
     } else {
       RaftClientRequestProto requestProto = toRaftClientRequestProto(request);
+      if (requestProto.getSerializedSize() > maxMessageSize) {
+        throw new IOException("msg size:" + requestProto.getSerializedSize() +
+            " exceeds maximum:" + maxMessageSize);
+      }
       CompletableFuture<RaftClientReplyProto> replyFuture =
           new CompletableFuture<>();
       final StreamObserver<RaftClientRequestProto> requestObserver =

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/564e89ee/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/HadoopFactory.java
----------------------------------------------------------------------
diff --git 
a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/HadoopFactory.java 
b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/HadoopFactory.java
index 8f8bc99..cf4c5fd 100644
--- a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/HadoopFactory.java
+++ b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/HadoopFactory.java
@@ -20,6 +20,7 @@ package org.apache.ratis.hadooprpc;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.ratis.client.ClientFactory;
 import org.apache.ratis.conf.Parameters;
+import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.hadooprpc.client.HadoopClientRpc;
 import org.apache.ratis.hadooprpc.server.HadoopRpcService;
 import org.apache.ratis.protocol.ClientId;
@@ -62,7 +63,7 @@ public class HadoopFactory extends ServerFactory.BaseFactory 
implements ClientFa
   }
 
   @Override
-  public HadoopClientRpc newRaftClientRpc(ClientId clientId) {
+  public HadoopClientRpc newRaftClientRpc(ClientId clientId, RaftProperties 
properties) {
     return new HadoopClientRpc(clientId, getConf());
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/564e89ee/ratis-netty/src/main/java/org/apache/ratis/netty/NettyFactory.java
----------------------------------------------------------------------
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyFactory.java 
b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyFactory.java
index f7010c9..5e9c2e9 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyFactory.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyFactory.java
@@ -19,6 +19,7 @@ package org.apache.ratis.netty;
 
 import org.apache.ratis.client.ClientFactory;
 import org.apache.ratis.conf.Parameters;
+import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.netty.client.NettyClientRpc;
 import org.apache.ratis.netty.server.NettyRpcService;
 import org.apache.ratis.protocol.ClientId;
@@ -40,7 +41,7 @@ public class NettyFactory extends ServerFactory.BaseFactory 
implements ClientFac
   }
 
   @Override
-  public NettyClientRpc newRaftClientRpc(ClientId clientId) {
+  public NettyClientRpc newRaftClientRpc(ClientId clientId, RaftProperties 
properties) {
     return new NettyClientRpc(clientId);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/564e89ee/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedRpc.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedRpc.java
 
b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedRpc.java
index b6e0b1f..bbf0a3e 100644
--- 
a/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedRpc.java
+++ 
b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedRpc.java
@@ -19,6 +19,7 @@ package org.apache.ratis.server.simulation;
 
 import org.apache.ratis.client.ClientFactory;
 import org.apache.ratis.conf.Parameters;
+import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.protocol.ClientId;
 import org.apache.ratis.rpc.RpcType;
 import org.apache.ratis.server.RaftServer;
@@ -70,7 +71,7 @@ class SimulatedRpc implements RpcType {
     }
 
     @Override
-    public SimulatedClientRpc newRaftClientRpc(ClientId clientId) {
+    public SimulatedClientRpc newRaftClientRpc(ClientId clientId, 
RaftProperties properties) {
       return Objects.requireNonNull(client2serverRequestReply);
     }
 

Reply via email to