Repository: incubator-ratis Updated Branches: refs/heads/master 0b73e892a -> 05812468f
RATIS-295. RaftLogWorker#flushWrites should also flush state machine data. Contributed by Shashikant Banerjee Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/05812468 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/05812468 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/05812468 Branch: refs/heads/master Commit: 05812468f7c85b38ff7391e361c27a76929cf48a Parents: 0b73e89 Author: Tsz Wo Nicholas Sze <[email protected]> Authored: Fri Aug 17 15:45:42 2018 -0700 Committer: Tsz Wo Nicholas Sze <[email protected]> Committed: Fri Aug 17 15:45:42 2018 -0700 ---------------------------------------------------------------------- .../java/org/apache/ratis/server/storage/RaftLogWorker.java | 7 +++++++ .../java/org/apache/ratis/statemachine/StateMachine.java | 9 +++++++++ 2 files changed, 16 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/05812468/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java index 64f5356..68f303d 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java @@ -41,6 +41,7 @@ import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.ExecutionException; import java.util.function.Supplier; /** @@ -225,7 +226,13 @@ class RaftLogWorker implements Runnable { LOG.debug("flush data to " + out + ", reset pending_sync_number to 0"); final Timer.Context timerContext = logFlushTimer.get().time(); try { + final CompletableFuture<Void> f = stateMachine != null ? + stateMachine.flushStateMachineData(lastWrittenIndex) : + CompletableFuture.completedFuture(null); out.flush(); + f.get(); + } catch (InterruptedException | ExecutionException e) { + throw IOUtils.asIOException(e); } finally { timerContext.stop(); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/05812468/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java b/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java index 88b5276..c76b2e4 100644 --- a/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java +++ b/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java @@ -231,4 +231,13 @@ public interface StateMachine extends Closeable { default void notifyExtendedNoLeader(RaftGroup group, RoleInfoProto roleInfoProto) { } + + /** + * Flush the state machine data till the log index provided. + * @param index log Index + * @return a future for the flush task, null otherwise + */ + default CompletableFuture<Void> flushStateMachineData(long index) { + return CompletableFuture.completedFuture(null); + } }
