http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/5c37675f/ratis-test/src/test/java/org/apache/ratis/server/simulation/TestRetryCacheWithSimulatedRpc.java
----------------------------------------------------------------------
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/server/simulation/TestRetryCacheWithSimulatedRpc.java
 
b/ratis-test/src/test/java/org/apache/ratis/server/simulation/TestRetryCacheWithSimulatedRpc.java
new file mode 100644
index 0000000..a088578
--- /dev/null
+++ 
b/ratis-test/src/test/java/org/apache/ratis/server/simulation/TestRetryCacheWithSimulatedRpc.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.simulation;
+
+import java.io.IOException;
+
+import org.apache.log4j.Level;
+import org.apache.ratis.RetryCacheTests;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.server.impl.RaftServerImpl;
+import org.apache.ratis.util.LogUtils;
+
+public class TestRetryCacheWithSimulatedRpc extends RetryCacheTests {
+  static {
+    LogUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG);
+    LogUtils.setLogLevel(RaftClient.LOG, Level.DEBUG);
+  }
+
+  private final MiniRaftClusterWithSimulatedRpc cluster;
+
+  public TestRetryCacheWithSimulatedRpc() throws IOException {
+    cluster = MiniRaftClusterWithSimulatedRpc.FACTORY.newCluster(
+        NUM_SERVERS, getProperties());
+  }
+
+  @Override
+  public MiniRaftClusterWithSimulatedRpc getCluster() {
+    return cluster;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/5c37675f/ratis-test/src/test/java/org/apache/ratis/server/simulation/TestServerInformationWithSimulatedRpc.java
----------------------------------------------------------------------
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/server/simulation/TestServerInformationWithSimulatedRpc.java
 
b/ratis-test/src/test/java/org/apache/ratis/server/simulation/TestServerInformationWithSimulatedRpc.java
new file mode 100644
index 0000000..f7025a5
--- /dev/null
+++ 
b/ratis-test/src/test/java/org/apache/ratis/server/simulation/TestServerInformationWithSimulatedRpc.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.simulation;
+
+import org.apache.ratis.server.impl.ServerInformationBaseTest;
+
+public class TestServerInformationWithSimulatedRpc
+    extends ServerInformationBaseTest<MiniRaftClusterWithSimulatedRpc>
+    implements MiniRaftClusterWithSimulatedRpc.FactoryGet {
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/5c37675f/ratis-test/src/test/java/org/apache/ratis/server/simulation/TestServerRestartWithSimulatedRpc.java
----------------------------------------------------------------------
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/server/simulation/TestServerRestartWithSimulatedRpc.java
 
b/ratis-test/src/test/java/org/apache/ratis/server/simulation/TestServerRestartWithSimulatedRpc.java
new file mode 100644
index 0000000..306e5e7
--- /dev/null
+++ 
b/ratis-test/src/test/java/org/apache/ratis/server/simulation/TestServerRestartWithSimulatedRpc.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.simulation;
+
+import org.apache.ratis.server.ServerRestartTests;
+
+public class TestServerRestartWithSimulatedRpc
+    extends ServerRestartTests<MiniRaftClusterWithSimulatedRpc>
+    implements MiniRaftClusterWithSimulatedRpc.FactoryGet {
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/5c37675f/ratis-test/src/test/java/org/apache/ratis/server/storage/TestCacheEviction.java
----------------------------------------------------------------------
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/server/storage/TestCacheEviction.java
 
b/ratis-test/src/test/java/org/apache/ratis/server/storage/TestCacheEviction.java
new file mode 100644
index 0000000..1cd41a5
--- /dev/null
+++ 
b/ratis-test/src/test/java/org/apache/ratis/server/storage/TestCacheEviction.java
@@ -0,0 +1,198 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.storage;
+
+import org.apache.ratis.BaseTest;
+import org.apache.ratis.MiniRaftCluster;
+import org.apache.ratis.RaftTestUtil.SimpleOperation;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.server.impl.RaftServerConstants;
+import org.apache.ratis.server.impl.RaftServerImpl;
+import org.apache.ratis.server.impl.ServerProtoUtils;
+import org.apache.ratis.server.impl.ServerState;
+import 
org.apache.ratis.server.storage.CacheInvalidationPolicy.CacheInvalidationPolicyDefault;
+import org.apache.ratis.server.storage.TestSegmentedRaftLog.SegmentRange;
+import org.apache.ratis.proto.RaftProtos.LogEntryProto;
+import org.apache.ratis.statemachine.SimpleStateMachine4Testing;
+import org.apache.ratis.statemachine.StateMachine;
+import org.apache.ratis.util.SizeInBytes;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+public class TestCacheEviction extends BaseTest {
+  private static final CacheInvalidationPolicy policy = new 
CacheInvalidationPolicyDefault();
+
+  private List<LogSegment> prepareSegments(int numSegments, boolean[] cached, 
long start, long size) {
+    Assert.assertEquals(numSegments, cached.length);
+    List<LogSegment> segments = new ArrayList<>(numSegments);
+    for (int i = 0; i < numSegments; i++) {
+      LogSegment s = LogSegment.newCloseSegment(null, start, start + size - 1);
+      if (cached[i]) {
+        s = Mockito.spy(s);
+        Mockito.when(s.hasCache()).thenReturn(true);
+      }
+      segments.add(s);
+      start += size;
+    }
+    return segments;
+  }
+
+  @Test
+  public void testBasicEviction() throws Exception {
+    final int maxCached = 5;
+    List<LogSegment> segments = prepareSegments(5,
+        new boolean[]{true, true, true, true, true}, 0, 10);
+
+    // case 1, make sure we do not evict cache for segments behind local 
flushed index
+    List<LogSegment> evicted = policy.evict(null, 5, 15, segments, maxCached);
+    Assert.assertEquals(0, evicted.size());
+
+    // case 2, suppose the local flushed index is in the 3rd segment, then we
+    // can evict the first two segment
+    evicted = policy.evict(null, 25, 30, segments, maxCached);
+    Assert.assertEquals(2, evicted.size());
+    Assert.assertSame(evicted.get(0), segments.get(0));
+    Assert.assertSame(evicted.get(1), segments.get(1));
+
+    // case 3, similar with case 2, but the local applied index is less than
+    // the local flushed index.
+    evicted = policy.evict(null, 25, 15, segments, maxCached);
+    Assert.assertEquals(1, evicted.size());
+    Assert.assertSame(evicted.get(0), segments.get(0));
+
+    // case 4, the local applied index is very small, then evict cache behind 
it
+    // first and let the state machine load the segments later
+    evicted = policy.evict(null, 35, 5, segments, maxCached);
+    Assert.assertEquals(1, evicted.size());
+    Assert.assertSame(evicted.get(0), segments.get(2));
+
+    Mockito.when(segments.get(2).hasCache()).thenReturn(false);
+    evicted = policy.evict(null, 35, 5, segments, maxCached);
+    Assert.assertEquals(1, evicted.size());
+    Assert.assertSame(evicted.get(0), segments.get(1));
+
+    Mockito.when(segments.get(1).hasCache()).thenReturn(false);
+    evicted = policy.evict(null, 35, 5, segments, maxCached);
+    Assert.assertEquals(0, evicted.size());
+  }
+
+  @Test
+  public void testEvictionWithFollowerIndices() throws Exception {
+    final int maxCached = 6;
+    List<LogSegment> segments = prepareSegments(6,
+        new boolean[]{true, true, true, true, true, true}, 0, 10);
+
+    // case 1, no matter where the followers are, we do not evict segments 
behind local
+    // flushed index
+    List<LogSegment> evicted = policy.evict(new long[]{20, 40, 40}, 5, 15, 
segments,
+        maxCached);
+    Assert.assertEquals(0, evicted.size());
+
+    // case 2, the follower indices are behind the local flushed index
+    evicted = policy.evict(new long[]{30, 40, 45}, 25, 30, segments, 
maxCached);
+    Assert.assertEquals(2, evicted.size());
+    Assert.assertSame(evicted.get(0), segments.get(0));
+    Assert.assertSame(evicted.get(1), segments.get(1));
+
+    // case 3, similar with case 3 in basic eviction test
+    evicted = policy.evict(new long[]{30, 40, 45}, 25, 15, segments, 
maxCached);
+    Assert.assertEquals(1, evicted.size());
+    Assert.assertSame(evicted.get(0), segments.get(0));
+
+    // case 4, the followers are slower than local flush
+    evicted = policy.evict(new long[]{15, 45, 45}, 55, 50, segments, 
maxCached);
+    Assert.assertEquals(1, evicted.size());
+    Assert.assertSame(evicted.get(0), segments.get(0));
+
+    Mockito.when(segments.get(0).hasCache()).thenReturn(false);
+    evicted = policy.evict(new long[]{15, 45, 45}, 55, 50, segments, 
maxCached);
+    Assert.assertEquals(1, evicted.size());
+    Assert.assertSame(evicted.get(0), segments.get(2));
+
+    Mockito.when(segments.get(2).hasCache()).thenReturn(false);
+    evicted = policy.evict(new long[]{15, 45, 45}, 55, 50, segments, 
maxCached);
+    Assert.assertEquals(1, evicted.size());
+    Assert.assertSame(evicted.get(0), segments.get(3));
+
+    Mockito.when(segments.get(3).hasCache()).thenReturn(false);
+    evicted = policy.evict(new long[]{15, 45, 45}, 55, 50, segments, 
maxCached);
+    Assert.assertEquals(0, evicted.size());
+  }
+
+  @Test
+  public void testEvictionInSegmentedLog() throws Exception {
+    final RaftProperties prop = new RaftProperties();
+    prop.setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY,
+        SimpleStateMachine4Testing.class, StateMachine.class);
+    RaftServerConfigKeys.Log.setSegmentSizeMax(prop, 
SizeInBytes.valueOf("8KB"));
+    RaftServerConfigKeys.Log.setPreallocatedSize(prop, 
SizeInBytes.valueOf("8KB"));
+    final RaftPeerId peerId = RaftPeerId.valueOf("s0");
+    final int maxCachedNum = 
RaftServerConfigKeys.Log.maxCachedSegmentNum(prop);
+
+    File storageDir = getTestDir();
+    RaftServerConfigKeys.setStorageDirs(prop,  
Collections.singletonList(storageDir));
+    RaftStorage storage = new RaftStorage(storageDir, 
RaftServerConstants.StartupOption.REGULAR);
+
+    RaftServerImpl server = Mockito.mock(RaftServerImpl.class);
+    ServerState state = Mockito.mock(ServerState.class);
+    Mockito.when(server.getState()).thenReturn(state);
+    Mockito.when(server.getFollowerNextIndices()).thenReturn(new long[]{});
+    Mockito.when(state.getLastAppliedIndex()).thenReturn(0L);
+
+    SegmentedRaftLog raftLog = new SegmentedRaftLog(peerId, server, storage, 
-1, prop);
+    raftLog.open(RaftServerConstants.INVALID_LOG_INDEX, null);
+    List<SegmentRange> slist = TestSegmentedRaftLog.prepareRanges(0, 
maxCachedNum, 7, 0);
+    LogEntryProto[] entries = generateEntries(slist);
+    raftLog.append(entries).forEach(CompletableFuture::join);
+
+    // check the current cached segment number: the last segment is still open
+    Assert.assertEquals(maxCachedNum - 1,
+        raftLog.getRaftLogCache().getCachedSegmentNum());
+
+    Mockito.when(server.getFollowerNextIndices()).thenReturn(new long[]{21, 
40, 40});
+    Mockito.when(state.getLastAppliedIndex()).thenReturn(35L);
+    slist = TestSegmentedRaftLog.prepareRanges(maxCachedNum, maxCachedNum + 2, 
7, 7 * maxCachedNum);
+    entries = generateEntries(slist);
+    raftLog.append(entries).forEach(CompletableFuture::join);
+
+    // check the cached segment number again. since the slowest follower is on
+    // index 21, the eviction should happen and evict 3 segments
+    Assert.assertEquals(maxCachedNum + 1 - 3,
+        raftLog.getRaftLogCache().getCachedSegmentNum());
+  }
+
+  private LogEntryProto[] generateEntries(List<SegmentRange> slist) {
+    List<LogEntryProto> eList = new ArrayList<>();
+    for (SegmentRange range : slist) {
+      for (long index = range.start; index <= range.end; index++) {
+        SimpleOperation m = new SimpleOperation(new String(new byte[1024]));
+        eList.add(ServerProtoUtils.toLogEntryProto(m.getLogEntryContent(), 
range.term, index));
+      }
+    }
+    return eList.toArray(new LogEntryProto[eList.size()]);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/5c37675f/ratis-test/src/test/java/org/apache/ratis/server/storage/TestRaftLogCache.java
----------------------------------------------------------------------
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/server/storage/TestRaftLogCache.java
 
b/ratis-test/src/test/java/org/apache/ratis/server/storage/TestRaftLogCache.java
new file mode 100644
index 0000000..d3c216d
--- /dev/null
+++ 
b/ratis-test/src/test/java/org/apache/ratis/server/storage/TestRaftLogCache.java
@@ -0,0 +1,257 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.storage;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.ratis.RaftTestUtil.SimpleOperation;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.server.impl.ServerProtoUtils;
+import org.apache.ratis.server.protocol.TermIndex;
+import org.apache.ratis.server.storage.RaftLogCache.TruncationSegments;
+import org.apache.ratis.proto.RaftProtos.LogEntryProto;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestRaftLogCache {
+  private static final RaftProperties prop = new RaftProperties();
+
+  private RaftLogCache cache;
+
+  @Before
+  public void setup() {
+    cache = new RaftLogCache(null, null, prop);
+  }
+
+  private LogSegment prepareLogSegment(long start, long end, boolean isOpen) {
+    LogSegment s = LogSegment.newOpenSegment(null, start);
+    for (long i = start; i <= end; i++) {
+      SimpleOperation m = new SimpleOperation("m" + i);
+      LogEntryProto entry = 
ServerProtoUtils.toLogEntryProto(m.getLogEntryContent(), 0, i);
+      s.appendToOpenSegment(entry);
+    }
+    if (!isOpen) {
+      s.close();
+    }
+    return s;
+  }
+
+  private void checkCache(long start, long end, int segmentSize) throws 
IOException {
+    Assert.assertEquals(start, cache.getStartIndex());
+    Assert.assertEquals(end, cache.getEndIndex());
+
+    for (long index = start; index <= end; index++) {
+      LogEntryProto entry = 
cache.getSegment(index).getEntryWithoutLoading(index).getEntry();
+      Assert.assertEquals(index, entry.getIndex());
+    }
+
+    long[] offsets = new long[]{start, start + 1, start + (end - start) / 2,
+        end - 1, end};
+    for (long offset : offsets) {
+      checkCacheEntries(offset, (int) (end - offset + 1), end);
+      checkCacheEntries(offset, 1, end);
+      checkCacheEntries(offset, 20, end);
+      checkCacheEntries(offset, segmentSize, end);
+      checkCacheEntries(offset, segmentSize - 1, end);
+    }
+  }
+
+  private void checkCacheEntries(long offset, int size, long end) {
+    TermIndex[] entries = cache.getTermIndices(offset, offset + size);
+    long realEnd = offset + size > end + 1 ? end + 1 : offset + size;
+    Assert.assertEquals(realEnd - offset, entries.length);
+    for (long i = offset; i < realEnd; i++) {
+      Assert.assertEquals(i, entries[(int) (i - offset)].getIndex());
+    }
+  }
+
+  @Test
+  public void testAddSegments() throws Exception {
+    LogSegment s1 = prepareLogSegment(1, 100, false);
+    cache.addSegment(s1);
+    checkCache(1, 100, 100);
+
+    try {
+      LogSegment s = prepareLogSegment(102, 103, true);
+      cache.addSegment(s);
+      Assert.fail("should fail since there is gap between two segments");
+    } catch (IllegalStateException ignored) {
+    }
+
+    LogSegment s2 = prepareLogSegment(101, 200, true);
+    cache.addSegment(s2);
+    checkCache(1, 200, 100);
+
+    try {
+      LogSegment s = prepareLogSegment(201, 202, true);
+      cache.addSegment(s);
+      Assert.fail("should fail since there is still an open segment in cache");
+    } catch (IllegalStateException ignored) {
+    }
+
+    cache.rollOpenSegment(false);
+    checkCache(1, 200, 100);
+
+    try {
+      LogSegment s = prepareLogSegment(202, 203, true);
+      cache.addSegment(s);
+      Assert.fail("should fail since there is gap between two segments");
+    } catch (IllegalStateException ignored) {
+    }
+
+    LogSegment s3 = prepareLogSegment(201, 300, true);
+    cache.addSegment(s3);
+    Assert.assertNotNull(cache.getOpenSegment());
+    checkCache(1, 300, 100);
+
+    cache.rollOpenSegment(true);
+    Assert.assertNotNull(cache.getOpenSegment());
+    checkCache(1, 300, 100);
+  }
+
+  @Test
+  public void testAppendEntry() throws Exception {
+    LogSegment closedSegment = prepareLogSegment(0, 99, false);
+    cache.addSegment(closedSegment);
+
+    final SimpleOperation m = new SimpleOperation("m");
+    try {
+      LogEntryProto entry = 
ServerProtoUtils.toLogEntryProto(m.getLogEntryContent(), 0, 0);
+      cache.appendEntry(entry);
+      Assert.fail("the open segment is null");
+    } catch (IllegalStateException ignored) {
+    }
+
+    LogSegment openSegment = prepareLogSegment(100, 100, true);
+    cache.addSegment(openSegment);
+    for (long index = 101; index < 200; index++) {
+      LogEntryProto entry = 
ServerProtoUtils.toLogEntryProto(m.getLogEntryContent(), 0, index);
+      cache.appendEntry(entry);
+    }
+
+    Assert.assertNotNull(cache.getOpenSegment());
+    checkCache(0, 199, 100);
+  }
+
+  @Test
+  public void testTruncate() throws Exception {
+    long start = 0;
+    for (int i = 0; i < 5; i++) { // 5 closed segments
+      LogSegment s = prepareLogSegment(start, start + 99, false);
+      cache.addSegment(s);
+      start += 100;
+    }
+    // add another open segment
+    LogSegment s = prepareLogSegment(start, start + 99, true);
+    cache.addSegment(s);
+
+    long end = cache.getEndIndex();
+    Assert.assertEquals(599, end);
+    int numOfSegments = 6;
+    // start truncation
+    for (int i = 0; i < 10; i++) { // truncate 10 times
+      // each time truncate 37 entries
+      end -= 37;
+      TruncationSegments ts = cache.truncate(end + 1);
+      checkCache(0, end, 100);
+
+      // check TruncationSegments
+      int currentNum= (int) (end / 100 + 1);
+      if (currentNum < numOfSegments) {
+        Assert.assertEquals(1, ts.toDelete.length);
+        numOfSegments = currentNum;
+      } else {
+        Assert.assertEquals(0, ts.toDelete.length);
+      }
+    }
+
+    // 230 entries remaining. truncate at the segment boundary
+    TruncationSegments ts = cache.truncate(200);
+    checkCache(0, 199, 100);
+    Assert.assertEquals(1, ts.toDelete.length);
+    Assert.assertEquals(200, ts.toDelete[0].startIndex);
+    Assert.assertEquals(229, ts.toDelete[0].endIndex);
+    Assert.assertEquals(0, ts.toDelete[0].targetLength);
+    Assert.assertFalse(ts.toDelete[0].isOpen);
+    Assert.assertNull(ts.toTruncate);
+
+    // add another open segment and truncate it as a whole
+    LogSegment newOpen = prepareLogSegment(200, 249, true);
+    cache.addSegment(newOpen);
+    ts = cache.truncate(200);
+    checkCache(0, 199, 100);
+    Assert.assertEquals(1, ts.toDelete.length);
+    Assert.assertEquals(200, ts.toDelete[0].startIndex);
+    Assert.assertEquals(249, ts.toDelete[0].endIndex);
+    Assert.assertEquals(0, ts.toDelete[0].targetLength);
+    Assert.assertTrue(ts.toDelete[0].isOpen);
+    Assert.assertNull(ts.toTruncate);
+
+    // add another open segment and truncate part of it
+    newOpen = prepareLogSegment(200, 249, true);
+    cache.addSegment(newOpen);
+    ts = cache.truncate(220);
+    checkCache(0, 219, 100);
+    Assert.assertNull(cache.getOpenSegment());
+    Assert.assertEquals(0, ts.toDelete.length);
+    Assert.assertTrue(ts.toTruncate.isOpen);
+    Assert.assertEquals(219, ts.toTruncate.newEndIndex);
+    Assert.assertEquals(200, ts.toTruncate.startIndex);
+    Assert.assertEquals(249, ts.toTruncate.endIndex);
+  }
+
+  private void testIterator(long startIndex) throws IOException {
+    Iterator<TermIndex> iterator = cache.iterator(startIndex);
+    TermIndex prev = null;
+    while (iterator.hasNext()) {
+      TermIndex termIndex = iterator.next();
+      
Assert.assertEquals(cache.getLogRecord(termIndex.getIndex()).getTermIndex(), 
termIndex);
+      if (prev != null) {
+        Assert.assertEquals(prev.getIndex() + 1, termIndex.getIndex());
+      }
+      prev = termIndex;
+    }
+    if (startIndex <= cache.getEndIndex()) {
+      Assert.assertNotNull(prev);
+      Assert.assertEquals(cache.getEndIndex(), prev.getIndex());
+    }
+  }
+
+  @Test
+  public void testIterator() throws Exception {
+    long start = 0;
+    for (int i = 0; i < 2; i++) { // 2 closed segments
+      LogSegment s = prepareLogSegment(start, start + 99, false);
+      cache.addSegment(s);
+      start += 100;
+    }
+    // add another open segment
+    LogSegment s = prepareLogSegment(start, start + 99, true);
+    cache.addSegment(s);
+
+    for (long startIndex = 0; startIndex < 300; startIndex += 50) {
+      testIterator(startIndex);
+    }
+    testIterator(299);
+
+    Iterator<TermIndex> iterator = cache.iterator(300);
+    Assert.assertFalse(iterator.hasNext());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/5c37675f/ratis-test/src/test/java/org/apache/ratis/server/storage/TestRaftLogReadWrite.java
----------------------------------------------------------------------
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/server/storage/TestRaftLogReadWrite.java
 
b/ratis-test/src/test/java/org/apache/ratis/server/storage/TestRaftLogReadWrite.java
new file mode 100644
index 0000000..7d9fdf5
--- /dev/null
+++ 
b/ratis-test/src/test/java/org/apache/ratis/server/storage/TestRaftLogReadWrite.java
@@ -0,0 +1,272 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.storage;
+
+import org.apache.ratis.BaseTest;
+import org.apache.ratis.RaftTestUtil.SimpleOperation;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.protocol.ChecksumException;
+import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.server.impl.RaftServerConstants;
+import org.apache.ratis.server.impl.RaftServerConstants.StartupOption;
+import org.apache.ratis.server.impl.ServerProtoUtils;
+import org.apache.ratis.thirdparty.com.google.protobuf.CodedOutputStream;
+import org.apache.ratis.proto.RaftProtos.LogEntryProto;
+import org.apache.ratis.util.FileUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Test basic functionality of LogReader, LogInputStream, and LogOutputStream.
+ */
+public class TestRaftLogReadWrite extends BaseTest {
+  private File storageDir;
+  private long segmentMaxSize;
+  private long preallocatedSize;
+  private int bufferSize;
+
+  @Before
+  public void setup() throws Exception {
+    storageDir = getTestDir();
+    RaftProperties properties = new RaftProperties();
+    RaftServerConfigKeys.setStorageDirs(properties,  
Collections.singletonList(storageDir));
+    this.segmentMaxSize =
+        RaftServerConfigKeys.Log.segmentSizeMax(properties).getSize();
+    this.preallocatedSize =
+        RaftServerConfigKeys.Log.preallocatedSize(properties).getSize();
+    this.bufferSize =
+        RaftServerConfigKeys.Log.writeBufferSize(properties).getSizeInt();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    if (storageDir != null) {
+      FileUtils.deleteFully(storageDir.getParentFile());
+    }
+  }
+
+  private LogEntryProto[] readLog(File file, long startIndex, long endIndex,
+      boolean isOpen) throws IOException {
+    List<LogEntryProto> list = new ArrayList<>();
+    try (LogInputStream in =
+             new LogInputStream(file, startIndex, endIndex, isOpen)) {
+      LogEntryProto entry;
+      while ((entry = in.nextEntry()) != null) {
+        list.add(entry);
+      }
+    }
+    return list.toArray(new LogEntryProto[list.size()]);
+  }
+
+  private long writeMessages(LogEntryProto[] entries, LogOutputStream out)
+      throws IOException {
+    long size = 0;
+    for (int i = 0; i < entries.length; i++) {
+      SimpleOperation m = new SimpleOperation("m" + i);
+      entries[i] = ServerProtoUtils.toLogEntryProto(m.getLogEntryContent(), 0, 
i);
+      final int s = entries[i].getSerializedSize();
+      size += CodedOutputStream.computeUInt32SizeNoTag(s) + s + 4;
+      out.write(entries[i]);
+    }
+    return size;
+  }
+
+  /**
+   * Test basic functionality: write several log entries, then read
+   */
+  @Test
+  public void testReadWriteLog() throws IOException {
+    final RaftStorage storage = new RaftStorage(storageDir, 
StartupOption.REGULAR);
+    File openSegment = storage.getStorageDir().getOpenLogFile(0);
+    long size = SegmentedRaftLogFormat.getHeaderLength();
+
+    final LogEntryProto[] entries = new LogEntryProto[100];
+    try (LogOutputStream out =
+             new LogOutputStream(openSegment, false, segmentMaxSize,
+                 preallocatedSize, bufferSize)) {
+      size += writeMessages(entries, out);
+    } finally {
+      storage.close();
+    }
+
+    Assert.assertEquals(size, openSegment.length());
+
+    LogEntryProto[] readEntries = readLog(openSegment, 0,
+        RaftServerConstants.INVALID_LOG_INDEX, true);
+    Assert.assertArrayEquals(entries, readEntries);
+  }
+
+  @Test
+  public void testAppendLog() throws IOException {
+    final RaftStorage storage = new RaftStorage(storageDir, 
StartupOption.REGULAR);
+    File openSegment = storage.getStorageDir().getOpenLogFile(0);
+    LogEntryProto[] entries = new LogEntryProto[200];
+    try (LogOutputStream out =
+             new LogOutputStream(openSegment, false, segmentMaxSize,
+                 preallocatedSize, bufferSize)) {
+      for (int i = 0; i < 100; i++) {
+        SimpleOperation m = new SimpleOperation("m" + i);
+        entries[i] = ServerProtoUtils.toLogEntryProto(m.getLogEntryContent(), 
0, i);
+        out.write(entries[i]);
+      }
+    }
+
+    try (LogOutputStream out =
+             new LogOutputStream(openSegment, true, segmentMaxSize,
+                 preallocatedSize, bufferSize)) {
+      for (int i = 100; i < 200; i++) {
+        SimpleOperation m = new SimpleOperation("m" + i);
+        entries[i] = ServerProtoUtils.toLogEntryProto(m.getLogEntryContent(), 
0, i);
+        out.write(entries[i]);
+      }
+    }
+
+    LogEntryProto[] readEntries = readLog(openSegment, 0,
+        RaftServerConstants.INVALID_LOG_INDEX, true);
+    Assert.assertArrayEquals(entries, readEntries);
+
+    storage.close();
+  }
+
+  /**
+   * Simulate the scenario that the peer is shutdown without truncating
+   * log segment file padding. Make sure the reader can correctly handle this.
+   */
+  @Test
+  public void testReadWithPadding() throws IOException {
+    final RaftStorage storage = new RaftStorage(storageDir, 
StartupOption.REGULAR);
+    File openSegment = storage.getStorageDir().getOpenLogFile(0);
+    long size = SegmentedRaftLogFormat.getHeaderLength();
+
+    LogEntryProto[] entries = new LogEntryProto[100];
+    LogOutputStream out = new LogOutputStream(openSegment, false,
+        segmentMaxSize, preallocatedSize, bufferSize);
+    size += writeMessages(entries, out);
+    out.flush();
+
+    // make sure the file contains padding
+    Assert.assertEquals(
+        RaftServerConfigKeys.Log.PREALLOCATED_SIZE_DEFAULT.getSize(),
+        openSegment.length());
+
+    // check if the reader can correctly read the log file
+    LogEntryProto[] readEntries = readLog(openSegment, 0,
+        RaftServerConstants.INVALID_LOG_INDEX, true);
+    Assert.assertArrayEquals(entries, readEntries);
+
+    out.close();
+    Assert.assertEquals(size, openSegment.length());
+  }
+
+  /**
+   * corrupt the padding by inserting non-zero bytes. Make sure the reader
+   * throws exception.
+   */
+  @Test
+  public void testReadWithCorruptPadding() throws IOException {
+    final RaftStorage storage = new RaftStorage(storageDir, 
StartupOption.REGULAR);
+    File openSegment = storage.getStorageDir().getOpenLogFile(0);
+
+    LogEntryProto[] entries = new LogEntryProto[10];
+    LogOutputStream out = new LogOutputStream(openSegment, false,
+        16 * 1024 * 1024, 4 * 1024 * 1024, bufferSize);
+    for (int i = 0; i < 10; i++) {
+      SimpleOperation m = new SimpleOperation("m" + i);
+      entries[i] = ServerProtoUtils.toLogEntryProto(m.getLogEntryContent(), 0, 
i);
+      out.write(entries[i]);
+    }
+    out.flush();
+
+    // make sure the file contains padding
+    Assert.assertEquals(4 * 1024 * 1024, openSegment.length());
+
+    try (FileOutputStream fout = new FileOutputStream(openSegment, true)) {
+      ByteBuffer byteBuffer = ByteBuffer.wrap(new byte[]{-1, 1});
+      fout.getChannel()
+          .write(byteBuffer, 16 * 1024 * 1024 - 10);
+    }
+
+    List<LogEntryProto> list = new ArrayList<>();
+    try (LogInputStream in = new LogInputStream(openSegment, 0,
+        RaftServerConstants.INVALID_LOG_INDEX, true)) {
+      LogEntryProto entry;
+      while ((entry = in.nextEntry()) != null) {
+        list.add(entry);
+      }
+      Assert.fail("should fail since we corrupt the padding");
+    } catch (IOException e) {
+      boolean findVerifyTerminator = false;
+      for (StackTraceElement s : e.getStackTrace()) {
+        if (s.getMethodName().equals("verifyTerminator")) {
+          findVerifyTerminator = true;
+          break;
+        }
+      }
+      Assert.assertTrue(findVerifyTerminator);
+    }
+    Assert.assertArrayEquals(entries,
+        list.toArray(new LogEntryProto[list.size()]));
+  }
+
+  /**
+   * Test the log reader to make sure it can detect the checksum mismatch.
+   */
+  @Test
+  public void testReadWithEntryCorruption() throws IOException {
+    RaftStorage storage = new RaftStorage(storageDir, StartupOption.REGULAR);
+    File openSegment = storage.getStorageDir().getOpenLogFile(0);
+    try (LogOutputStream out =
+             new LogOutputStream(openSegment, false, segmentMaxSize,
+                 preallocatedSize, bufferSize)) {
+      for (int i = 0; i < 100; i++) {
+        LogEntryProto entry = ServerProtoUtils.toLogEntryProto(
+            new SimpleOperation("m" + i).getLogEntryContent(), 0, i);
+        out.write(entry);
+      }
+    } finally {
+      storage.close();
+    }
+
+    // corrupt the log file
+    try (RandomAccessFile raf = new 
RandomAccessFile(openSegment.getCanonicalFile(),
+        "rw")) {
+      raf.seek(100);
+      int correctValue = raf.read();
+      raf.seek(100);
+      raf.write(correctValue + 1);
+    }
+
+    try {
+      readLog(openSegment, 0, RaftServerConstants.INVALID_LOG_INDEX, true);
+      Assert.fail("The read of corrupted log file should fail");
+    } catch (ChecksumException e) {
+      LOG.info("Caught ChecksumException as expected", e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/5c37675f/ratis-test/src/test/java/org/apache/ratis/server/storage/TestRaftLogSegment.java
----------------------------------------------------------------------
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/server/storage/TestRaftLogSegment.java
 
b/ratis-test/src/test/java/org/apache/ratis/server/storage/TestRaftLogSegment.java
new file mode 100644
index 0000000..270e279
--- /dev/null
+++ 
b/ratis-test/src/test/java/org/apache/ratis/server/storage/TestRaftLogSegment.java
@@ -0,0 +1,372 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.storage;
+
+import org.apache.ratis.BaseTest;
+import org.apache.ratis.RaftTestUtil.SimpleOperation;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.server.impl.RaftServerConstants.StartupOption;
+import org.apache.ratis.server.impl.ServerProtoUtils;
+import org.apache.ratis.server.storage.LogSegment.LogRecordWithEntry;
+import org.apache.ratis.proto.RaftProtos.LogEntryProto;
+import org.apache.ratis.proto.RaftProtos.StateMachineLogEntryProto;
+import org.apache.ratis.thirdparty.com.google.protobuf.CodedOutputStream;
+import org.apache.ratis.util.FileUtils;
+import org.apache.ratis.util.Preconditions;
+import org.apache.ratis.util.SizeInBytes;
+import org.apache.ratis.util.TraditionalBinaryPrefix;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static 
org.apache.ratis.server.impl.RaftServerConstants.INVALID_LOG_INDEX;
+import static org.apache.ratis.server.storage.LogSegment.getEntrySize;
+
+/**
+ * Test basic functionality of {@link LogSegment}
+ */
+public class TestRaftLogSegment extends BaseTest {
+  private File storageDir;
+  private long segmentMaxSize;
+  private long preallocatedSize;
+  private int bufferSize;
+
+  @Before
+  public void setup() throws Exception {
+    RaftProperties properties = new RaftProperties();
+    storageDir = getTestDir();
+    RaftServerConfigKeys.setStorageDirs(properties,  
Collections.singletonList(storageDir));
+    this.segmentMaxSize =
+        RaftServerConfigKeys.Log.segmentSizeMax(properties).getSize();
+    this.preallocatedSize =
+        RaftServerConfigKeys.Log.preallocatedSize(properties).getSize();
+    this.bufferSize =
+        RaftServerConfigKeys.Log.writeBufferSize(properties).getSizeInt();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    if (storageDir != null) {
+      FileUtils.deleteFully(storageDir.getParentFile());
+    }
+  }
+
+  File prepareLog(boolean isOpen, long startIndex, int numEntries, long term, 
boolean isLastEntryPartiallyWritten)
+      throws IOException {
+    if (!isOpen) {
+      Preconditions.assertTrue(!isLastEntryPartiallyWritten, "For closed log, 
the last entry cannot be partially written.");
+    }
+    RaftStorage storage = new RaftStorage(storageDir, StartupOption.REGULAR);
+    final File file = isOpen ?
+        storage.getStorageDir().getOpenLogFile(startIndex) :
+        storage.getStorageDir().getClosedLogFile(startIndex, startIndex + 
numEntries - 1);
+
+    final LogEntryProto[] entries = new LogEntryProto[numEntries];
+    try (LogOutputStream out = new LogOutputStream(file, false,
+        segmentMaxSize, preallocatedSize, bufferSize)) {
+      for (int i = 0; i < entries.length; i++) {
+        SimpleOperation op = new SimpleOperation("m" + i);
+        entries[i] = ServerProtoUtils.toLogEntryProto(op.getLogEntryContent(), 
term, i + startIndex);
+        out.write(entries[i]);
+      }
+    }
+
+    if (isLastEntryPartiallyWritten) {
+      final int entrySize = size(entries[entries.length - 1]);
+      final int truncatedEntrySize = 
ThreadLocalRandom.current().nextInt(entrySize - 1) + 1;
+      // 0 < truncatedEntrySize < entrySize
+      final long fileLength = file.length();
+      final long truncatedFileLength = fileLength - (entrySize - 
truncatedEntrySize);
+      LOG.info("truncate last entry: entry(size={}, truncated={}), 
file(length={}, truncated={})",
+          entrySize, truncatedEntrySize, fileLength, truncatedFileLength);
+      FileUtils.truncateFile(file, truncatedFileLength);
+    }
+
+    storage.close();
+    return file;
+  }
+
+  static int size(LogEntryProto entry) {
+    final int n = entry.getSerializedSize();
+    return CodedOutputStream.computeUInt32SizeNoTag(n) + n + 4;
+  }
+
+  static void checkLogSegment(LogSegment segment, long start, long end,
+      boolean isOpen, long totalSize, long term) throws Exception {
+    Assert.assertEquals(start, segment.getStartIndex());
+    Assert.assertEquals(end, segment.getEndIndex());
+    Assert.assertEquals(isOpen, segment.isOpen());
+    Assert.assertEquals(totalSize, segment.getTotalSize());
+
+    long offset = SegmentedRaftLogFormat.getHeaderLength();
+    for (long i = start; i <= end; i++) {
+      LogSegment.LogRecord record = segment.getLogRecord(i);
+      LogRecordWithEntry lre = segment.getEntryWithoutLoading(i);
+      Assert.assertEquals(i, lre.getRecord().getTermIndex().getIndex());
+      Assert.assertEquals(term, lre.getRecord().getTermIndex().getTerm());
+      Assert.assertEquals(offset, record.getOffset());
+
+      LogEntryProto entry = lre.hasEntry() ?
+          lre.getEntry() : segment.loadCache(lre.getRecord());
+      offset += getEntrySize(entry);
+    }
+  }
+
+  @Test
+  public void testLoadLogSegment() throws Exception {
+    testLoadSegment(true, false);
+  }
+
+  @Test
+  public void testLoadLogSegmentLastEntryPartiallyWritten() throws Exception {
+    testLoadSegment(true, true);
+  }
+
+  @Test
+  public void testLoadCache() throws Exception {
+    testLoadSegment(false, false);
+  }
+
+  @Test
+  public void testLoadCacheLastEntryPartiallyWritten() throws Exception {
+    testLoadSegment(false, true);
+  }
+
+  private void testLoadSegment(boolean loadInitial, boolean 
isLastEntryPartiallyWritten) throws Exception {
+    // load an open segment
+    final File openSegmentFile = prepareLog(true, 0, 100, 0, 
isLastEntryPartiallyWritten);
+    RaftStorage storage = new RaftStorage(storageDir, StartupOption.REGULAR);
+    LogSegment openSegment = LogSegment.loadSegment(storage, openSegmentFile, 
0,
+        INVALID_LOG_INDEX, true, loadInitial, null);
+    final int delta = isLastEntryPartiallyWritten? 1: 0;
+    checkLogSegment(openSegment, 0, 99 - delta, true, 
openSegmentFile.length(), 0);
+    storage.close();
+    // for open segment we currently always keep log entries in the memory
+    Assert.assertEquals(0, openSegment.getLoadingTimes());
+
+    // load a closed segment (1000-1099)
+    final File closedSegmentFile = prepareLog(false, 1000, 100, 1, false);
+    LogSegment closedSegment = LogSegment.loadSegment(storage, 
closedSegmentFile,
+        1000, 1099, false, loadInitial, null);
+    checkLogSegment(closedSegment, 1000, 1099, false,
+        closedSegment.getTotalSize(), 1);
+    Assert.assertEquals(loadInitial ? 0 : 1, closedSegment.getLoadingTimes());
+  }
+
+  @Test
+  public void testAppendEntries() throws Exception {
+    final long start = 1000;
+    LogSegment segment = LogSegment.newOpenSegment(null, start);
+    long size = SegmentedRaftLogFormat.getHeaderLength();
+    final long max = 8 * 1024 * 1024;
+    checkLogSegment(segment, start, start - 1, true, size, 0);
+
+    // append till full
+    long term = 0;
+    int i = 0;
+    List<LogEntryProto> list = new ArrayList<>();
+    while (size < max) {
+      SimpleOperation op = new SimpleOperation("m" + i);
+      LogEntryProto entry = 
ServerProtoUtils.toLogEntryProto(op.getLogEntryContent(), term, i++ + start);
+      size += getEntrySize(entry);
+      list.add(entry);
+    }
+
+    segment.appendToOpenSegment(list.toArray(new LogEntryProto[list.size()]));
+    Assert.assertTrue(segment.getTotalSize() >= max);
+    checkLogSegment(segment, start, i - 1 + start, true, size, term);
+  }
+
+  @Test
+  public void testAppendWithGap() throws Exception {
+    LogSegment segment = LogSegment.newOpenSegment(null, 1000);
+    SimpleOperation op = new SimpleOperation("m");
+    final StateMachineLogEntryProto m = op.getLogEntryContent();
+    try {
+      LogEntryProto entry = ServerProtoUtils.toLogEntryProto(m, 0, 1001);
+      segment.appendToOpenSegment(entry);
+      Assert.fail("should fail since the entry's index needs to be 1000");
+    } catch (IllegalStateException e) {
+      // the exception is expected.
+    }
+
+    LogEntryProto entry = ServerProtoUtils.toLogEntryProto(m, 0, 1000);
+    segment.appendToOpenSegment(entry);
+
+    try {
+      entry = ServerProtoUtils.toLogEntryProto(m, 0, 1002);
+      segment.appendToOpenSegment(entry);
+      Assert.fail("should fail since the entry's index needs to be 1001");
+    } catch (IllegalStateException e) {
+      // the exception is expected.
+    }
+
+    LogEntryProto[] entries = new LogEntryProto[2];
+    for (int i = 0; i < 2; i++) {
+      entries[i] = ServerProtoUtils.toLogEntryProto(m, 0, 1001 + i * 2);
+    }
+    try {
+      segment.appendToOpenSegment(entries);
+      Assert.fail("should fail since there is gap between entries");
+    } catch (IllegalStateException e) {
+      // the exception is expected.
+    }
+  }
+
+  @Test
+  public void testTruncate() throws Exception {
+    final long term = 1;
+    final long start = 1000;
+    LogSegment segment = LogSegment.newOpenSegment(null, start);
+    for (int i = 0; i < 100; i++) {
+      LogEntryProto entry = ServerProtoUtils.toLogEntryProto(
+          new SimpleOperation("m" + i).getLogEntryContent(), term, i + start);
+      segment.appendToOpenSegment(entry);
+    }
+
+    // truncate an open segment (remove 1080~1099)
+    long newSize = segment.getLogRecord(start + 80).getOffset();
+    segment.truncate(start + 80);
+    Assert.assertEquals(80, segment.numOfEntries());
+    checkLogSegment(segment, start, start + 79, false, newSize, term);
+
+    // truncate a closed segment (remove 1050~1079)
+    newSize = segment.getLogRecord(start + 50).getOffset();
+    segment.truncate(start + 50);
+    Assert.assertEquals(50, segment.numOfEntries());
+    checkLogSegment(segment, start, start + 49, false, newSize, term);
+
+    // truncate all the remaining entries
+    segment.truncate(start);
+    Assert.assertEquals(0, segment.numOfEntries());
+    checkLogSegment(segment, start, start - 1, false,
+        SegmentedRaftLogFormat.getHeaderLength(), term);
+  }
+
+  @Test
+  public void testPreallocateSegment() throws Exception {
+    RaftStorage storage = new RaftStorage(storageDir, StartupOption.REGULAR);
+    final File file = storage.getStorageDir().getOpenLogFile(0);
+    final int[] maxSizes = new int[]{1024, 1025, 1024 * 1024 - 1, 1024 * 1024,
+        1024 * 1024 + 1, 2 * 1024 * 1024 - 1, 2 * 1024 * 1024,
+        2 * 1024 * 1024 + 1, 8 * 1024 * 1024};
+    final int[] preallocated = new int[]{512, 1024, 1025, 1024 * 1024,
+        1024 * 1024 + 1, 2 * 1024 * 1024};
+
+    // make sure preallocation is correct with different max/pre-allocated size
+    for (int max : maxSizes) {
+      for (int a : preallocated) {
+        try (LogOutputStream ignored =
+                 new LogOutputStream(file, false, max, a, bufferSize)) {
+          Assert.assertEquals("max=" + max + ", a=" + a, file.length(), 
Math.min(max, a));
+        }
+        try (LogInputStream in =
+                 new LogInputStream(file, 0, INVALID_LOG_INDEX, true)) {
+          LogEntryProto entry = in.nextEntry();
+          Assert.assertNull(entry);
+        }
+      }
+    }
+
+    // test the scenario where an entry's size is larger than the max size
+    final byte[] content = new byte[1024 * 2];
+    Arrays.fill(content, (byte) 1);
+    final long size;
+    try (LogOutputStream out = new LogOutputStream(file, false,
+        1024, 1024, bufferSize)) {
+      SimpleOperation op = new SimpleOperation(new String(content));
+      LogEntryProto entry = 
ServerProtoUtils.toLogEntryProto(op.getLogEntryContent(), 0, 0);
+      size = LogSegment.getEntrySize(entry);
+      out.write(entry);
+    }
+    Assert.assertEquals(file.length(),
+        size + SegmentedRaftLogFormat.getHeaderLength());
+    try (LogInputStream in = new LogInputStream(file, 0,
+        INVALID_LOG_INDEX, true)) {
+      LogEntryProto entry = in.nextEntry();
+      Assert.assertArrayEquals(content,
+          entry.getStateMachineLogEntry().getLogData().toByteArray());
+      Assert.assertNull(in.nextEntry());
+    }
+  }
+
+  /**
+   * Keep appending and check if pre-allocation is correct
+   */
+  @Test
+  public void testPreallocationAndAppend() throws Exception {
+    final SizeInBytes max = SizeInBytes.valueOf(2, 
TraditionalBinaryPrefix.MEGA);
+    RaftStorage storage = new RaftStorage(storageDir, StartupOption.REGULAR);
+    final File file = storage.getStorageDir().getOpenLogFile(0);
+
+    final byte[] content = new byte[1024];
+    Arrays.fill(content, (byte) 1);
+    SimpleOperation op = new SimpleOperation(new String(content));
+    LogEntryProto entry = 
ServerProtoUtils.toLogEntryProto(op.getLogEntryContent(), 0, 0);
+    final long entrySize = LogSegment.getEntrySize(entry);
+
+    long totalSize = SegmentedRaftLogFormat.getHeaderLength();
+    long preallocated = 16 * 1024;
+    try (LogOutputStream out = new LogOutputStream(file, false,
+        max.getSize(), 16 * 1024, 10 * 1024)) {
+      Assert.assertEquals(preallocated, file.length());
+      while (totalSize + entrySize < max.getSize()) {
+        totalSize += entrySize;
+        out.write(entry);
+        if (totalSize > preallocated) {
+          Assert.assertEquals("totalSize==" + totalSize,
+              preallocated + 16 * 1024, file.length());
+          preallocated += 16 * 1024;
+        }
+      }
+    }
+
+    Assert.assertEquals(totalSize, file.length());
+  }
+
+  @Test
+  public void testZeroSizeInProgressFile() throws Exception {
+    final RaftStorage storage = new RaftStorage(storageDir, 
StartupOption.REGULAR);
+    final File file = storage.getStorageDir().getOpenLogFile(0);
+    storage.close();
+
+    // create zero size in-progress file
+    LOG.info("file: " + file);
+    Assert.assertTrue(file.createNewFile());
+    final Path path = file.toPath();
+    Assert.assertTrue(Files.exists(path));
+    Assert.assertEquals(0, Files.size(path));
+
+    // getLogSegmentFiles should remove it.
+    final List<RaftStorageDirectory.LogPathAndIndex> logs = 
storage.getStorageDir().getLogSegmentFiles();
+    Assert.assertEquals(0, logs.size());
+    Assert.assertFalse(Files.exists(path));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/5c37675f/ratis-test/src/test/java/org/apache/ratis/server/storage/TestRaftStorage.java
----------------------------------------------------------------------
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/server/storage/TestRaftStorage.java 
b/ratis-test/src/test/java/org/apache/ratis/server/storage/TestRaftStorage.java
new file mode 100644
index 0000000..4a26f8c
--- /dev/null
+++ 
b/ratis-test/src/test/java/org/apache/ratis/server/storage/TestRaftStorage.java
@@ -0,0 +1,206 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.storage;
+
+import org.apache.ratis.BaseTest;
+import org.apache.ratis.server.impl.RaftServerConstants.StartupOption;
+import org.apache.ratis.server.protocol.TermIndex;
+import org.apache.ratis.server.storage.RaftStorageDirectory.StorageState;
+import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage;
+import org.apache.ratis.util.FileUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.internal.util.reflection.Whitebox;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * Test RaftStorage and RaftStorageDirectory
+ */
+public class TestRaftStorage extends BaseTest {
+  private File storageDir;
+
+  @Before
+  public void setup() throws Exception {
+    storageDir = getTestDir();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    if (storageDir != null) {
+      FileUtils.deleteFully(storageDir.getParentFile());
+    }
+  }
+
+  @Test
+  public void testNotExistent() throws IOException {
+    FileUtils.deleteFully(storageDir);
+
+    // we will format the empty directory
+    RaftStorage storage = new RaftStorage(storageDir, StartupOption.REGULAR);
+    Assert.assertEquals(StorageState.NORMAL, storage.getState());
+
+    try {
+      new RaftStorage(storageDir, StartupOption.FORMAT).close();
+      Assert.fail("the format should fail since the storage is still locked");
+    } catch (IOException e) {
+      Assert.assertTrue(e.getMessage().contains("directory is already 
locked"));
+    }
+
+    storage.close();
+    FileUtils.deleteFully(storageDir);
+    Assert.assertTrue(storageDir.createNewFile());
+    try {
+      new RaftStorage(storageDir, StartupOption.REGULAR);
+      Assert.fail();
+    } catch (IOException e) {
+      Assert.assertTrue(
+          e.getMessage().contains(StorageState.NON_EXISTENT.name()));
+    }
+  }
+
+  /**
+   * make sure the RaftStorage format works
+   */
+  @Test
+  public void testStorage() throws Exception {
+    RaftStorageDirectory sd = new RaftStorageDirectory(storageDir);
+    try {
+      StorageState state = sd.analyzeStorage(true);
+      Assert.assertEquals(StorageState.NOT_FORMATTED, state);
+      Assert.assertTrue(sd.isCurrentEmpty());
+    } finally {
+      sd.unlock();
+    }
+
+    RaftStorage storage = new RaftStorage(storageDir, StartupOption.REGULAR);
+    Assert.assertEquals(StorageState.NORMAL, storage.getState());
+    storage.close();
+
+    Assert.assertEquals(StorageState.NORMAL, sd.analyzeStorage(false));
+    File m = sd.getMetaFile();
+    Assert.assertTrue(m.exists());
+    MetaFile metaFile = new MetaFile(m);
+    Assert.assertEquals(MetaFile.DEFAULT_TERM, metaFile.getTerm());
+    Assert.assertEquals(MetaFile.EMPTY_VOTEFOR, metaFile.getVotedFor());
+
+    metaFile.set(123, "peer1");
+    metaFile.readFile();
+    Assert.assertEquals(123, metaFile.getTerm());
+    Assert.assertEquals("peer1", metaFile.getVotedFor());
+
+    MetaFile metaFile2 = new MetaFile(m);
+    Assert.assertFalse((Boolean) Whitebox.getInternalState(metaFile2, 
"loaded"));
+    Assert.assertEquals(123, metaFile.getTerm());
+    Assert.assertEquals("peer1", metaFile.getVotedFor());
+
+    // test format
+    storage = new RaftStorage(storageDir, StartupOption.FORMAT);
+    Assert.assertEquals(StorageState.NORMAL, storage.getState());
+    metaFile = new MetaFile(sd.getMetaFile());
+    Assert.assertEquals(MetaFile.DEFAULT_TERM, metaFile.getTerm());
+    Assert.assertEquals(MetaFile.EMPTY_VOTEFOR, metaFile.getVotedFor());
+    storage.close();
+  }
+
+  @Test
+  public void testMetaFile() throws Exception {
+    RaftStorage storage = new RaftStorage(storageDir, StartupOption.FORMAT);
+    File m = storage.getStorageDir().getMetaFile();
+    Assert.assertTrue(m.exists());
+    MetaFile metaFile = new MetaFile(m);
+    Assert.assertEquals(MetaFile.DEFAULT_TERM, metaFile.getTerm());
+    Assert.assertEquals(MetaFile.EMPTY_VOTEFOR, metaFile.getVotedFor());
+
+    metaFile.set(123, "peer1");
+    metaFile.readFile();
+    Assert.assertEquals(123, metaFile.getTerm());
+    Assert.assertEquals("peer1", metaFile.getVotedFor());
+
+    MetaFile metaFile2 = new MetaFile(m);
+    Assert.assertFalse((Boolean) Whitebox.getInternalState(metaFile2, 
"loaded"));
+    Assert.assertEquals(123, metaFile.getTerm());
+    Assert.assertEquals("peer1", metaFile.getVotedFor());
+
+    storage.close();
+  }
+
+  /**
+   * check if RaftStorage deletes tmp metafile when startup
+   */
+  @Test
+  public void testCleanMetaTmpFile() throws Exception {
+    RaftStorage storage = new RaftStorage(storageDir, StartupOption.REGULAR);
+    Assert.assertEquals(StorageState.NORMAL, storage.getState());
+    storage.close();
+
+    RaftStorageDirectory sd = new RaftStorageDirectory(storageDir);
+    File metaFile = sd.getMetaFile();
+    FileUtils.move(metaFile, sd.getMetaTmpFile());
+
+    Assert.assertEquals(StorageState.NOT_FORMATTED, sd.analyzeStorage(false));
+
+    try {
+      new RaftStorage(storageDir, StartupOption.REGULAR);
+      Assert.fail("should throw IOException since storage dir is not 
formatted");
+    } catch (IOException e) {
+      Assert.assertTrue(
+          e.getMessage().contains(StorageState.NOT_FORMATTED.name()));
+    }
+
+    // let the storage dir contain both raft-meta and raft-meta.tmp
+    new RaftStorage(storageDir, StartupOption.FORMAT).close();
+    Assert.assertTrue(sd.getMetaFile().exists());
+    Assert.assertTrue(sd.getMetaTmpFile().createNewFile());
+    Assert.assertTrue(sd.getMetaTmpFile().exists());
+    try {
+      storage = new RaftStorage(storageDir, StartupOption.REGULAR);
+      Assert.assertEquals(StorageState.NORMAL, storage.getState());
+      Assert.assertFalse(sd.getMetaTmpFile().exists());
+      Assert.assertTrue(sd.getMetaFile().exists());
+    } finally {
+      storage.close();
+    }
+  }
+
+  @Test
+  public void testSnapshotFileName() throws Exception {
+    final long term = ThreadLocalRandom.current().nextLong(Long.MAX_VALUE);
+    final long index = ThreadLocalRandom.current().nextLong(Long.MAX_VALUE);
+    final String name = SimpleStateMachineStorage.getSnapshotFileName(term, 
index);
+    System.out.println("name = " + name);
+    final File file = new File(storageDir, name);
+    final TermIndex ti = 
SimpleStateMachineStorage.getTermIndexFromSnapshotFile(file);
+    System.out.println("file = " + file);
+    Assert.assertEquals(term, ti.getTerm());
+    Assert.assertEquals(index, ti.getIndex());
+    System.out.println("ti = " + ti);
+
+    final File foo = new File(storageDir, "foo");
+    try {
+      SimpleStateMachineStorage.getTermIndexFromSnapshotFile(foo);
+      Assert.fail();
+    } catch(IllegalArgumentException iae) {
+      System.out.println("Good " + iae);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/5c37675f/ratis-test/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java
----------------------------------------------------------------------
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java
 
b/ratis-test/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java
new file mode 100644
index 0000000..bcbfa73
--- /dev/null
+++ 
b/ratis-test/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java
@@ -0,0 +1,524 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.storage;
+
+import org.apache.log4j.Level;
+import org.apache.ratis.BaseTest;
+import org.apache.ratis.RaftTestUtil.SimpleOperation;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.protocol.TimeoutIOException;
+import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.server.impl.RaftServerConstants;
+import org.apache.ratis.server.impl.RetryCacheTestUtil;
+import org.apache.ratis.server.impl.RetryCache;
+import org.apache.ratis.server.impl.RaftServerImpl;
+import org.apache.ratis.server.impl.ServerProtoUtils;
+import org.apache.ratis.server.protocol.TermIndex;
+import org.apache.ratis.proto.RaftProtos.LogEntryProto;
+import org.apache.ratis.statemachine.SimpleStateMachine4Testing;
+import org.apache.ratis.statemachine.StateMachine;
+import org.apache.ratis.statemachine.impl.BaseStateMachine;
+import org.apache.ratis.util.ExitUtils;
+import org.apache.ratis.util.FileUtils;
+import org.apache.ratis.util.JavaUtils;
+import org.apache.ratis.util.LogUtils;
+import org.apache.ratis.util.SizeInBytes;
+import org.apache.ratis.util.TimeDuration;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doCallRealMethod;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TestSegmentedRaftLog extends BaseTest {
+  static {
+    LogUtils.setLogLevel(RaftLogWorker.LOG, Level.DEBUG);
+  }
+
+  private static final RaftPeerId peerId = RaftPeerId.valueOf("s0");
+
+  static class SegmentRange {
+    final long start;
+    final long end;
+    final long term;
+    final boolean isOpen;
+
+    SegmentRange(long s, long e, long term, boolean isOpen) {
+      this.start = s;
+      this.end = e;
+      this.term = term;
+      this.isOpen = isOpen;
+    }
+  }
+
+  private File storageDir;
+  private RaftProperties properties;
+  private RaftStorage storage;
+  private long segmentMaxSize;
+  private long preallocatedSize;
+  private int bufferSize;
+
+  @Before
+  public void setup() throws Exception {
+    storageDir = getTestDir();
+    properties = new RaftProperties();
+    RaftServerConfigKeys.setStorageDirs(properties,  
Collections.singletonList(storageDir));
+    storage = new RaftStorage(storageDir, 
RaftServerConstants.StartupOption.REGULAR);
+    this.segmentMaxSize =
+        RaftServerConfigKeys.Log.segmentSizeMax(properties).getSize();
+    this.preallocatedSize =
+        RaftServerConfigKeys.Log.preallocatedSize(properties).getSize();
+    this.bufferSize =
+        RaftServerConfigKeys.Log.writeBufferSize(properties).getSizeInt();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    if (storageDir != null) {
+      FileUtils.deleteFully(storageDir.getParentFile());
+    }
+  }
+
+  private LogEntryProto[] prepareLog(List<SegmentRange> list) throws 
IOException {
+    List<LogEntryProto> entryList = new ArrayList<>();
+    for (SegmentRange range : list) {
+      File file = range.isOpen ?
+          storage.getStorageDir().getOpenLogFile(range.start) :
+          storage.getStorageDir().getClosedLogFile(range.start, range.end);
+
+      final int size = (int) (range.end - range.start + 1);
+      LogEntryProto[] entries = new LogEntryProto[size];
+      try (LogOutputStream out = new LogOutputStream(file, false,
+          segmentMaxSize, preallocatedSize, bufferSize)) {
+        for (int i = 0; i < size; i++) {
+          SimpleOperation m = new SimpleOperation("m" + (i + range.start));
+          entries[i] = 
ServerProtoUtils.toLogEntryProto(m.getLogEntryContent(), range.term, i + 
range.start);
+          out.write(entries[i]);
+        }
+      }
+      Collections.addAll(entryList, entries);
+    }
+    return entryList.toArray(new LogEntryProto[entryList.size()]);
+  }
+
+  static List<SegmentRange> prepareRanges(int startTerm, int endTerm, int 
segmentSize,
+      long startIndex) {
+    List<SegmentRange> list = new ArrayList<>(endTerm - startTerm);
+    for (int i = startTerm; i < endTerm; i++) {
+      list.add(new SegmentRange(startIndex, startIndex + segmentSize - 1, i,
+          i == endTerm - 1));
+      startIndex += segmentSize;
+    }
+    return list;
+  }
+
+  private LogEntryProto getLastEntry(SegmentedRaftLog raftLog)
+      throws IOException {
+    return raftLog.get(raftLog.getLastEntryTermIndex().getIndex());
+  }
+
+  @Test
+  public void testLoadLogSegments() throws Exception {
+    // first generate log files
+    List<SegmentRange> ranges = prepareRanges(0, 5, 100, 0);
+    LogEntryProto[] entries = prepareLog(ranges);
+
+    // create RaftLog object and load log file
+    try (SegmentedRaftLog raftLog =
+             new SegmentedRaftLog(peerId, null, storage, -1, properties)) {
+      raftLog.open(RaftServerConstants.INVALID_LOG_INDEX, null);
+      // check if log entries are loaded correctly
+      for (LogEntryProto e : entries) {
+        LogEntryProto entry = raftLog.get(e.getIndex());
+        Assert.assertEquals(e, entry);
+      }
+
+      TermIndex[] termIndices = raftLog.getEntries(0, 500);
+      LogEntryProto[] entriesFromLog = Arrays.stream(termIndices)
+          .map(ti -> {
+            try {
+              return raftLog.get(ti.getIndex());
+            } catch (IOException e) {
+              throw new RuntimeException(e);
+            }
+          })
+          .toArray(LogEntryProto[]::new);
+      Assert.assertArrayEquals(entries, entriesFromLog);
+      Assert.assertEquals(entries[entries.length - 1], getLastEntry(raftLog));
+    }
+  }
+
+  static List<LogEntryProto> prepareLogEntries(List<SegmentRange> slist,
+      Supplier<String> stringSupplier) {
+    List<LogEntryProto> eList = new ArrayList<>();
+    for (SegmentRange range : slist) {
+      prepareLogEntries(range, stringSupplier, false, eList);
+    }
+    return eList;
+  }
+
+  static List<LogEntryProto> prepareLogEntries(SegmentRange range,
+      Supplier<String> stringSupplier, boolean hasStataMachineData, 
List<LogEntryProto> eList) {
+    for(long index = range.start; index <= range.end; index++) {
+      eList.add(prepareLogEntry(range.term, index, stringSupplier, 
hasStataMachineData));
+    }
+    return eList;
+  }
+
+  static LogEntryProto prepareLogEntry(long term, long index, Supplier<String> 
stringSupplier, boolean hasStataMachineData) {
+    final SimpleOperation m = stringSupplier == null?
+        new SimpleOperation("m" + index, hasStataMachineData):
+        new SimpleOperation(stringSupplier.get(), hasStataMachineData);
+    return ServerProtoUtils.toLogEntryProto(m.getLogEntryContent(), term, 
index);
+  }
+
+  /**
+   * Append entry one by one and check if log state is correct.
+   */
+  @Test
+  public void testAppendEntry() throws Exception {
+    List<SegmentRange> ranges = prepareRanges(0, 5, 200, 0);
+    List<LogEntryProto> entries = prepareLogEntries(ranges, null);
+
+    try (SegmentedRaftLog raftLog =
+             new SegmentedRaftLog(peerId, null, storage, -1, properties)) {
+      raftLog.open(RaftServerConstants.INVALID_LOG_INDEX, null);
+      // append entries to the raftlog
+      
entries.stream().map(raftLog::appendEntry).forEach(CompletableFuture::join);
+    }
+
+    try (SegmentedRaftLog raftLog =
+             new SegmentedRaftLog(peerId, null, storage, -1, properties)) {
+      raftLog.open(RaftServerConstants.INVALID_LOG_INDEX, null);
+      // check if the raft log is correct
+      checkEntries(raftLog, entries, 0, entries.size());
+    }
+
+    try (SegmentedRaftLog raftLog =
+        new SegmentedRaftLog(peerId, null, storage, -1, properties)) {
+      raftLog.open(RaftServerConstants.INVALID_LOG_INDEX, null);
+      TermIndex lastTermIndex  = raftLog.getLastEntryTermIndex();
+      IllegalStateException ex = null;
+      try {
+        // append entry fails if append entry term is lower than log's last 
entry term
+        raftLog.appendEntry(LogEntryProto.newBuilder(entries.get(0))
+            .setTerm(lastTermIndex.getTerm() - 1)
+            .setIndex(lastTermIndex.getIndex() + 1).build());
+      } catch (IllegalStateException e) {
+        ex = e;
+      }
+      Assert.assertTrue(ex.getMessage().contains("term less than RaftLog's 
last term"));
+      try {
+        // append entry fails if difference between append entry index and 
log's last entry index is greater than 1
+        raftLog.appendEntry(LogEntryProto.newBuilder(entries.get(0))
+            .setTerm(lastTermIndex.getTerm())
+            .setIndex(lastTermIndex.getIndex() + 2).build());
+      } catch (IllegalStateException e) {
+        ex = e;
+      }
+      Assert.assertTrue(ex.getMessage().contains("and RaftLog's last index " + 
lastTermIndex.getIndex() + " greater than 1"));
+    }
+  }
+
+  /**
+   * Keep appending entries, make sure the rolling is correct.
+   */
+  @Test
+  public void testAppendAndRoll() throws Exception {
+    RaftServerConfigKeys.Log.setPreallocatedSize(properties, 
SizeInBytes.valueOf("16KB"));
+    RaftServerConfigKeys.Log.setSegmentSizeMax(properties, 
SizeInBytes.valueOf("128KB"));
+
+    List<SegmentRange> ranges = prepareRanges(0, 1, 1024, 0);
+    final byte[] content = new byte[1024];
+    List<LogEntryProto> entries = prepareLogEntries(ranges,
+        () -> new String(content));
+
+    try (SegmentedRaftLog raftLog =
+             new SegmentedRaftLog(peerId, null, storage, -1, properties)) {
+      raftLog.open(RaftServerConstants.INVALID_LOG_INDEX, null);
+      // append entries to the raftlog
+      
entries.stream().map(raftLog::appendEntry).forEach(CompletableFuture::join);
+    }
+
+    try (SegmentedRaftLog raftLog =
+             new SegmentedRaftLog(peerId, null, storage, -1, properties)) {
+      raftLog.open(RaftServerConstants.INVALID_LOG_INDEX, null);
+      // check if the raft log is correct
+      checkEntries(raftLog, entries, 0, entries.size());
+      Assert.assertEquals(9, raftLog.getRaftLogCache().getNumOfSegments());
+    }
+  }
+
+  @Test
+  public void testTruncate() throws Exception {
+    // prepare the log for truncation
+    List<SegmentRange> ranges = prepareRanges(0, 5, 200, 0);
+    List<LogEntryProto> entries = prepareLogEntries(ranges, null);
+
+    try (SegmentedRaftLog raftLog =
+             new SegmentedRaftLog(peerId, null, storage, -1, properties)) {
+      raftLog.open(RaftServerConstants.INVALID_LOG_INDEX, null);
+      // append entries to the raftlog
+      
entries.stream().map(raftLog::appendEntry).forEach(CompletableFuture::join);
+    }
+
+    for (long fromIndex = 900; fromIndex >= 0; fromIndex -= 150) {
+      testTruncate(entries, fromIndex);
+    }
+  }
+
+  private void testTruncate(List<LogEntryProto> entries, long fromIndex)
+      throws Exception {
+    try (SegmentedRaftLog raftLog =
+             new SegmentedRaftLog(peerId, null, storage, -1, properties)) {
+      raftLog.open(RaftServerConstants.INVALID_LOG_INDEX, null);
+      // truncate the log
+      raftLog.truncate(fromIndex).join();
+
+
+      checkEntries(raftLog, entries, 0, (int) fromIndex);
+    }
+
+    try (SegmentedRaftLog raftLog =
+             new SegmentedRaftLog(peerId, null, storage, -1, properties)) {
+      raftLog.open(RaftServerConstants.INVALID_LOG_INDEX, null);
+      // check if the raft log is correct
+      if (fromIndex > 0) {
+        Assert.assertEquals(entries.get((int) (fromIndex - 1)),
+            getLastEntry(raftLog));
+      } else {
+        Assert.assertNull(raftLog.getLastEntryTermIndex());
+      }
+      checkEntries(raftLog, entries, 0, (int) fromIndex);
+    }
+  }
+
+  private void checkEntries(RaftLog raftLog, List<LogEntryProto> expected,
+      int offset, int size) throws IOException {
+    if (size > 0) {
+      for (int i = offset; i < size + offset; i++) {
+        LogEntryProto entry = raftLog.get(expected.get(i).getIndex());
+        Assert.assertEquals(expected.get(i), entry);
+      }
+      TermIndex[] termIndices = raftLog.getEntries(
+          expected.get(offset).getIndex(),
+          expected.get(offset + size - 1).getIndex() + 1);
+      LogEntryProto[] entriesFromLog = Arrays.stream(termIndices)
+          .map(ti -> {
+            try {
+              return raftLog.get(ti.getIndex());
+            } catch (IOException e) {
+              throw new RuntimeException(e);
+            }
+          })
+          .toArray(LogEntryProto[]::new);
+      LogEntryProto[] expectedArray = expected.subList(offset, offset + size)
+          .stream().toArray(LogEntryProto[]::new);
+      Assert.assertArrayEquals(expectedArray, entriesFromLog);
+    }
+  }
+
+  private void checkFailedEntries(List<LogEntryProto> entries, long fromIndex, 
RetryCache retryCache) {
+    for (int i = 0; i < entries.size(); i++) {
+      if (i < fromIndex) {
+        RetryCacheTestUtil.assertFailure(retryCache, entries.get(i), false);
+      } else {
+        RetryCacheTestUtil.assertFailure(retryCache, entries.get(i), true);
+      }
+    }
+  }
+
+  /**
+   * Test append with inconsistent entries
+   */
+  @Test
+  public void testAppendEntriesWithInconsistency() throws Exception {
+    // prepare the log for truncation
+    List<SegmentRange> ranges = prepareRanges(0, 5, 200, 0);
+    List<LogEntryProto> entries = prepareLogEntries(ranges, null);
+
+    RaftServerImpl server = mock(RaftServerImpl.class);
+    RetryCache retryCache = RetryCacheTestUtil.createRetryCache();
+    when(server.getRetryCache()).thenReturn(retryCache);
+    
doCallRealMethod().when(server).failClientRequest(any(LogEntryProto.class));
+    try (SegmentedRaftLog raftLog =
+             new SegmentedRaftLog(peerId, server, storage, -1, properties)) {
+      raftLog.open(RaftServerConstants.INVALID_LOG_INDEX, null);
+      entries.stream().forEach(entry -> 
RetryCacheTestUtil.createEntry(retryCache, entry));
+      // append entries to the raftlog
+      
entries.stream().map(raftLog::appendEntry).forEach(CompletableFuture::join);
+    }
+
+    // append entries whose first 100 entries are the same with existing log,
+    // and the next 100 are with different term
+    SegmentRange r1 = new SegmentRange(550, 599, 2, false);
+    SegmentRange r2 = new SegmentRange(600, 649, 3, false);
+    SegmentRange r3 = new SegmentRange(650, 749, 10, false);
+    List<LogEntryProto> newEntries = prepareLogEntries(
+        Arrays.asList(r1, r2, r3), null);
+
+    try (SegmentedRaftLog raftLog =
+             new SegmentedRaftLog(peerId, server, storage, -1, properties)) {
+      raftLog.open(RaftServerConstants.INVALID_LOG_INDEX, null);
+      raftLog.append(newEntries.toArray(new 
LogEntryProto[newEntries.size()])).forEach(CompletableFuture::join);
+
+      checkFailedEntries(entries, 650, retryCache);
+      checkEntries(raftLog, entries, 0, 650);
+      checkEntries(raftLog, newEntries, 100, 100);
+      Assert.assertEquals(newEntries.get(newEntries.size() - 1),
+          getLastEntry(raftLog));
+      Assert.assertEquals(newEntries.get(newEntries.size() - 1).getIndex(),
+          raftLog.getLatestFlushedIndex());
+    }
+
+    // load the raftlog again and check
+    try (SegmentedRaftLog raftLog =
+             new SegmentedRaftLog(peerId, server, storage, -1, properties)) {
+      raftLog.open(RaftServerConstants.INVALID_LOG_INDEX, null);
+      checkEntries(raftLog, entries, 0, 650);
+      checkEntries(raftLog, newEntries, 100, 100);
+      Assert.assertEquals(newEntries.get(newEntries.size() - 1),
+          getLastEntry(raftLog));
+      Assert.assertEquals(newEntries.get(newEntries.size() - 1).getIndex(),
+          raftLog.getLatestFlushedIndex());
+
+      RaftLogCache cache = raftLog.getRaftLogCache();
+      Assert.assertEquals(5, cache.getNumOfSegments());
+    }
+  }
+
+  @Test
+  public void testSegmentedRaftLogStateMachineData() throws Exception {
+    final SegmentRange range = new SegmentRange(0, 10, 1, true);
+    final List<LogEntryProto> entries = prepareLogEntries(range, null, true, 
new ArrayList<>());
+
+    final SimpleStateMachine4Testing sm = new SimpleStateMachine4Testing();
+    try (SegmentedRaftLog raftLog = new SegmentedRaftLog(peerId, null, sm, 
null, storage, -1, properties)) {
+      raftLog.open(RaftServerConstants.INVALID_LOG_INDEX, null);
+
+      int next = 0;
+      long flush = -1;
+      assertIndices(raftLog, flush, next);
+      raftLog.appendEntry(entries.get(next++));
+      assertIndices(raftLog, flush, next);
+      raftLog.appendEntry(entries.get(next++));
+      assertIndices(raftLog, flush, next);
+      raftLog.appendEntry(entries.get(next++));
+      assertIndicesMultipleAttempts(raftLog, flush += 3, next);
+
+      sm.blockFlushStateMachineData();
+      raftLog.appendEntry(entries.get(next++));
+      {
+        sm.blockWriteStateMachineData();
+        final Thread t = startAppendEntryThread(raftLog, entries.get(next++));
+        TimeUnit.SECONDS.sleep(1);
+        Assert.assertTrue(t.isAlive());
+        sm.unblockWriteStateMachineData();
+        t.join();
+      }
+      assertIndices(raftLog, flush, next);
+      TimeUnit.SECONDS.sleep(1);
+      assertIndices(raftLog, flush, next);
+      sm.unblockFlushStateMachineData();
+      assertIndicesMultipleAttempts(raftLog, flush + 2, next);
+    }
+  }
+
+  @Test
+  public void testSegmentedRaftLogStateMachineDataTimeoutIOException() throws 
Exception {
+    RaftServerConfigKeys.Log.StateMachineData.setSync(properties, true);
+    final TimeDuration syncTimeout = TimeDuration.valueOf(100, 
TimeUnit.MILLISECONDS);
+    RaftServerConfigKeys.Log.StateMachineData.setSyncTimeout(properties, 
syncTimeout);
+    final int numRetries = 2;
+    RaftServerConfigKeys.Log.StateMachineData.setSyncTimeoutRetry(properties, 
numRetries);
+    ExitUtils.disableSystemExit();
+
+    final LogEntryProto entry = prepareLogEntry(0, 0, null, true);
+    final StateMachine sm = new BaseStateMachine() {
+      @Override
+      public CompletableFuture<?> writeStateMachineData(LogEntryProto entry) {
+        return new CompletableFuture<>(); // the future never completes
+      }
+    };
+
+    try (SegmentedRaftLog raftLog = new SegmentedRaftLog(peerId, null, sm, 
null, storage, -1, properties)) {
+      raftLog.open(RaftServerConstants.INVALID_LOG_INDEX, null);
+      raftLog.appendEntry(entry);  // RaftLogWorker should catch 
TimeoutIOException
+
+      JavaUtils.attempt(() -> {
+        final ExitUtils.ExitException exitException = 
ExitUtils.getFirstExitException();
+        Objects.requireNonNull(exitException, "exitException == null");
+        Assert.assertEquals(TimeoutIOException.class, 
exitException.getCause().getClass());
+      }, 3*numRetries, syncTimeout, "RaftLogWorker should catch 
TimeoutIOException and exit", LOG);
+    }
+  }
+
+  static Thread startAppendEntryThread(RaftLog raftLog, LogEntryProto entry) {
+    final Thread t = new Thread(() -> raftLog.appendEntry(entry));
+    t.start();
+    return t;
+  }
+
+  void assertIndices(RaftLog raftLog, long expectedFlushIndex, long 
expectedNextIndex) {
+    LOG.info("assert expectedFlushIndex={}", expectedFlushIndex);
+    Assert.assertEquals(expectedFlushIndex, raftLog.getLatestFlushedIndex());
+    LOG.info("assert expectedNextIndex={}", expectedNextIndex);
+    Assert.assertEquals(expectedNextIndex, raftLog.getNextIndex());
+  }
+
+  void assertIndicesMultipleAttempts(RaftLog raftLog, long expectedFlushIndex, 
long expectedNextIndex) throws Exception {
+    JavaUtils.attempt(() -> assertIndices(raftLog, expectedFlushIndex, 
expectedNextIndex),
+        10, 100, "assertIndices", LOG);
+  }
+
+  @Test
+  public void testSegmentedRaftLogFormatInternalHeader() throws Exception {
+    testFailureCase("testSegmentedRaftLogFormatInternalHeader",
+        () -> SegmentedRaftLogFormat.applyHeaderTo(header -> {
+          LOG.info("header  = " + new String(header, StandardCharsets.UTF_8));
+          header[0] += 1; // try changing the internal header
+          LOG.info("header' = " + new String(header, StandardCharsets.UTF_8));
+          return null;
+        }), IllegalStateException.class);
+
+    // reset the header
+    SegmentedRaftLogFormat.applyHeaderTo(header -> {
+      LOG.info("header'  = " + new String(header, StandardCharsets.UTF_8));
+      header[0] -= 1; // try changing the internal header
+      LOG.info("header'' = " + new String(header, StandardCharsets.UTF_8));
+      return null;
+    });
+  }
+}

Reply via email to