Repository: incubator-ratis Updated Branches: refs/heads/master 7f1794316 -> 564e89ee4
RATIS-133. Raft gRPC client should check proto size before sending a message. Contributed by Mukul Kumar Singh Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/564e89ee Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/564e89ee Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/564e89ee Branch: refs/heads/master Commit: 564e89ee4285e5dc2c8b7fa48a66f63a692a4cce Parents: 7f17943 Author: Tsz-Wo Nicholas Sze <[email protected]> Authored: Sat Nov 11 14:18:49 2017 -0800 Committer: Tsz-Wo Nicholas Sze <[email protected]> Committed: Sat Nov 11 14:18:49 2017 -0800 ---------------------------------------------------------------------- .../java/org/apache/ratis/client/ClientFactory.java | 3 ++- .../main/java/org/apache/ratis/client/RaftClient.java | 2 +- .../java/org/apache/ratis/grpc/GrpcConfigKeys.java | 14 +++++++------- .../main/java/org/apache/ratis/grpc/GrpcFactory.java | 5 +++-- .../java/org/apache/ratis/grpc/RaftGRpcService.java | 2 +- .../org/apache/ratis/grpc/client/AppendStreamer.java | 6 ++++++ .../org/apache/ratis/grpc/client/GrpcClientRpc.java | 10 +++++++++- .../org/apache/ratis/hadooprpc/HadoopFactory.java | 3 ++- .../java/org/apache/ratis/netty/NettyFactory.java | 3 ++- .../apache/ratis/server/simulation/SimulatedRpc.java | 3 ++- 10 files changed, 35 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/564e89ee/ratis-client/src/main/java/org/apache/ratis/client/ClientFactory.java ---------------------------------------------------------------------- diff --git a/ratis-client/src/main/java/org/apache/ratis/client/ClientFactory.java b/ratis-client/src/main/java/org/apache/ratis/client/ClientFactory.java index 0aa4827..08f98f6 100644 --- a/ratis-client/src/main/java/org/apache/ratis/client/ClientFactory.java +++ b/ratis-client/src/main/java/org/apache/ratis/client/ClientFactory.java @@ -17,6 +17,7 @@ */ package org.apache.ratis.client; +import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.protocol.ClientId; import org.apache.ratis.rpc.RpcFactory; @@ -32,5 +33,5 @@ public interface ClientFactory extends RpcFactory { } /** Create a {@link RaftClientRpc}. */ - RaftClientRpc newRaftClientRpc(ClientId clientId); + RaftClientRpc newRaftClientRpc(ClientId clientId, RaftProperties properties); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/564e89ee/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java ---------------------------------------------------------------------- diff --git a/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java b/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java index 4251931..6e1e5c1 100644 --- a/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java +++ b/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java @@ -88,7 +88,7 @@ public interface RaftClient extends Closeable { if (clientRpc == null) { final RpcType rpcType = RaftConfigKeys.Rpc.type(properties); final ClientFactory factory = ClientFactory.cast(rpcType.newFactory(parameters)); - clientRpc = factory.newRaftClientRpc(clientId); + clientRpc = factory.newRaftClientRpc(clientId, properties); } } return ClientImplUtils.newRaftClient(clientId, http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/564e89ee/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java index 429e83d..d3276dc 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java @@ -40,13 +40,6 @@ public interface GrpcConfigKeys { setInt(properties::setInt, PORT_KEY, port); } - String MESSAGE_SIZE_MAX_KEY = PREFIX + ".message.size.max"; - SizeInBytes MESSAGE_SIZE_MAX_DEFAULT = SizeInBytes.valueOf("64MB"); - static SizeInBytes messageSizeMax(RaftProperties properties) { - return getSizeInBytes(properties::getSizeInBytes, - MESSAGE_SIZE_MAX_KEY, MESSAGE_SIZE_MAX_DEFAULT); - } - String LEADER_OUTSTANDING_APPENDS_MAX_KEY = PREFIX + ".leader.outstanding.appends.max"; int LEADER_OUTSTANDING_APPENDS_MAX_DEFAULT = 128; static int leaderOutstandingAppendsMax(RaftProperties properties) { @@ -90,6 +83,13 @@ public interface GrpcConfigKeys { } } + String MESSAGE_SIZE_MAX_KEY = PREFIX + ".message.size.max"; + SizeInBytes MESSAGE_SIZE_MAX_DEFAULT = SizeInBytes.valueOf("64MB"); + static SizeInBytes messageSizeMax(RaftProperties properties) { + return getSizeInBytes(properties::getSizeInBytes, + MESSAGE_SIZE_MAX_KEY, MESSAGE_SIZE_MAX_DEFAULT); + } + static void main(String[] args) { printAll(GrpcConfigKeys.class); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/564e89ee/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcFactory.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcFactory.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcFactory.java index 3d00952..4f2612e 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcFactory.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcFactory.java @@ -19,6 +19,7 @@ package org.apache.ratis.grpc; import org.apache.ratis.client.ClientFactory; import org.apache.ratis.conf.Parameters; +import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.grpc.client.GrpcClientRpc; import org.apache.ratis.grpc.server.GRpcLogAppender; import org.apache.ratis.protocol.ClientId; @@ -48,7 +49,7 @@ public class GrpcFactory implements ServerFactory, ClientFactory { } @Override - public GrpcClientRpc newRaftClientRpc(ClientId clientId) { - return new GrpcClientRpc(clientId); + public GrpcClientRpc newRaftClientRpc(ClientId clientId, RaftProperties properties) { + return new GrpcClientRpc(clientId, properties); } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/564e89ee/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java index 853830e..e0af140 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java @@ -76,7 +76,7 @@ public class RaftGRpcService implements RaftServerRpc { private RaftGRpcService(RaftServer server) { this(server, GrpcConfigKeys.Server.port(server.getProperties()), - GrpcConfigKeys.Server.messageSizeMax(server.getProperties()).getSizeInt()); + GrpcConfigKeys.messageSizeMax(server.getProperties()).getSizeInt()); } private RaftGRpcService(RaftServer raftServer, int port, int maxMessageSize) { ServerBuilder serverBuilder = ServerBuilder.forPort(port); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/564e89ee/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java index 1eba386..c8c30fa 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java @@ -71,6 +71,7 @@ public class AppendStreamer implements Closeable { private final Deque<RaftClientRequestProto> dataQueue; private final Deque<RaftClientRequestProto> ackQueue; private final int maxPendingNum; + private final int maxMessageSize; private final PeerProxyMap<RaftClientProtocolProxy> proxyMap; private final Map<RaftPeerId, RaftPeer> peers; @@ -87,6 +88,7 @@ public class AppendStreamer implements Closeable { RaftPeerId leaderId, ClientId clientId) { this.clientId = clientId; maxPendingNum = GrpcConfigKeys.OutputStream.outstandingAppendsMax(prop); + maxMessageSize = GrpcConfigKeys.messageSizeMax(prop).getSizeInt(); dataQueue = new ConcurrentLinkedDeque<>(); ackQueue = new ConcurrentLinkedDeque<>(); exceptionAndRetry = new ExceptionAndRetry(prop); @@ -155,6 +157,10 @@ public class AppendStreamer implements Closeable { // wrap the current buffer into a RaftClientRequestProto final RaftClientRequestProto request = ClientProtoUtils.toRaftClientRequestProto( clientId, leaderId, groupId, seqNum, content, false); + if (request.getSerializedSize() > maxMessageSize) { + throw new IOException("msg size:" + request.getSerializedSize() + + " exceeds maximum:" + maxMessageSize); + } dataQueue.offer(request); this.notifyAll(); } else { http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/564e89ee/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java index d7ae284..3084289 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java @@ -19,6 +19,8 @@ package org.apache.ratis.grpc.client; import org.apache.ratis.client.impl.ClientProtoUtils; import org.apache.ratis.client.impl.RaftClientRpcWithProxy; +import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.grpc.GrpcConfigKeys; import org.apache.ratis.grpc.RaftGrpcUtil; import org.apache.ratis.protocol.*; import org.apache.ratis.shaded.io.grpc.StatusRuntimeException; @@ -41,9 +43,11 @@ import static org.apache.ratis.client.impl.ClientProtoUtils.*; public class GrpcClientRpc extends RaftClientRpcWithProxy<RaftClientProtocolClient> { public static final Logger LOG = LoggerFactory.getLogger(GrpcClientRpc.class); + private final int maxMessageSize; - public GrpcClientRpc(ClientId clientId) { + public GrpcClientRpc(ClientId clientId, RaftProperties properties) { super(new PeerProxyMap<>(clientId.toString(), RaftClientProtocolClient::new)); + maxMessageSize = GrpcConfigKeys.messageSizeMax(properties).getSizeInt(); } @Override @@ -66,6 +70,10 @@ public class GrpcClientRpc extends RaftClientRpcWithProxy<RaftClientProtocolClie proxy.serverInformation(proto)); } else { RaftClientRequestProto requestProto = toRaftClientRequestProto(request); + if (requestProto.getSerializedSize() > maxMessageSize) { + throw new IOException("msg size:" + requestProto.getSerializedSize() + + " exceeds maximum:" + maxMessageSize); + } CompletableFuture<RaftClientReplyProto> replyFuture = new CompletableFuture<>(); final StreamObserver<RaftClientRequestProto> requestObserver = http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/564e89ee/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/HadoopFactory.java ---------------------------------------------------------------------- diff --git a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/HadoopFactory.java b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/HadoopFactory.java index 8f8bc99..cf4c5fd 100644 --- a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/HadoopFactory.java +++ b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/HadoopFactory.java @@ -20,6 +20,7 @@ package org.apache.ratis.hadooprpc; import org.apache.hadoop.conf.Configuration; import org.apache.ratis.client.ClientFactory; import org.apache.ratis.conf.Parameters; +import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.hadooprpc.client.HadoopClientRpc; import org.apache.ratis.hadooprpc.server.HadoopRpcService; import org.apache.ratis.protocol.ClientId; @@ -62,7 +63,7 @@ public class HadoopFactory extends ServerFactory.BaseFactory implements ClientFa } @Override - public HadoopClientRpc newRaftClientRpc(ClientId clientId) { + public HadoopClientRpc newRaftClientRpc(ClientId clientId, RaftProperties properties) { return new HadoopClientRpc(clientId, getConf()); } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/564e89ee/ratis-netty/src/main/java/org/apache/ratis/netty/NettyFactory.java ---------------------------------------------------------------------- diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyFactory.java b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyFactory.java index f7010c9..5e9c2e9 100644 --- a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyFactory.java +++ b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyFactory.java @@ -19,6 +19,7 @@ package org.apache.ratis.netty; import org.apache.ratis.client.ClientFactory; import org.apache.ratis.conf.Parameters; +import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.netty.client.NettyClientRpc; import org.apache.ratis.netty.server.NettyRpcService; import org.apache.ratis.protocol.ClientId; @@ -40,7 +41,7 @@ public class NettyFactory extends ServerFactory.BaseFactory implements ClientFac } @Override - public NettyClientRpc newRaftClientRpc(ClientId clientId) { + public NettyClientRpc newRaftClientRpc(ClientId clientId, RaftProperties properties) { return new NettyClientRpc(clientId); } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/564e89ee/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedRpc.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedRpc.java b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedRpc.java index b6e0b1f..bbf0a3e 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedRpc.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedRpc.java @@ -19,6 +19,7 @@ package org.apache.ratis.server.simulation; import org.apache.ratis.client.ClientFactory; import org.apache.ratis.conf.Parameters; +import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.protocol.ClientId; import org.apache.ratis.rpc.RpcType; import org.apache.ratis.server.RaftServer; @@ -70,7 +71,7 @@ class SimulatedRpc implements RpcType { } @Override - public SimulatedClientRpc newRaftClientRpc(ClientId clientId) { + public SimulatedClientRpc newRaftClientRpc(ClientId clientId, RaftProperties properties) { return Objects.requireNonNull(client2serverRequestReply); }
