Repository: incubator-ratis
Updated Branches:
  refs/heads/master 06fd2e441 -> 2373a4a08


RATIS-70. Separate term/index/offset and log entry content in LogSegment. 
Contributed by Jing Zhao


Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/2373a4a0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/2373a4a0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/2373a4a0

Branch: refs/heads/master
Commit: 2373a4a08e5805af3291acae64c2efccd9f528d4
Parents: 06fd2e4
Author: Jing Zhao <[email protected]>
Authored: Mon Apr 17 10:43:29 2017 -0700
Committer: Jing Zhao <[email protected]>
Committed: Mon Apr 17 10:43:29 2017 -0700

----------------------------------------------------------------------
 .../org/apache/ratis/TestRestartRaftPeer.java   |   2 +-
 .../org/apache/ratis/grpc/TestRaftStream.java   |  14 +-
 .../ratis/server/impl/LeaderElection.java       |   5 +-
 .../apache/ratis/server/impl/LeaderState.java   |  12 +-
 .../apache/ratis/server/impl/LogAppender.java   |   3 +-
 .../ratis/server/impl/RaftServerImpl.java       |  30 +++-
 .../apache/ratis/server/impl/ServerState.java   |  26 +++-
 .../ratis/server/impl/StateMachineUpdater.java  |  32 +----
 .../apache/ratis/server/protocol/TermIndex.java |   2 +
 .../apache/ratis/server/storage/LogSegment.java | 112 +++++++++------
 .../ratis/server/storage/MemoryRaftLog.java     |  37 +++--
 .../apache/ratis/server/storage/RaftLog.java    |  47 +++---
 .../ratis/server/storage/RaftLogCache.java      | 144 +++++++++++--------
 .../ratis/server/storage/RaftLogWorker.java     |  10 +-
 .../ratis/server/storage/SegmentedRaftLog.java  |  53 ++++---
 .../ratis/statemachine/BaseStateMachine.java    |   4 +-
 .../apache/ratis/statemachine/StateMachine.java |   4 +-
 .../java/org/apache/ratis/RaftBasicTests.java   |   5 +-
 .../java/org/apache/ratis/RaftTestUtil.java     |  21 +--
 .../impl/RaftReconfigurationBaseTest.java       |   7 +-
 .../ratis/server/storage/TestRaftLogCache.java  |   3 +-
 .../server/storage/TestRaftLogSegment.java      |  13 +-
 .../server/storage/TestSegmentedRaftLog.java    |  62 ++++----
 23 files changed, 390 insertions(+), 258 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/2373a4a0/ratis-examples/src/test/java/org/apache/ratis/TestRestartRaftPeer.java
----------------------------------------------------------------------
diff --git 
a/ratis-examples/src/test/java/org/apache/ratis/TestRestartRaftPeer.java 
b/ratis-examples/src/test/java/org/apache/ratis/TestRestartRaftPeer.java
index 346d7c0..dc38e82 100644
--- a/ratis-examples/src/test/java/org/apache/ratis/TestRestartRaftPeer.java
+++ b/ratis-examples/src/test/java/org/apache/ratis/TestRestartRaftPeer.java
@@ -111,6 +111,6 @@ public class TestRestartRaftPeer {
     // make sure the restarted peer's log segments is correct
     cluster.restartServer(followerId, false);
     Assert.assertTrue(cluster.getServer(followerId).getState().getLog()
-        .getLastEntry().getIndex() >= 20);
+        .getLastEntryTermIndex().getIndex() >= 20);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/2373a4a0/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftStream.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftStream.java 
b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftStream.java
index 16bc221..83ca5ec 100644
--- a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftStream.java
+++ b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftStream.java
@@ -20,6 +20,7 @@ package org.apache.ratis.grpc;
 import org.apache.log4j.Level;
 import org.apache.ratis.RaftTestUtil;
 import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.server.protocol.TermIndex;
 import org.apache.ratis.util.LogUtils;
 import org.apache.ratis.util.SizeInBytes;
 import org.apache.ratis.grpc.client.AppendStreamer;
@@ -27,7 +28,6 @@ import org.apache.ratis.grpc.client.RaftOutputStream;
 import org.apache.ratis.protocol.ClientId;
 import org.apache.ratis.server.impl.RaftServerImpl;
 import org.apache.ratis.server.storage.RaftLog;
-import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Test;
@@ -108,9 +108,9 @@ public class TestRaftStream {
     long committedIndex = raftLog.getLastCommittedIndex();
     Assert.assertEquals(expectedCommittedIndex, committedIndex);
     // check the log content
-    LogEntryProto[] entries = raftLog.getEntries(1, expectedCommittedIndex + 
1);
-    for (LogEntryProto entry : entries) {
-      byte[] logData = entry.getSmLogEntry().getData().toByteArray();
+    TermIndex[] entries = raftLog.getEntries(1, expectedCommittedIndex + 1);
+    for (TermIndex entry : entries) {
+      byte[] logData = 
raftLog.get(entry.getIndex()).getSmLogEntry().getData().toByteArray();
       byte[] expected = s.get();
       Assert.assertEquals("log entry: " + entry,
           expected.length, logData.length);
@@ -240,11 +240,11 @@ public class TestRaftStream {
     // 0.5 + 1 + 2.5 + 4 = 8
     Assert.assertEquals(8, leader.getState().getLastAppliedIndex());
     Assert.assertEquals(8, log.getLastCommittedIndex());
-    LogEntryProto[] entries = log.getEntries(1, 9);
+    TermIndex[] entries = log.getEntries(1, 9);
     byte[] actual = new byte[ByteValue.BUFFERSIZE * 8];
     totalSize = 0;
-    for (LogEntryProto e : entries) {
-      byte[] eValue = e.getSmLogEntry().getData().toByteArray();
+    for (TermIndex e : entries) {
+      byte[] eValue = 
log.get(e.getIndex()).getSmLogEntry().getData().toByteArray();
       Assert.assertEquals(ByteValue.BUFFERSIZE, eValue.length);
       System.arraycopy(eValue, 0, actual, totalSize, eValue.length);
       totalSize += eValue.length;

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/2373a4a0/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
index d026db6..14a3780 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
@@ -45,7 +45,7 @@ class LeaderElection extends Daemon {
       List<Exception> exceptions, long newTerm) {
     LOG.info(server.getId() + ": Election " + result + "; received "
         + responses.size() + " response(s) "
-        + responses.stream().map(r -> 
ProtoUtils.toString(r)).collect(Collectors.toList())
+        + 
responses.stream().map(ProtoUtils::toString).collect(Collectors.toList())
         + " and " + exceptions.size() + " exception(s); " + server.getState());
     int i = 0;
     for(Exception e : exceptions) {
@@ -127,8 +127,7 @@ class LeaderElection extends Daemon {
       LOG.info(state.getSelfId() + ": begin an election in Term "
           + electionTerm);
 
-      TermIndex lastEntry = ServerProtoUtils.toTermIndex(
-          state.getLog().getLastEntry());
+      TermIndex lastEntry = state.getLog().getLastEntryTermIndex();
       if (lastEntry == null) {
         // lastEntry may need to be derived from snapshot
         SnapshotInfo snapshot = state.getLatestSnapshot();

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/2373a4a0/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
index 2d1ab52..96bd28b 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
@@ -20,6 +20,7 @@ package org.apache.ratis.server.impl;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.protocol.*;
 import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.server.protocol.TermIndex;
 import org.apache.ratis.server.storage.RaftLog;
 import org.apache.ratis.shaded.proto.RaftProtos.LeaderNoOp;
 import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto;
@@ -401,7 +402,7 @@ public class LeaderState {
     long majorityInNewConf = computeLastCommitted(voterLists.get(0),
         conf.containsInConf(selfId));
     final long oldLastCommitted = raftLog.getLastCommittedIndex();
-    final LogEntryProto[] entriesToCommit;
+    final TermIndex[] entriesToCommit;
     if (!conf.isTransitional()) {
       // copy the entries that may get committed out of the raftlog, to prevent
       // the possible race that the log gets purged after the statemachine does
@@ -420,18 +421,17 @@ public class LeaderState {
     checkAndUpdateConfiguration(entriesToCommit);
   }
 
-  private boolean committedConf(LogEntryProto[] entries) {
+  private boolean committedConf(TermIndex[] entries) {
     final long currentCommitted = raftLog.getLastCommittedIndex();
-    for (LogEntryProto entry : entries) {
-      if (entry.getIndex() <= currentCommitted &&
-          ProtoUtils.isConfigurationLogEntry(entry)) {
+    for (TermIndex entry : entries) {
+      if (entry.getIndex() <= currentCommitted && 
raftLog.isConfigEntry(entry)) {
         return true;
       }
     }
     return false;
   }
 
-  private void checkAndUpdateConfiguration(LogEntryProto[] entriesToCheck) {
+  private void checkAndUpdateConfiguration(TermIndex[] entriesToCheck) {
     final RaftConfiguration conf = server.getRaftConf();
     if (committedConf(entriesToCheck)) {
       if (conf.isTransitional()) {

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/2373a4a0/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
index 5417e92..ff12f4e 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
@@ -137,8 +137,7 @@ public class LogAppender extends Daemon {
   }
 
   private TermIndex getPrevious() {
-    TermIndex previous = ServerProtoUtils.toTermIndex(
-        raftLog.get(follower.getNextIndex() - 1));
+    TermIndex previous = raftLog.getTermIndex(follower.getNextIndex() - 1);
     if (previous == null) {
       // if previous is null, nextIndex must be equal to the log start
       // index (otherwise we will install snapshot).

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/2373a4a0/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 0a6a68d..3c704b0 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -17,9 +17,12 @@
  */
 package org.apache.ratis.server.impl;
 
+import static 
org.apache.ratis.server.impl.ServerProtoUtils.toRaftConfiguration;
 import static 
org.apache.ratis.shaded.proto.RaftProtos.AppendEntriesReplyProto.AppendResult.INCONSISTENCY;
 import static 
org.apache.ratis.shaded.proto.RaftProtos.AppendEntriesReplyProto.AppendResult.NOT_LEADER;
 import static 
org.apache.ratis.shaded.proto.RaftProtos.AppendEntriesReplyProto.AppendResult.SUCCESS;
+import static 
org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto.LogEntryBodyCase.CONFIGURATIONENTRY;
+import static 
org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto.LogEntryBodyCase.SMLOGENTRY;
 import static org.apache.ratis.util.LifeCycle.State.CLOSED;
 import static org.apache.ratis.util.LifeCycle.State.CLOSING;
 import static org.apache.ratis.util.LifeCycle.State.RUNNING;
@@ -873,7 +876,7 @@ public class RaftServerImpl implements RaftServer {
    * @param stateMachineFuture the future returned by the state machine
    *                           from which we will get transaction result later
    */
-  void replyPendingRequest(LogEntryProto logEntry,
+  private void replyPendingRequest(LogEntryProto logEntry,
       CompletableFuture<Message> stateMachineFuture) {
     // update the retry cache
     final ClientId clientId = new 
ClientId(logEntry.getClientId().toByteArray());
@@ -903,13 +906,36 @@ public class RaftServerImpl implements RaftServer {
     });
   }
 
-  TransactionContext getTransactionContext(long index) {
+  private TransactionContext getTransactionContext(long index) {
     if (leaderState != null) { // is leader and is running
       return leaderState.getTransactionContext(index);
     }
     return null;
   }
 
+  public void applyLogToStateMachine(LogEntryProto next) {
+    if (next.getLogEntryBodyCase() == CONFIGURATIONENTRY) {
+      // the reply should have already been set. only need to record
+      // the new conf in the state machine.
+      stateMachine.setRaftConfiguration(toRaftConfiguration(next.getIndex(),
+          next.getConfigurationEntry()));
+    } else if (next.getLogEntryBodyCase() == SMLOGENTRY) {
+      // check whether there is a TransactionContext because we are the leader.
+      TransactionContext trx = getTransactionContext(next.getIndex());
+      if (trx == null) {
+        trx = new TransactionContext(stateMachine, next);
+      }
+
+      // Let the StateMachine inject logic for committed transactions in 
sequential order.
+      trx = stateMachine.applyTransactionSerial(trx);
+
+      // TODO: This step can be parallelized
+      CompletableFuture<Message> stateMachineFuture =
+          stateMachine.applyTransaction(trx);
+      replyPendingRequest(next, stateMachineFuture);
+    }
+  }
+
   @Override
   public RaftProperties getProperties() {
     return this.properties;

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/2373a4a0/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
index e9d8ef8..da1aa3c 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
@@ -34,6 +34,9 @@ import org.apache.ratis.util.ProtoUtils;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.util.function.Consumer;
+
+import static 
org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto.LogEntryBodyCase.CONFIGURATIONENTRY;
 
 /**
  * Common states of a raft peer. Protected by RaftServer's lock.
@@ -85,7 +88,16 @@ public class ServerState implements Closeable {
     long lastApplied = initStatemachine(stateMachine, prop);
 
     leaderId = null;
-    log = initLog(id, prop, server, lastApplied);
+    // we cannot apply log entries to the state machine in this step, since we
+    // do not know whether the local log entries have been committed.
+    log = initLog(id, prop, lastApplied, entry -> {
+      if (entry.getLogEntryBodyCase() == CONFIGURATIONENTRY) {
+        configurationManager.addConfiguration(entry.getIndex(),
+            ServerProtoUtils.toRaftConfiguration(entry.getIndex(),
+                entry.getConfigurationEntry()));
+      }
+    });
+
     RaftLog.Metadata metadata = log.loadMetadata();
     currentTerm = metadata.getTerm();
     votedFor = metadata.getVotedFor();
@@ -122,7 +134,8 @@ public class ServerState implements Closeable {
    * know whether they have been committed.
    */
   private RaftLog initLog(RaftPeerId id, RaftProperties prop,
-      RaftServerImpl server, long lastIndexInSnapshot) throws IOException {
+      long lastIndexInSnapshot, Consumer<LogEntryProto> logConsumer)
+      throws IOException {
     final RaftLog log;
     if (RaftServerConfigKeys.Log.useMemory(prop)) {
       log = new MemoryRaftLog(id);
@@ -130,7 +143,7 @@ public class ServerState implements Closeable {
       log = new SegmentedRaftLog(id, server, this.storage,
           lastIndexInSnapshot, prop);
     }
-    log.open(configurationManager, lastIndexInSnapshot);
+    log.open(lastIndexInSnapshot, logConsumer);
     return log;
   }
 
@@ -236,16 +249,15 @@ public class ServerState implements Closeable {
   }
 
   boolean isLogUpToDate(TermIndex candidateLastEntry) {
-    LogEntryProto lastEntry = log.getLastEntry();
+    TermIndex local = log.getLastEntryTermIndex();
     // need to take into account snapshot
     SnapshotInfo snapshot = server.getStateMachine().getLatestSnapshot();
-     if (lastEntry == null && snapshot == null) {
+     if (local == null && snapshot == null) {
       return true;
     } else if (candidateLastEntry == null) {
       return false;
     }
-    TermIndex local = ServerProtoUtils.toTermIndex(lastEntry);
-    if (local == null || (snapshot != null && snapshot.getIndex() > 
lastEntry.getIndex())) {
+    if (local == null || (snapshot != null && snapshot.getIndex() > 
local.getIndex())) {
       local = snapshot.getTermIndex();
     }
     return local.compareTo(candidateLastEntry) <= 0;

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/2373a4a0/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
index 95af956..b9ed6a5 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
@@ -17,21 +17,13 @@
  */
 package org.apache.ratis.server.impl;
 
-import static 
org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto.LogEntryBodyCase.CONFIGURATIONENTRY;
-import static 
org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto.LogEntryBodyCase.SMLOGENTRY;
-
-import java.io.IOException;
-import java.util.concurrent.CompletableFuture;
-
 import org.apache.ratis.conf.RaftProperties;
-import org.apache.ratis.protocol.Message;
 import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.server.storage.RaftLog;
 import org.apache.ratis.server.storage.RaftStorage;
 import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto;
 import org.apache.ratis.statemachine.SnapshotInfo;
 import org.apache.ratis.statemachine.StateMachine;
-import org.apache.ratis.statemachine.TransactionContext;
 import org.apache.ratis.util.Daemon;
 import org.apache.ratis.util.ExitUtils;
 import org.apache.ratis.util.LifeCycle;
@@ -39,6 +31,8 @@ import org.apache.ratis.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+
 /**
  * This class tracks the log entries that have been committed in a quorum and
  * applies them to the state machine. We let a separate thread do this work
@@ -145,27 +139,7 @@ class StateMachineUpdater implements Runnable {
         while (lastAppliedIndex < committedIndex) {
           final LogEntryProto next = raftLog.get(lastAppliedIndex + 1);
           if (next != null) {
-            if (next.getLogEntryBodyCase() == CONFIGURATIONENTRY) {
-              // the reply should have already been set. only need to record
-              // the new conf in the state machine.
-              stateMachine.setRaftConfiguration(
-                  ServerProtoUtils.toRaftConfiguration(next.getIndex(),
-                      next.getConfigurationEntry()));
-            } else if (next.getLogEntryBodyCase() == SMLOGENTRY) {
-              // check whether there is a TransactionContext because we are 
the leader.
-              TransactionContext trx = 
server.getTransactionContext(next.getIndex());
-              if (trx == null) {
-                trx = new TransactionContext(stateMachine, next);
-              }
-
-              // Let the StateMachine inject logic for committed transactions 
in sequential order.
-              trx = stateMachine.applyTransactionSerial(trx);
-
-              // TODO: This step can be parallelized
-              CompletableFuture<Message> stateMachineFuture =
-                  stateMachine.applyTransaction(trx);
-              server.replyPendingRequest(next, stateMachineFuture);
-            }
+            server.applyLogToStateMachine(next);
             lastAppliedIndex++;
           } else {
             LOG.debug("{}: logEntry {} is null. There may be snapshot to load. 
state:{}",

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/2373a4a0/ratis-server/src/main/java/org/apache/ratis/server/protocol/TermIndex.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/protocol/TermIndex.java 
b/ratis-server/src/main/java/org/apache/ratis/server/protocol/TermIndex.java
index 477b70c..a16110f 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/protocol/TermIndex.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/protocol/TermIndex.java
@@ -21,6 +21,8 @@ import org.apache.ratis.server.impl.ServerImplUtils;
 
 /** The term and the log index defined in the Raft consensus algorithm. */
 public interface TermIndex extends Comparable<TermIndex> {
+  TermIndex[] EMPTY_TERMINDEX_ARRAY = {};
+
   /** @return the term. */
   long getTerm();
 

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/2373a4a0/ratis-server/src/main/java/org/apache/ratis/server/storage/LogSegment.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/storage/LogSegment.java 
b/ratis-server/src/main/java/org/apache/ratis/server/storage/LogSegment.java
index 46f9f4f..3856585 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/storage/LogSegment.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/LogSegment.java
@@ -17,19 +17,23 @@
  */
 package org.apache.ratis.server.storage;
 
-import org.apache.ratis.server.impl.ConfigurationManager;
-import org.apache.ratis.server.impl.ServerProtoUtils;
+import org.apache.ratis.server.protocol.TermIndex;
+import org.apache.ratis.shaded.com.google.common.annotations.VisibleForTesting;
 import org.apache.ratis.shaded.com.google.protobuf.CodedOutputStream;
 import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto;
 import org.apache.ratis.util.FileUtils;
 import org.apache.ratis.util.Preconditions;
+import org.apache.ratis.util.ProtoUtils;
 
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
-
-import static 
org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto.LogEntryBodyCase.CONFIGURATIONENTRY;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Consumer;
 
 /**
  * In-memory cache for a log segment file. All the updates will be first 
written
@@ -38,44 +42,45 @@ import static 
org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto.LogEntryBod
  * This class will be protected by the RaftServer's lock.
  */
 class LogSegment implements Comparable<Long> {
+  static long getEntrySize(LogEntryProto entry) {
+    final int serialized = entry.getSerializedSize();
+    return serialized + CodedOutputStream.computeUInt32SizeNoTag(serialized) + 
4;
+  }
+
+  @VisibleForTesting
   static class LogRecord {
     /** starting offset in the file */
-    final long offset;
-    final LogEntryProto entry;
+    private final long offset;
+    private final TermIndex termIndex;
 
     LogRecord(long offset, LogEntryProto entry) {
       this.offset = offset;
-      this.entry = entry;
+      termIndex = TermIndex.newTermIndex(entry.getTerm(), entry.getIndex());
     }
-  }
 
-  static class SegmentFileInfo {
-    final long startIndex; // start index of the
-    final long endIndex; // original end index
-    final boolean isOpen;
-    final long targetLength; // position for truncation
-    final long newEndIndex; // new end index after the truncation
-
-    SegmentFileInfo(long start, long end, boolean isOpen, long targetLength,
-        long newEndIndex) {
-      this.startIndex = start;
-      this.endIndex = end;
-      this.isOpen = isOpen;
-      this.targetLength = targetLength;
-      this.newEndIndex = newEndIndex;
+    TermIndex getTermIndex() {
+      return termIndex;
     }
-  }
 
-  static long getEntrySize(LogEntryProto entry) {
-    final int serialized = entry.getSerializedSize();
-    return serialized + CodedOutputStream.computeUInt32SizeNoTag(serialized) + 
4;
+    long getOffset() {
+      return offset;
+    }
   }
 
   private boolean isOpen;
-  private final List<LogRecord> records = new ArrayList<>();
   private long totalSize;
   private final long startIndex;
   private long endIndex;
+  /**
+   * the list of records is more like the index of a segment
+   */
+  private final List<LogRecord> records = new ArrayList<>();
+  /**
+   * the entryCache caches the content of log entries.
+   * TODO: currently we cache all the log entries. will fix it soon.
+   */
+  private final Map<TermIndex, LogEntryProto> entryCache = new HashMap<>();
+  private final Set<TermIndex> configEntries = new HashSet<>();
 
   private LogSegment(boolean isOpen, long start, long end) {
     this.isOpen = isOpen;
@@ -95,7 +100,7 @@ class LogSegment implements Comparable<Long> {
   }
 
   static LogSegment loadSegment(File file, long start, long end, boolean 
isOpen,
-      ConfigurationManager confManager) throws IOException {
+      Consumer<LogEntryProto> logConsumer) throws IOException {
     final LogSegment segment;
     try (LogInputStream in = new LogInputStream(file, start, end, isOpen)) {
       segment = isOpen ? LogSegment.newOpenSegment(start) :
@@ -108,11 +113,9 @@ class LogSegment implements Comparable<Long> {
               "gap between entry %s and entry %s", prev, next);
         }
         segment.append(next);
-        if (confManager != null &&
-            next.getLogEntryBodyCase() == CONFIGURATIONENTRY) {
-          confManager.addConfiguration(next.getIndex(),
-              ServerProtoUtils.toRaftConfiguration(next.getIndex(),
-                  next.getConfigurationEntry()));
+
+        if (logConsumer != null) {
+          logConsumer.accept(next);
         }
         prev = next;
       }
@@ -123,7 +126,7 @@ class LogSegment implements Comparable<Long> {
       FileUtils.truncateFile(file, segment.getTotalSize());
     }
 
-    Preconditions.assertTrue(start == segment.records.get(0).entry.getIndex());
+    Preconditions.assertTrue(start == 
segment.records.get(0).getTermIndex().getIndex());
     if (!isOpen) {
       Preconditions.assertTrue(segment.getEndIndex() == end);
     }
@@ -143,7 +146,7 @@ class LogSegment implements Comparable<Long> {
   }
 
   int numOfEntries() {
-    return (int) (endIndex - startIndex + 1);
+    return Math.toIntExact(endIndex - startIndex + 1);
   }
 
   void appendToOpenSegment(LogEntryProto... entries) {
@@ -167,29 +170,52 @@ class LogSegment implements Comparable<Long> {
       final LogRecord currentLast = getLastRecord();
       if (currentLast != null) {
         Preconditions.assertTrue(
-            entry.getIndex() == currentLast.entry.getIndex() + 1,
+            entry.getIndex() == currentLast.getTermIndex().getIndex() + 1,
             "gap between entries %s and %s", entry.getIndex(),
-            currentLast.entry.getIndex());
+            currentLast.getTermIndex().getIndex());
       }
 
       final LogRecord record = new LogRecord(totalSize, entry);
       records.add(record);
+      entryCache.put(record.getTermIndex(), entry);
+      if (ProtoUtils.isConfigurationLogEntry(entry)) {
+        configEntries.add(record.getTermIndex());
+      }
       totalSize += getEntrySize(entry);
       endIndex = entry.getIndex();
     }
   }
 
+  LogEntryProto getLogEntry(long index) {
+    LogRecord record = getLogRecord(index);
+    return record == null ? null : entryCache.get(record.getTermIndex());
+  }
+
+  TermIndex getTermIndex(long index) {
+    LogRecord record = getLogRecord(index);
+    return record == null ? null : record.getTermIndex();
+  }
+
   LogRecord getLogRecord(long index) {
     if (index >= startIndex && index <= endIndex) {
-      return records.get((int) (index - startIndex));
+      return records.get(Math.toIntExact(index - startIndex));
     }
     return null;
   }
 
-  LogRecord getLastRecord() {
+  private LogRecord getLastRecord() {
     return records.isEmpty() ? null : records.get(records.size() - 1);
   }
 
+  TermIndex getLastTermIndex() {
+    LogRecord last = getLastRecord();
+    return last == null ? null : last.getTermIndex();
+  }
+
+  boolean isConfigEntry(TermIndex ti) {
+    return configEntries.contains(ti);
+  }
+
   long getTotalSize() {
     return totalSize;
   }
@@ -199,9 +225,11 @@ class LogSegment implements Comparable<Long> {
    */
   void truncate(long fromIndex) {
     Preconditions.assertTrue(fromIndex >= startIndex && fromIndex <= endIndex);
-    LogRecord record = records.get((int) (fromIndex - startIndex));
+    LogRecord record = records.get(Math.toIntExact(fromIndex - startIndex));
     for (long index = endIndex; index >= fromIndex; index--) {
-      records.remove((int)(index - startIndex));
+      LogRecord removed = records.remove(Math.toIntExact(index - startIndex));
+      entryCache.remove(removed.getTermIndex());
+      configEntries.remove(removed.getTermIndex());
     }
     totalSize = record.offset;
     isOpen = false;
@@ -227,6 +255,8 @@ class LogSegment implements Comparable<Long> {
 
   void clear() {
     records.clear();
+    entryCache.clear();
+    configEntries.clear();
     endIndex = startIndex - 1;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/2373a4a0/ratis-server/src/main/java/org/apache/ratis/server/storage/MemoryRaftLog.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/storage/MemoryRaftLog.java 
b/ratis-server/src/main/java/org/apache/ratis/server/storage/MemoryRaftLog.java
index a49db9a..df71d08 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/storage/MemoryRaftLog.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/storage/MemoryRaftLog.java
@@ -24,10 +24,12 @@ import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.server.impl.RaftConfiguration;
 import org.apache.ratis.server.impl.RaftServerConstants;
 import org.apache.ratis.server.impl.ServerProtoUtils;
+import org.apache.ratis.server.protocol.TermIndex;
 import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto;
 import org.apache.ratis.util.AutoCloseableLock;
 import org.apache.ratis.util.CodeInjectionForTesting;
 import org.apache.ratis.util.Preconditions;
+import org.apache.ratis.util.ProtoUtils;
 
 /**
  * A simple RaftLog implementation in memory. Used only for testing.
@@ -49,15 +51,30 @@ public class MemoryRaftLog extends RaftLog {
   }
 
   @Override
-  public LogEntryProto[] getEntries(long startIndex, long endIndex) {
+  public TermIndex getTermIndex(long index) {
     checkLogState();
     try(AutoCloseableLock readLock = readLock()) {
-      final int i = (int) startIndex;
+      final int i = (int) index;
+      return i >= 0 && i < entries.size() ?
+          ServerProtoUtils.toTermIndex(entries.get(i)) : null;
+    }
+  }
+
+  @Override
+  public TermIndex[] getEntries(long startIndex, long endIndex) {
+    checkLogState();
+    try(AutoCloseableLock readLock = readLock()) {
+      final int from = (int) startIndex;
       if (startIndex >= entries.size()) {
         return null;
       }
-      final int toIndex = (int) Math.min(entries.size(), endIndex);
-      return entries.subList(i, toIndex).toArray(EMPTY_LOGENTRY_ARRAY);
+      final int to = (int) Math.min(entries.size(), endIndex);
+      TermIndex[] ti = new TermIndex[to - from];
+      for (int i = 0; i < ti.length; i++) {
+        ti[i] = TermIndex.newTermIndex(entries.get(i).getTerm(),
+            entries.get(i).getIndex());
+      }
+      return ti;
     }
   }
 
@@ -74,11 +91,11 @@ public class MemoryRaftLog extends RaftLog {
   }
 
   @Override
-  public LogEntryProto getLastEntry() {
+  public TermIndex getLastEntryTermIndex() {
     checkLogState();
     try(AutoCloseableLock readLock = readLock()) {
       final int size = entries.size();
-      return size == 0 ? null : entries.get(size - 1);
+      return size == 0 ? null : ServerProtoUtils.toTermIndex(entries.get(size 
- 1));
     }
   }
 
@@ -146,8 +163,7 @@ public class MemoryRaftLog extends RaftLog {
 
   @Override
   public String toString() {
-    return "last=" + ServerProtoUtils.toString(getLastEntry())
-        + ", committed="
+    return "last=" + getLastEntryTermIndex() + ", committed="
         + ServerProtoUtils.toString(get(getLastCommittedIndex()));
   }
 
@@ -180,4 +196,9 @@ public class MemoryRaftLog extends RaftLog {
   public void syncWithSnapshot(long lastSnapshotIndex) {
     // do nothing
   }
+
+  @Override
+  public boolean isConfigEntry(TermIndex ti) {
+    return ProtoUtils.isConfigurationLogEntry(get(ti.getIndex()));
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/2373a4a0/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java 
b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java
index e72249a..5002f83 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java
@@ -17,15 +17,9 @@
  */
 package org.apache.ratis.server.storage;
 
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
 import org.apache.ratis.protocol.ClientId;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.protocol.StateMachineException;
-import org.apache.ratis.server.impl.ConfigurationManager;
 import org.apache.ratis.server.impl.RaftConfiguration;
 import org.apache.ratis.server.impl.RaftServerConstants;
 import org.apache.ratis.server.impl.ServerProtoUtils;
@@ -38,6 +32,13 @@ import org.apache.ratis.util.ProtoUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Consumer;
+
 /**
  * Base class of RaftLog. Currently we provide two types of RaftLog
  * implementation:
@@ -48,7 +49,6 @@ import org.slf4j.LoggerFactory;
  */
 public abstract class RaftLog implements Closeable {
   public static final Logger LOG = LoggerFactory.getLogger(RaftLog.class);
-  public static final LogEntryProto[] EMPTY_LOGENTRY_ARRAY = new 
LogEntryProto[0];
   public static final String LOG_SYNC = RaftLog.class.getSimpleName() + 
".logSync";
 
   /**
@@ -85,7 +85,7 @@ public abstract class RaftLog implements Closeable {
       if (lastCommitted.get() < majorityIndex) {
         // Only update last committed index for current term. See §5.4.2 in
         // paper for details.
-        final LogEntryProto entry = get(majorityIndex);
+        final TermIndex entry = getTermIndex(majorityIndex);
         if (entry != null && entry.getTerm() == currentTerm) {
           LOG.debug("{}: Updating lastCommitted to {}", selfId, majorityIndex);
           lastCommitted.set(majorityIndex);
@@ -103,8 +103,7 @@ public abstract class RaftLog implements Closeable {
     if (ti == null) {
       return false;
     }
-    LogEntryProto entry = get(ti.getIndex());
-    TermIndex local = ServerProtoUtils.toTermIndex(entry);
+    TermIndex local = getTermIndex(ti.getIndex());
     return ti.equals(local);
   }
 
@@ -112,7 +111,7 @@ public abstract class RaftLog implements Closeable {
    * @return the index of the next log entry to append.
    */
   public long getNextIndex() {
-    final LogEntryProto last = getLastEntry();
+    final TermIndex last = getLastEntryTermIndex();
     if (last == null) {
       // if the log is empty, the last committed index should be consistent 
with
       // the last index included in the latest snapshot.
@@ -166,7 +165,7 @@ public abstract class RaftLog implements Closeable {
     }
   }
 
-  public void open(ConfigurationManager confManager, long lastIndexInSnapshot)
+  public void open(long lastIndexInSnapshot, Consumer<LogEntryProto> consumer)
       throws IOException {
     isOpen = true;
   }
@@ -183,17 +182,26 @@ public abstract class RaftLog implements Closeable {
   public abstract LogEntryProto get(long index);
 
   /**
+   * Get the TermIndex information of the given index.
+   *
+   * @param index The given index.
+   * @return The TermIndex of the log entry associated with the given index.
+   *         Null if there is no log entry with the index.
+   */
+  public abstract TermIndex getTermIndex(long index);
+
+  /**
    * @param startIndex the starting log index (inclusive)
    * @param endIndex the ending log index (exclusive)
-   * @return all log entries within the given index range. Null if startIndex
-   *         is greater than the smallest available index.
+   * @return TermIndex of all log entries within the given index range. Null if
+   *         startIndex is greater than the smallest available index.
    */
-  public abstract LogEntryProto[] getEntries(long startIndex, long endIndex);
+  public abstract TermIndex[] getEntries(long startIndex, long endIndex);
 
   /**
-   * @return the last log entry.
+   * @return the last log entry's term and index.
    */
-  public abstract LogEntryProto getLastEntry();
+  public abstract TermIndex getLastEntryTermIndex();
 
   /**
    * Truncate the log entries till the given index. The log with the given 
index
@@ -250,9 +258,12 @@ public abstract class RaftLog implements Closeable {
 
   public abstract void syncWithSnapshot(long lastSnapshotIndex);
 
+  public abstract boolean isConfigEntry(TermIndex ti);
+
   @Override
   public String toString() {
-    return ServerProtoUtils.toString(getLastEntry());
+    TermIndex last = getLastEntryTermIndex();
+    return last == null ? "null" : Collections.singletonList(last).toString();
   }
 
   public static class Metadata {

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/2373a4a0/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogCache.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogCache.java 
b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogCache.java
index 0a21846..0142bee 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogCache.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogCache.java
@@ -26,8 +26,7 @@ import java.util.List;
 import java.util.NoSuchElementException;
 
 import org.apache.ratis.server.impl.RaftServerConstants;
-import org.apache.ratis.server.storage.LogSegment.LogRecord;
-import org.apache.ratis.server.storage.LogSegment.SegmentFileInfo;
+import org.apache.ratis.server.protocol.TermIndex;
 import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto;
 
 import org.apache.ratis.util.Preconditions;
@@ -38,6 +37,35 @@ import org.apache.ratis.util.Preconditions;
  * requires external lock protection.
  */
 class RaftLogCache {
+  static class SegmentFileInfo {
+    final long startIndex; // start index of the segment
+    final long endIndex; // original end index
+    final boolean isOpen;
+    final long targetLength; // position for truncation
+    final long newEndIndex; // new end index after the truncation
+
+    SegmentFileInfo(long start, long end, boolean isOpen, long targetLength,
+        long newEndIndex) {
+      this.startIndex = start;
+      this.endIndex = end;
+      this.isOpen = isOpen;
+      this.targetLength = targetLength;
+      this.newEndIndex = newEndIndex;
+    }
+  }
+
+  static class TruncationSegments {
+    final SegmentFileInfo toTruncate; // name of the file to be truncated
+    final SegmentFileInfo[] toDelete; // names of the files to be deleted
+
+    TruncationSegments(SegmentFileInfo toTruncate,
+        List<SegmentFileInfo> toDelete) {
+      this.toDelete = toDelete == null ? null :
+          toDelete.toArray(new SegmentFileInfo[toDelete.size()]);
+      this.toTruncate = toTruncate;
+    }
+  }
+
   private LogSegment openSegment;
   private final List<LogSegment> closedSegments;
 
@@ -74,25 +102,50 @@ class RaftLogCache {
     }
   }
 
-  LogEntryProto getEntry(long index) {
+  LogSegment getOpenSegment() {
+    return openSegment;
+  }
+
+  /**
+   * finalize the current open segment, and start a new open segment
+   */
+  void rollOpenSegment(boolean createNewOpen) {
+    Preconditions.assertTrue(openSegment != null
+        && openSegment.numOfEntries() > 0);
+    final long nextIndex = openSegment.getEndIndex() + 1;
+    openSegment.close();
+    closedSegments.add(openSegment);
+    if (createNewOpen) {
+      openSegment = LogSegment.newOpenSegment(nextIndex);
+    } else {
+      openSegment = null;
+    }
+  }
+
+  private LogSegment getSegment(long index) {
     if (openSegment != null && index >= openSegment.getStartIndex()) {
-      final LogRecord record = openSegment.getLogRecord(index);
-      return record == null ? null : record.entry;
+      return openSegment;
     } else {
       int segmentIndex = Collections.binarySearch(closedSegments, index);
-      if (segmentIndex < 0) {
-        return null;
-      } else {
-        return closedSegments.get(segmentIndex).getLogRecord(index).entry;
-      }
+      return segmentIndex < 0 ? null : closedSegments.get(segmentIndex);
     }
   }
 
+  LogEntryProto getEntry(long index) {
+    LogSegment segment = getSegment(index);
+    return segment == null ? null : segment.getLogEntry(index);
+  }
+
+  TermIndex getTermIndex(long index) {
+    LogSegment segment = getSegment(index);
+    return segment == null ? null : segment.getTermIndex(index);
+  }
+
   /**
    * @param startIndex inclusive
    * @param endIndex exclusive
    */
-  LogEntryProto[] getEntries(final long startIndex, final long endIndex) {
+  TermIndex[] getEntries(final long startIndex, final long endIndex) {
     if (startIndex < 0 || startIndex < getStartIndex()) {
       throw new IndexOutOfBoundsException("startIndex = " + startIndex
           + ", log cache starts from index " + getStartIndex());
@@ -103,10 +156,10 @@ class RaftLogCache {
     }
     final long realEnd = Math.min(getEndIndex() + 1, endIndex);
     if (startIndex >= realEnd) {
-      return RaftLog.EMPTY_LOGENTRY_ARRAY;
+      return TermIndex.EMPTY_TERMINDEX_ARRAY;
     }
 
-    LogEntryProto[] entries = new LogEntryProto[(int) (realEnd - startIndex)];
+    TermIndex[] entries = new TermIndex[Math.toIntExact(realEnd - startIndex)];
     int segmentIndex = Collections.binarySearch(closedSegments, startIndex);
     if (segmentIndex < 0) {
       getEntriesFromSegment(openSegment, startIndex, entries, 0, 
entries.length);
@@ -114,30 +167,37 @@ class RaftLogCache {
       long index = startIndex;
       for (int i = segmentIndex; i < closedSegments.size() && index < realEnd; 
i++) {
         LogSegment s = closedSegments.get(i);
-        int numberFromSegment = (int) Math.min(realEnd - index,
-            s.getEndIndex() - index + 1);
-        getEntriesFromSegment(s, index, entries, (int) (index - startIndex),
-            numberFromSegment);
+        int numberFromSegment = Math.toIntExact(
+            Math.min(realEnd - index, s.getEndIndex() - index + 1));
+        getEntriesFromSegment(s, index, entries,
+            Math.toIntExact(index - startIndex), numberFromSegment);
         index += numberFromSegment;
       }
       if (index < realEnd) {
         getEntriesFromSegment(openSegment, index, entries,
-            (int) (index - startIndex), (int) (realEnd - index));
+            Math.toIntExact(index - startIndex),
+            Math.toIntExact(realEnd - index));
       }
     }
     return entries;
   }
 
   private void getEntriesFromSegment(LogSegment segment, long startIndex,
-      LogEntryProto[] entries, int offset, int size) {
+      TermIndex[] entries, int offset, int size) {
     long endIndex = segment.getEndIndex();
     endIndex = Math.min(endIndex, startIndex + size - 1);
     int index = offset;
     for (long i = startIndex; i <= endIndex; i++) {
-      entries[index++] = segment.getLogRecord(i).entry;
+      LogSegment.LogRecord r = segment.getLogRecord(i);
+      entries[index++] = r == null ? null : r.getTermIndex();
     }
   }
 
+  boolean isConfigEntry(TermIndex ti) {
+    LogSegment segment = getSegment(ti.getIndex());
+    return segment != null && segment.isConfigEntry(ti);
+  }
+
   long getStartIndex() {
     if (closedSegments.isEmpty()) {
       return openSegment != null ? openSegment.getStartIndex() :
@@ -154,15 +214,11 @@ class RaftLogCache {
             closedSegments.get(closedSegments.size() - 1).getEndIndex());
   }
 
-  LogEntryProto getLastEntry() {
+  TermIndex getLastTermIndex() {
     return (openSegment != null && openSegment.numOfEntries() > 0) ?
-        openSegment.getLastRecord().entry :
+        openSegment.getLastTermIndex() :
         (closedSegments.isEmpty() ? null :
-            closedSegments.get(closedSegments.size() - 
1).getLastRecord().entry);
-  }
-
-  LogSegment getOpenSegment() {
-    return openSegment;
+            closedSegments.get(closedSegments.size() - 1).getLastTermIndex());
   }
 
   void appendEntry(LogEntryProto entry) {
@@ -172,22 +228,6 @@ class RaftLogCache {
     openSegment.appendToOpenSegment(entry);
   }
 
-  /**
-   * finalize the current open segment, and start a new open segment
-   */
-  void rollOpenSegment(boolean createNewOpen) {
-    Preconditions.assertTrue(openSegment != null
-        && openSegment.numOfEntries() > 0);
-    final long nextIndex = openSegment.getEndIndex() + 1;
-    openSegment.close();
-    closedSegments.add(openSegment);
-    if (createNewOpen) {
-      openSegment = LogSegment.newOpenSegment(nextIndex);
-    } else {
-      openSegment = null;
-    }
-  }
-
   private SegmentFileInfo deleteOpenSegment() {
     final long oldEnd = openSegment.getEndIndex();
     openSegment.clear();
@@ -246,18 +286,6 @@ class RaftLogCache {
     return null;
   }
 
-  static class TruncationSegments {
-    final SegmentFileInfo toTruncate; // name of the file to be truncated
-    final SegmentFileInfo[] toDelete; // names of the files to be deleted
-
-    TruncationSegments(SegmentFileInfo toTruncate,
-        List<SegmentFileInfo> toDelete) {
-      this.toDelete = toDelete == null ? null :
-          toDelete.toArray(new SegmentFileInfo[toDelete.size()]);
-      this.toTruncate = toTruncate;
-    }
-  }
-
   Iterator<LogEntryProto> iterator(long startIndex) {
     return new EntryIterator(startIndex);
   }
@@ -294,9 +322,9 @@ class RaftLogCache {
 
     @Override
     public LogEntryProto next() {
-      LogRecord record;
+      LogEntryProto entry;
       if (currentSegment == null ||
-          (record = currentSegment.getLogRecord(nextIndex)) == null) {
+          (entry = currentSegment.getLogEntry(nextIndex)) == null) {
         throw new NoSuchElementException();
       }
       if (++nextIndex > currentSegment.getEndIndex()) {
@@ -306,7 +334,7 @@ class RaftLogCache {
               openSegment : closedSegments.get(segmentIndex);
         }
       }
-      return record.entry;
+      return entry;
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/2373a4a0/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java 
b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java
index 5add8ae..ee21d8b 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java
@@ -28,7 +28,7 @@ import org.apache.ratis.io.nativeio.NativeIO;
 import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.server.impl.RaftServerConstants;
 import org.apache.ratis.server.impl.RaftServerImpl;
-import org.apache.ratis.server.storage.LogSegment.SegmentFileInfo;
+import org.apache.ratis.server.storage.RaftLogCache.SegmentFileInfo;
 import org.apache.ratis.server.storage.RaftLogCache.TruncationSegments;
 import org.apache.ratis.server.storage.SegmentedRaftLog.Task;
 import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto;
@@ -200,13 +200,13 @@ class RaftLogWorker implements Runnable {
    *
    * Thus all the tasks are created and added sequentially.
    */
-  Task startLogSegment(long startIndex) {
-    return addIOTask(new StartLogSegment(startIndex));
+  void startLogSegment(long startIndex) {
+    addIOTask(new StartLogSegment(startIndex));
   }
 
-  Task rollLogSegment(LogSegment segmentToClose) {
+  void rollLogSegment(LogSegment segmentToClose) {
     addIOTask(new FinalizeLogSegment(segmentToClose));
-    return addIOTask(new StartLogSegment(segmentToClose.getEndIndex() + 1));
+    addIOTask(new StartLogSegment(segmentToClose.getEndIndex() + 1));
   }
 
   Task writeLogEntry(LogEntryProto entry) {

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/2373a4a0/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java
index 2fd4dd2..c956f84 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java
@@ -20,9 +20,9 @@ package org.apache.ratis.server.storage;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.server.RaftServerConfigKeys;
-import org.apache.ratis.server.impl.ConfigurationManager;
 import org.apache.ratis.server.impl.RaftServerConstants;
 import org.apache.ratis.server.impl.RaftServerImpl;
+import org.apache.ratis.server.protocol.TermIndex;
 import org.apache.ratis.server.storage.RaftStorageDirectory.LogPathAndIndex;
 import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto;
 import org.apache.ratis.util.AutoCloseableLock;
@@ -34,6 +34,7 @@ import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.util.Iterator;
 import java.util.List;
+import java.util.function.Consumer;
 
 /**
  * The RaftLog implementation that writes log entries into segmented files in
@@ -110,17 +111,18 @@ public class SegmentedRaftLog extends RaftLog {
   }
 
   @Override
-  public void open(ConfigurationManager confManager, long lastIndexInSnapshot)
+  public void open(long lastIndexInSnapshot, Consumer<LogEntryProto> consumer)
       throws IOException {
-    loadLogSegments(confManager, lastIndexInSnapshot);
+    loadLogSegments(lastIndexInSnapshot, consumer);
     File openSegmentFile = null;
-    if (cache.getOpenSegment() != null) {
+    LogSegment openSegment = cache.getOpenSegment();
+    if (openSegment != null) {
       openSegmentFile = storage.getStorageDir()
-          .getOpenLogFile(cache.getOpenSegment().getStartIndex());
+          .getOpenLogFile(openSegment.getStartIndex());
     }
     fileLogWorker.start(Math.max(cache.getEndIndex(), lastIndexInSnapshot),
         openSegmentFile);
-    super.open(confManager, lastIndexInSnapshot);
+    super.open(lastIndexInSnapshot, consumer);
   }
 
   @Override
@@ -128,12 +130,14 @@ public class SegmentedRaftLog extends RaftLog {
     return cache.getStartIndex();
   }
 
-  private void loadLogSegments(ConfigurationManager confManager,
-      long lastIndexInSnapshot) throws IOException {
+  private void loadLogSegments(long lastIndexInSnapshot,
+      Consumer<LogEntryProto> logConsumer) throws IOException {
     try(AutoCloseableLock writeLock = writeLock()) {
       List<LogPathAndIndex> paths = 
storage.getStorageDir().getLogSegmentFiles();
       for (LogPathAndIndex pi : paths) {
-        LogSegment logSegment = parseLogSegment(pi, confManager);
+        boolean isOpen = pi.endIndex == RaftServerConstants.INVALID_LOG_INDEX;
+        LogSegment logSegment = LogSegment.loadSegment(pi.path.toFile(),
+            pi.startIndex, pi.endIndex, isOpen, logConsumer);
         cache.addSegment(logSegment);
       }
 
@@ -150,13 +154,6 @@ public class SegmentedRaftLog extends RaftLog {
     }
   }
 
-  private LogSegment parseLogSegment(LogPathAndIndex pi,
-      ConfigurationManager confManager) throws IOException {
-    final boolean isOpen = pi.endIndex == 
RaftServerConstants.INVALID_LOG_INDEX;
-    return LogSegment.loadSegment(pi.path.toFile(), pi.startIndex, pi.endIndex,
-        isOpen, confManager);
-  }
-
   @Override
   public LogEntryProto get(long index) {
     checkLogState();
@@ -166,7 +163,15 @@ public class SegmentedRaftLog extends RaftLog {
   }
 
   @Override
-  public LogEntryProto[] getEntries(long startIndex, long endIndex) {
+  public TermIndex getTermIndex(long index) {
+    checkLogState();
+    try(AutoCloseableLock readLock = readLock()) {
+      return cache.getTermIndex(index);
+    }
+  }
+
+  @Override
+  public TermIndex[] getEntries(long startIndex, long endIndex) {
     checkLogState();
     try(AutoCloseableLock readLock = readLock()) {
       return cache.getEntries(startIndex, endIndex);
@@ -174,10 +179,10 @@ public class SegmentedRaftLog extends RaftLog {
   }
 
   @Override
-  public LogEntryProto getLastEntry() {
+  public TermIndex getLastEntryTermIndex() {
     checkLogState();
     try(AutoCloseableLock readLock = readLock()) {
-      return cache.getLastEntry();
+      return cache.getLastTermIndex();
     }
   }
 
@@ -209,10 +214,9 @@ public class SegmentedRaftLog extends RaftLog {
         cache.rollOpenSegment(true);
         fileLogWorker.rollLogSegment(currentOpenSegment);
       } else if (currentOpenSegment.numOfEntries() > 0 &&
-          currentOpenSegment.getLastRecord().entry.getTerm() != 
entry.getTerm()) {
+          currentOpenSegment.getLastTermIndex().getTerm() != entry.getTerm()) {
         // the term changes
-        final long currentTerm = currentOpenSegment.getLastRecord().entry
-            .getTerm();
+        final long currentTerm = 
currentOpenSegment.getLastTermIndex().getTerm();
         Preconditions.assertTrue(currentTerm < entry.getTerm(),
             "open segment's term %s is larger than the new entry's term %s",
             currentTerm, entry.getTerm());
@@ -313,6 +317,11 @@ public class SegmentedRaftLog extends RaftLog {
   }
 
   @Override
+  public boolean isConfigEntry(TermIndex ti) {
+    return cache.isConfigEntry(ti);
+  }
+
+  @Override
   public void close() throws IOException {
     super.close();
     fileLogWorker.close();

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/2373a4a0/ratis-server/src/main/java/org/apache/ratis/statemachine/BaseStateMachine.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/statemachine/BaseStateMachine.java
 
b/ratis-server/src/main/java/org/apache/ratis/statemachine/BaseStateMachine.java
index 2a845c1..9a0e999 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/statemachine/BaseStateMachine.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/statemachine/BaseStateMachine.java
@@ -86,12 +86,12 @@ public class BaseStateMachine implements StateMachine {
   }
 
   @Override
-  public TransactionContext applyTransactionSerial(TransactionContext trx) 
throws IOException {
+  public TransactionContext applyTransactionSerial(TransactionContext trx) {
     return trx;
   }
 
   @Override
-  public CompletableFuture<Message> applyTransaction(TransactionContext trx) 
throws IOException {
+  public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
     // return the same message contained in the entry
     Message msg = () -> trx.getLogEntry().get().getSmLogEntry().getData();
     return CompletableFuture.completedFuture(msg);

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/2373a4a0/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java 
b/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java
index 2649dcb..4dcac72 100644
--- a/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java
+++ b/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java
@@ -150,7 +150,7 @@ public interface StateMachine extends Closeable {
    *            of the raft peers
    * @return The Transaction context.
    */
-  TransactionContext applyTransactionSerial(TransactionContext trx) throws 
IOException;
+  TransactionContext applyTransactionSerial(TransactionContext trx);
 
   /**
    * Apply a committed log entry to the state machine. This method can be 
called concurrently with
@@ -160,7 +160,7 @@ public interface StateMachine extends Closeable {
    *            of the raft peers
    */
   // TODO: We do not need to return CompletableFuture
-  CompletableFuture<Message> applyTransaction(TransactionContext trx) throws 
IOException;
+  CompletableFuture<Message> applyTransaction(TransactionContext trx);
 
   /**
    * Notify the state machine that the raft peer is no longer leader.

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/2373a4a0/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java 
b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
index 9e3897d..1e41944 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
@@ -98,8 +98,9 @@ public abstract class RaftBasicTests {
     LOG.info(cluster.printAllLogs());
 
     cluster.getServers().stream().filter(RaftServerImpl::isAlive)
-        .map(s -> s.getState().getLog().getEntries(1, Long.MAX_VALUE))
-        .forEach(e -> RaftTestUtil.assertLogEntries(e, 1, term, messages));
+        .map(s -> s.getState().getLog())
+        .forEach(log -> RaftTestUtil.assertLogEntries(log,
+            log.getEntries(1, Long.MAX_VALUE), 1, term, messages));
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/2373a4a0/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java 
b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
index 202c618..76258e5 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
@@ -23,6 +23,8 @@ import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.server.impl.BlockRequestHandlingInjection;
 import org.apache.ratis.server.impl.DelayLocalExecutionInjection;
 import org.apache.ratis.server.impl.RaftServerImpl;
+import org.apache.ratis.server.protocol.TermIndex;
+import org.apache.ratis.server.storage.RaftLog;
 import org.apache.ratis.shaded.com.google.protobuf.ByteString;
 import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto;
 import org.apache.ratis.shaded.proto.RaftProtos.SMLogEntryProto;
@@ -45,6 +47,7 @@ import java.util.function.IntSupplier;
 import static org.apache.ratis.util.ProtoUtils.toByteString;
 
 public class RaftTestUtil {
+  public static final LogEntryProto[] EMPTY_LOGENTRY_ARRAY = new 
LogEntryProto[0];
   static final Logger LOG = LoggerFactory.getLogger(RaftTestUtil.class);
 
   public static RaftServerImpl waitForLeader(MiniRaftCluster cluster)
@@ -91,14 +94,16 @@ public class RaftTestUtil {
     return leader != null ? leader.getId().toString() : null;
   }
 
-  public static boolean logEntriesContains(LogEntryProto[] entries,
+  public static boolean logEntriesContains(RaftLog log,
       SimpleMessage... expectedMessages) {
     int idxEntries = 0;
     int idxExpected = 0;
-    while (idxEntries < entries.length
+    TermIndex[] termIndices = log.getEntries(0, Long.MAX_VALUE);
+    while (idxEntries < termIndices.length
         && idxExpected < expectedMessages.length) {
       if 
(Arrays.equals(expectedMessages[idxExpected].getContent().toByteArray(),
-          entries[idxEntries].getSmLogEntry().getData().toByteArray())) {
+          log.get(termIndices[idxEntries].getIndex()).getSmLogEntry()
+              .getData().toByteArray())) {
         ++idxExpected;
       }
       ++idxEntries;
@@ -111,8 +116,8 @@ public class RaftTestUtil {
     final int size = servers.size();
     final long count = servers.stream()
         .filter(RaftServerImpl::isAlive)
-        .map(s -> s.getState().getLog().getEntries(0, Long.MAX_VALUE))
-        .filter(e -> logEntriesContains(e, expectedMessages))
+        .map(s -> s.getState().getLog())
+        .filter(log -> logEntriesContains(log, expectedMessages))
         .count();
     if (2*count <= size) {
       throw new AssertionError("Not in majority: size=" + size
@@ -120,11 +125,11 @@ public class RaftTestUtil {
     }
   }
 
-  public static void assertLogEntries(LogEntryProto[] entries, long startIndex,
-      long expertedTerm, SimpleMessage... expectedMessages) {
+  public static void assertLogEntries(RaftLog log, TermIndex[] entries,
+      long startIndex, long expertedTerm, SimpleMessage... expectedMessages) {
     Assert.assertEquals(expectedMessages.length, entries.length);
     for(int i = 0; i < entries.length; i++) {
-      final LogEntryProto e = entries[i];
+      final LogEntryProto e = log.get(entries[i].getIndex());
       Assert.assertEquals(expertedTerm, e.getTerm());
       Assert.assertEquals(startIndex + i, e.getIndex());
       Assert.assertArrayEquals(expectedMessages[i].getContent().toByteArray(),

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/2373a4a0/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
 
b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
index 6884e40..524405a 100644
--- 
a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
+++ 
b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
@@ -50,6 +50,7 @@ import 
org.apache.ratis.protocol.ReconfigurationInProgressException;
 import org.apache.ratis.protocol.ReconfigurationTimeoutException;
 import org.apache.ratis.protocol.SetConfigurationRequest;
 import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.server.protocol.TermIndex;
 import org.apache.ratis.server.simulation.RequestHandler;
 import org.apache.ratis.server.storage.RaftLog;
 import org.apache.ratis.util.LogUtils;
@@ -558,8 +559,9 @@ public abstract class RaftReconfigurationBaseTest {
         Thread.sleep(500);
       }
       Assert.assertEquals(1, log.getLatestFlushedIndex());
+      TermIndex last = log.getLastEntryTermIndex();
       Assert.assertEquals(CONFIGURATIONENTRY,
-          log.getLastEntry().getLogEntryBodyCase());
+          log.get(last.getIndex()).getLogEntryBodyCase());
 
       // unblock the old leader
       
BlockRequestHandlingInjection.getInstance().unblockReplier(leaderId.toString());
@@ -575,8 +577,9 @@ public abstract class RaftReconfigurationBaseTest {
       boolean newState = false;
       for (int i = 0; i < 10 && !newState; i++) {
         Thread.sleep(500);
+        TermIndex lastTermIndex = log.getLastEntryTermIndex();
         newState = log.getLastCommittedIndex() == 1 &&
-            log.getLastEntry().getLogEntryBodyCase() != CONFIGURATIONENTRY;
+            log.get(lastTermIndex.getIndex()).getLogEntryBodyCase() != 
CONFIGURATIONENTRY;
       }
       Assert.assertTrue(newState);
     } finally {

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/2373a4a0/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogCache.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogCache.java
 
b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogCache.java
index 388c269..d9f7c10 100644
--- 
a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogCache.java
+++ 
b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogCache.java
@@ -21,6 +21,7 @@ import java.util.Iterator;
 
 import org.apache.ratis.RaftTestUtil.SimpleOperation;
 import org.apache.ratis.protocol.ClientId;
+import org.apache.ratis.server.protocol.TermIndex;
 import org.apache.ratis.server.storage.RaftLogCache.TruncationSegments;
 import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto;
 import org.apache.ratis.util.ProtoUtils;
@@ -74,7 +75,7 @@ public class TestRaftLogCache {
   }
 
   private void checkCacheEntries(long offset, int size, long end) {
-    LogEntryProto[] entries = cache.getEntries(offset, offset + size);
+    TermIndex[] entries = cache.getEntries(offset, offset + size);
     long realEnd = offset + size > end + 1 ? end + 1 : offset + size;
     Assert.assertEquals(realEnd - offset, entries.length);
     for (long i = offset; i < realEnd; i++) {

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/2373a4a0/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogSegment.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogSegment.java
 
b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogSegment.java
index 89f4c08..4e90d75 100644
--- 
a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogSegment.java
+++ 
b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestRaftLogSegment.java
@@ -95,11 +95,12 @@ public class TestRaftLogSegment {
     long offset = SegmentedRaftLog.HEADER_BYTES.length;
     for (long i = start; i <= end; i++) {
       LogSegment.LogRecord record = segment.getLogRecord(i);
-      Assert.assertEquals(i, record.entry.getIndex());
-      Assert.assertEquals(term, record.entry.getTerm());
-      Assert.assertEquals(offset, record.offset);
+      LogEntryProto entry = segment.getLogEntry(i);
+      Assert.assertEquals(i, entry.getIndex());
+      Assert.assertEquals(term, entry.getTerm());
+      Assert.assertEquals(offset, record.getOffset());
 
-      offset += getEntrySize(record.entry);
+      offset += getEntrySize(entry);
     }
   }
 
@@ -192,13 +193,13 @@ public class TestRaftLogSegment {
     }
 
     // truncate an open segment (remove 1080~1099)
-    long newSize = segment.getLogRecord(start + 80).offset;
+    long newSize = segment.getLogRecord(start + 80).getOffset();
     segment.truncate(start + 80);
     Assert.assertEquals(80, segment.numOfEntries());
     checkLogSegment(segment, start, start + 79, false, newSize, term);
 
     // truncate a closed segment (remove 1050~1079)
-    newSize = segment.getLogRecord(start + 50).offset;
+    newSize = segment.getLogRecord(start + 50).getOffset();
     segment.truncate(start + 50);
     Assert.assertEquals(50, segment.numOfEntries());
     checkLogSegment(segment, start, start + 49, false, newSize, term);

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/2373a4a0/ratis-server/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java
 
b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java
index ccf7690..afd7d29 100644
--- 
a/ratis-server/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java
+++ 
b/ratis-server/src/test/java/org/apache/ratis/server/storage/TestSegmentedRaftLog.java
@@ -18,21 +18,19 @@
 package org.apache.ratis.server.storage;
 
 import org.apache.log4j.Level;
-import org.apache.ratis.MiniRaftCluster;
 import org.apache.ratis.RaftTestUtil;
 import org.apache.ratis.RaftTestUtil.SimpleOperation;
 import org.apache.ratis.conf.RaftProperties;
-import org.apache.ratis.util.LogUtils;
-import org.apache.ratis.util.SizeInBytes;
 import org.apache.ratis.protocol.ClientId;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.server.RaftServerConfigKeys;
-import org.apache.ratis.server.impl.ConfigurationManager;
 import org.apache.ratis.server.impl.RaftServerConstants;
-import org.apache.ratis.server.impl.RaftServerTestUtil;
+import org.apache.ratis.server.protocol.TermIndex;
 import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto;
 import org.apache.ratis.util.FileUtils;
+import org.apache.ratis.util.LogUtils;
 import org.apache.ratis.util.ProtoUtils;
+import org.apache.ratis.util.SizeInBytes;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -45,6 +43,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.function.Supplier;
+import java.util.stream.Collectors;
 
 public class TestSegmentedRaftLog {
   static {
@@ -72,8 +71,6 @@ public class TestSegmentedRaftLog {
   private File storageDir;
   private RaftProperties properties;
   private RaftStorage storage;
-  private final ConfigurationManager cm = 
RaftServerTestUtil.newConfigurationManager(
-      MiniRaftCluster.initConfiguration(3));
 
   @Before
   public void setup() throws Exception {
@@ -123,6 +120,10 @@ public class TestSegmentedRaftLog {
     return list;
   }
 
+  private LogEntryProto getLastEntry(SegmentedRaftLog raftLog) {
+    return raftLog.get(raftLog.getLastEntryTermIndex().getIndex());
+  }
+
   @Test
   public void testLoadLogSegments() throws Exception {
     // first generate log files
@@ -132,15 +133,20 @@ public class TestSegmentedRaftLog {
     // create RaftLog object and load log file
     try (SegmentedRaftLog raftLog =
              new SegmentedRaftLog(peerId, null, storage, -1, properties)) {
-      raftLog.open(cm, RaftServerConstants.INVALID_LOG_INDEX);
+      raftLog.open(RaftServerConstants.INVALID_LOG_INDEX, null);
       // check if log entries are loaded correctly
       for (LogEntryProto e : entries) {
         LogEntryProto entry = raftLog.get(e.getIndex());
         Assert.assertEquals(e, entry);
       }
 
-      Assert.assertArrayEquals(entries, raftLog.getEntries(0, 500));
-      Assert.assertEquals(entries[entries.length - 1], raftLog.getLastEntry());
+      TermIndex[] termIndices = raftLog.getEntries(0, 500);
+      LogEntryProto[] entriesFromLog = Arrays.stream(termIndices)
+          .map(ti -> raftLog.get(ti.getIndex()))
+          .collect(Collectors.toList())
+          .toArray(RaftTestUtil.EMPTY_LOGENTRY_ARRAY);
+      Assert.assertArrayEquals(entries, entriesFromLog);
+      Assert.assertEquals(entries[entries.length - 1], getLastEntry(raftLog));
     }
   }
 
@@ -169,7 +175,7 @@ public class TestSegmentedRaftLog {
 
     try (SegmentedRaftLog raftLog =
              new SegmentedRaftLog(peerId, null, storage, -1, properties)) {
-      raftLog.open(cm, RaftServerConstants.INVALID_LOG_INDEX);
+      raftLog.open(RaftServerConstants.INVALID_LOG_INDEX, null);
       // append entries to the raftlog
       entries.forEach(raftLog::appendEntry);
       raftLog.logSync();
@@ -177,7 +183,7 @@ public class TestSegmentedRaftLog {
 
     try (SegmentedRaftLog raftLog =
              new SegmentedRaftLog(peerId, null, storage, -1, properties)) {
-      raftLog.open(cm, RaftServerConstants.INVALID_LOG_INDEX);
+      raftLog.open(RaftServerConstants.INVALID_LOG_INDEX, null);
       // check if the raft log is correct
       checkEntries(raftLog, entries, 0, entries.size());
     }
@@ -198,7 +204,7 @@ public class TestSegmentedRaftLog {
 
     try (SegmentedRaftLog raftLog =
              new SegmentedRaftLog(peerId, null, storage, -1, properties)) {
-      raftLog.open(cm, RaftServerConstants.INVALID_LOG_INDEX);
+      raftLog.open(RaftServerConstants.INVALID_LOG_INDEX, null);
       // append entries to the raftlog
       entries.forEach(raftLog::appendEntry);
       raftLog.logSync();
@@ -206,7 +212,7 @@ public class TestSegmentedRaftLog {
 
     try (SegmentedRaftLog raftLog =
              new SegmentedRaftLog(peerId, null, storage, -1, properties)) {
-      raftLog.open(cm, RaftServerConstants.INVALID_LOG_INDEX);
+      raftLog.open(RaftServerConstants.INVALID_LOG_INDEX, null);
       // check if the raft log is correct
       checkEntries(raftLog, entries, 0, entries.size());
       Assert.assertEquals(9, raftLog.getRaftLogCache().getNumOfSegments());
@@ -221,7 +227,7 @@ public class TestSegmentedRaftLog {
 
     try (SegmentedRaftLog raftLog =
              new SegmentedRaftLog(peerId, null, storage, -1, properties)) {
-      raftLog.open(cm, RaftServerConstants.INVALID_LOG_INDEX);
+      raftLog.open(RaftServerConstants.INVALID_LOG_INDEX, null);
       // append entries to the raftlog
       entries.forEach(raftLog::appendEntry);
       raftLog.logSync();
@@ -236,7 +242,7 @@ public class TestSegmentedRaftLog {
       throws Exception {
     try (SegmentedRaftLog raftLog =
              new SegmentedRaftLog(peerId, null, storage, -1, properties)) {
-      raftLog.open(cm, RaftServerConstants.INVALID_LOG_INDEX);
+      raftLog.open(RaftServerConstants.INVALID_LOG_INDEX, null);
       // truncate the log
       raftLog.truncate(fromIndex);
       raftLog.logSync();
@@ -246,13 +252,13 @@ public class TestSegmentedRaftLog {
 
     try (SegmentedRaftLog raftLog =
              new SegmentedRaftLog(peerId, null, storage, -1, properties)) {
-      raftLog.open(cm, RaftServerConstants.INVALID_LOG_INDEX);
+      raftLog.open(RaftServerConstants.INVALID_LOG_INDEX, null);
       // check if the raft log is correct
       if (fromIndex > 0) {
         Assert.assertEquals(entries.get((int) (fromIndex - 1)),
-            raftLog.getLastEntry());
+            getLastEntry(raftLog));
       } else {
-        Assert.assertNull(raftLog.getLastEntry());
+        Assert.assertNull(raftLog.getLastEntryTermIndex());
       }
       checkEntries(raftLog, entries, 0, (int) fromIndex);
     }
@@ -265,11 +271,15 @@ public class TestSegmentedRaftLog {
         LogEntryProto entry = raftLog.get(expected.get(i).getIndex());
         Assert.assertEquals(expected.get(i), entry);
       }
-      LogEntryProto[] entriesFromLog = raftLog.getEntries(
+      TermIndex[] termIndices = raftLog.getEntries(
           expected.get(offset).getIndex(),
           expected.get(offset + size - 1).getIndex() + 1);
+      LogEntryProto[] entriesFromLog = Arrays.stream(termIndices)
+          .map(ti -> raftLog.get(ti.getIndex()))
+          .collect(Collectors.toList())
+          .toArray(RaftTestUtil.EMPTY_LOGENTRY_ARRAY);
       LogEntryProto[] expectedArray = expected.subList(offset, offset + size)
-          .toArray(SegmentedRaftLog.EMPTY_LOGENTRY_ARRAY);
+          .toArray(RaftTestUtil.EMPTY_LOGENTRY_ARRAY);
       Assert.assertArrayEquals(expectedArray, entriesFromLog);
     }
   }
@@ -285,7 +295,7 @@ public class TestSegmentedRaftLog {
 
     try (SegmentedRaftLog raftLog =
              new SegmentedRaftLog(peerId, null, storage, -1, properties)) {
-      raftLog.open(cm, RaftServerConstants.INVALID_LOG_INDEX);
+      raftLog.open(RaftServerConstants.INVALID_LOG_INDEX, null);
       // append entries to the raftlog
       entries.forEach(raftLog::appendEntry);
       raftLog.logSync();
@@ -301,14 +311,14 @@ public class TestSegmentedRaftLog {
 
     try (SegmentedRaftLog raftLog =
              new SegmentedRaftLog(peerId, null, storage, -1, properties)) {
-      raftLog.open(cm, RaftServerConstants.INVALID_LOG_INDEX);
+      raftLog.open(RaftServerConstants.INVALID_LOG_INDEX, null);
       raftLog.append(newEntries.toArray(new LogEntryProto[newEntries.size()]));
       raftLog.logSync();
 
       checkEntries(raftLog, entries, 0, 650);
       checkEntries(raftLog, newEntries, 100, 100);
       Assert.assertEquals(newEntries.get(newEntries.size() - 1),
-          raftLog.getLastEntry());
+          getLastEntry(raftLog));
       Assert.assertEquals(newEntries.get(newEntries.size() - 1).getIndex(),
           raftLog.getLatestFlushedIndex());
     }
@@ -316,11 +326,11 @@ public class TestSegmentedRaftLog {
     // load the raftlog again and check
     try (SegmentedRaftLog raftLog =
              new SegmentedRaftLog(peerId, null, storage, -1, properties)) {
-      raftLog.open(cm, RaftServerConstants.INVALID_LOG_INDEX);
+      raftLog.open(RaftServerConstants.INVALID_LOG_INDEX, null);
       checkEntries(raftLog, entries, 0, 650);
       checkEntries(raftLog, newEntries, 100, 100);
       Assert.assertEquals(newEntries.get(newEntries.size() - 1),
-          raftLog.getLastEntry());
+          getLastEntry(raftLog));
       Assert.assertEquals(newEntries.get(newEntries.size() - 1).getIndex(),
           raftLog.getLatestFlushedIndex());
 

Reply via email to