Repository: incubator-ratis Updated Branches: refs/heads/master c1b23fdb0 -> 2b8467ea9
RATIS-161. gRPC Log appender should check if the appended request size is less than max message size of the server. Contributed by Mukul Kumar Singh Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/2b8467ea Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/2b8467ea Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/2b8467ea Branch: refs/heads/master Commit: 2b8467ea9e06a972be87b0da95768755c5864773 Parents: c1b23fd Author: Tsz-Wo Nicholas Sze <[email protected]> Authored: Tue Dec 12 18:31:48 2017 +0800 Committer: Tsz-Wo Nicholas Sze <[email protected]> Committed: Tue Dec 12 18:31:48 2017 +0800 ---------------------------------------------------------------------- .../org/apache/ratis/grpc/RaftGRpcService.java | 17 +++++++-- .../apache/ratis/server/impl/LogAppender.java | 37 ++++++++++++-------- 2 files changed, 36 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/2b8467ea/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java index e0af140..e4701cc 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java @@ -25,6 +25,7 @@ import org.apache.ratis.protocol.RaftPeer; import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.rpc.SupportedRpcType; import org.apache.ratis.server.RaftServer; +import org.apache.ratis.server.RaftServerConfigKeys; import org.apache.ratis.server.RaftServerRpc; import org.apache.ratis.shaded.io.grpc.Server; import org.apache.ratis.shaded.io.grpc.ServerBuilder; @@ -32,6 +33,7 @@ import org.apache.ratis.shaded.io.grpc.netty.NettyServerBuilder; import org.apache.ratis.shaded.proto.RaftProtos.*; import org.apache.ratis.util.CodeInjectionForTesting; import org.apache.ratis.util.ExitUtils; +import org.apache.ratis.util.SizeInBytes; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -76,9 +78,18 @@ public class RaftGRpcService implements RaftServerRpc { private RaftGRpcService(RaftServer server) { this(server, GrpcConfigKeys.Server.port(server.getProperties()), - GrpcConfigKeys.messageSizeMax(server.getProperties()).getSizeInt()); - } - private RaftGRpcService(RaftServer raftServer, int port, int maxMessageSize) { + GrpcConfigKeys.messageSizeMax(server.getProperties()).getSizeInt(), + GrpcConfigKeys.messageSizeMax(server.getProperties()), + RaftServerConfigKeys.Log.Appender.bufferCapacity(server.getProperties())); + } + private RaftGRpcService(RaftServer raftServer, int port, int maxMessageSize, + SizeInBytes grpcMessageSizeMax, SizeInBytes appenderBufferSize) { + if (appenderBufferSize.getSize() > grpcMessageSizeMax.getSize()) { + throw new IllegalArgumentException("Illegal configuration: " + + RaftServerConfigKeys.Log.Appender.BUFFER_CAPACITY_KEY + " = " + appenderBufferSize + + " > " + GrpcConfigKeys.MESSAGE_SIZE_MAX_KEY + " = " + grpcMessageSizeMax); + } + ServerBuilder serverBuilder = ServerBuilder.forPort(port); idSupplier = raftServer::getId; server = ((NettyServerBuilder) serverBuilder).maxMessageSize(maxMessageSize) http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/2b8467ea/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java index a5b0791..556ba83 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java @@ -114,13 +114,20 @@ public class LogAppender extends Daemon { private final List<LogEntryProto> buf = new ArrayList<>(); private int totalSize = 0; - void addEntry(LogEntryProto entry) { - buf.add(entry); - totalSize += entry.getSerializedSize(); - } - - boolean isFull() { - return totalSize >= maxBufferSize; + /** + * Adds a log entry to the Log entry buffer. + * Checks if enough space is available before adding the entry to the buffer. + * @return true if the entry is added successfully; + * otherwise, the entry is not added, return false. + */ + boolean addEntry(LogEntryProto entry) { + final long entrySize = entry.getSerializedSize(); + if (totalSize + entrySize <= maxBufferSize) { + buf.add(entry); + totalSize += entrySize; + return true; + } + return false; } boolean isEmpty() { @@ -159,20 +166,20 @@ public class LogAppender extends Daemon { final TermIndex previous = getPrevious(); final long leaderNext = raftLog.getNextIndex(); long next = follower.getNextIndex() + buffer.getPendingEntryNum(); - boolean toSend = false; + final boolean toSend; if (leaderNext == next && !buffer.isEmpty()) { // no new entries, then send out the entries in the buffer toSend = true; } else if (leaderNext > next) { - while (leaderNext > next && !buffer.isFull()) { - // stop adding entry once the buffer size is >= the max size - buffer.addEntry(raftLog.get(next++)); - } - if (buffer.isFull() || !batchSending) { - // buffer is full or batch sending is disabled, send out a request - toSend = true; + boolean hasSpace = true; + for(; hasSpace && leaderNext > next;) { + hasSpace = buffer.addEntry(raftLog.get(next++)); } + // buffer is full or batch sending is disabled, send out a request + toSend = !hasSpace || !batchSending; + } else { + toSend = false; } if (toSend || shouldHeartbeat()) {
