Repository: incubator-ratis
Updated Branches:
  refs/heads/master a3a833290 -> c213a1969


RATIS-281. Ratis should provide an api to readStateMachineData when the cached 
segment is not available.  Contributed by Mukul Kumar Singh


Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/c213a196
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/c213a196
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/c213a196

Branch: refs/heads/master
Commit: c213a19691d38f0f28e58d3138f7c6b33689d235
Parents: a3a8332
Author: Tsz Wo Nicholas Sze <[email protected]>
Authored: Mon Jul 30 13:11:21 2018 -0700
Committer: Tsz Wo Nicholas Sze <[email protected]>
Committed: Mon Jul 30 13:11:21 2018 -0700

----------------------------------------------------------------------
 .../java/org/apache/ratis/util/ProtoUtils.java  | 14 +++++++++-
 ratis-proto-shaded/src/main/proto/Raft.proto    |  1 +
 .../apache/ratis/server/impl/LogAppender.java   |  2 +-
 .../ratis/server/storage/MemoryRaftLog.java     |  5 ++++
 .../apache/ratis/server/storage/RaftLog.java    |  9 ++++++
 .../server/storage/RaftLogIOException.java      |  4 +++
 .../ratis/server/storage/SegmentedRaftLog.java  | 29 ++++++++++++++++++++
 .../apache/ratis/statemachine/StateMachine.java | 10 +++++++
 .../SimpleStateMachine4Testing.java             | 18 +++++++++++-
 9 files changed, 89 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c213a196/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java 
b/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java
index d3b8fcf..2ac2ca5 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java
@@ -161,6 +161,14 @@ public interface ProtoUtils {
         .build();
   }
 
+  static boolean shouldReadStateMachineData(LogEntryProto entry) {
+    if (entry.getLogEntryBodyCase() != LogEntryBodyCase.SMLOGENTRY) {
+      return false;
+    }
+    final SMLogEntryProto smLog = entry.getSmLogEntry();
+    return smLog.getStateMachineDataAttached() && 
smLog.getStateMachineData().isEmpty();
+  }
+
   /**
    * If the given entry is {@link LogEntryBodyCase#SMLOGENTRY} and it has 
state machine data,
    * build a new entry without the state machine data.
@@ -177,8 +185,12 @@ public interface ProtoUtils {
       return entry;
     }
     // build a new LogEntryProto without state machine data
+    // and mark that it has been removed
     return LogEntryProto.newBuilder(entry)
-        .setSmLogEntry(SMLogEntryProto.newBuilder().setData(smLog.getData()))
+        .setSmLogEntry
+            (SMLogEntryProto.newBuilder()
+            .setData(smLog.getData())
+            .setStateMachineDataAttached(true))
         .build();
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c213a196/ratis-proto-shaded/src/main/proto/Raft.proto
----------------------------------------------------------------------
diff --git a/ratis-proto-shaded/src/main/proto/Raft.proto 
b/ratis-proto-shaded/src/main/proto/Raft.proto
index eb8b0b5..c34e5c4 100644
--- a/ratis-proto-shaded/src/main/proto/Raft.proto
+++ b/ratis-proto-shaded/src/main/proto/Raft.proto
@@ -46,6 +46,7 @@ message SMLogEntryProto {
   bytes data = 1;
 
   bytes stateMachineData = 2; // State machine specific data which is not 
written to log.
+  bool stateMachineDataAttached = 3; // set this flag when state machine data 
is attached.
 }
 
 message LeaderNoOp {

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c213a196/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
index 1db4c69..0398052 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
@@ -196,7 +196,7 @@ public class LogAppender {
     } else if (leaderNext > next) {
       boolean hasSpace = true;
       for(; hasSpace && leaderNext > next;) {
-        hasSpace = buffer.addEntry(raftLog.get(next++));
+        hasSpace = buffer.addEntry(raftLog.getEntryWithData(next++));
       }
       // buffer is full or batch sending is disabled, send out a request
       toSend = !hasSpace || !batchSending;

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c213a196/ratis-server/src/main/java/org/apache/ratis/server/storage/MemoryRaftLog.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/storage/MemoryRaftLog.java 
b/ratis-server/src/main/java/org/apache/ratis/server/storage/MemoryRaftLog.java
index 98162b7..eba6f63 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/storage/MemoryRaftLog.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/storage/MemoryRaftLog.java
@@ -52,6 +52,11 @@ public class MemoryRaftLog extends RaftLog {
   }
 
   @Override
+  public LogEntryProto getEntryWithData(long index) {
+    return get(index);
+  }
+
+  @Override
   public TermIndex getTermIndex(long index) {
     checkLogState();
     try(AutoCloseableLock readLock = readLock()) {

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c213a196/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java 
b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java
index 6ae3216..6dc8835 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java
@@ -194,6 +194,15 @@ public abstract class RaftLog implements Closeable {
   public abstract LogEntryProto get(long index) throws RaftLogIOException;
 
   /**
+   * Get the log entry of the given index along with the state machine data.
+   *
+   * @param index The given index.
+   * @return The log entry associated with the given index.
+   *         Null if there is no log entry with the index.
+   */
+  public abstract LogEntryProto getEntryWithData(long index) throws 
RaftLogIOException;
+
+  /**
    * Get the TermIndex information of the given index.
    *
    * @param index The given index.

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c213a196/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogIOException.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogIOException.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogIOException.java
index 0fbf737..5b16b13 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogIOException.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogIOException.java
@@ -30,4 +30,8 @@ public class RaftLogIOException extends RaftException {
   public RaftLogIOException(String msg) {
     super(msg);
   }
+
+  public RaftLogIOException(String message, Throwable cause) {
+    super(message, cause);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c213a196/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java
index 1c00175..7f59518 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java
@@ -29,7 +29,9 @@ import 
org.apache.ratis.server.storage.LogSegment.LogRecordWithEntry;
 import org.apache.ratis.server.storage.RaftStorageDirectory.LogPathAndIndex;
 import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto;
 import org.apache.ratis.util.AutoCloseableLock;
+import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.Preconditions;
+import org.apache.ratis.util.ProtoUtils;
 
 import java.io.File;
 import java.io.IOException;
@@ -191,6 +193,33 @@ public class SegmentedRaftLog extends RaftLog {
     return segment.loadCache(recordAndEntry.getRecord());
   }
 
+  @Override
+  public LogEntryProto getEntryWithData(long index) throws RaftLogIOException {
+    final LogEntryProto entry = get(index);
+    if (!ProtoUtils.shouldReadStateMachineData(entry)) {
+      return entry;
+    }
+
+    LogEntryProto logEntryProto;
+    try {
+      logEntryProto = 
server.getStateMachine().readStateMachineData(entry).join();
+    } catch (Throwable e) {
+      final String err = server.getId() + ": Failed readStateMachineData for " 
+
+          ServerProtoUtils.toLogEntryString(entry);
+      LOG.error(err, e);
+      throw new RaftLogIOException(err, 
JavaUtils.unwrapCompletionException(e));
+    }
+    // by this time we have already read the state machine data,
+    // so the log entry data should be set now
+    if (!ProtoUtils.shouldReadStateMachineData(logEntryProto)) {
+      final String err = server.getId() + ": State machine data not set for " +
+          ServerProtoUtils.toLogEntryString(logEntryProto);
+      LOG.error(err);
+      throw new RaftLogIOException(err);
+    }
+    return logEntryProto;
+  }
+
   private void checkAndEvictCache() {
     if (server != null && cache.shouldEvict()) {
       // TODO if the cache is hitting the maximum size and we cannot evict any

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c213a196/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java 
b/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java
index 2a0ebf1..4c2e64d 100644
--- a/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java
+++ b/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java
@@ -154,6 +154,16 @@ public interface StateMachine extends Closeable {
   }
 
   /**
+   * Read asynchronously the state machine data to this state machine.
+   *
+   * @return a future for the read task if the state machine data should be 
read
+   *         otherwise, return null.
+   */
+  default CompletableFuture<LogEntryProto> readStateMachineData(LogEntryProto 
entry) {
+    return null;
+  }
+
+  /**
    * This is called before the transaction passed from the StateMachine is 
appended to the raft log.
    * This method will be called from log append and having the same strict 
serial order that the
    * transactions will have in the RAFT log. Since this is called serially in 
the critical path of

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c213a196/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
 
b/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
index 1bb04c7..e39a9c8 100644
--- 
a/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
+++ 
b/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
@@ -258,7 +258,7 @@ public class SimpleStateMachine4Testing extends 
BaseStateMachine {
 
   @Override
   public CompletableFuture<?> writeStateMachineData(LogEntryProto entry) {
-    CompletableFuture f = new CompletableFuture();
+    CompletableFuture<?> f = new CompletableFuture();
     if (blockAppend) {
       try {
         blockingSemaphore.acquire();
@@ -273,6 +273,22 @@ public class SimpleStateMachine4Testing extends 
BaseStateMachine {
   }
 
   @Override
+  public CompletableFuture<LogEntryProto> readStateMachineData(LogEntryProto 
entry) {
+    CompletableFuture<LogEntryProto> f = new CompletableFuture<>();
+    if (blockAppend) {
+      try {
+        blockingSemaphore.acquire();
+        blockingSemaphore.release();
+      } catch (InterruptedException e) {
+        LOG.error("Could not block readStateMachineData", e);
+        Thread.currentThread().interrupt();
+      }
+    }
+    f.complete(null);
+    return f;
+  }
+
+  @Override
   public void close() {
     lifeCycle.checkStateAndClose(() -> {
       running = false;

Reply via email to