Repository: incubator-ratis Updated Branches: refs/heads/master a3a833290 -> c213a1969
RATIS-281. Ratis should provide an api to readStateMachineData when the cached segment is not available. Contributed by Mukul Kumar Singh Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/c213a196 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/c213a196 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/c213a196 Branch: refs/heads/master Commit: c213a19691d38f0f28e58d3138f7c6b33689d235 Parents: a3a8332 Author: Tsz Wo Nicholas Sze <[email protected]> Authored: Mon Jul 30 13:11:21 2018 -0700 Committer: Tsz Wo Nicholas Sze <[email protected]> Committed: Mon Jul 30 13:11:21 2018 -0700 ---------------------------------------------------------------------- .../java/org/apache/ratis/util/ProtoUtils.java | 14 +++++++++- ratis-proto-shaded/src/main/proto/Raft.proto | 1 + .../apache/ratis/server/impl/LogAppender.java | 2 +- .../ratis/server/storage/MemoryRaftLog.java | 5 ++++ .../apache/ratis/server/storage/RaftLog.java | 9 ++++++ .../server/storage/RaftLogIOException.java | 4 +++ .../ratis/server/storage/SegmentedRaftLog.java | 29 ++++++++++++++++++++ .../apache/ratis/statemachine/StateMachine.java | 10 +++++++ .../SimpleStateMachine4Testing.java | 18 +++++++++++- 9 files changed, 89 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c213a196/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java index d3b8fcf..2ac2ca5 100644 --- a/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java @@ -161,6 +161,14 @@ public interface ProtoUtils { .build(); } + static boolean shouldReadStateMachineData(LogEntryProto entry) { + if (entry.getLogEntryBodyCase() != LogEntryBodyCase.SMLOGENTRY) { + return false; + } + final SMLogEntryProto smLog = entry.getSmLogEntry(); + return smLog.getStateMachineDataAttached() && smLog.getStateMachineData().isEmpty(); + } + /** * If the given entry is {@link LogEntryBodyCase#SMLOGENTRY} and it has state machine data, * build a new entry without the state machine data. @@ -177,8 +185,12 @@ public interface ProtoUtils { return entry; } // build a new LogEntryProto without state machine data + // and mark that it has been removed return LogEntryProto.newBuilder(entry) - .setSmLogEntry(SMLogEntryProto.newBuilder().setData(smLog.getData())) + .setSmLogEntry + (SMLogEntryProto.newBuilder() + .setData(smLog.getData()) + .setStateMachineDataAttached(true)) .build(); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c213a196/ratis-proto-shaded/src/main/proto/Raft.proto ---------------------------------------------------------------------- diff --git a/ratis-proto-shaded/src/main/proto/Raft.proto b/ratis-proto-shaded/src/main/proto/Raft.proto index eb8b0b5..c34e5c4 100644 --- a/ratis-proto-shaded/src/main/proto/Raft.proto +++ b/ratis-proto-shaded/src/main/proto/Raft.proto @@ -46,6 +46,7 @@ message SMLogEntryProto { bytes data = 1; bytes stateMachineData = 2; // State machine specific data which is not written to log. + bool stateMachineDataAttached = 3; // set this flag when state machine data is attached. } message LeaderNoOp { http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c213a196/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java index 1db4c69..0398052 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java @@ -196,7 +196,7 @@ public class LogAppender { } else if (leaderNext > next) { boolean hasSpace = true; for(; hasSpace && leaderNext > next;) { - hasSpace = buffer.addEntry(raftLog.get(next++)); + hasSpace = buffer.addEntry(raftLog.getEntryWithData(next++)); } // buffer is full or batch sending is disabled, send out a request toSend = !hasSpace || !batchSending; http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c213a196/ratis-server/src/main/java/org/apache/ratis/server/storage/MemoryRaftLog.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/MemoryRaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/MemoryRaftLog.java index 98162b7..eba6f63 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/storage/MemoryRaftLog.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/MemoryRaftLog.java @@ -52,6 +52,11 @@ public class MemoryRaftLog extends RaftLog { } @Override + public LogEntryProto getEntryWithData(long index) { + return get(index); + } + + @Override public TermIndex getTermIndex(long index) { checkLogState(); try(AutoCloseableLock readLock = readLock()) { http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c213a196/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java index 6ae3216..6dc8835 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java @@ -194,6 +194,15 @@ public abstract class RaftLog implements Closeable { public abstract LogEntryProto get(long index) throws RaftLogIOException; /** + * Get the log entry of the given index along with the state machine data. + * + * @param index The given index. + * @return The log entry associated with the given index. + * Null if there is no log entry with the index. + */ + public abstract LogEntryProto getEntryWithData(long index) throws RaftLogIOException; + + /** * Get the TermIndex information of the given index. * * @param index The given index. http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c213a196/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogIOException.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogIOException.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogIOException.java index 0fbf737..5b16b13 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogIOException.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogIOException.java @@ -30,4 +30,8 @@ public class RaftLogIOException extends RaftException { public RaftLogIOException(String msg) { super(msg); } + + public RaftLogIOException(String message, Throwable cause) { + super(message, cause); + } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c213a196/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java index 1c00175..7f59518 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java @@ -29,7 +29,9 @@ import org.apache.ratis.server.storage.LogSegment.LogRecordWithEntry; import org.apache.ratis.server.storage.RaftStorageDirectory.LogPathAndIndex; import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto; import org.apache.ratis.util.AutoCloseableLock; +import org.apache.ratis.util.JavaUtils; import org.apache.ratis.util.Preconditions; +import org.apache.ratis.util.ProtoUtils; import java.io.File; import java.io.IOException; @@ -191,6 +193,33 @@ public class SegmentedRaftLog extends RaftLog { return segment.loadCache(recordAndEntry.getRecord()); } + @Override + public LogEntryProto getEntryWithData(long index) throws RaftLogIOException { + final LogEntryProto entry = get(index); + if (!ProtoUtils.shouldReadStateMachineData(entry)) { + return entry; + } + + LogEntryProto logEntryProto; + try { + logEntryProto = server.getStateMachine().readStateMachineData(entry).join(); + } catch (Throwable e) { + final String err = server.getId() + ": Failed readStateMachineData for " + + ServerProtoUtils.toLogEntryString(entry); + LOG.error(err, e); + throw new RaftLogIOException(err, JavaUtils.unwrapCompletionException(e)); + } + // by this time we have already read the state machine data, + // so the log entry data should be set now + if (!ProtoUtils.shouldReadStateMachineData(logEntryProto)) { + final String err = server.getId() + ": State machine data not set for " + + ServerProtoUtils.toLogEntryString(logEntryProto); + LOG.error(err); + throw new RaftLogIOException(err); + } + return logEntryProto; + } + private void checkAndEvictCache() { if (server != null && cache.shouldEvict()) { // TODO if the cache is hitting the maximum size and we cannot evict any http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c213a196/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java b/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java index 2a0ebf1..4c2e64d 100644 --- a/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java +++ b/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java @@ -154,6 +154,16 @@ public interface StateMachine extends Closeable { } /** + * Read asynchronously the state machine data to this state machine. + * + * @return a future for the read task if the state machine data should be read + * otherwise, return null. + */ + default CompletableFuture<LogEntryProto> readStateMachineData(LogEntryProto entry) { + return null; + } + + /** * This is called before the transaction passed from the StateMachine is appended to the raft log. * This method will be called from log append and having the same strict serial order that the * transactions will have in the RAFT log. Since this is called serially in the critical path of http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c213a196/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java b/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java index 1bb04c7..e39a9c8 100644 --- a/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java +++ b/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java @@ -258,7 +258,7 @@ public class SimpleStateMachine4Testing extends BaseStateMachine { @Override public CompletableFuture<?> writeStateMachineData(LogEntryProto entry) { - CompletableFuture f = new CompletableFuture(); + CompletableFuture<?> f = new CompletableFuture(); if (blockAppend) { try { blockingSemaphore.acquire(); @@ -273,6 +273,22 @@ public class SimpleStateMachine4Testing extends BaseStateMachine { } @Override + public CompletableFuture<LogEntryProto> readStateMachineData(LogEntryProto entry) { + CompletableFuture<LogEntryProto> f = new CompletableFuture<>(); + if (blockAppend) { + try { + blockingSemaphore.acquire(); + blockingSemaphore.release(); + } catch (InterruptedException e) { + LOG.error("Could not block readStateMachineData", e); + Thread.currentThread().interrupt(); + } + } + f.complete(null); + return f; + } + + @Override public void close() { lifeCycle.checkStateAndClose(() -> { running = false;
