Repository: incubator-ratis
Updated Branches:
  refs/heads/master ce5f48c41 -> 0b7337083


Revert "r371"

This reverts commit 4cb6b20237d7bf95a2c1acbaeec0ab58bc2a0074.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/0b733708
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/0b733708
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/0b733708

Branch: refs/heads/master
Commit: 0b7337083600f584624384c05bf5b6236e0f9dbe
Parents: ce5f48c
Author: Tsz Wo Nicholas Sze <szets...@apache.org>
Authored: Thu Oct 25 06:56:16 2018 +0800
Committer: Tsz Wo Nicholas Sze <szets...@apache.org>
Committed: Thu Oct 25 06:57:34 2018 +0800

----------------------------------------------------------------------
 .../org/apache/ratis/TestRestartRaftPeer.java   | 106 +++++++++++
 .../org/apache/ratis/grpc/TestRaftWithGrpc.java |   9 +-
 .../ratis/grpc/TestServerRestartWithGrpc.java   |  25 ---
 .../ratis/netty/TestServerRestartWithNetty.java |  25 ---
 .../java/org/apache/ratis/RaftAsyncTests.java   |   6 +-
 .../java/org/apache/ratis/RaftTestUtil.java     |  18 +-
 .../java/org/apache/ratis/RetryCacheTests.java  |  19 +-
 .../org/apache/ratis/WatchRequestTests.java     | 188 +++++++++----------
 .../apache/ratis/server/ServerRestartTests.java | 110 -----------
 .../apache/ratis/server/TestRaftLogMetrics.java |  69 ++++---
 .../impl/RaftReconfigurationBaseTest.java       |  13 +-
 .../ratis/server/impl/RaftServerTestUtil.java   |  11 +-
 .../impl/RaftStateMachineExceptionTests.java    |   9 +-
 .../server/impl/StateMachineShutdownTests.java  |   2 +-
 .../TestServerRestartWithSimulatedRpc.java      |  25 ---
 .../server/storage/RaftStorageTestUtils.java    |   9 +-
 .../statemachine/RaftSnapshotBaseTest.java      |  75 +++-----
 .../SimpleStateMachine4Testing.java             |  78 +++-----
 18 files changed, 325 insertions(+), 472 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/0b733708/ratis-examples/src/test/java/org/apache/ratis/TestRestartRaftPeer.java
----------------------------------------------------------------------
diff --git 
a/ratis-examples/src/test/java/org/apache/ratis/TestRestartRaftPeer.java 
b/ratis-examples/src/test/java/org/apache/ratis/TestRestartRaftPeer.java
new file mode 100644
index 0000000..ccbbda0
--- /dev/null
+++ b/ratis-examples/src/test/java/org/apache/ratis/TestRestartRaftPeer.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis;
+
+import org.apache.log4j.Level;
+import org.apache.ratis.RaftTestUtil.SimpleMessage;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.examples.ParameterizedBaseTest;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.server.impl.RaftServerImpl;
+import org.apache.ratis.server.storage.RaftLog;
+import org.apache.ratis.statemachine.SimpleStateMachine4Testing;
+import org.apache.ratis.statemachine.StateMachine;
+import org.apache.ratis.util.LogUtils;
+import org.apache.ratis.util.SizeInBytes;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+
+/**
+ * Test restarting raft peers.
+ */
+@RunWith(Parameterized.class)
+public class TestRestartRaftPeer extends BaseTest {
+  static {
+    LogUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG);
+    LogUtils.setLogLevel(RaftLog.LOG, Level.DEBUG);
+    LogUtils.setLogLevel(RaftClient.LOG, Level.DEBUG);
+  }
+
+  @Parameterized.Parameters
+  public static Collection<Object[]> data() throws IOException {
+    RaftProperties prop = new RaftProperties();
+    prop.setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY,
+        SimpleStateMachine4Testing.class, StateMachine.class);
+    RaftServerConfigKeys.Log.setSegmentSizeMax(prop, 
SizeInBytes.valueOf("8KB"));
+    return ParameterizedBaseTest.getMiniRaftClusters(prop, 3);
+  }
+
+  @Parameterized.Parameter
+  public MiniRaftCluster cluster;
+
+  @Test
+  public void restartFollower() throws Exception {
+    cluster.start();
+    RaftTestUtil.waitForLeader(cluster);
+    final RaftPeerId leaderId = cluster.getLeader().getId();
+    final RaftClient client = cluster.createClient(leaderId);
+
+    // write some messages
+    final byte[] content = new byte[1024];
+    Arrays.fill(content, (byte) 1);
+    final SimpleMessage message = new SimpleMessage(new String(content));
+    for (int i = 0; i < 10; i++) {
+      Assert.assertTrue(client.send(message).isSuccess());
+    }
+
+    // restart a follower
+    RaftPeerId followerId = cluster.getFollowers().get(0).getId();
+    LOG.info("Restart follower {}", followerId);
+    cluster.restartServer(followerId, false);
+
+    // write some more messages
+    for (int i = 0; i < 10; i++) {
+      Assert.assertTrue(client.send(message).isSuccess());
+    }
+    client.close();
+
+    // make sure the restarted follower can catchup
+    boolean catchup = false;
+    long lastAppliedIndex = 0;
+    for (int i = 0; i < 10 && !catchup; i++) {
+      Thread.sleep(500);
+      lastAppliedIndex = 
cluster.getRaftServerImpl(followerId).getState().getLastAppliedIndex();
+      catchup = lastAppliedIndex >= 20;
+    }
+    Assert.assertTrue("lastAppliedIndex=" + lastAppliedIndex, catchup);
+
+    // make sure the restarted peer's log segments is correct
+    cluster.restartServer(followerId, false);
+    Assert.assertTrue(cluster.getRaftServerImpl(followerId).getState().getLog()
+        .getLastEntryTermIndex().getIndex() >= 20);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/0b733708/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java
----------------------------------------------------------------------
diff --git 
a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java 
b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java
index d98be53..7ae385d 100644
--- a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java
+++ b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java
@@ -92,17 +92,16 @@ public class TestRaftWithGrpc
         Assert.assertEquals(raftServer.getState().getLog().getNextIndex(), 
index);
         if (!raftServer.isLeader()) {
           TermIndex[] serverEntries = 
raftServer.getState().getLog().getEntries(0, Integer.MAX_VALUE);
-          Assert.assertArrayEquals(serverEntries, leaderEntries);
+          Arrays.equals(serverEntries, leaderEntries);
         }
       });
 
       // Wait for heartbeats from leader to be received by followers
-      Thread.sleep(500);
+      Thread.sleep(1000);
       
RaftServerTestUtil.getLogAppenders(cluster.getLeader()).forEach(logAppender -> {
         // FollowerInfo in the leader state should have updated next and match 
index.
-        final long followerMatchIndex = 
logAppender.getFollower().getMatchIndex();
-        Assert.assertTrue(followerMatchIndex >= index - 1);
-        Assert.assertEquals(followerMatchIndex + 1, 
logAppender.getFollower().getNextIndex());
+        Assert.assertEquals(logAppender.getFollower().getMatchIndex(), index - 
1);
+        Assert.assertEquals(logAppender.getFollower().getNextIndex(), index);
       });
     }
     cluster.shutdown();

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/0b733708/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestServerRestartWithGrpc.java
----------------------------------------------------------------------
diff --git 
a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestServerRestartWithGrpc.java 
b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestServerRestartWithGrpc.java
deleted file mode 100644
index 682b3ba..0000000
--- 
a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestServerRestartWithGrpc.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ratis.grpc;
-
-import org.apache.ratis.server.ServerRestartTests;
-
-public class TestServerRestartWithGrpc
-    extends ServerRestartTests<MiniRaftClusterWithGrpc>
-    implements MiniRaftClusterWithGrpc.FactoryGet {
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/0b733708/ratis-netty/src/test/java/org/apache/ratis/netty/TestServerRestartWithNetty.java
----------------------------------------------------------------------
diff --git 
a/ratis-netty/src/test/java/org/apache/ratis/netty/TestServerRestartWithNetty.java
 
b/ratis-netty/src/test/java/org/apache/ratis/netty/TestServerRestartWithNetty.java
deleted file mode 100644
index 15dc688..0000000
--- 
a/ratis-netty/src/test/java/org/apache/ratis/netty/TestServerRestartWithNetty.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ratis.netty;
-
-import org.apache.ratis.server.ServerRestartTests;
-
-public class TestServerRestartWithNetty
-    extends ServerRestartTests<MiniRaftClusterWithNetty>
-    implements MiniRaftClusterWithNetty.FactoryGet {
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/0b733708/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java 
b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
index c14515c..f79eb6b 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
@@ -192,7 +192,7 @@ public abstract class RaftAsyncTests<CLUSTER extends 
MiniRaftCluster> extends Ba
       // submit some messages
       final List<CompletableFuture<RaftClientReply>> futures = new 
ArrayList<>();
       for (int i = 0; i < numMesssages; i++) {
-        final String s = "" + i;
+        final String s = "m" + i;
         LOG.info("sendAsync " + s);
         futures.add(client.sendAsync(new RaftTestUtil.SimpleMessage(s)));
       }
@@ -218,12 +218,12 @@ public abstract class RaftAsyncTests<CLUSTER extends 
MiniRaftCluster> extends Ba
       // test a failure case
       testFailureCaseAsync("sendStaleReadAsync(..) with a larger commit index",
           () -> client.sendStaleReadAsync(
-              new RaftTestUtil.SimpleMessage("" + Long.MAX_VALUE),
+              new RaftTestUtil.SimpleMessage("" + (numMesssages + 1)),
               followerCommitInfo.getCommitIndex(), follower),
           StateMachineException.class, IndexOutOfBoundsException.class);
 
       // test sendStaleReadAsync
-      for (int i = 0; i < numMesssages; i++) {
+      for (int i = 1; i < followerCommitInfo.getCommitIndex(); i++) {
         final int query = i;
         LOG.info("sendStaleReadAsync, query=" + query);
         final Message message = new RaftTestUtil.SimpleMessage("" + query);

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/0b733708/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java 
b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
index 5946a47..60629f9 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
@@ -113,14 +113,11 @@ public interface RaftTestUtil {
     return leader != null ? leader.getId().toString() : null;
   }
 
-  static boolean logEntriesContains(RaftLog log, SimpleMessage... 
expectedMessages) {
-    return logEntriesContains(log, 0L, Long.MAX_VALUE, expectedMessages);
-  }
-
-  static boolean logEntriesContains(RaftLog log, long startIndex, long 
endIndex, SimpleMessage... expectedMessages) {
+  static boolean logEntriesContains(RaftLog log,
+      SimpleMessage... expectedMessages) {
     int idxEntries = 0;
     int idxExpected = 0;
-    TermIndex[] termIndices = log.getEntries(startIndex, endIndex);
+    TermIndex[] termIndices = log.getEntries(0, Long.MAX_VALUE);
     while (idxEntries < termIndices.length
         && idxExpected < expectedMessages.length) {
       try {
@@ -379,13 +376,4 @@ public interface RaftTestUtil {
       }
     }).start();
   }
-
-  static void assertSameLog(RaftLog expected, RaftLog computed) throws 
Exception {
-    Assert.assertEquals(expected.getLastEntryTermIndex(), 
computed.getLastEntryTermIndex());
-    final long lastIndex = expected.getNextIndex() - 1;
-    Assert.assertEquals(expected.getLastEntryTermIndex().getIndex(), 
lastIndex);
-    for(long i = 0; i < lastIndex; i++) {
-      Assert.assertEquals(expected.get(i), computed.get(i));
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/0b733708/ratis-server/src/test/java/org/apache/ratis/RetryCacheTests.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/RetryCacheTests.java 
b/ratis-server/src/test/java/org/apache/ratis/RetryCacheTests.java
index c962481..9fdb4f7 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RetryCacheTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RetryCacheTests.java
@@ -29,8 +29,6 @@ import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.server.impl.RaftServerImpl;
 import org.apache.ratis.server.impl.RaftServerTestUtil;
-import org.apache.ratis.server.storage.RaftLog;
-import org.apache.ratis.server.storage.RaftLogIOException;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -38,7 +36,6 @@ import org.junit.Test;
 
 import java.io.IOException;
 import java.util.Arrays;
-import java.util.stream.LongStream;
 
 import static java.util.Arrays.asList;
 
@@ -113,21 +110,10 @@ public abstract class RetryCacheTests extends BaseTest {
       Assert.assertEquals(2, RaftServerTestUtil.getRetryCacheSize(server));
       Assert.assertNotNull(RaftServerTestUtil.getRetryEntry(server, clientId, 
callId));
       // make sure there is only one log entry committed
-      Assert.assertEquals(1, count(server.getState().getLog(), oldLastApplied 
+ 1));
+      Assert.assertEquals(oldLastApplied + 1, 
server.getState().getLastAppliedIndex());
     }
   }
 
-  static int count(RaftLog log, long startIndex) throws RaftLogIOException {
-    final long nextIndex = log.getNextIndex();
-    int count = 0;
-    for(long i = startIndex; i < nextIndex; i++) {
-      if (log.get(i).hasStateMachineLogEntry()) {
-        count++;
-      }
-    }
-    return count;
-  }
-
   /**
    * Test retry while the leader changes to another peer
    */
@@ -172,7 +158,8 @@ public abstract class RetryCacheTests extends BaseTest {
     }
 
     // check the new leader and make sure the retry did not get committed
-    Assert.assertEquals(0, count(cluster.getLeader().getState().getLog(), 
oldLastApplied + 1));
+    Assert.assertEquals(oldLastApplied + 3,
+        cluster.getLeader().getState().getLastAppliedIndex());
     client.close();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/0b733708/ratis-server/src/test/java/org/apache/ratis/WatchRequestTests.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/WatchRequestTests.java 
b/ratis-server/src/test/java/org/apache/ratis/WatchRequestTests.java
index d1cb7e0..9ff27ad 100644
--- a/ratis-server/src/test/java/org/apache/ratis/WatchRequestTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/WatchRequestTests.java
@@ -41,8 +41,6 @@ import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
 
 public abstract class WatchRequestTests<CLUSTER extends MiniRaftCluster>
     extends BaseTest
@@ -73,6 +71,7 @@ public abstract class WatchRequestTests<CLUSTER extends 
MiniRaftCluster>
   }
 
   static class TestParameters {
+    final long startLogIndex;
     final int numMessages;
     final RaftClient writeClient;
     final RaftClient watchMajorityClient;
@@ -82,10 +81,12 @@ public abstract class WatchRequestTests<CLUSTER extends 
MiniRaftCluster>
     final MiniRaftCluster cluster;
     final Logger log;
 
-    TestParameters(int numMessages, RaftClient writeClient,
+    TestParameters(
+        long startLogIndex, int numMessages, RaftClient writeClient,
         RaftClient watchMajorityClient, RaftClient watchAllClient,
         RaftClient watchMajorityCommittedClient, RaftClient 
watchAllCommittedClient,
         MiniRaftCluster cluster, Logger log) {
+      this.startLogIndex = startLogIndex;
       this.numMessages = numMessages;
       this.writeClient = writeClient;
       this.watchMajorityClient = watchMajorityClient;
@@ -96,31 +97,9 @@ public abstract class WatchRequestTests<CLUSTER extends 
MiniRaftCluster>
       this.log = log;
     }
 
-    void sendRequests(List<CompletableFuture<RaftClientReply>> replies,
-        List<CompletableFuture<WatchReplies>> watches) {
-      for(int i = 0; i < numMessages; i++) {
-        final String message = "m" + i;
-        log.info("SEND_REQUEST {}: message={}", i, message);
-        final CompletableFuture<RaftClientReply> replyFuture = 
writeClient.sendAsync(new RaftTestUtil.SimpleMessage(message));
-        replies.add(replyFuture);
-        final CompletableFuture<WatchReplies> watchFuture = new 
CompletableFuture<>();
-        watches.add(watchFuture);
-        replyFuture.thenAccept(reply -> {
-          final long logIndex = reply.getLogIndex();
-          log.info("SEND_WATCH: message={}, logIndex={}", message, logIndex);
-          watchFuture.complete(new WatchReplies(logIndex,
-              watchMajorityClient.sendWatchAsync(logIndex, 
ReplicationLevel.MAJORITY),
-              watchAllClient.sendWatchAsync(logIndex, ReplicationLevel.ALL),
-              watchMajorityCommittedClient.sendWatchAsync(logIndex, 
ReplicationLevel.MAJORITY_COMMITTED),
-              watchAllCommittedClient.sendWatchAsync(logIndex, 
ReplicationLevel.ALL_COMMITTED)
-          ));
-        });
-      }
-    }
-
     @Override
     public String toString() {
-      return "numMessages=" + numMessages;
+      return "startLogIndex=" + startLogIndex + ", numMessages=" + numMessages;
     }
   }
 
@@ -140,9 +119,10 @@ public abstract class WatchRequestTests<CLUSTER extends 
MiniRaftCluster>
         final RaftClient watchAllCommittedClient = 
cluster.createClient(RaftTestUtil.waitForLeader(cluster).getId())) {
       final int[] numMessages = {1, 10, 100};
       for(int i = 0; i < 5; i++) {
+        final long logIndex = getLogIndex(writeClient) + 1;
         final int n = 
numMessages[ThreadLocalRandom.current().nextInt(numMessages.length)];
         final TestParameters p = new TestParameters(
-            n, writeClient, watchMajorityClient, watchAllClient,
+            logIndex, n, writeClient, watchMajorityClient, watchAllClient,
             watchMajorityCommittedClient, watchAllCommittedClient, cluster, 
LOG);
         LOG.info("{}) {}, {}", i, p, cluster.printServers());
         testCase.apply(p);
@@ -151,29 +131,18 @@ public abstract class WatchRequestTests<CLUSTER extends 
MiniRaftCluster>
     }
   }
 
-  static class WatchReplies {
-    private final long logIndex;
-    private final CompletableFuture<RaftClientReply> majority;
-    private final CompletableFuture<RaftClientReply> all;
-    private final CompletableFuture<RaftClientReply> majorityCommitted;
-    private final CompletableFuture<RaftClientReply> allCommitted;
-
-    WatchReplies(long logIndex,
-        CompletableFuture<RaftClientReply> majority, 
CompletableFuture<RaftClientReply> all,
-        CompletableFuture<RaftClientReply> majorityCommitted, 
CompletableFuture<RaftClientReply> allCommitted) {
-      this.logIndex = logIndex;
-      this.majority = majority;
-      this.all = all;
-      this.majorityCommitted = majorityCommitted;
-      this.allCommitted = allCommitted;
-    }
-  }
-
   static Void runTestWatchRequestAsync(TestParameters p) throws Exception {
-    final Logger LOG = p.log;
-    final MiniRaftCluster cluster = p.cluster;
-    final int numMessages = p.numMessages;
+    runTestWatchRequestAsync(p.startLogIndex, p.numMessages,
+        p.writeClient, p.watchMajorityClient, p.watchAllClient,
+        p.watchMajorityCommittedClient, p.watchAllCommittedClient, p.cluster, 
p.log);
+    return null;
+  }
 
+  static void runTestWatchRequestAsync(
+      long startLogIndex, int numMessages,
+      RaftClient writeClient, RaftClient watchMajorityClient, RaftClient 
watchAllClient,
+      RaftClient watchMajorityCommittedClient, RaftClient 
watchAllCommittedClient,
+      MiniRaftCluster cluster, Logger LOG) throws Exception {
     // blockStartTransaction of the leader so that no transaction can be 
committed MAJORITY
     final RaftServerImpl leader = cluster.getLeader();
     LOG.info("block leader {}", leader.getId());
@@ -187,35 +156,52 @@ public abstract class WatchRequestTests<CLUSTER extends 
MiniRaftCluster>
 
     // send a message
     final List<CompletableFuture<RaftClientReply>> replies = new ArrayList<>();
-    final List<CompletableFuture<WatchReplies>> watches = new ArrayList<>();
+    final List<CompletableFuture<RaftClientReply>> watchMajoritys = new 
ArrayList<>();
+    final List<CompletableFuture<RaftClientReply>> watchAlls = new 
ArrayList<>();
+    final List<CompletableFuture<RaftClientReply>> watchMajorityCommitteds = 
new ArrayList<>();
+    final List<CompletableFuture<RaftClientReply>> watchAllCommitteds = new 
ArrayList<>();
 
-    p.sendRequests(replies, watches);
+    for(int i = 0; i < numMessages; i++) {
+      final long logIndex = startLogIndex + i;
+      final String message = "m" + logIndex;
+      LOG.info("SEND_REQUEST {}: logIndex={}, message={}", i, logIndex, 
message);
+      replies.add(writeClient.sendAsync(new 
RaftTestUtil.SimpleMessage(message)));
+      watchMajoritys.add(watchMajorityClient.sendWatchAsync(logIndex, 
ReplicationLevel.MAJORITY));
+      watchAlls.add(watchAllClient.sendWatchAsync(logIndex, 
ReplicationLevel.ALL));
+      watchMajorityCommitteds.add(watchMajorityCommittedClient.sendWatchAsync(
+          logIndex, ReplicationLevel.MAJORITY_COMMITTED));
+      watchAllCommitteds.add(watchAllCommittedClient.sendWatchAsync(logIndex, 
ReplicationLevel.ALL_COMMITTED));
+    }
 
     Assert.assertEquals(numMessages, replies.size());
-    Assert.assertEquals(numMessages, watches.size());
+    Assert.assertEquals(numMessages, watchMajoritys.size());
+    Assert.assertEquals(numMessages, watchAlls.size());
+    Assert.assertEquals(numMessages, watchMajorityCommitteds.size());
+    Assert.assertEquals(numMessages, watchAllCommitteds.size());
 
     // since leader is blocked, nothing can be done.
     TimeUnit.SECONDS.sleep(1);
     assertNotDone(replies);
-    assertNotDone(watches);
+    assertNotDone(watchMajoritys);
+    assertNotDone(watchAlls);
+    assertNotDone(watchMajorityCommitteds);
+    assertNotDone(watchAllCommitteds);
 
     // unblock leader so that the transaction can be committed.
     SimpleStateMachine4Testing.get(leader).unblockStartTransaction();
     LOG.info("unblock leader {}", leader.getId());
     for(int i = 0; i < numMessages; i++) {
+      final long logIndex = startLogIndex + i;
+      LOG.info("UNBLOCK_LEADER {}: logIndex={}", i, logIndex);
       final RaftClientReply reply = replies.get(i).get(GET_TIMEOUT_SECOND, 
TimeUnit.SECONDS);
-      final long logIndex = reply.getLogIndex();
-      LOG.info("{}: receive reply for logIndex={}", i, logIndex);
       Assert.assertTrue(reply.isSuccess());
-
-      final WatchReplies watchReplies = watches.get(i).get(GET_TIMEOUT_SECOND, 
TimeUnit.SECONDS);
-      Assert.assertEquals(logIndex, watchReplies.logIndex);
-      final RaftClientReply watchMajorityReply = 
watchReplies.majority.get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS);
+      Assert.assertEquals(logIndex, reply.getLogIndex());
+      final RaftClientReply watchMajorityReply = 
watchMajoritys.get(i).get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS);
       LOG.info("watchMajorityReply({}) = {}", logIndex, watchMajorityReply);
-      Assert.assertTrue(watchMajorityReply.isSuccess());
+      Assert.assertTrue(watchMajoritys.get(i).get().isSuccess());
 
       final RaftClientReply watchMajorityCommittedReply
-          = watchReplies.majorityCommitted.get(GET_TIMEOUT_SECOND, 
TimeUnit.SECONDS);
+          = watchMajorityCommitteds.get(i).get(GET_TIMEOUT_SECOND, 
TimeUnit.SECONDS);
       LOG.info("watchMajorityCommittedReply({}) = ", logIndex, 
watchMajorityCommittedReply);
       Assert.assertTrue(watchMajorityCommittedReply.isSuccess());
       { // check commit infos
@@ -233,25 +219,22 @@ public abstract class WatchRequestTests<CLUSTER extends 
MiniRaftCluster>
       }
     }
 
-    Assert.assertEquals(numMessages, watches.size());
-
     // but not replicated/committed to all.
     TimeUnit.SECONDS.sleep(1);
-    assertNotDone(watches.stream().map(CompletableFuture::join).map(w -> 
w.all));
-    assertNotDone(watches.stream().map(CompletableFuture::join).map(w -> 
w.allCommitted));
+    assertNotDone(watchAlls);
+    assertNotDone(watchAllCommitteds);
 
     // unblock follower so that the transaction can be replicated and 
committed to all.
     LOG.info("unblock follower {}", blockedFollower.getId());
     
SimpleStateMachine4Testing.get(blockedFollower).unblockFlushStateMachineData();
     for(int i = 0; i < numMessages; i++) {
-      final WatchReplies watchReplies = watches.get(i).get(GET_TIMEOUT_SECOND, 
TimeUnit.SECONDS);
-      final long logIndex = watchReplies.logIndex;
+      final long logIndex = startLogIndex + i;
       LOG.info("UNBLOCK_FOLLOWER {}: logIndex={}", i, logIndex);
-      final RaftClientReply watchAllReply = 
watchReplies.all.get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS);
+      final RaftClientReply watchAllReply = 
watchAlls.get(i).get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS);
       LOG.info("watchAllReply({}) = {}", logIndex, watchAllReply);
       Assert.assertTrue(watchAllReply.isSuccess());
 
-      final RaftClientReply watchAllCommittedReply = 
watchReplies.allCommitted.get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS);
+      final RaftClientReply watchAllCommittedReply = 
watchAllCommitteds.get(i).get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS);
       LOG.info("watchAllCommittedReply({}) = {}", logIndex, 
watchAllCommittedReply);
       Assert.assertTrue(watchAllCommittedReply.isSuccess());
       { // check commit infos
@@ -260,14 +243,9 @@ public abstract class WatchRequestTests<CLUSTER extends 
MiniRaftCluster>
         commitInfos.forEach(info -> Assert.assertTrue(logIndex <= 
info.getCommitIndex()));
       }
     }
-    return null;
   }
 
   static <T> void assertNotDone(List<CompletableFuture<T>> futures) {
-    assertNotDone(futures.stream());
-  }
-
-  static <T> void assertNotDone(Stream<CompletableFuture<T>> futures) {
     futures.forEach(f -> {
       if (f.isDone()) {
         try {
@@ -289,44 +267,65 @@ public abstract class WatchRequestTests<CLUSTER extends 
MiniRaftCluster>
   }
 
   static Void runTestWatchRequestAsyncChangeLeader(TestParameters p) throws 
Exception {
-    final Logger LOG = p.log;
-    final MiniRaftCluster cluster = p.cluster;
-    final int numMessages = p.numMessages;
+    runTestWatchRequestAsyncChangeLeader(p.startLogIndex, p.numMessages,
+        p.writeClient, p.watchMajorityClient, p.watchAllClient,
+        p.watchMajorityCommittedClient, p.watchAllCommittedClient, p.cluster, 
p.log);
+    return null;
+  }
 
+  static void runTestWatchRequestAsyncChangeLeader(
+      long startLogIndex, int numMessages,
+      RaftClient writeClient, RaftClient watchMajorityClient, RaftClient 
watchAllClient,
+      RaftClient watchMajorityCommittedClient, RaftClient 
watchAllCommittedClient,
+      MiniRaftCluster cluster, Logger LOG) throws Exception {
     // blockFlushStateMachineData a follower so that no transaction can be 
ALL_COMMITTED
     final List<RaftServerImpl> followers = cluster.getFollowers();
     final RaftServerImpl blockedFollower = 
followers.get(ThreadLocalRandom.current().nextInt(followers.size()));
     LOG.info("block follower {}", blockedFollower.getId());
     
SimpleStateMachine4Testing.get(blockedFollower).blockFlushStateMachineData();
 
+    // send a message
     final List<CompletableFuture<RaftClientReply>> replies = new ArrayList<>();
-    final List<CompletableFuture<WatchReplies>> watches = new ArrayList<>();
+    final List<CompletableFuture<RaftClientReply>> watchMajoritys = new 
ArrayList<>();
+    final List<CompletableFuture<RaftClientReply>> watchAlls = new 
ArrayList<>();
+    final List<CompletableFuture<RaftClientReply>> watchMajorityCommitteds = 
new ArrayList<>();
+    final List<CompletableFuture<RaftClientReply>> watchAllCommitteds = new 
ArrayList<>();
 
-    p.sendRequests(replies, watches);
+    for(int i = 0; i < numMessages; i++) {
+      final long logIndex = startLogIndex + i;
+      final String message = "m" + logIndex;
+      LOG.info("SEND_REQUEST {}: logIndex={}, message={}", i, logIndex, 
message);
+      replies.add(writeClient.sendAsync(new 
RaftTestUtil.SimpleMessage(message)));
+      watchMajoritys.add(watchMajorityClient.sendWatchAsync(logIndex, 
ReplicationLevel.MAJORITY));
+      watchAlls.add(watchAllClient.sendWatchAsync(logIndex, 
ReplicationLevel.ALL));
+      watchMajorityCommitteds.add(
+          watchMajorityCommittedClient.sendWatchAsync(logIndex, 
ReplicationLevel.MAJORITY_COMMITTED));
+      watchAllCommitteds.add(watchAllCommittedClient.sendWatchAsync(logIndex, 
ReplicationLevel.ALL_COMMITTED));
+    }
 
     Assert.assertEquals(numMessages, replies.size());
-    Assert.assertEquals(numMessages, watches.size());
+    Assert.assertEquals(numMessages, watchMajoritys.size());
+    Assert.assertEquals(numMessages, watchAlls.size());
+    Assert.assertEquals(numMessages, watchMajorityCommitteds.size());
+    Assert.assertEquals(numMessages, watchAllCommitteds.size());
 
     // since only one follower is blocked, requests can be committed MAJORITY 
but neither ALL nor ALL_COMMITTED.
     for(int i = 0; i < numMessages; i++) {
+      final long logIndex = startLogIndex + i;
+      LOG.info("UNBLOCK_F1 {}: logIndex={}", i, logIndex);
       final RaftClientReply reply = replies.get(i).get(GET_TIMEOUT_SECOND, 
TimeUnit.SECONDS);
-      final long logIndex = reply.getLogIndex();
-      LOG.info("UNBLOCK_F1 {}: reply logIndex={}", i, logIndex);
       Assert.assertTrue(reply.isSuccess());
-
-      final WatchReplies watchReplies = watches.get(i).get(GET_TIMEOUT_SECOND, 
TimeUnit.SECONDS);
-      Assert.assertEquals(logIndex, watchReplies.logIndex);
-      final RaftClientReply watchMajorityReply = 
watchReplies.majority.get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS);
+      Assert.assertEquals(logIndex, reply.getLogIndex());
+      final RaftClientReply watchMajorityReply = 
watchMajoritys.get(i).get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS);
       LOG.info("watchMajorityReply({}) = {}", logIndex, watchMajorityReply);
-      Assert.assertTrue(watchMajorityReply.isSuccess());
+      Assert.assertTrue(watchMajoritys.get(i).get().isSuccess());
 
       final RaftClientReply watchMajorityCommittedReply
-          = watchReplies.majorityCommitted.get(GET_TIMEOUT_SECOND, 
TimeUnit.SECONDS);
+          = watchMajorityCommitteds.get(i).get(GET_TIMEOUT_SECOND, 
TimeUnit.SECONDS);
       LOG.info("watchMajorityCommittedReply({}) = ", logIndex, 
watchMajorityCommittedReply);
       Assert.assertTrue(watchMajorityCommittedReply.isSuccess());
       { // check commit infos
         final Collection<CommitInfoProto> commitInfos = 
watchMajorityCommittedReply.getCommitInfos();
-        LOG.info("commitInfos=" + commitInfos);
         Assert.assertEquals(NUM_SERVERS, commitInfos.size());
 
         // One follower has not committed, so min must be less than logIndex
@@ -340,8 +339,8 @@ public abstract class WatchRequestTests<CLUSTER extends 
MiniRaftCluster>
       }
     }
     TimeUnit.SECONDS.sleep(1);
-    assertNotDone(watches.stream().map(CompletableFuture::join).map(w -> 
w.all));
-    assertNotDone(watches.stream().map(CompletableFuture::join).map(w -> 
w.allCommitted));
+    assertNotDone(watchAlls);
+    assertNotDone(watchAllCommitteds);
 
     // Now change leader
     RaftTestUtil.changeLeader(cluster, cluster.getLeader().getId());
@@ -350,14 +349,13 @@ public abstract class WatchRequestTests<CLUSTER extends 
MiniRaftCluster>
     
SimpleStateMachine4Testing.get(blockedFollower).unblockFlushStateMachineData();
     LOG.info("unblock follower {}", blockedFollower.getId());
     for(int i = 0; i < numMessages; i++) {
-      final WatchReplies watchReplies = watches.get(i).get(GET_TIMEOUT_SECOND, 
TimeUnit.SECONDS);
-      final long logIndex = watchReplies.logIndex;
+      final long logIndex = startLogIndex + i;
       LOG.info("UNBLOCK_FOLLOWER {}: logIndex={}", i, logIndex);
-      final RaftClientReply watchAllReply = 
watchReplies.all.get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS);
+      final RaftClientReply watchAllReply = 
watchAlls.get(i).get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS);
       LOG.info("watchAllReply({}) = {}", logIndex, watchAllReply);
       Assert.assertTrue(watchAllReply.isSuccess());
 
-      final RaftClientReply watchAllCommittedReply = 
watchReplies.allCommitted.get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS);
+      final RaftClientReply watchAllCommittedReply = 
watchAllCommitteds.get(i).get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS);
       LOG.info("watchAllCommittedReply({}) = {}", logIndex, 
watchAllCommittedReply);
       Assert.assertTrue(watchAllCommittedReply.isSuccess());
       { // check commit infos
@@ -366,6 +364,6 @@ public abstract class WatchRequestTests<CLUSTER extends 
MiniRaftCluster>
         commitInfos.forEach(info -> Assert.assertTrue(logIndex <= 
info.getCommitIndex()));
       }
     }
-    return null;
   }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/0b733708/ratis-server/src/test/java/org/apache/ratis/server/ServerRestartTests.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/server/ServerRestartTests.java 
b/ratis-server/src/test/java/org/apache/ratis/server/ServerRestartTests.java
deleted file mode 100644
index 5353caa..0000000
--- a/ratis-server/src/test/java/org/apache/ratis/server/ServerRestartTests.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ratis.server;
-
-import org.apache.log4j.Level;
-import org.apache.ratis.BaseTest;
-import org.apache.ratis.MiniRaftCluster;
-import org.apache.ratis.RaftTestUtil;
-import org.apache.ratis.RaftTestUtil.SimpleMessage;
-import org.apache.ratis.client.RaftClient;
-import org.apache.ratis.conf.RaftProperties;
-import org.apache.ratis.proto.RaftProtos.LogEntryProto;
-import org.apache.ratis.protocol.RaftPeerId;
-import org.apache.ratis.server.impl.RaftServerImpl;
-import org.apache.ratis.server.impl.ServerState;
-import org.apache.ratis.server.storage.RaftLog;
-import org.apache.ratis.statemachine.SimpleStateMachine4Testing;
-import org.apache.ratis.statemachine.StateMachine;
-import org.apache.ratis.util.JavaUtils;
-import org.apache.ratis.util.LogUtils;
-import org.apache.ratis.util.SizeInBytes;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-/**
- * Test restarting raft peers.
- */
-public abstract class ServerRestartTests<CLUSTER extends MiniRaftCluster>
-    extends BaseTest
-    implements MiniRaftCluster.Factory.Get<CLUSTER> {
-  static {
-    LogUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG);
-    LogUtils.setLogLevel(RaftLog.LOG, Level.DEBUG);
-  }
-
-  static final int NUM_SERVERS = 3;
-
-  @Before
-  public void setup() {
-    final RaftProperties prop = getProperties();
-    prop.setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY,
-        SimpleStateMachine4Testing.class, StateMachine.class);
-    RaftServerConfigKeys.Log.setSegmentSizeMax(prop, 
SizeInBytes.valueOf("8KB"));
-  }
-
-  @Test
-  public void testRestartFollower() throws Exception {
-    try(final MiniRaftCluster cluster = newCluster(NUM_SERVERS)) {
-      runTestRestartFollower(cluster, LOG);
-    }
-  }
-
-  static void runTestRestartFollower(MiniRaftCluster cluster, Logger LOG) 
throws Exception {
-    cluster.start();
-    RaftTestUtil.waitForLeader(cluster);
-    final RaftPeerId leaderId = cluster.getLeader().getId();
-    final RaftClient client = cluster.createClient(leaderId);
-
-    // write some messages
-    final byte[] content = new byte[1024];
-    Arrays.fill(content, (byte)1);
-    final SimpleMessage message = new SimpleMessage(new String(content));
-    for(int i = 0; i < 10; i++) {
-      Assert.assertTrue(client.send(message).isSuccess());
-    }
-
-    // restart a follower
-    RaftPeerId followerId = cluster.getFollowers().get(0).getId();
-    LOG.info("Restart follower {}", followerId);
-    cluster.restartServer(followerId, false);
-
-    // write some more messages
-    for(int i = 0; i < 10; i++) {
-      Assert.assertTrue(client.send(message).isSuccess());
-    }
-    client.close();
-
-    final long leaderLastIndex = 
cluster.getLeader().getState().getLog().getLastEntryTermIndex().getIndex();
-    // make sure the restarted follower can catchup
-    final ServerState followerState = 
cluster.getRaftServerImpl(followerId).getState();
-    JavaUtils.attempt(() -> followerState.getLastAppliedIndex() >= 
leaderLastIndex,
-        10, 500, "follower catchup", LOG);
-
-    // make sure the restarted peer's log segments is correct
-    cluster.restartServer(followerId, false);
-    Assert.assertTrue(cluster.getRaftServerImpl(followerId).getState().getLog()
-        .getLastEntryTermIndex().getIndex() >= leaderLastIndex);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/0b733708/ratis-server/src/test/java/org/apache/ratis/server/TestRaftLogMetrics.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/server/TestRaftLogMetrics.java 
b/ratis-server/src/test/java/org/apache/ratis/server/TestRaftLogMetrics.java
index 9cc60a6..978800d 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/TestRaftLogMetrics.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/TestRaftLogMetrics.java
@@ -20,67 +20,61 @@ package org.apache.ratis.server;
 
 import com.codahale.metrics.Timer;
 import org.apache.log4j.Level;
-import org.apache.ratis.BaseTest;
-import org.apache.ratis.MiniRaftCluster;
 import org.apache.ratis.RaftTestUtil;
 import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.metrics.RatisMetricsRegistry;
 import org.apache.ratis.server.impl.RaftServerImpl;
 import org.apache.ratis.server.impl.RaftServerProxy;
 import org.apache.ratis.server.simulation.MiniRaftClusterWithSimulatedRpc;
-import org.apache.ratis.server.storage.RaftStorageTestUtils;
-import org.apache.ratis.statemachine.StateMachine;
-import org.apache.ratis.statemachine.impl.BaseStateMachine;
 import org.apache.ratis.util.LogUtils;
+import org.junit.After;
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Test;
 
 import javax.management.ObjectName;
+import java.io.IOException;
 import java.lang.management.ManagementFactory;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.atomic.AtomicInteger;
 
-public class TestRaftLogMetrics extends BaseTest
-    implements MiniRaftClusterWithSimulatedRpc.FactoryGet {
+public class TestRaftLogMetrics {
 
   {
     LogUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG);
+    LogUtils.setLogLevel(RaftClient.LOG, Level.DEBUG);
   }
 
   public static final int NUM_SERVERS = 3;
 
-  {
-    getProperties().setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY,
-        MetricsStateMachine.class, StateMachine.class);
-  }
+  protected static final RaftProperties properties = new RaftProperties();
 
-  static class MetricsStateMachine extends BaseStateMachine {
-    static MetricsStateMachine get(RaftServerImpl s) {
-      return (MetricsStateMachine)s.getStateMachine();
-    }
+  private final MiniRaftClusterWithSimulatedRpc cluster = 
MiniRaftClusterWithSimulatedRpc
+      .FACTORY.newCluster(NUM_SERVERS, getProperties());
 
-    private final AtomicInteger flushCount = new AtomicInteger();
+  public RaftProperties getProperties() {
+    return properties;
+  }
 
-    int getFlushCount() {
-      return flushCount.get();
-    }
+  @Before
+  public void setup() throws IOException {
+    Assert.assertNull(cluster.getLeader());
+    cluster.start();
+  }
 
-    @Override
-    public CompletableFuture<Void> flushStateMachineData(long index) {
-      flushCount.incrementAndGet();
-      return super.flushStateMachineData(index);
+  @After
+  public void tearDown() {
+    if (cluster != null) {
+      cluster.shutdown();
     }
   }
 
-  @Test
-  public void testFlushMetric() throws Exception {
-    try(final MiniRaftCluster cluster = newCluster(NUM_SERVERS)) {
-      cluster.start();
-      runTestFlushMetric(cluster);
-    }
+  private String getLogFlushTimeMetric(String serverId) {
+    return new StringBuilder("org.apache.ratis.server.storage.RaftLogWorker.")
+        .append(serverId).append(".flush-time").toString();
   }
 
-  static void runTestFlushMetric(MiniRaftCluster cluster) throws Exception {
+  @Test
+  public void testFlushMetric() throws Exception {
     int numMsg = 2;
     final RaftTestUtil.SimpleMessage[] messages = 
RaftTestUtil.SimpleMessage.create(numMsg);
 
@@ -91,21 +85,22 @@ public class TestRaftLogMetrics extends BaseTest
     }
 
     for (RaftServerProxy rsp: cluster.getServers()) {
-      final String flushTimeMetric = 
RaftStorageTestUtils.getLogFlushTimeMetric(rsp.getId());
+      String flushTimeMetric = getLogFlushTimeMetric(rsp.getId().toString());
       Timer tm = 
RatisMetricsRegistry.getRegistry().getTimers().get(flushTimeMetric);
       Assert.assertNotNull(tm);
 
-      final MetricsStateMachine stateMachine = 
MetricsStateMachine.get(rsp.getImpl(cluster.getGroupId()));
-      final int expectedFlush = stateMachine.getFlushCount();
+      // Number of log entries expected = numMsg + 1 entry for 
start-log-segment
+      int numExpectedLogEntries = numMsg + 1;
 
-      Assert.assertEquals(expectedFlush, tm.getCount());
+      Assert.assertEquals(numExpectedLogEntries, tm.getCount());
       Assert.assertTrue(tm.getMeanRate() > 0);
 
       // Test jmx
       ObjectName oname = new ObjectName("metrics", "name", flushTimeMetric);
-      Assert.assertEquals(expectedFlush,
+      Assert.assertEquals(numExpectedLogEntries,
           ((Long) 
ManagementFactory.getPlatformMBeanServer().getAttribute(oname, "Count"))
               .intValue());
     }
   }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/0b733708/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
 
b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
index 246a9a2..e9651d6 100644
--- 
a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
+++ 
b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
@@ -61,6 +61,8 @@ public abstract class RaftReconfigurationBaseTest extends 
BaseTest {
 
   protected static final RaftProperties prop = new RaftProperties();
   
+  private static final ClientId clientId = ClientId.randomId();
+
   static final int STAGING_CATCHUP_GAP = 10;
   @BeforeClass
   public static void setup() {
@@ -414,16 +416,17 @@ public abstract class RaftReconfigurationBaseTest extends 
BaseTest {
       cluster.start();
       RaftTestUtil.waitForLeader(cluster);
 
-      final RaftServerImpl leader = cluster.getLeader();
-      final RaftClient client = cluster.createClient(leader.getId());
+      final RaftPeerId leaderId = cluster.getLeader().getId();
+      final RaftClient client = cluster.createClient(leaderId);
       client.send(new SimpleMessage("m"));
 
-      final RaftLog leaderLog = leader.getState().getLog();
-      final long committedIndex = leaderLog.getLastCommittedIndex();
+      final long committedIndex = cluster.getLeader().getState().getLog()
+          .getLastCommittedIndex();
       final RaftConfiguration confBefore = cluster.getLeader().getRaftConf();
 
       // no real configuration change in the request
-      final RaftClientReply reply = 
client.setConfiguration(cluster.getPeers().toArray(RaftPeer.emptyArray()));
+      RaftClientReply reply = client.setConfiguration(cluster.getPeers()
+          .toArray(new RaftPeer[0]));
       Assert.assertTrue(reply.isSuccess());
       Assert.assertEquals(committedIndex, cluster.getLeader().getState()
           .getLog().getLastCommittedIndex());

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/0b733708/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
 
b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
index 827117e..bcfaf01 100644
--- 
a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
+++ 
b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
@@ -41,7 +41,8 @@ public class RaftServerTestUtil {
         3, sleepMs, "waitAndCheckNewConf", LOG);
   }
   private static void waitAndCheckNewConf(MiniRaftCluster cluster,
-      RaftPeer[] peers, Collection<String> deadPeers) {
+      RaftPeer[] peers, Collection<String> deadPeers)
+      throws Exception {
     LOG.info(cluster.printServers());
     Assert.assertNotNull(cluster.getLeader());
 
@@ -60,11 +61,9 @@ public class RaftServerTestUtil {
         numIncluded++;
         Assert.assertTrue(server.getRaftConf().isStable());
         Assert.assertTrue(server.getRaftConf().hasNoChange(peers));
-      } else if (server.isAlive()) {
-        // The server is successfully removed from the conf
-        // It may not be shutdown since it may not be able to talk to the new 
leader (who is not in its conf).
-        Assert.assertTrue(server.getRaftConf().isStable());
-        
Assert.assertFalse(server.getRaftConf().containsInConf(server.getId()));
+      } else {
+        Assert.assertFalse(server.getId() + " is still running: " + server,
+            server.isAlive());
       }
     }
     Assert.assertEquals(peers.length, numIncluded + deadIncluded);

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/0b733708/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftStateMachineExceptionTests.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftStateMachineExceptionTests.java
 
b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftStateMachineExceptionTests.java
index cf3a490..ec635d0 100644
--- 
a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftStateMachineExceptionTests.java
+++ 
b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftStateMachineExceptionTests.java
@@ -21,7 +21,6 @@ import org.apache.log4j.Level;
 import org.apache.ratis.BaseTest;
 import org.apache.ratis.MiniRaftCluster;
 import org.apache.ratis.RaftTestUtil;
-import org.apache.ratis.RaftTestUtil.SimpleMessage;
 import org.apache.ratis.client.RaftClient;
 import org.apache.ratis.client.RaftClientRpc;
 import org.apache.ratis.conf.RaftProperties;
@@ -108,8 +107,8 @@ public abstract class 
RaftStateMachineExceptionTests<CLUSTER extends MiniRaftClu
     final RaftClientRpc rpc = client.getClientRpc();
     final long callId = 999;
     final long seqNum = 111;
-    final SimpleMessage message = new SimpleMessage("message");
-    final RaftClientRequest r = cluster.newRaftClientRequest(client.getId(), 
leaderId, callId, seqNum, message);
+    RaftClientRequest r = cluster.newRaftClientRequest(client.getId(), 
leaderId,
+        callId, seqNum, new RaftTestUtil.SimpleMessage("message"));
     RaftClientReply reply = rpc.sendRequest(r);
     Assert.assertFalse(reply.isSuccess());
     Assert.assertNotNull(reply.getStateMachineException());
@@ -132,8 +131,8 @@ public abstract class 
RaftStateMachineExceptionTests<CLUSTER extends MiniRaftClu
       }
       Assert.assertNotNull(
           RaftServerTestUtil.getRetryEntry(server, client.getId(), callId));
-      final RaftLog log = server.getState().getLog();
-      RaftTestUtil.logEntriesContains(log, oldLastApplied + 1, 
log.getNextIndex(), message);
+      Assert.assertEquals(oldLastApplied + 1,
+          server.getState().getLastAppliedIndex());
     }
 
     client.close();

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/0b733708/ratis-server/src/test/java/org/apache/ratis/server/impl/StateMachineShutdownTests.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/server/impl/StateMachineShutdownTests.java
 
b/ratis-server/src/test/java/org/apache/ratis/server/impl/StateMachineShutdownTests.java
index e566700..a66cf70 100644
--- 
a/ratis-server/src/test/java/org/apache/ratis/server/impl/StateMachineShutdownTests.java
+++ 
b/ratis-server/src/test/java/org/apache/ratis/server/impl/StateMachineShutdownTests.java
@@ -97,7 +97,7 @@ public abstract class StateMachineShutdownTests<CLUSTER 
extends MiniRaftCluster>
     RaftClientReply watchReply = client.sendWatch(
         logIndex, RaftProtos.ReplicationLevel.ALL_COMMITTED);
     watchReply.getCommitInfos().forEach(
-        val -> Assert.assertTrue(val.getCommitIndex() >= logIndex));
+        val -> Assert.assertEquals(val.getCommitIndex(), logIndex));
 
     RaftServerImpl secondFollower = cluster.getFollowers().get(1);
     // Second follower is blocked in apply transaction

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/0b733708/ratis-server/src/test/java/org/apache/ratis/server/simulation/TestServerRestartWithSimulatedRpc.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/server/simulation/TestServerRestartWithSimulatedRpc.java
 
b/ratis-server/src/test/java/org/apache/ratis/server/simulation/TestServerRestartWithSimulatedRpc.java
deleted file mode 100644
index 306e5e7..0000000
--- 
a/ratis-server/src/test/java/org/apache/ratis/server/simulation/TestServerRestartWithSimulatedRpc.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ratis.server.simulation;
-
-import org.apache.ratis.server.ServerRestartTests;
-
-public class TestServerRestartWithSimulatedRpc
-    extends ServerRestartTests<MiniRaftClusterWithSimulatedRpc>
-    implements MiniRaftClusterWithSimulatedRpc.FactoryGet {
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/0b733708/ratis-server/src/test/java/org/apache/ratis/server/storage/RaftStorageTestUtils.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/server/storage/RaftStorageTestUtils.java
 
b/ratis-server/src/test/java/org/apache/ratis/server/storage/RaftStorageTestUtils.java
index e681b66..ad8308e 100644
--- 
a/ratis-server/src/test/java/org/apache/ratis/server/storage/RaftStorageTestUtils.java
+++ 
b/ratis-server/src/test/java/org/apache/ratis/server/storage/RaftStorageTestUtils.java
@@ -18,8 +18,6 @@
 package org.apache.ratis.server.storage;
 
 import org.apache.log4j.Level;
-import org.apache.ratis.protocol.RaftPeerId;
-import org.apache.ratis.server.impl.ServerProtoUtils;
 import org.apache.ratis.server.protocol.TermIndex;
 import org.apache.ratis.proto.RaftProtos;
 import org.apache.ratis.util.AutoCloseableLock;
@@ -32,10 +30,6 @@ public interface RaftStorageTestUtils {
     LogUtils.setLogLevel(RaftLogWorker.LOG, level);
   }
 
-  static String getLogFlushTimeMetric(RaftPeerId serverId) {
-    return RaftLogWorker.class.getName() + "." + serverId + ".flush-time";
-  }
-
   static void printLog(RaftLog log, Consumer<String> println) {
     if (log == null) {
       println.accept("log == null");
@@ -56,7 +50,8 @@ public interface RaftStorageTestUtils {
       b.append(i == committed? 'c': ' ');
       b.append(String.format("%3d: ", i));
       try {
-        b.append(ServerProtoUtils.toLogEntryString(log.get(i)));
+        final RaftProtos.LogEntryProto entry = log.get(i);
+        b.append(entry != null? entry.getLogEntryBodyCase(): null);
       } catch (RaftLogIOException e) {
         b.append(e);
       }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/0b733708/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java
 
b/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java
index 0a5e38d..7a326a3 100644
--- 
a/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java
+++ 
b/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java
@@ -33,9 +33,7 @@ import org.apache.ratis.server.storage.RaftLog;
 import org.apache.ratis.server.storage.RaftStorageDirectory;
 import org.apache.ratis.server.storage.RaftStorageDirectory.LogPathAndIndex;
 import org.apache.ratis.proto.RaftProtos.LogEntryProto;
-import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage;
 import org.apache.ratis.util.FileUtils;
-import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.LogUtils;
 import org.junit.After;
 import org.junit.Assert;
@@ -47,8 +45,6 @@ import org.slf4j.LoggerFactory;
 import java.io.File;
 import java.io.IOException;
 import java.util.List;
-import java.util.stream.Collectors;
-import java.util.stream.LongStream;
 
 public abstract class RaftSnapshotBaseTest extends BaseTest {
   static {
@@ -60,31 +56,25 @@ public abstract class RaftSnapshotBaseTest extends BaseTest 
{
   static final Logger LOG = 
LoggerFactory.getLogger(RaftSnapshotBaseTest.class);
   private static final int SNAPSHOT_TRIGGER_THRESHOLD = 10;
 
-  static List<File> getSnapshotFiles(MiniRaftCluster cluster, long startIndex, 
long endIndex) {
+  static File getSnapshotFile(MiniRaftCluster cluster, int i) {
     final RaftServerImpl leader = cluster.getLeader();
-    final SimpleStateMachineStorage storage = 
SimpleStateMachine4Testing.get(leader).getStateMachineStorage();
-    final long term = leader.getState().getCurrentTerm();
-    return LongStream.range(startIndex, endIndex)
-        .mapToObj(i -> storage.getSnapshotFile(term, i))
-        .collect(Collectors.toList());
+    final SimpleStateMachine4Testing sm = 
SimpleStateMachine4Testing.get(leader);
+    return sm.getStateMachineStorage().getSnapshotFile(
+        leader.getState().getCurrentTerm(), i);
   }
 
-
-  static void assertLeaderContent(MiniRaftCluster cluster) throws Exception {
+  static void assertLeaderContent(MiniRaftCluster cluster)
+      throws InterruptedException {
     final RaftServerImpl leader = RaftTestUtil.waitForLeader(cluster);
-    final RaftLog leaderLog = leader.getState().getLog();
-    final long lastIndex = leaderLog.getLastEntryTermIndex().getIndex();
-    final LogEntryProto e = leaderLog.get(lastIndex);
-
+    Assert.assertEquals(SNAPSHOT_TRIGGER_THRESHOLD * 2,
+        leader.getState().getLog().getLastCommittedIndex());
     final LogEntryProto[] entries = 
SimpleStateMachine4Testing.get(leader).getContent();
-    long message = 0;
-    for (int i = 0; i < entries.length; i++) {
-      LOG.info("{}) {} {}", i, message, entries[i]);
-      if (entries[i].hasStateMachineLogEntry()) {
-        final SimpleMessage m = new SimpleMessage("m" + message++);
-        Assert.assertArrayEquals(m.getContent().toByteArray(),
-            entries[i].getStateMachineLogEntry().getLogData().toByteArray());
-      }
+
+    for (int i = 1; i < SNAPSHOT_TRIGGER_THRESHOLD * 2 - 1; i++) {
+      Assert.assertEquals(i+1, entries[i].getIndex());
+      Assert.assertArrayEquals(
+          new SimpleMessage("m" + i).getContent().toByteArray(),
+          entries[i].getStateMachineLogEntry().getLogData().toByteArray());
     }
   }
 
@@ -128,12 +118,15 @@ public abstract class RaftSnapshotBaseTest extends 
BaseTest {
       }
     }
 
-    final long nextIndex = 
cluster.getLeader().getState().getLog().getNextIndex();
-    LOG.info("nextIndex = {}", nextIndex);
     // wait for the snapshot to be done
-    final List<File> snapshotFiles = getSnapshotFiles(cluster, nextIndex - 
SNAPSHOT_TRIGGER_THRESHOLD, nextIndex);
-    JavaUtils.attempt(() -> 
snapshotFiles.stream().anyMatch(RaftSnapshotBaseTest::exists),
-        10, 1000, "snapshotFile.exist", LOG);
+    final File snapshotFile = getSnapshotFile(cluster, i);
+
+    int retries = 0;
+    do {
+      Thread.sleep(1000);
+    } while (!snapshotFile.exists() && retries++ < 10);
+
+    Assert.assertTrue(snapshotFile + " does not exist", snapshotFile.exists());
 
     // restart the peer and check if it can correctly load snapshot
     cluster.restart(false);
@@ -145,14 +138,6 @@ public abstract class RaftSnapshotBaseTest extends 
BaseTest {
     }
   }
 
-  static boolean exists(File f) {
-    if (f.exists()) {
-      LOG.info("File exists: " + f);
-      return true;
-    }
-    return false;
-  }
-
   /**
    * Basic test for install snapshot: start a one node cluster and let it
    * generate a snapshot. Then delete the log and restart the node, and add 
more
@@ -160,7 +145,7 @@ public abstract class RaftSnapshotBaseTest extends BaseTest 
{
    */
   @Test
   public void testBasicInstallSnapshot() throws Exception {
-    final List<LogPathAndIndex> logs;
+    List<LogPathAndIndex> logs;
     try {
       RaftTestUtil.waitForLeader(cluster);
       final RaftPeerId leaderId = cluster.getLeader().getId();
@@ -176,13 +161,15 @@ public abstract class RaftSnapshotBaseTest extends 
BaseTest {
       // wait for the snapshot to be done
       RaftStorageDirectory storageDirectory = cluster.getLeader().getState()
           .getStorage().getStorageDir();
-
-      final long nextIndex = 
cluster.getLeader().getState().getLog().getNextIndex();
-      LOG.info("nextIndex = {}", nextIndex);
-      final List<File> snapshotFiles = getSnapshotFiles(cluster, nextIndex - 
SNAPSHOT_TRIGGER_THRESHOLD, nextIndex);
-      JavaUtils.attempt(() -> 
snapshotFiles.stream().anyMatch(RaftSnapshotBaseTest::exists),
-          10, 1000, "snapshotFile.exist", LOG);
+      final File snapshotFile = getSnapshotFile(cluster, i);
       logs = storageDirectory.getLogSegmentFiles();
+
+      int retries = 0;
+      do {
+        Thread.sleep(1000);
+      } while (!snapshotFile.exists() && retries++ < 10);
+
+      Assert.assertTrue(snapshotFile + " does not exist", 
snapshotFile.exists());
     } finally {
       cluster.shutdown();
     }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/0b733708/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
 
b/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
index 313e713..9a7267b 100644
--- 
a/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
+++ 
b/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
@@ -20,15 +20,13 @@ package org.apache.ratis.statemachine;
 import org.apache.ratis.RaftTestUtil.SimpleMessage;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.io.MD5Hash;
-import org.apache.ratis.proto.RaftProtos.LogEntryProto;
-import org.apache.ratis.proto.RaftProtos.RoleInfoProto;
 import org.apache.ratis.protocol.Message;
 import org.apache.ratis.protocol.RaftClientRequest;
-import org.apache.ratis.protocol.RaftGroup;
 import org.apache.ratis.protocol.RaftGroupId;
 import org.apache.ratis.protocol.StateMachineException;
 import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.protocol.RaftGroup;
 import org.apache.ratis.server.impl.RaftServerConstants;
 import org.apache.ratis.server.impl.RaftServerImpl;
 import org.apache.ratis.server.impl.ServerProtoUtils;
@@ -36,29 +34,22 @@ import org.apache.ratis.server.protocol.TermIndex;
 import org.apache.ratis.server.storage.LogInputStream;
 import org.apache.ratis.server.storage.LogOutputStream;
 import org.apache.ratis.server.storage.RaftStorage;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.apache.ratis.proto.RaftProtos.RoleInfoProto;
+import org.apache.ratis.proto.RaftProtos.LogEntryProto;
 import org.apache.ratis.statemachine.impl.BaseStateMachine;
 import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage;
 import org.apache.ratis.statemachine.impl.SingleFileSnapshotInfo;
 import org.apache.ratis.statemachine.impl.TransactionContextImpl;
-import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
-import org.apache.ratis.util.Daemon;
-import org.apache.ratis.util.JavaUtils;
-import org.apache.ratis.util.LifeCycle;
-import org.apache.ratis.util.MD5FileUtil;
-import org.apache.ratis.util.Preconditions;
+import org.apache.ratis.util.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
-import java.util.Collections;
-import java.util.EnumMap;
-import java.util.Objects;
-import java.util.SortedMap;
-import java.util.TreeMap;
+import java.util.*;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
 
 /**
  * A {@link StateMachine} implementation example that simply stores all the log
@@ -77,8 +68,8 @@ public class SimpleStateMachine4Testing extends 
BaseStateMachine {
     return (SimpleStateMachine4Testing)s.getStateMachine();
   }
 
-  private final SortedMap<Long, LogEntryProto> indexMap = 
Collections.synchronizedSortedMap(new TreeMap<>());
-  private final SortedMap<String, LogEntryProto> dataMap = 
Collections.synchronizedSortedMap(new TreeMap<>());
+  private final List<LogEntryProto> list =
+      Collections.synchronizedList(new ArrayList<>());
   private final Daemon checkpointer;
   private final SimpleStateMachineStorage storage = new 
SimpleStateMachineStorage();
   private final RaftProperties properties = new RaftProperties();
@@ -128,14 +119,14 @@ public class SimpleStateMachine4Testing extends 
BaseStateMachine {
   public SimpleStateMachine4Testing() {
     checkpointer = new Daemon(() -> {
       while (running) {
-        if (indexMap.lastKey() - endIndexLastCkpt >= SNAPSHOT_THRESHOLD) {
-          endIndexLastCkpt = takeSnapshot();
-        }
-
-        try {
-          TimeUnit.SECONDS.sleep(1);
-        } catch(InterruptedException ignored) {
-        }
+          if (list.get(list.size() - 1).getIndex() - endIndexLastCkpt >=
+              SNAPSHOT_THRESHOLD) {
+            endIndexLastCkpt = takeSnapshot();
+          }
+          try {
+            Thread.sleep(1000);
+          } catch (InterruptedException ignored) {
+          }
       }
     });
   }
@@ -148,12 +139,6 @@ public class SimpleStateMachine4Testing extends 
BaseStateMachine {
     return leaderElectionTimeoutInfo;
   }
 
-  private void put(LogEntryProto entry) {
-    final LogEntryProto previous = indexMap.put(entry.getIndex(), entry);
-    Preconditions.assertNull(previous, "previous");
-    dataMap.put(entry.getStateMachineLogEntry().getLogData().toStringUtf8(), 
entry);
-  }
-
   @Override
   public synchronized void initialize(RaftServer server, RaftGroupId groupId,
       RaftStorage raftStorage) throws IOException {
@@ -186,7 +171,7 @@ public class SimpleStateMachine4Testing extends 
BaseStateMachine {
   @Override
   public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
     LogEntryProto entry = Objects.requireNonNull(trx.getLogEntry());
-    put(entry);
+    list.add(entry);
     updateLastAppliedTermIndex(entry.getTerm(), entry.getIndex());
     return CompletableFuture.completedFuture(
         new SimpleMessage(entry.getIndex() + " OK"));
@@ -207,7 +192,7 @@ public class SimpleStateMachine4Testing extends 
BaseStateMachine {
         termIndex.getIndex(), snapshotFile);
     try (LogOutputStream out = new LogOutputStream(snapshotFile, false,
         segmentMaxSize, preallocatedSize, bufferSize)) {
-      for (final LogEntryProto entry : indexMap.values()) {
+      for (final LogEntryProto entry : list) {
         if (entry.getIndex() > endIndex) {
           break;
         } else {
@@ -256,13 +241,13 @@ public class SimpleStateMachine4Testing extends 
BaseStateMachine {
           snapshot.getFile().getPath().toFile(), 0, endIndex, false)) {
         LogEntryProto entry;
         while ((entry = in.nextEntry()) != null) {
-          put(entry);
+          list.add(entry);
           updateLastAppliedTermIndex(entry.getTerm(), entry.getIndex());
         }
       }
       Preconditions.assertTrue(
-          !indexMap.isEmpty() && endIndex == indexMap.lastKey(),
-          "endIndex=%s, indexMap=%s", endIndex, indexMap);
+          !list.isEmpty() && endIndex == list.get(list.size() - 1).getIndex(),
+          "endIndex=%s, list=%s", endIndex, list);
       this.endIndexLastCkpt = endIndex;
       setLastAppliedTermIndex(snapshot.getTermIndex());
       this.storage.loadLatestSnapshot();
@@ -279,21 +264,18 @@ public class SimpleStateMachine4Testing extends 
BaseStateMachine {
    */
   @Override
   public CompletableFuture<Message> query(Message request) {
-    final String string = request.getContent().toStringUtf8();
-    Exception exception;
+    final ByteString bytes = request.getContent();
     try {
-      LOG.info("query " + string);
-      final LogEntryProto entry = dataMap.get(string);
-      if (entry != null) {
-        return 
CompletableFuture.completedFuture(Message.valueOf(entry.toByteString()));
-      }
-      exception = new IndexOutOfBoundsException("Log entry not found for query 
" + string);
+      final long index = bytes.isEmpty()? getLastAppliedTermIndex().getIndex()
+          : Long.parseLong(bytes.toStringUtf8());
+      LOG.info("query log index " + index);
+      final LogEntryProto entry = list.get(Math.toIntExact(index - 1));
+      return 
CompletableFuture.completedFuture(Message.valueOf(entry.toByteString()));
     } catch (Exception e) {
       LOG.warn("Failed request " + request, e);
-      exception = e;
+      return JavaUtils.completeExceptionally(new StateMachineException(
+          "Failed request " + request, e));
     }
-    return JavaUtils.completeExceptionally(new StateMachineException(
-        "Failed request " + request, exception));
   }
 
   static final ByteString STATE_MACHINE_DATA = 
ByteString.copyFromUtf8("StateMachine Data");
@@ -332,7 +314,7 @@ public class SimpleStateMachine4Testing extends 
BaseStateMachine {
   }
 
   public LogEntryProto[] getContent() {
-    return indexMap.values().toArray(new LogEntryProto[0]);
+    return list.toArray(new LogEntryProto[list.size()]);
   }
 
   public void blockStartTransaction() {

Reply via email to