Repository: incubator-ratis Updated Branches: refs/heads/master c2423179b -> 0236eea30
RATIS-289. Optimize readStateMachine by allowing to enqueue multiple read requests in parallel. Contributed by Mukul Kumar Singh Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/0236eea3 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/0236eea3 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/0236eea3 Branch: refs/heads/master Commit: 0236eea307fef8f8829b19dfef8919fb665f42fc Parents: c242317 Author: Tsz Wo Nicholas Sze <[email protected]> Authored: Fri Aug 3 13:44:50 2018 -0700 Committer: Tsz Wo Nicholas Sze <[email protected]> Committed: Fri Aug 3 13:44:50 2018 -0700 ---------------------------------------------------------------------- .../java/org/apache/ratis/util/ProtoUtils.java | 15 ++++++- ratis-proto-shaded/src/main/proto/Raft.proto | 1 + .../apache/ratis/server/impl/LogAppender.java | 14 ++++-- .../ratis/server/storage/MemoryRaftLog.java | 4 +- .../apache/ratis/server/storage/RaftLog.java | 46 +++++++++++++++++++- .../ratis/server/storage/SegmentedRaftLog.java | 16 ++----- 6 files changed, 75 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/0236eea3/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java index 2ac2ca5..06b1346 100644 --- a/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java @@ -190,10 +190,23 @@ public interface ProtoUtils { .setSmLogEntry (SMLogEntryProto.newBuilder() .setData(smLog.getData()) - .setStateMachineDataAttached(true)) + .setStateMachineDataAttached(true) + .setSerializedProtobufSize(entry.getSerializedSize())) .build(); } + static long getSerializedSize(LogEntryProto entry) { + if (entry.getLogEntryBodyCase() != LogEntryBodyCase.SMLOGENTRY) { + return entry.getSerializedSize(); + } + final SMLogEntryProto smLog = entry.getSmLogEntry(); + if (!smLog.getStateMachineDataAttached()) { + // if state machine data was never set, return the proto serialized size + return entry.getSerializedSize(); + } + return smLog.getSerializedProtobufSize(); + } + static IOException toIOException(ServiceException se) { final Throwable t = se.getCause(); if (t == null) { http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/0236eea3/ratis-proto-shaded/src/main/proto/Raft.proto ---------------------------------------------------------------------- diff --git a/ratis-proto-shaded/src/main/proto/Raft.proto b/ratis-proto-shaded/src/main/proto/Raft.proto index 2965f97..62202be 100644 --- a/ratis-proto-shaded/src/main/proto/Raft.proto +++ b/ratis-proto-shaded/src/main/proto/Raft.proto @@ -47,6 +47,7 @@ message SMLogEntryProto { bytes stateMachineData = 2; // State machine specific data which is not written to log. bool stateMachineDataAttached = 3; // set this flag when state machine data is attached. + uint64 serializedProtobufSize = 4; // size of the serialized LogEntryProto along with stateMachineData } message LeaderNoOp { http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/0236eea3/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java index 0398052..b863bad 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java @@ -22,6 +22,7 @@ import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.server.RaftServerConfigKeys; import org.apache.ratis.server.impl.LeaderState.StateUpdateEventType; import org.apache.ratis.server.protocol.TermIndex; +import org.apache.ratis.server.storage.RaftLog.EntryWithData; import org.apache.ratis.server.storage.FileInfo; import org.apache.ratis.server.storage.RaftLog; import org.apache.ratis.server.storage.RaftLogIOException; @@ -134,7 +135,7 @@ public class LogAppender { * A buffer for log entries with size limitation. */ private class LogEntryBuffer { - private final List<LogEntryProto> buf = new ArrayList<>(); + private final List<EntryWithData> buf = new ArrayList<>(); private int totalSize = 0; /** @@ -143,7 +144,7 @@ public class LogAppender { * @return true if the entry is added successfully; * otherwise, the entry is not added, return false. */ - boolean addEntry(LogEntryProto entry) { + boolean addEntry(EntryWithData entry) { final long entrySize = entry.getSerializedSize(); if (totalSize + entrySize <= maxBufferSize) { buf.add(entry); @@ -157,9 +158,14 @@ public class LogAppender { return buf.isEmpty(); } - AppendEntriesRequestProto getAppendRequest(TermIndex previous, long callId) { + AppendEntriesRequestProto getAppendRequest(TermIndex previous, long callId) throws RaftLogIOException { + final List<LogEntryProto> protos = new ArrayList<>(); + // Wait for all the log entry futures to complete and then create a list of LogEntryProtos. + for (EntryWithData bufEntry : buf) { + protos.add(bufEntry.getEntry()); + } final AppendEntriesRequestProto request = leaderState.newAppendEntriesRequestProto( - getFollowerId(), previous, buf, !follower.isAttendingVote(), callId); + getFollowerId(), previous, protos, !follower.isAttendingVote(), callId); buf.clear(); totalSize = 0; return request; http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/0236eea3/ratis-server/src/main/java/org/apache/ratis/server/storage/MemoryRaftLog.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/MemoryRaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/MemoryRaftLog.java index eba6f63..7b3b1b9 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/storage/MemoryRaftLog.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/MemoryRaftLog.java @@ -52,8 +52,8 @@ public class MemoryRaftLog extends RaftLog { } @Override - public LogEntryProto getEntryWithData(long index) { - return get(index); + public EntryWithData getEntryWithData(long index) { + return new EntryWithData(get(index), null); } @Override http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/0236eea3/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java index 6dc8835..0d2ec4c 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java @@ -20,6 +20,7 @@ package org.apache.ratis.server.storage; import org.apache.ratis.protocol.ClientId; import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.protocol.StateMachineException; +import org.apache.ratis.server.impl.LogAppender; import org.apache.ratis.server.impl.RaftConfiguration; import org.apache.ratis.server.impl.RaftServerConstants; import org.apache.ratis.server.impl.ServerProtoUtils; @@ -27,6 +28,7 @@ import org.apache.ratis.server.protocol.TermIndex; import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto; import org.apache.ratis.statemachine.TransactionContext; import org.apache.ratis.util.AutoCloseableLock; +import org.apache.ratis.util.JavaUtils; import org.apache.ratis.util.Preconditions; import org.apache.ratis.util.ProtoUtils; import org.slf4j.Logger; @@ -200,7 +202,7 @@ public abstract class RaftLog implements Closeable { * @return The log entry associated with the given index. * Null if there is no log entry with the index. */ - public abstract LogEntryProto getEntryWithData(long index) throws RaftLogIOException; + public abstract EntryWithData getEntryWithData(long index) throws RaftLogIOException; /** * Get the TermIndex information of the given index. @@ -326,4 +328,46 @@ public abstract class RaftLog implements Closeable { public RaftPeerId getSelfId() { return selfId; } + + /** + * Holds proto entry along with future which contains read state machine data + */ + public class EntryWithData { + private LogEntryProto logEntry; + private CompletableFuture<LogEntryProto> future; + + EntryWithData(LogEntryProto logEntry, CompletableFuture<LogEntryProto> future) { + this.logEntry = logEntry; + this.future = future; + } + + public long getSerializedSize() { + return ProtoUtils.getSerializedSize(logEntry); + } + + public LogEntryProto getEntry() throws RaftLogIOException { + LogEntryProto entryProto; + if (future == null) { + return logEntry; + } + + try { + entryProto = future.join(); + } catch (Throwable t) { + final String err = selfId + ": Failed readStateMachineData for " + + ServerProtoUtils.toLogEntryString(logEntry); + LogAppender.LOG.error(err, t); + throw new RaftLogIOException(err, JavaUtils.unwrapCompletionException(t)); + } + // by this time we have already read the state machine data, + // so the log entry data should be set now + if (ProtoUtils.shouldReadStateMachineData(entryProto)) { + final String err = selfId + ": State machine data not set for " + + ServerProtoUtils.toLogEntryString(logEntry); + LogAppender.LOG.error(err); + throw new RaftLogIOException(err); + } + return entryProto; + } + } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/0236eea3/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java index 7f59518..9df16f8 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java @@ -194,30 +194,20 @@ public class SegmentedRaftLog extends RaftLog { } @Override - public LogEntryProto getEntryWithData(long index) throws RaftLogIOException { + public EntryWithData getEntryWithData(long index) throws RaftLogIOException { final LogEntryProto entry = get(index); if (!ProtoUtils.shouldReadStateMachineData(entry)) { - return entry; + return new EntryWithData(entry, null); } - LogEntryProto logEntryProto; try { - logEntryProto = server.getStateMachine().readStateMachineData(entry).join(); + return new EntryWithData(entry, server.getStateMachine().readStateMachineData(entry)); } catch (Throwable e) { final String err = server.getId() + ": Failed readStateMachineData for " + ServerProtoUtils.toLogEntryString(entry); LOG.error(err, e); throw new RaftLogIOException(err, JavaUtils.unwrapCompletionException(e)); } - // by this time we have already read the state machine data, - // so the log entry data should be set now - if (!ProtoUtils.shouldReadStateMachineData(logEntryProto)) { - final String err = server.getId() + ": State machine data not set for " + - ServerProtoUtils.toLogEntryString(logEntryProto); - LOG.error(err); - throw new RaftLogIOException(err); - } - return logEntryProto; } private void checkAndEvictCache() {
