Repository: incubator-ratis Updated Branches: refs/heads/master 021ee857f -> 9d65b502c
RATIS-304. StateMachine#readStateMachineData should return SMLogEntryProto. Contributed by Lokesh Jain Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/9d65b502 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/9d65b502 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/9d65b502 Branch: refs/heads/master Commit: 9d65b502c5b48c8fff8debe5d65cedd10740f973 Parents: 021ee85 Author: Tsz Wo Nicholas Sze <[email protected]> Authored: Thu Sep 27 16:16:49 2018 -0700 Committer: Tsz Wo Nicholas Sze <[email protected]> Committed: Thu Sep 27 16:16:49 2018 -0700 ---------------------------------------------------------------------- .../main/java/org/apache/ratis/util/ProtoUtils.java | 13 +++++++++++++ .../java/org/apache/ratis/server/storage/RaftLog.java | 7 ++++--- .../org/apache/ratis/statemachine/StateMachine.java | 3 ++- .../ratis/statemachine/SimpleStateMachine4Testing.java | 4 ++-- 4 files changed, 21 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/9d65b502/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java index d1ef2e0..2b72397 100644 --- a/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java @@ -194,6 +194,19 @@ public interface ProtoUtils { .build(); } + /** + * Return a new log entry based on the input log entry with stateMachineData added. + * @param stateMachineData - state machine data to be added + * @param entry - log entry to which stateMachineData needs to be added + * @return LogEntryProto with stateMachineData added + */ + static LogEntryProto addStateMachineData(ByteString stateMachineData, LogEntryProto entry) { + final SMLogEntryProto smLogEntryProto = SMLogEntryProto.newBuilder(entry.getSmLogEntry()) + .setStateMachineData(stateMachineData) + .build(); + return LogEntryProto.newBuilder(entry).setSmLogEntry(smLogEntryProto).build(); + } + static long getSerializedSize(LogEntryProto entry) { if (entry.getLogEntryBodyCase() != LogEntryBodyCase.SMLOGENTRY) { return entry.getSerializedSize(); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/9d65b502/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java index ff38879..2adef40 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java @@ -27,6 +27,7 @@ import org.apache.ratis.server.impl.ServerProtoUtils; import org.apache.ratis.server.protocol.TermIndex; import org.apache.ratis.proto.RaftProtos.LogEntryProto; import org.apache.ratis.statemachine.TransactionContext; +import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; import org.apache.ratis.util.AutoCloseableLock; import org.apache.ratis.util.JavaUtils; import org.apache.ratis.util.Preconditions; @@ -347,9 +348,9 @@ public abstract class RaftLog implements Closeable { */ public class EntryWithData { private LogEntryProto logEntry; - private CompletableFuture<LogEntryProto> future; + private CompletableFuture<ByteString> future; - EntryWithData(LogEntryProto logEntry, CompletableFuture<LogEntryProto> future) { + EntryWithData(LogEntryProto logEntry, CompletableFuture<ByteString> future) { this.logEntry = logEntry; this.future = future; } @@ -365,7 +366,7 @@ public abstract class RaftLog implements Closeable { } try { - entryProto = future.join(); + entryProto = future.thenApply(data -> ProtoUtils.addStateMachineData(data, logEntry)).join(); } catch (Throwable t) { final String err = selfId + ": Failed readStateMachineData for " + ServerProtoUtils.toLogEntryString(logEntry); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/9d65b502/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java b/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java index 6c8b2df..900db83 100644 --- a/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java +++ b/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java @@ -28,6 +28,7 @@ import org.apache.ratis.server.protocol.TermIndex; import org.apache.ratis.server.storage.RaftStorage; import org.apache.ratis.proto.RaftProtos.RoleInfoProto; import org.apache.ratis.proto.RaftProtos.LogEntryProto; +import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; import org.apache.ratis.util.LifeCycle; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -161,7 +162,7 @@ public interface StateMachine extends Closeable { * @return a future for the read task if the state machine data should be read * otherwise, return null. */ - default CompletableFuture<LogEntryProto> readStateMachineData(LogEntryProto entry) { + default CompletableFuture<ByteString> readStateMachineData(LogEntryProto entry) { return null; } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/9d65b502/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java b/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java index cafd98d..6285701 100644 --- a/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java +++ b/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java @@ -285,8 +285,8 @@ public class SimpleStateMachine4Testing extends BaseStateMachine { } @Override - public CompletableFuture<LogEntryProto> readStateMachineData(LogEntryProto entry) { - CompletableFuture<LogEntryProto> f = new CompletableFuture<>(); + public CompletableFuture<ByteString> readStateMachineData(LogEntryProto entry) { + CompletableFuture<ByteString> f = new CompletableFuture<>(); if (blockAppend) { try { blockingSemaphore.acquire();
