Repository: incubator-ratis
Updated Branches:
  refs/heads/master 4d723a2c7 -> 1d2ebee02


RATIS-379. Allow writing state machine data to be sync'ed with writing raft log.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/1d2ebee0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/1d2ebee0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/1d2ebee0

Branch: refs/heads/master
Commit: 1d2ebee02d9e97763936ebd9b8b7cbfded514a1e
Parents: 4d723a2
Author: Tsz Wo Nicholas Sze <[email protected]>
Authored: Tue Oct 30 13:11:17 2018 +0800
Committer: Tsz Wo Nicholas Sze <[email protected]>
Committed: Tue Oct 30 13:11:17 2018 +0800

----------------------------------------------------------------------
 .../java/org/apache/ratis/conf/ConfUtils.java   |  4 +-
 .../ratis/protocol/TimeoutIOException.java      | 31 ++++++++
 .../java/org/apache/ratis/util/IOUtils.java     | 22 ++++-
 .../ratis/server/RaftServerConfigKeys.java      | 24 ++++++
 .../ratis/server/impl/RaftServerProxy.java      |  4 +-
 .../ratis/server/storage/RaftLogWorker.java     | 46 ++++++-----
 .../ratis/server/storage/SegmentedRaftLog.java  | 11 ++-
 .../java/org/apache/ratis/RaftTestUtil.java     | 11 ++-
 .../server/storage/TestSegmentedRaftLog.java    | 84 ++++++++++++++++++--
 .../SimpleStateMachine4Testing.java             |  2 +
 10 files changed, 205 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/1d2ebee0/ratis-common/src/main/java/org/apache/ratis/conf/ConfUtils.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/conf/ConfUtils.java 
b/ratis-common/src/main/java/org/apache/ratis/conf/ConfUtils.java
index 453ea0e..3ffd8be 100644
--- a/ratis-common/src/main/java/org/apache/ratis/conf/ConfUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/conf/ConfUtils.java
@@ -254,8 +254,8 @@ public interface ConfUtils {
     }
     if (printKey(confClass, out, f, "KEY", "DEFAULT",
         (b, defaultField) ->
-            b.append(defaultField.getType().getSimpleName()).append(", ")
-                .append("default=" + defaultField.get(null)))) {
+            b.append(defaultField.getGenericType().getTypeName()).append(", ")
+             .append("default=").append(defaultField.get(null)))) {
       return;
     }
     if (printKey(confClass, out, f, "PARAMETER", "CLASS",

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/1d2ebee0/ratis-common/src/main/java/org/apache/ratis/protocol/TimeoutIOException.java
----------------------------------------------------------------------
diff --git 
a/ratis-common/src/main/java/org/apache/ratis/protocol/TimeoutIOException.java 
b/ratis-common/src/main/java/org/apache/ratis/protocol/TimeoutIOException.java
new file mode 100644
index 0000000..6effb30
--- /dev/null
+++ 
b/ratis-common/src/main/java/org/apache/ratis/protocol/TimeoutIOException.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.protocol;
+
+import java.io.IOException;
+
+/**
+ * Timeout has occurred for a blocking I/O.
+ */
+public class TimeoutIOException extends IOException {
+  static final long serialVersionUID = 1L;
+
+  public TimeoutIOException(String message, Throwable throwable) {
+    super(message, throwable);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/1d2ebee0/ratis-common/src/main/java/org/apache/ratis/util/IOUtils.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/IOUtils.java 
b/ratis-common/src/main/java/org/apache/ratis/util/IOUtils.java
index 8559239..dbb8d20 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/IOUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/IOUtils.java
@@ -20,6 +20,7 @@
 
 package org.apache.ratis.util;
 
+import org.apache.ratis.protocol.TimeoutIOException;
 import org.slf4j.Logger;
 
 import java.io.Closeable;
@@ -35,6 +36,8 @@ import java.nio.channels.FileChannel;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Supplier;
 
 /**
  * IO related utility methods.
@@ -58,11 +61,11 @@ public interface IOUtils {
     return cause != null? asIOException(cause): new IOException(e);
   }
 
-  static <T> T getFromFuture(CompletableFuture<T> future, Object name) throws 
IOException {
+  static <T> T getFromFuture(CompletableFuture<T> future, Supplier<Object> 
name) throws IOException {
     try {
       return future.get();
     } catch (InterruptedException e) {
-      throw toInterruptedIOException(name + " interrupted.", e);
+      throw toInterruptedIOException(name.get() + " interrupted.", e);
     } catch (ExecutionException e) {
       throw toIOException(e);
     } catch (CompletionException e) {
@@ -70,6 +73,21 @@ public interface IOUtils {
     }
   }
 
+  static <T> T getFromFuture(CompletableFuture<T> future, Supplier<Object> 
name, TimeDuration timeout)
+      throws IOException {
+    try {
+      return future.get(timeout.getDuration(), timeout.getUnit());
+    } catch (InterruptedException e) {
+      throw toInterruptedIOException(name.get() + " interrupted.", e);
+    } catch (ExecutionException e) {
+      throw toIOException(e);
+    } catch (CompletionException e) {
+      throw asIOException(JavaUtils.unwrapCompletionException(e));
+    } catch(TimeoutException e) {
+      throw new TimeoutIOException("Timeout: " + name.get(), e);
+    }
+  }
+
   static boolean shouldReconnect(Exception e) {
     return ReflectionUtils.isInstance(e,
         SocketException.class, SocketTimeoutException.class, 
ClosedChannelException.class, EOFException.class);

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/1d2ebee0/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java 
b/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
index 33662c9..22799f2 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
@@ -144,6 +144,30 @@ public interface RaftServerConfigKeys {
       setInt(properties::setInt, FORCE_SYNC_NUM_KEY, forceSyncNum);
     }
 
+    interface StateMachineData {
+      String PREFIX = Log.PREFIX + ".statemachine.data";
+
+      String SYNC_KEY = PREFIX + ".sync";
+      boolean SYNC_DEFAULT = true;
+      static boolean sync(RaftProperties properties) {
+        return getBoolean(properties::getBoolean,
+            SYNC_KEY, SYNC_DEFAULT, getDefaultLog());
+      }
+      static void setSync(RaftProperties properties, boolean sync) {
+        setBoolean(properties::setBoolean, SYNC_KEY, sync);
+      }
+
+      String SYNC_TIMEOUT_KEY = PREFIX + ".sync.timeout";
+      TimeDuration SYNC_TIMEOUT_DEFAULT = TimeDuration.valueOf(10, 
TimeUnit.SECONDS);
+      static TimeDuration syncTimeout(RaftProperties properties) {
+        return 
getTimeDuration(properties.getTimeDuration(SYNC_TIMEOUT_DEFAULT.getUnit()),
+            SYNC_TIMEOUT_KEY, SYNC_TIMEOUT_DEFAULT, getDefaultLog());
+      }
+      static void setSyncTimeout(RaftProperties properties, TimeDuration 
syncTimeout) {
+        setTimeDuration(properties::setTimeDuration, SYNC_TIMEOUT_KEY, 
syncTimeout);
+      }
+    }
+
     interface Appender {
       String PREFIX = Log.PREFIX + ".appender";
 

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/1d2ebee0/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
index 7648247..a4458f4 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
@@ -263,13 +263,13 @@ public class RaftServerProxy implements RaftServer {
 
   public RaftServerImpl getImpl(RaftGroupId groupId) throws IOException {
     Objects.requireNonNull(groupId, "groupId == null");
-    return IOUtils.getFromFuture(getImplFuture(groupId), getId());
+    return IOUtils.getFromFuture(getImplFuture(groupId), this::getId);
   }
 
   List<RaftServerImpl> getImpls() throws IOException {
     final List<RaftServerImpl> list = new ArrayList<>();
     for(CompletableFuture<RaftServerImpl> f : impls.getAll()) {
-      list.add(IOUtils.getFromFuture(f, getId()));
+      list.add(IOUtils.getFromFuture(f, this::getId));
     }
     return list;
   }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/1d2ebee0/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java 
b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java
index b5b9ac5..6c64057 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java
@@ -24,7 +24,6 @@ import org.apache.ratis.metrics.RatisMetricsRegistry;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.server.impl.RaftServerConstants;
-import org.apache.ratis.server.impl.RaftServerImpl;
 import org.apache.ratis.server.impl.ServerProtoUtils;
 import org.apache.ratis.server.storage.RaftLogCache.SegmentFileInfo;
 import org.apache.ratis.server.storage.RaftLogCache.TruncationSegments;
@@ -37,11 +36,11 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.Optional;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.ExecutionException;
 import java.util.function.Supplier;
 
 /**
@@ -81,22 +80,26 @@ class RaftLogWorker implements Runnable {
   private final long preallocatedSize;
   private final int bufferSize;
 
-  RaftLogWorker(RaftPeerId selfId, RaftServerImpl raftServer, RaftStorage 
storage,
-                RaftProperties properties) {
+  private final boolean stateMachineDataSync;
+  private final TimeDuration stateMachineDataSyncTimeout;
+
+  RaftLogWorker(RaftPeerId selfId, StateMachine stateMachine, Runnable 
submitUpdateCommitEvent,
+      RaftStorage storage, RaftProperties properties) {
     this.name = selfId + "-" + getClass().getSimpleName();
     LOG.info("new {} for {}", name, storage);
 
-    this.submitUpdateCommitEvent = raftServer != null? 
raftServer::submitUpdateCommitEvent: () -> {};
-    this.stateMachine = raftServer != null? raftServer.getStateMachine(): null;
+    this.submitUpdateCommitEvent = submitUpdateCommitEvent;
+    this.stateMachine = stateMachine;
 
     this.storage = storage;
-    this.segmentMaxSize =
-        RaftServerConfigKeys.Log.segmentSizeMax(properties).getSize();
-    this.preallocatedSize =
-        RaftServerConfigKeys.Log.preallocatedSize(properties).getSize();
-    this.bufferSize =
-        RaftServerConfigKeys.Log.writeBufferSize(properties).getSizeInt();
+    this.segmentMaxSize = 
RaftServerConfigKeys.Log.segmentSizeMax(properties).getSize();
+    this.preallocatedSize = 
RaftServerConfigKeys.Log.preallocatedSize(properties).getSize();
+    this.bufferSize = 
RaftServerConfigKeys.Log.writeBufferSize(properties).getSizeInt();
     this.forceSyncNum = RaftServerConfigKeys.Log.forceSyncNum(properties);
+
+    this.stateMachineDataSync = 
RaftServerConfigKeys.Log.StateMachineData.sync(properties);
+    this.stateMachineDataSyncTimeout = 
RaftServerConfigKeys.Log.StateMachineData.syncTimeout(properties);
+
     this.workerThread = new Thread(this, name);
 
     // Server Id can be null in unit tests
@@ -220,16 +223,19 @@ class RaftLogWorker implements Runnable {
 
   private void flushWrites() throws IOException {
     if (out != null) {
-      LOG.debug("flush data to " + out + ", reset pending_sync_number to 0");
+      LOG.debug("{}: flush {}", name, out);
       final Timer.Context timerContext = logFlushTimer.get().time();
       try {
         final CompletableFuture<Void> f = stateMachine != null ?
             stateMachine.flushStateMachineData(lastWrittenIndex) :
             CompletableFuture.completedFuture(null);
+        if (stateMachineDataSync) {
+          IOUtils.getFromFuture(f, () -> name + "-flushStateMachineData", 
stateMachineDataSyncTimeout);
+        }
         out.flush();
-        f.get();
-      } catch (InterruptedException | ExecutionException e) {
-        throw IOUtils.asIOException(e);
+        if (!stateMachineDataSync) {
+          IOUtils.getFromFuture(f, () -> name + "-flushStateMachineData");
+        }
       } finally {
         timerContext.stop();
       }
@@ -238,10 +244,10 @@ class RaftLogWorker implements Runnable {
   }
 
   private void updateFlushedIndex() {
-    LOG.debug("{}: updateFlushedIndex {} -> {}", name, lastWrittenIndex, 
flushedIndex);
+    LOG.debug("{}: updateFlushedIndex {} -> {}", name, flushedIndex, 
lastWrittenIndex);
     flushedIndex = lastWrittenIndex;
     pendingFlushNum = 0;
-    submitUpdateCommitEvent.run();
+    Optional.ofNullable(submitUpdateCommitEvent).ifPresent(Runnable::run);
   }
 
   /**
@@ -300,6 +306,10 @@ class RaftLogWorker implements Runnable {
 
     @Override
     public void execute() throws IOException {
+      if (stateMachineDataSync && stateMachineFuture != null) {
+        IOUtils.getFromFuture(stateMachineFuture, () -> this + 
"-writeStateMachineData", stateMachineDataSyncTimeout);
+      }
+
       Preconditions.assertTrue(out != null);
       Preconditions.assertTrue(lastWrittenIndex + 1 == entry.getIndex(),
           "lastWrittenIndex == %s, entry == %s", lastWrittenIndex, entry);

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/1d2ebee0/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java
index f2b5c1a..666b9c8 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java
@@ -27,6 +27,7 @@ import org.apache.ratis.server.storage.LogSegment.LogRecord;
 import org.apache.ratis.server.storage.LogSegment.LogRecordWithEntry;
 import org.apache.ratis.server.storage.RaftStorageDirectory.LogPathAndIndex;
 import org.apache.ratis.proto.RaftProtos.LogEntryProto;
+import org.apache.ratis.statemachine.StateMachine;
 import org.apache.ratis.util.AutoCloseableLock;
 import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.Preconditions;
@@ -101,13 +102,21 @@ public class SegmentedRaftLog extends RaftLog {
 
   public SegmentedRaftLog(RaftPeerId selfId, RaftServerImpl server,
       RaftStorage storage, long lastIndexInSnapshot, RaftProperties 
properties) {
+    this(selfId, server, server != null? server.getStateMachine(): null,
+        server != null? server::submitUpdateCommitEvent: null,
+        storage, lastIndexInSnapshot, properties);
+  }
+
+  SegmentedRaftLog(RaftPeerId selfId, RaftServerImpl server,
+      StateMachine stateMachine, Runnable submitUpdateCommitEvent,
+      RaftStorage storage, long lastIndexInSnapshot, RaftProperties 
properties) {
     super(selfId, RaftServerConfigKeys.Log.Appender.bufferCapacity(properties)
         .getSizeInt());
     this.server = server;
     this.storage = storage;
     segmentMaxSize = 
RaftServerConfigKeys.Log.segmentSizeMax(properties).getSize();
     cache = new RaftLogCache(selfId, storage, properties);
-    fileLogWorker = new RaftLogWorker(selfId, server, storage, properties);
+    this.fileLogWorker = new RaftLogWorker(selfId, stateMachine, 
submitUpdateCommitEvent, storage, properties);
     lastCommitted.set(lastIndexInSnapshot);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/1d2ebee0/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java 
b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
index 5946a47..78e3768 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
@@ -280,13 +280,18 @@ public interface RaftTestUtil {
     private final StateMachineLogEntryProto smLogEntryProto;
 
     public SimpleOperation(String op) {
-      this(clientId, callId.incrementAndGet(), op);
+      this(op, false);
     }
 
-    private SimpleOperation(ClientId clientId, long callId, String op) {
+    public SimpleOperation(String op, boolean hasStateMachineData) {
+      this(clientId, callId.incrementAndGet(), op, hasStateMachineData);
+    }
+
+    private SimpleOperation(ClientId clientId, long callId, String op, boolean 
hasStateMachineData) {
       this.op = Objects.requireNonNull(op);
+      final ByteString bytes = ProtoUtils.toByteString(op);
       this.smLogEntryProto = ServerProtoUtils.toStateMachineLogEntryProto(
-          clientId, callId, ProtoUtils.toByteString(op), null);
+          clientId, callId, bytes, hasStateMachineData? bytes: null);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/1d2ebee0/ratis-server/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java
 
b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java
index 5cb498a..8083b62 100644
--- 
a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java
+++ 
b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java
@@ -30,8 +30,11 @@ import org.apache.ratis.server.impl.RaftServerImpl;
 import org.apache.ratis.server.impl.ServerProtoUtils;
 import org.apache.ratis.server.protocol.TermIndex;
 import org.apache.ratis.proto.RaftProtos.LogEntryProto;
+import org.apache.ratis.statemachine.SimpleStateMachine4Testing;
 import org.apache.ratis.util.FileUtils;
+import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.LogUtils;
+import org.apache.ratis.util.ProtoUtils;
 import org.apache.ratis.util.SizeInBytes;
 import org.junit.After;
 import org.junit.Assert;
@@ -46,6 +49,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
 import java.util.function.Supplier;
 
 import static org.mockito.Matchers.any;
@@ -175,12 +179,18 @@ public class TestSegmentedRaftLog extends BaseTest {
       Supplier<String> stringSupplier) {
     List<LogEntryProto> eList = new ArrayList<>();
     for (SegmentRange range : slist) {
-      for (long index = range.start; index <= range.end; index++) {
-        SimpleOperation m = stringSupplier == null ?
-            new SimpleOperation("m" + index) :
-            new SimpleOperation(stringSupplier.get());
-        eList.add(ServerProtoUtils.toLogEntryProto(m.getLogEntryContent(), 
range.term, index));
-      }
+      prepareLogEntries(range, stringSupplier, false, eList);
+    }
+    return eList;
+  }
+
+  List<LogEntryProto> prepareLogEntries(SegmentRange range,
+      Supplier<String> stringSupplier, boolean hasStataMachineData, 
List<LogEntryProto> eList) {
+    for(long index = range.start; index <= range.end; index++) {
+      SimpleOperation m = stringSupplier == null?
+          new SimpleOperation("m" + index, hasStataMachineData):
+          new SimpleOperation(stringSupplier.get(), hasStataMachineData);
+      eList.add(ServerProtoUtils.toLogEntryProto(m.getLogEntryContent(), 
range.term, index));
     }
     return eList;
   }
@@ -400,6 +410,60 @@ public class TestSegmentedRaftLog extends BaseTest {
     }
   }
 
+  @Test
+  public void testSegmentedRaftLogStateMachineData() throws Exception {
+    final SegmentRange range = new SegmentRange(0, 10, 1, true);
+    final List<LogEntryProto> entries = prepareLogEntries(range, null, true, 
new ArrayList<>());
+
+    final SimpleStateMachine4Testing sm = new SimpleStateMachine4Testing();
+    try (SegmentedRaftLog raftLog = new SegmentedRaftLog(peerId, null, sm, 
null, storage, -1, properties)) {
+      raftLog.open(RaftServerConstants.INVALID_LOG_INDEX, null);
+
+      int next = 0;
+      long flush = -1;
+      assertIndices(raftLog, flush, next);
+      raftLog.appendEntry(entries.get(next++));
+      assertIndices(raftLog, flush, next);
+      raftLog.appendEntry(entries.get(next++));
+      assertIndices(raftLog, flush, next);
+      raftLog.appendEntry(entries.get(next++));
+      assertIndicesMultipleAttempts(raftLog, flush += 3, next);
+
+      sm.blockFlushStateMachineData();
+      raftLog.appendEntry(entries.get(next++));
+      {
+        sm.blockWriteStateMachineData();
+        final Thread t = startAppendEntryThread(raftLog, entries.get(next++));
+        TimeUnit.SECONDS.sleep(1);
+        Assert.assertTrue(t.isAlive());
+        sm.unblockWriteStateMachineData();
+        t.join();
+      }
+      assertIndices(raftLog, flush, next);
+      TimeUnit.SECONDS.sleep(1);
+      assertIndices(raftLog, flush, next);
+      sm.unblockFlushStateMachineData();
+      assertIndicesMultipleAttempts(raftLog, flush + 2, next);
+    }
+  }
+
+  static Thread startAppendEntryThread(RaftLog raftLog, LogEntryProto entry) {
+    final Thread t = new Thread(() -> raftLog.appendEntry(entry));
+    t.start();
+    return t;
+  }
+
+  void assertIndices(RaftLog raftLog, long expectedFlushIndex, long 
expectedNextIndex) {
+    LOG.info("assert expectedFlushIndex={}", expectedFlushIndex);
+    Assert.assertEquals(expectedFlushIndex, raftLog.getLatestFlushedIndex());
+    LOG.info("assert expectedNextIndex={}", expectedNextIndex);
+    Assert.assertEquals(expectedNextIndex, raftLog.getNextIndex());
+  }
+
+  void assertIndicesMultipleAttempts(RaftLog raftLog, long expectedFlushIndex, 
long expectedNextIndex) throws Exception {
+    JavaUtils.attempt(() -> assertIndices(raftLog, expectedFlushIndex, 
expectedNextIndex),
+        10, 100, "assertIndices", LOG);
+  }
 
   @Test
   public void testSegmentedRaftLogFormatInternalHeader() throws Exception {
@@ -410,5 +474,13 @@ public class TestSegmentedRaftLog extends BaseTest {
           LOG.info("header' = " + new String(header, StandardCharsets.UTF_8));
           return null;
         }), IllegalStateException.class);
+
+    // reset the header
+    SegmentedRaftLogFormat.applyHeaderTo(header -> {
+      LOG.info("header'  = " + new String(header, StandardCharsets.UTF_8));
+      header[0] -= 1; // try changing the internal header
+      LOG.info("header'' = " + new String(header, StandardCharsets.UTF_8));
+      return null;
+    });
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/1d2ebee0/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
 
b/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
index 2c6883e..84be87b 100644
--- 
a/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
+++ 
b/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
@@ -100,12 +100,14 @@ public class SimpleStateMachine4Testing extends 
BaseStateMachine {
     private final EnumMap<Type, CompletableFuture<Void>> maps = new 
EnumMap<>(Type.class);
 
     void block(Type type) {
+      LOG.info("block {}", type);
       final CompletableFuture<Void> future = new CompletableFuture<>();
       final CompletableFuture<Void> previous = maps.putIfAbsent(type, future);
       Preconditions.assertNull(previous, "previous");
     }
 
     void unblock(Type type) {
+      LOG.info("unblock {}", type);
       final CompletableFuture<Void> future = maps.remove(type);
       Objects.requireNonNull(future, "future == null");
       future.complete(null);

Reply via email to