http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
new file mode 100644
index 0000000..d079abf
--- /dev/null
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -0,0 +1,832 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.impl;
+
+import static org.apache.ratis.util.LifeCycle.State.CLOSED;
+import static org.apache.ratis.util.LifeCycle.State.CLOSING;
+import static org.apache.ratis.util.LifeCycle.State.RUNNING;
+import static org.apache.ratis.util.LifeCycle.State.STARTING;
+
+import static 
org.apache.ratis.shaded.proto.RaftProtos.AppendEntriesReplyProto.AppendResult.*;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.OptionalLong;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.NotLeaderException;
+import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.RaftClientRequest;
+import org.apache.ratis.protocol.RaftException;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.protocol.ReconfigurationInProgressException;
+import org.apache.ratis.protocol.SetConfigurationRequest;
+import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.server.RaftServerRpc;
+import org.apache.ratis.server.protocol.TermIndex;
+import org.apache.ratis.server.storage.FileInfo;
+import org.apache.ratis.shaded.proto.RaftProtos.AppendEntriesReplyProto;
+import org.apache.ratis.shaded.proto.RaftProtos.AppendEntriesRequestProto;
+import org.apache.ratis.shaded.proto.RaftProtos.FileChunkProto;
+import org.apache.ratis.shaded.proto.RaftProtos.InstallSnapshotReplyProto;
+import org.apache.ratis.shaded.proto.RaftProtos.InstallSnapshotRequestProto;
+import org.apache.ratis.shaded.proto.RaftProtos.InstallSnapshotResult;
+import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto;
+import org.apache.ratis.shaded.proto.RaftProtos.RequestVoteReplyProto;
+import org.apache.ratis.shaded.proto.RaftProtos.RequestVoteRequestProto;
+import org.apache.ratis.statemachine.SnapshotInfo;
+import org.apache.ratis.statemachine.StateMachine;
+import org.apache.ratis.statemachine.TransactionContext;
+import org.apache.ratis.util.CodeInjectionForTesting;
+import org.apache.ratis.util.LifeCycle;
+import org.apache.ratis.util.ProtoUtils;
+import org.apache.ratis.util.RaftUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+
+public class RaftServerImpl implements RaftServer {
+  public static final Logger LOG = 
LoggerFactory.getLogger(RaftServerImpl.class);
+
+  private static final String CLASS_NAME = 
RaftServerImpl.class.getSimpleName();
+  static final String REQUEST_VOTE = CLASS_NAME + ".requestVote";
+  static final String APPEND_ENTRIES = CLASS_NAME + ".appendEntries";
+  static final String INSTALL_SNAPSHOT = CLASS_NAME + ".installSnapshot";
+
+
+  /** Role of raft peer */
+  enum Role {
+    LEADER, CANDIDATE, FOLLOWER
+  }
+
+  private final int minTimeoutMs;
+  private final int maxTimeoutMs;
+
+  private final LifeCycle lifeCycle;
+  private final ServerState state;
+  private final StateMachine stateMachine;
+  private final RaftProperties properties;
+  private volatile Role role;
+
+  /** used when the peer is follower, to monitor election timeout */
+  private volatile FollowerState heartbeatMonitor;
+
+  /** used when the peer is candidate, to request votes from other peers */
+  private volatile LeaderElection electionDaemon;
+
+  /** used when the peer is leader */
+  private volatile LeaderState leaderState;
+
+  private RaftServerRpc serverRpc;
+
+  private final LogAppenderFactory appenderFactory;
+
+  public RaftServerImpl(String id, RaftConfiguration raftConf,
+                        RaftProperties properties, StateMachine stateMachine) 
throws IOException {
+    this.lifeCycle = new LifeCycle(id);
+    minTimeoutMs = properties.getInt(
+        RaftServerConfigKeys.RAFT_SERVER_RPC_TIMEOUT_MIN_MS_KEY,
+        RaftServerConfigKeys.RAFT_SERVER_RPC_TIMEOUT_MIN_MS_DEFAULT);
+    maxTimeoutMs = properties.getInt(
+        RaftServerConfigKeys.RAFT_SERVER_RPC_TIMEOUT_MAX_MS_KEY,
+        RaftServerConfigKeys.RAFT_SERVER_RPC_TIMEOUT_MAX_MS_DEFAULT);
+    Preconditions.checkArgument(maxTimeoutMs > minTimeoutMs,
+        "max timeout: %s, min timeout: %s", maxTimeoutMs, minTimeoutMs);
+    this.properties = properties;
+    this.stateMachine = stateMachine;
+    this.state = new ServerState(id, raftConf, properties, this, stateMachine);
+    appenderFactory = initAppenderFactory();
+  }
+
+  int getMinTimeoutMs() {
+    return minTimeoutMs;
+  }
+
+  int getMaxTimeoutMs() {
+    return maxTimeoutMs;
+  }
+
+  int getRandomTimeoutMs() {
+    return RaftUtils.getRandomBetween(minTimeoutMs, maxTimeoutMs);
+  }
+
+  @Override
+  public StateMachine getStateMachine() {
+    return this.stateMachine;
+  }
+
+  public LogAppenderFactory getLogAppenderFactory() {
+    return appenderFactory;
+  }
+
+  private LogAppenderFactory initAppenderFactory() {
+    Class<? extends LogAppenderFactory> factoryClass = properties.getClass(
+        RaftServerConfigKeys.RAFT_SERVER_LOG_APPENDER_FACTORY_CLASS_KEY,
+        RaftServerConfigKeys.RAFT_SERVER_LOG_APPENDER_FACTORY_CLASS_DEFAULT,
+        LogAppenderFactory.class);
+    return RaftUtils.newInstance(factoryClass);
+  }
+
+  /**
+   * Used by tests to set initial raft configuration with correct port 
bindings.
+   */
+  @VisibleForTesting
+  public void setInitialConf(RaftConfiguration conf) {
+    this.state.setInitialConf(conf);
+  }
+
+  @Override
+  public void setServerRpc(RaftServerRpc serverRpc) {
+    this.serverRpc = serverRpc;
+    // add peers into rpc service
+    RaftConfiguration conf = getRaftConf();
+    if (conf != null) {
+      serverRpc.addPeers(conf.getPeers());
+    }
+  }
+
+  public RaftServerRpc getServerRpc() {
+    return serverRpc;
+  }
+
+  @Override
+  public void start() {
+    lifeCycle.transition(STARTING);
+    state.start();
+    RaftConfiguration conf = getRaftConf();
+    if (conf != null && conf.contains(getId())) {
+      LOG.debug("{} starts as a follower", getId());
+      startAsFollower();
+    } else {
+      LOG.debug("{} starts with initializing state", getId());
+      startInitializing();
+    }
+  }
+
+  /**
+   * The peer belongs to the current configuration, should start as a follower
+   */
+  private void startAsFollower() {
+    role = Role.FOLLOWER;
+    heartbeatMonitor = new FollowerState(this);
+    heartbeatMonitor.start();
+
+    serverRpc.start();
+    lifeCycle.transition(RUNNING);
+  }
+
+  /**
+   * The peer does not have any configuration (maybe it will later be included
+   * in some configuration). Start still as a follower but will not vote or
+   * start election.
+   */
+  private void startInitializing() {
+    role = Role.FOLLOWER;
+    // do not start heartbeatMonitoring
+    serverRpc.start();
+  }
+
+  public ServerState getState() {
+    return this.state;
+  }
+
+  @Override
+  public String getId() {
+    return getState().getSelfId();
+  }
+
+  RaftConfiguration getRaftConf() {
+    return getState().getRaftConf();
+  }
+
+  @Override
+  public void close() {
+    lifeCycle.checkStateAndClose(() -> {
+      try {
+        shutdownHeartbeatMonitor();
+        shutdownElectionDaemon();
+        shutdownLeaderState();
+
+        serverRpc.close();
+        state.close();
+      } catch (Exception ignored) {
+        LOG.warn("Failed to kill " + state.getSelfId(), ignored);
+      }
+    });
+  }
+
+  @VisibleForTesting
+  public boolean isAlive() {
+    return !lifeCycle.getCurrentState().isOneOf(CLOSING, CLOSED);
+  }
+
+  public boolean isFollower() {
+    return role == Role.FOLLOWER;
+  }
+
+  public boolean isCandidate() {
+    return role == Role.CANDIDATE;
+  }
+
+  public boolean isLeader() {
+    return role == Role.LEADER;
+  }
+
+  /**
+   * Change the server state to Follower if necessary
+   * @param newTerm The new term.
+   * @param sync We will call {@link ServerState#persistMetadata()} if this is
+   *             set to true and term/votedFor get updated.
+   * @return if the term/votedFor should be updated to the new term
+   * @throws IOException if term/votedFor persistence failed.
+   */
+  synchronized boolean changeToFollower(long newTerm, boolean sync)
+      throws IOException {
+    final Role old = role;
+    role = Role.FOLLOWER;
+
+    boolean metadataUpdated = false;
+    if (newTerm > state.getCurrentTerm()) {
+      state.setCurrentTerm(newTerm);
+      state.resetLeaderAndVotedFor();
+      metadataUpdated = true;
+    }
+
+    if (old == Role.LEADER) {
+      assert leaderState != null;
+      shutdownLeaderState();
+    } else if (old == Role.CANDIDATE) {
+      shutdownElectionDaemon();
+    }
+
+    if (old != Role.FOLLOWER) {
+      heartbeatMonitor = new FollowerState(this);
+      heartbeatMonitor.start();
+    }
+
+    if (metadataUpdated && sync) {
+      state.persistMetadata();
+    }
+    return metadataUpdated;
+  }
+
+  private synchronized void shutdownLeaderState() {
+    final LeaderState leader = leaderState;
+    if (leader != null) {
+      leader.stop();
+    }
+    leaderState = null;
+    // TODO: make sure that StateMachineUpdater has applied all transactions 
that have context
+  }
+
+  private void shutdownElectionDaemon() {
+    final LeaderElection election = electionDaemon;
+    if (election != null) {
+      election.stopRunning();
+      // no need to interrupt the election thread
+    }
+    electionDaemon = null;
+  }
+
+  synchronized void changeToLeader() {
+    Preconditions.checkState(isCandidate());
+    shutdownElectionDaemon();
+    role = Role.LEADER;
+    state.becomeLeader();
+    // start sending AppendEntries RPC to followers
+    leaderState = new LeaderState(this, properties);
+    leaderState.start();
+  }
+
+  private void shutdownHeartbeatMonitor() {
+    final FollowerState hm = heartbeatMonitor;
+    if (hm != null) {
+      hm.stopRunning();
+      hm.interrupt();
+    }
+    heartbeatMonitor = null;
+  }
+
+  synchronized void changeToCandidate() {
+    Preconditions.checkState(isFollower());
+    shutdownHeartbeatMonitor();
+    role = Role.CANDIDATE;
+    // start election
+    electionDaemon = new LeaderElection(this);
+    electionDaemon.start();
+  }
+
+  @Override
+  public String toString() {
+    return role + " " + state + " " + lifeCycle.getCurrentState();
+  }
+
+  /**
+   * @return null if the server is in leader state.
+   */
+  private CompletableFuture<RaftClientReply> checkLeaderState(
+      RaftClientRequest request) {
+    if (!isLeader()) {
+      NotLeaderException exception = generateNotLeaderException();
+      CompletableFuture<RaftClientReply> future = new CompletableFuture<>();
+      future.complete(new RaftClientReply(request, exception));
+      return future;
+    }
+    return null;
+  }
+
+  NotLeaderException generateNotLeaderException() {
+    if (lifeCycle.getCurrentState() != RUNNING) {
+      return new NotLeaderException(getId(), null, null);
+    }
+    String leaderId = state.getLeaderId();
+    if (leaderId == null || leaderId.equals(state.getSelfId())) {
+      // No idea about who is the current leader. Or the peer is the current
+      // leader, but it is about to step down
+      RaftPeer suggestedLeader = state.getRaftConf()
+          .getRandomPeer(state.getSelfId());
+      leaderId = suggestedLeader == null ? null : suggestedLeader.getId();
+    }
+    RaftConfiguration conf = getRaftConf();
+    Collection<RaftPeer> peers = conf.getPeers();
+    return new NotLeaderException(getId(), conf.getPeer(leaderId),
+        peers.toArray(new RaftPeer[peers.size()]));
+  }
+
+  /**
+   * Handle a normal update request from client.
+   */
+  private CompletableFuture<RaftClientReply> appendTransaction(
+      RaftClientRequest request, TransactionContext entry)
+      throws RaftException {
+    LOG.debug("{}: receive client request({})", getId(), request);
+    lifeCycle.assertCurrentState(RUNNING);
+    CompletableFuture<RaftClientReply> reply;
+
+    final PendingRequest pending;
+    synchronized (this) {
+      reply = checkLeaderState(request);
+      if (reply != null) {
+        return reply;
+      }
+
+      // append the message to its local log
+      final long entryIndex;
+      try {
+        entryIndex = state.applyLog(entry);
+      } catch (IOException e) {
+        throw new RaftException(e);
+      }
+
+      // put the request into the pending queue
+      pending = leaderState.addPendingRequest(entryIndex, request, entry);
+      leaderState.notifySenders();
+    }
+    return pending.getFuture();
+  }
+
+  @Override
+  public CompletableFuture<RaftClientReply> submitClientRequestAsync(
+      RaftClientRequest request) throws IOException {
+    // first check the server's leader state
+    CompletableFuture<RaftClientReply> reply = checkLeaderState(request);
+    if (reply != null) {
+      return reply;
+    }
+
+    // let the state machine handle read-only request from client
+    if (request.isReadOnly()) {
+      // TODO: We might not be the leader anymore by the time this completes. 
See the RAFT paper,
+      // section 8 (last part)
+      return stateMachine.query(request);
+    }
+
+    // TODO: this client request will not be added to pending requests
+    // until later which means that any failure in between will leave partial 
state in the
+    // state machine. We should call cancelTransaction() for failed requests
+    TransactionContext entry = stateMachine.startTransaction(request);
+    if (entry.getException().isPresent()) {
+      throw RaftUtils.asIOException(entry.getException().get());
+    }
+
+    return appendTransaction(request, entry);
+  }
+
+  @Override
+  public RaftClientReply submitClientRequest(RaftClientRequest request)
+      throws IOException {
+    return waitForReply(getId(), request, submitClientRequestAsync(request));
+  }
+
+  private static RaftClientReply waitForReply(String id, RaftClientRequest 
request,
+      CompletableFuture<RaftClientReply> future) throws IOException {
+    try {
+      return future.get();
+    } catch (InterruptedException e) {
+      final String s = id + ": Interrupted when waiting for reply, request=" + 
request;
+      LOG.info(s, e);
+      throw RaftUtils.toInterruptedIOException(s, e);
+    } catch (ExecutionException e) {
+      final Throwable cause = e.getCause();
+      if (cause == null) {
+        throw new IOException(e);
+      }
+      if (cause instanceof NotLeaderException) {
+        return new RaftClientReply(request, (NotLeaderException)cause);
+      } else {
+        throw RaftUtils.asIOException(cause);
+      }
+    }
+  }
+
+  @Override
+  public RaftClientReply setConfiguration(SetConfigurationRequest request)
+      throws IOException {
+    return waitForReply(getId(), request, setConfigurationAsync(request));
+  }
+
+  /**
+   * Handle a raft configuration change request from client.
+   */
+  @Override
+  public CompletableFuture<RaftClientReply> setConfigurationAsync(
+      SetConfigurationRequest request) throws IOException {
+    LOG.debug("{}: receive setConfiguration({})", getId(), request);
+    lifeCycle.assertCurrentState(RUNNING);
+    CompletableFuture<RaftClientReply> reply = checkLeaderState(request);
+    if (reply != null) {
+      return reply;
+    }
+
+    final RaftPeer[] peersInNewConf = request.getPeersInNewConf();
+    final PendingRequest pending;
+    synchronized (this) {
+      reply = checkLeaderState(request);
+      if (reply != null) {
+        return reply;
+      }
+
+      final RaftConfiguration current = getRaftConf();
+      // make sure there is no other raft reconfiguration in progress
+      if (!current.isStable() || leaderState.inStagingState() ||
+          !state.isCurrentConfCommitted()) {
+        throw new ReconfigurationInProgressException(
+            "Reconfiguration is already in progress: " + current);
+      }
+
+      // return true if the new configuration is the same with the current one
+      if (current.hasNoChange(peersInNewConf)) {
+        pending = leaderState.returnNoConfChange(request);
+        return pending.getFuture();
+      }
+
+      // add new peers into the rpc service
+      getServerRpc().addPeers(Arrays.asList(peersInNewConf));
+      // add staging state into the leaderState
+      pending = leaderState.startSetConfiguration(request);
+    }
+    return pending.getFuture();
+  }
+
+  private boolean shouldWithholdVotes() {
+    return isLeader() || (isFollower() && state.hasLeader()
+        && heartbeatMonitor.shouldWithholdVotes());
+  }
+
+  /**
+   * check if the remote peer is not included in the current conf
+   * and should shutdown. should shutdown if all the following stands:
+   * 1. this is a leader
+   * 2. current conf is stable and has been committed
+   * 3. candidate id is not included in conf
+   * 4. candidate's last entry's index < conf's index
+   */
+  private boolean shouldSendShutdown(String candidateId,
+      TermIndex candidateLastEntry) {
+    return isLeader()
+        && getRaftConf().isStable()
+        && getState().isConfCommitted()
+        && !getRaftConf().containsInConf(candidateId)
+        && candidateLastEntry.getIndex() < getRaftConf().getLogEntryIndex()
+        && !leaderState.isBootStrappingPeer(candidateId);
+  }
+
+  @Override
+  public RequestVoteReplyProto requestVote(RequestVoteRequestProto r)
+      throws IOException {
+    final String candidateId = r.getServerRequest().getRequestorId();
+    return requestVote(candidateId, r.getCandidateTerm(),
+        ServerProtoUtils.toTermIndex(r.getCandidateLastEntry()));
+  }
+
+  private RequestVoteReplyProto requestVote(String candidateId,
+      long candidateTerm, TermIndex candidateLastEntry) throws IOException {
+    CodeInjectionForTesting.execute(REQUEST_VOTE, getId(),
+        candidateId, candidateTerm, candidateLastEntry);
+    LOG.debug("{}: receive requestVote({}, {}, {})",
+        getId(), candidateId, candidateTerm, candidateLastEntry);
+    lifeCycle.assertCurrentState(RUNNING);
+
+    boolean voteGranted = false;
+    boolean shouldShutdown = false;
+    final RequestVoteReplyProto reply;
+    synchronized (this) {
+      if (shouldWithholdVotes()) {
+        LOG.info("{} Withhold vote from server {} with term {}. " +
+            "This server:{}, last rpc time from leader {} is {}", getId(),
+            candidateId, candidateTerm, this, this.getState().getLeaderId(),
+            (isFollower() ? heartbeatMonitor.getLastRpcTime() : -1));
+      } else if (state.recognizeCandidate(candidateId, candidateTerm)) {
+        boolean termUpdated = changeToFollower(candidateTerm, false);
+        // see Section 5.4.1 Election restriction
+        if (state.isLogUpToDate(candidateLastEntry)) {
+          heartbeatMonitor.updateLastRpcTime(false);
+          state.grantVote(candidateId);
+          voteGranted = true;
+        }
+        if (termUpdated || voteGranted) {
+          state.persistMetadata(); // sync metafile
+        }
+      }
+      if (!voteGranted && shouldSendShutdown(candidateId, candidateLastEntry)) 
{
+        shouldShutdown = true;
+      }
+      reply = ServerProtoUtils.toRequestVoteReplyProto(candidateId, getId(),
+          voteGranted, state.getCurrentTerm(), shouldShutdown);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("{} replies to vote request: {}. Peer's state: {}",
+            getId(), ProtoUtils.toString(reply), state);
+      }
+    }
+    return reply;
+  }
+
+  private void validateEntries(long expectedTerm, TermIndex previous,
+      LogEntryProto... entries) {
+    if (entries != null && entries.length > 0) {
+      final long index0 = entries[0].getIndex();
+
+      if (previous == null || previous.getTerm() == 0) {
+        Preconditions.checkArgument(index0 == 0,
+            "Unexpected Index: previous is null but entries[%s].getIndex()=%s",
+            0, index0);
+      } else {
+        Preconditions.checkArgument(previous.getIndex() == index0 - 1,
+            "Unexpected Index: previous is %s but entries[%s].getIndex()=%s",
+            previous, 0, index0);
+      }
+
+      for (int i = 0; i < entries.length; i++) {
+        final long t = entries[i].getTerm();
+        Preconditions.checkArgument(expectedTerm >= t,
+            "Unexpected Term: entries[%s].getTerm()=%s but expectedTerm=%s",
+            i, t, expectedTerm);
+
+        final long indexi = entries[i].getIndex();
+        Preconditions.checkArgument(indexi == index0 + i,
+            "Unexpected Index: entries[%s].getIndex()=%s but 
entries[0].getIndex()=%s",
+            i, indexi, index0);
+      }
+    }
+  }
+
+  @Override
+  public AppendEntriesReplyProto appendEntries(AppendEntriesRequestProto r)
+      throws IOException {
+    // TODO avoid converting list to array
+    final LogEntryProto[] entries = r.getEntriesList()
+        .toArray(new LogEntryProto[r.getEntriesCount()]);
+    final TermIndex previous = r.hasPreviousLog() ?
+        ServerProtoUtils.toTermIndex(r.getPreviousLog()) : null;
+    return appendEntries(r.getServerRequest().getRequestorId(),
+        r.getLeaderTerm(), previous, r.getLeaderCommit(), r.getInitializing(),
+        entries);
+  }
+
+  private AppendEntriesReplyProto appendEntries(String leaderId, long 
leaderTerm,
+      TermIndex previous, long leaderCommit, boolean initializing,
+      LogEntryProto... entries) throws IOException {
+    CodeInjectionForTesting.execute(APPEND_ENTRIES, getId(),
+        leaderId, leaderTerm, previous, leaderCommit, initializing, entries);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("{}: receive appendEntries({}, {}, {}, {}, {}, {})", getId(),
+          leaderId, leaderTerm, previous, leaderCommit, initializing,
+          ServerProtoUtils.toString(entries));
+    }
+    lifeCycle.assertCurrentState(STARTING, RUNNING);
+
+    try {
+      validateEntries(leaderTerm, previous, entries);
+    } catch (IllegalArgumentException e) {
+      throw new IOException(e);
+    }
+
+    final long currentTerm;
+    long nextIndex = state.getLog().getNextIndex();
+    synchronized (this) {
+      final boolean recognized = state.recognizeLeader(leaderId, leaderTerm);
+      currentTerm = state.getCurrentTerm();
+      if (!recognized) {
+        final AppendEntriesReplyProto reply = 
ServerProtoUtils.toAppendEntriesReplyProto(
+            leaderId, getId(), currentTerm, nextIndex, NOT_LEADER);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("{}: do not recognize leader. Reply: {}",
+              getId(), ProtoUtils.toString(reply));
+        }
+        return reply;
+      }
+      changeToFollower(leaderTerm, true);
+      state.setLeader(leaderId);
+
+      if (!initializing && lifeCycle.compareAndTransition(STARTING, RUNNING)) {
+        heartbeatMonitor = new FollowerState(this);
+        heartbeatMonitor.start();
+      }
+      if (lifeCycle.getCurrentState() == RUNNING) {
+        heartbeatMonitor.updateLastRpcTime(true);
+      }
+
+      // We need to check if "previous" is in the local peer. Note that it is
+      // possible that "previous" is covered by the latest snapshot: e.g.,
+      // it's possible there's no log entries outside of the latest snapshot.
+      // However, it is not possible that "previous" index is smaller than the
+      // last index included in snapshot. This is because indices <= snapshot's
+      // last index should have been committed.
+      if (previous != null && !containPrevious(previous)) {
+        final AppendEntriesReplyProto reply =
+            ServerProtoUtils.toAppendEntriesReplyProto(leaderId, getId(),
+                currentTerm, Math.min(nextIndex, previous.getIndex()), 
INCONSISTENCY);
+        LOG.debug("{}: inconsistency entries. Leader previous:{}, Reply:{}",
+            getId(), previous, ServerProtoUtils.toString(reply));
+        return reply;
+      }
+
+      state.getLog().append(entries);
+      state.updateConfiguration(entries);
+      state.updateStatemachine(leaderCommit, currentTerm);
+    }
+    if (entries != null && entries.length > 0) {
+      try {
+        state.getLog().logSync();
+      } catch (InterruptedException e) {
+        throw new InterruptedIOException("logSync got interrupted");
+      }
+      nextIndex = entries[entries.length - 1].getIndex() + 1;
+    }
+    synchronized (this) {
+      if (lifeCycle.getCurrentState() == RUNNING && isFollower()
+          && getState().getCurrentTerm() == currentTerm) {
+        // reset election timer to avoid punishing the leader for our own
+        // long disk writes
+        heartbeatMonitor.updateLastRpcTime(false);
+      }
+    }
+    final AppendEntriesReplyProto reply = 
ServerProtoUtils.toAppendEntriesReplyProto(
+        leaderId, getId(), currentTerm, nextIndex, SUCCESS);
+    LOG.debug("{}: succeeded to handle AppendEntries. Reply: {}", getId(),
+        ServerProtoUtils.toString(reply));
+    return reply;
+  }
+
+  private boolean containPrevious(TermIndex previous) {
+    LOG.debug("{}: prev:{}, latestSnapshot:{}, getLatestInstalledSnapshot:{}",
+        getId(), previous, state.getLatestSnapshot(), 
state.getLatestInstalledSnapshot());
+    return state.getLog().contains(previous)
+        ||  (state.getLatestSnapshot() != null
+             && state.getLatestSnapshot().getTermIndex().equals(previous))
+        || (state.getLatestInstalledSnapshot() != null)
+             && state.getLatestInstalledSnapshot().equals(previous);
+  }
+
+  @Override
+  public InstallSnapshotReplyProto installSnapshot(
+      InstallSnapshotRequestProto request) throws IOException {
+    final String leaderId = request.getServerRequest().getRequestorId();
+    CodeInjectionForTesting.execute(INSTALL_SNAPSHOT, getId(), leaderId, 
request);
+    LOG.debug("{}: receive installSnapshot({})", getId(), request);
+
+    lifeCycle.assertCurrentState(STARTING, RUNNING);
+
+    final long currentTerm;
+    final long leaderTerm = request.getLeaderTerm();
+    final TermIndex lastTermIndex = ServerProtoUtils.toTermIndex(
+        request.getTermIndex());
+    final long lastIncludedIndex = lastTermIndex.getIndex();
+    synchronized (this) {
+      final boolean recognized = state.recognizeLeader(leaderId, leaderTerm);
+      currentTerm = state.getCurrentTerm();
+      if (!recognized) {
+        final InstallSnapshotReplyProto reply = ServerProtoUtils
+            .toInstallSnapshotReplyProto(leaderId, getId(), currentTerm,
+                request.getRequestIndex(), InstallSnapshotResult.NOT_LEADER);
+        LOG.debug("{}: do not recognize leader for installing snapshot." +
+            " Reply: {}", getId(), reply);
+        return reply;
+      }
+      changeToFollower(leaderTerm, true);
+      state.setLeader(leaderId);
+
+      if (lifeCycle.getCurrentState() == RUNNING) {
+        heartbeatMonitor.updateLastRpcTime(true);
+      }
+
+      // Check and append the snapshot chunk. We simply put this in lock
+      // considering a follower peer requiring a snapshot installation does not
+      // have a lot of requests
+      Preconditions.checkState(
+          state.getLog().getNextIndex() <= lastIncludedIndex,
+          "%s log's next id is %s, last included index in snapshot is %s",
+          getId(),  state.getLog().getNextIndex(), lastIncludedIndex);
+
+      //TODO: We should only update State with installed snapshot once the 
request is done.
+      state.installSnapshot(request);
+
+      // update the committed index
+      // re-load the state machine if this is the last chunk
+      if (request.getDone()) {
+        state.reloadStateMachine(lastIncludedIndex, leaderTerm);
+      }
+      if (lifeCycle.getCurrentState() == RUNNING) {
+        heartbeatMonitor.updateLastRpcTime(false);
+      }
+    }
+    if (request.getDone()) {
+      LOG.info("{}: successfully install the whole snapshot-{}", getId(),
+          lastIncludedIndex);
+    }
+    return ServerProtoUtils.toInstallSnapshotReplyProto(leaderId, getId(),
+        currentTerm, request.getRequestIndex(), InstallSnapshotResult.SUCCESS);
+  }
+
+  AppendEntriesRequestProto createAppendEntriesRequest(long leaderTerm,
+      String targetId, TermIndex previous, List<LogEntryProto> entries,
+      boolean initializing) {
+    return ServerProtoUtils.toAppendEntriesRequestProto(getId(), targetId,
+        leaderTerm, entries, state.getLog().getLastCommittedIndex(),
+        initializing, previous);
+  }
+
+  synchronized InstallSnapshotRequestProto createInstallSnapshotRequest(
+      String targetId, String requestId, int requestIndex, SnapshotInfo 
snapshot,
+      List<FileChunkProto> chunks, boolean done) {
+    OptionalLong totalSize = snapshot.getFiles().stream()
+        .mapToLong(FileInfo::getFileSize).reduce(Long::sum);
+    assert totalSize.isPresent();
+    return ServerProtoUtils.toInstallSnapshotRequestProto(getId(), targetId,
+        requestId, requestIndex, state.getCurrentTerm(), 
snapshot.getTermIndex(),
+        chunks, totalSize.getAsLong(), done);
+  }
+
+  synchronized RequestVoteRequestProto createRequestVoteRequest(String 
targetId,
+      long term, TermIndex lastEntry) {
+    return ServerProtoUtils.toRequestVoteRequestProto(getId(), targetId, term,
+        lastEntry);
+  }
+
+  public synchronized void submitLocalSyncEvent() {
+    if (isLeader() && leaderState != null) {
+      leaderState.submitUpdateStateEvent(LeaderState.UPDATE_COMMIT_EVENT);
+    }
+  }
+
+  synchronized void replyPendingRequest(long logIndex,
+      CompletableFuture<Message> message) {
+    if (isLeader() && leaderState != null) { // is leader and is running
+      leaderState.replyPendingRequest(logIndex, message);
+    }
+  }
+
+  TransactionContext getTransactionContext(long index) {
+    if (leaderState != null) { // is leader and is running
+      return leaderState.getTransactionContext(index);
+    }
+    return null;
+  }
+
+  public RaftProperties getProperties() {
+    return this.properties;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
new file mode 100644
index 0000000..4d4371d
--- /dev/null
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
@@ -0,0 +1,191 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.impl;
+
+import static org.apache.ratis.server.impl.RaftServerConstants.DEFAULT_SEQNUM;
+import static 
org.apache.ratis.shaded.proto.RaftProtos.AppendEntriesReplyProto.AppendResult.SUCCESS;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import org.apache.ratis.client.impl.ClientProtoUtils;
+import org.apache.ratis.server.protocol.TermIndex;
+import org.apache.ratis.shaded.proto.RaftProtos.AppendEntriesReplyProto;
+import org.apache.ratis.shaded.proto.RaftProtos.AppendEntriesRequestProto;
+import org.apache.ratis.shaded.proto.RaftProtos.FileChunkProto;
+import org.apache.ratis.shaded.proto.RaftProtos.InstallSnapshotReplyProto;
+import org.apache.ratis.shaded.proto.RaftProtos.InstallSnapshotRequestProto;
+import org.apache.ratis.shaded.proto.RaftProtos.InstallSnapshotResult;
+import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto;
+import org.apache.ratis.shaded.proto.RaftProtos.RaftConfigurationProto;
+import org.apache.ratis.shaded.proto.RaftProtos.RaftRpcReplyProto;
+import org.apache.ratis.shaded.proto.RaftProtos.RequestVoteReplyProto;
+import org.apache.ratis.shaded.proto.RaftProtos.RequestVoteRequestProto;
+import org.apache.ratis.shaded.proto.RaftProtos.TermIndexProto;
+import org.apache.ratis.util.ProtoUtils;
+
+
+/** Server proto utilities for internal use. */
+public class ServerProtoUtils {
+  public static TermIndex toTermIndex(TermIndexProto p) {
+    return p == null? null: TermIndex.newTermIndex(p.getTerm(), p.getIndex());
+  }
+
+  public static TermIndexProto toTermIndexProto(TermIndex ti) {
+    return ti == null? null: TermIndexProto.newBuilder()
+        .setTerm(ti.getTerm())
+        .setIndex(ti.getIndex())
+        .build();
+  }
+
+  public static TermIndex toTermIndex(LogEntryProto entry) {
+    return entry == null ? null :
+        TermIndex.newTermIndex(entry.getTerm(), entry.getIndex());
+  }
+
+  public static String toString(LogEntryProto... entries) {
+    return entries == null? "null"
+        : entries.length == 0 ? "[]"
+        : entries.length == 1? "" + toTermIndex(entries[0])
+        : "" + Arrays.stream(entries).map(ServerProtoUtils::toTermIndex)
+            .collect(Collectors.toList());
+  }
+
+  public static String toString(AppendEntriesReplyProto reply) {
+    return toString(reply.getServerReply()) + "," + reply.getResult()
+        + ",nextIndex:" + reply.getNextIndex() + ",term:" + reply.getTerm();
+  }
+
+  private static String toString(RaftRpcReplyProto reply) {
+    return reply.getRequestorId() + "->" + reply.getReplyId() + ","
+        + reply.getSuccess();
+  }
+
+  public static RaftConfigurationProto toRaftConfigurationProto(
+      RaftConfiguration conf) {
+    return RaftConfigurationProto.newBuilder()
+        .addAllPeers(ProtoUtils.toRaftPeerProtos(conf.getPeersInConf()))
+        .addAllOldPeers(ProtoUtils.toRaftPeerProtos(conf.getPeersInOldConf()))
+        .build();
+  }
+
+  public static RaftConfiguration toRaftConfiguration(
+      long index, RaftConfigurationProto proto) {
+    final RaftConfiguration.Builder b = RaftConfiguration.newBuilder()
+        .setConf(ProtoUtils.toRaftPeerArray(proto.getPeersList()))
+        .setLogEntryIndex(index);
+    if (proto.getOldPeersCount() > 0) {
+      b.setOldConf(ProtoUtils.toRaftPeerArray(proto.getOldPeersList()));
+    }
+    return b.build();
+  }
+
+  public static LogEntryProto toLogEntryProto(
+      RaftConfiguration conf, long term, long index) {
+    return LogEntryProto.newBuilder()
+        .setTerm(term)
+        .setIndex(index)
+        .setConfigurationEntry(toRaftConfigurationProto(conf))
+        .build();
+  }
+
+  public static RequestVoteReplyProto toRequestVoteReplyProto(
+      String requestorId, String replyId, boolean success, long term,
+      boolean shouldShutdown) {
+    final RequestVoteReplyProto.Builder b = RequestVoteReplyProto.newBuilder();
+    b.setServerReply(ClientProtoUtils.toRaftRpcReplyProtoBuilder(requestorId, 
replyId,
+        DEFAULT_SEQNUM, success))
+        .setTerm(term)
+        .setShouldShutdown(shouldShutdown);
+    return b.build();
+  }
+
+  public static RequestVoteRequestProto toRequestVoteRequestProto(
+      String requestorId, String replyId, long term, TermIndex lastEntry) {
+    final RequestVoteRequestProto.Builder b = 
RequestVoteRequestProto.newBuilder()
+        .setServerRequest(
+            ClientProtoUtils.toRaftRpcRequestProtoBuilder(requestorId, 
replyId, DEFAULT_SEQNUM))
+        .setCandidateTerm(term);
+    if (lastEntry != null) {
+      b.setCandidateLastEntry(toTermIndexProto(lastEntry));
+    }
+    return b.build();
+  }
+
+  public static InstallSnapshotReplyProto toInstallSnapshotReplyProto(
+      String requestorId, String replyId, long term, int requestIndex,
+      InstallSnapshotResult result) {
+    final RaftRpcReplyProto.Builder rb = 
ClientProtoUtils.toRaftRpcReplyProtoBuilder(requestorId,
+        replyId, DEFAULT_SEQNUM, result == InstallSnapshotResult.SUCCESS);
+    final InstallSnapshotReplyProto.Builder builder = InstallSnapshotReplyProto
+        .newBuilder().setServerReply(rb).setTerm(term).setResult(result)
+        .setRequestIndex(requestIndex);
+    return builder.build();
+  }
+
+  public static InstallSnapshotRequestProto toInstallSnapshotRequestProto(
+      String requestorId, String replyId, String requestId, int requestIndex,
+      long term, TermIndex lastTermIndex, List<FileChunkProto> chunks,
+      long totalSize, boolean done) {
+    return InstallSnapshotRequestProto.newBuilder()
+        .setServerRequest(
+            ClientProtoUtils.toRaftRpcRequestProtoBuilder(requestorId, 
replyId, DEFAULT_SEQNUM))
+        .setRequestId(requestId)
+        .setRequestIndex(requestIndex)
+        // .setRaftConfiguration()  TODO: save and pass RaftConfiguration
+        .setLeaderTerm(term)
+        .setTermIndex(toTermIndexProto(lastTermIndex))
+        .addAllFileChunks(chunks)
+        .setTotalSize(totalSize)
+        .setDone(done).build();
+  }
+
+  public static AppendEntriesReplyProto toAppendEntriesReplyProto(
+      String requestorId, String replyId, long term,
+      long nextIndex, AppendEntriesReplyProto.AppendResult appendResult) {
+    RaftRpcReplyProto.Builder rb = 
ClientProtoUtils.toRaftRpcReplyProtoBuilder(requestorId,
+        replyId, DEFAULT_SEQNUM, appendResult == SUCCESS);
+    final AppendEntriesReplyProto.Builder b = 
AppendEntriesReplyProto.newBuilder();
+    b.setServerReply(rb).setTerm(term).setNextIndex(nextIndex)
+        .setResult(appendResult);
+    return b.build();
+  }
+
+  public static AppendEntriesRequestProto toAppendEntriesRequestProto(
+      String requestorId, String replyId, long leaderTerm,
+      List<LogEntryProto> entries, long leaderCommit, boolean initializing,
+      TermIndex previous) {
+    final AppendEntriesRequestProto.Builder b = AppendEntriesRequestProto
+        .newBuilder()
+        .setServerRequest(
+            ClientProtoUtils.toRaftRpcRequestProtoBuilder(requestorId, 
replyId, DEFAULT_SEQNUM))
+        .setLeaderTerm(leaderTerm)
+        .setLeaderCommit(leaderCommit)
+        .setInitializing(initializing);
+    if (entries != null && !entries.isEmpty()) {
+      b.addAllEntries(entries);
+    }
+
+    if (previous != null) {
+      b.setPreviousLog(toTermIndexProto(previous));
+    }
+    return b.build();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
new file mode 100644
index 0000000..8608fc4
--- /dev/null
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
@@ -0,0 +1,350 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.impl;
+
+import static 
org.apache.ratis.server.RaftServerConfigKeys.RAFT_SERVER_USE_MEMORY_LOG_DEFAULT;
+import static 
org.apache.ratis.server.RaftServerConfigKeys.RAFT_SERVER_USE_MEMORY_LOG_KEY;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.server.protocol.TermIndex;
+import org.apache.ratis.server.storage.MemoryRaftLog;
+import org.apache.ratis.server.storage.RaftLog;
+import org.apache.ratis.server.storage.RaftStorage;
+import org.apache.ratis.server.storage.SegmentedRaftLog;
+import org.apache.ratis.server.storage.SnapshotManager;
+import org.apache.ratis.shaded.proto.RaftProtos.InstallSnapshotRequestProto;
+import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto;
+import org.apache.ratis.statemachine.SnapshotInfo;
+import org.apache.ratis.statemachine.StateMachine;
+import org.apache.ratis.statemachine.TransactionContext;
+import org.apache.ratis.util.ProtoUtils;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+
+/**
+ * Common states of a raft peer. Protected by RaftServer's lock.
+ */
+public class ServerState implements Closeable {
+  private final String selfId;
+  private final RaftServerImpl server;
+  /** Raft log */
+  private final RaftLog log;
+  /** Raft configuration */
+  private final ConfigurationManager configurationManager;
+  /** The thread that applies committed log entries to the state machine */
+  private final StateMachineUpdater stateMachineUpdater;
+  /** local storage for log and snapshot */
+  private final RaftStorage storage;
+  private final SnapshotManager snapshotManager;
+
+  /**
+   * Latest term server has seen. initialized to 0 on first boot, increases
+   * monotonically.
+   */
+  private long currentTerm;
+  /**
+   * The server ID of the leader for this term. Null means either there is
+   * no leader for this term yet or this server does not know who it is yet.
+   */
+  private String leaderId;
+  /**
+   * Candidate that this peer granted vote for in current term (or null if 
none)
+   */
+  private String votedFor;
+
+  /**
+   * Latest installed snapshot for this server. This maybe different than 
StateMachine's latest
+   * snapshot. Once we successfully install a snapshot, the SM may not pick it 
up immediately.
+   * Further, this will not get updated when SM does snapshots itself.
+   */
+  private TermIndex latestInstalledSnapshot;
+
+  ServerState(String id, RaftConfiguration conf, RaftProperties prop,
+              RaftServerImpl server, StateMachine stateMachine) throws 
IOException {
+    this.selfId = id;
+    this.server = server;
+    configurationManager = new ConfigurationManager(conf);
+    storage = new RaftStorage(prop, RaftServerConstants.StartupOption.REGULAR);
+    snapshotManager = new SnapshotManager(storage, id);
+
+    long lastApplied = initStatemachine(stateMachine, prop);
+
+    leaderId = null;
+    log = initLog(id, prop, server, lastApplied);
+    RaftLog.Metadata metadata = log.loadMetadata();
+    currentTerm = metadata.getTerm();
+    votedFor = metadata.getVotedFor();
+
+    stateMachineUpdater = new StateMachineUpdater(stateMachine, server, log,
+         lastApplied, prop);
+  }
+
+  /**
+   * Used by tests to set initial raft configuration with correct port 
bindings.
+   */
+  @VisibleForTesting
+  public void setInitialConf(RaftConfiguration initialConf) {
+    configurationManager.setInitialConf(initialConf);
+  }
+
+  private long initStatemachine(StateMachine sm, RaftProperties properties)
+      throws IOException {
+    sm.initialize(selfId, properties, storage);
+    storage.setStateMachineStorage(sm.getStateMachineStorage());
+    SnapshotInfo snapshot = sm.getLatestSnapshot();
+
+    if (snapshot == null || snapshot.getTermIndex().getIndex() < 0) {
+      return RaftServerConstants.INVALID_LOG_INDEX;
+    }
+
+    // get the raft configuration from the snapshot
+    RaftConfiguration raftConf = sm.getRaftConfiguration();
+    if (raftConf != null) {
+      configurationManager.addConfiguration(raftConf.getLogEntryIndex(),
+          raftConf);
+    }
+    return snapshot.getIndex();
+  }
+
+  void start() {
+    stateMachineUpdater.start();
+  }
+
+  /**
+   * note we do not apply log entries to the state machine here since we do not
+   * know whether they have been committed.
+   */
+  private RaftLog initLog(String id, RaftProperties prop, RaftServerImpl 
server,
+      long lastIndexInSnapshot) throws IOException {
+    final RaftLog log;
+    if (prop.getBoolean(RAFT_SERVER_USE_MEMORY_LOG_KEY,
+        RAFT_SERVER_USE_MEMORY_LOG_DEFAULT)) {
+      log = new MemoryRaftLog(id);
+    } else {
+      log = new SegmentedRaftLog(id, server, this.storage,
+          lastIndexInSnapshot, prop);
+    }
+    log.open(configurationManager, lastIndexInSnapshot);
+    return log;
+  }
+
+  public RaftConfiguration getRaftConf() {
+    return configurationManager.getCurrent();
+  }
+
+  @VisibleForTesting
+
+  public String getSelfId() {
+    return this.selfId;
+  }
+
+  public long getCurrentTerm() {
+    return currentTerm;
+  }
+
+  void setCurrentTerm(long term) {
+    currentTerm = term;
+  }
+
+  String getLeaderId() {
+    return leaderId;
+  }
+
+  boolean hasLeader() {
+    return leaderId != null;
+  }
+
+  /**
+   * Become a candidate and start leader election
+   */
+  long initElection() {
+    votedFor = selfId;
+    leaderId = null;
+    return ++currentTerm;
+  }
+
+  void persistMetadata() throws IOException {
+    this.log.writeMetadata(currentTerm, votedFor);
+  }
+
+  void resetLeaderAndVotedFor() {
+    votedFor = null;
+    leaderId = null;
+  }
+
+  /**
+   * Vote for a candidate and update the local state.
+   */
+  void grantVote(String candidateId) {
+    votedFor = candidateId;
+    leaderId = null;
+  }
+
+  void setLeader(String leaderId) {
+    this.leaderId = leaderId;
+  }
+
+  void becomeLeader() {
+    leaderId = selfId;
+  }
+
+  public RaftLog getLog() {
+    return log;
+  }
+
+  long applyLog(TransactionContext operation) throws IOException {
+    return log.append(currentTerm, operation);
+  }
+
+  /**
+   * Check if accept the leader selfId and term from the incoming 
AppendEntries rpc.
+   * If accept, update the current state.
+   * @return true if the check passes
+   */
+  boolean recognizeLeader(String leaderId, long leaderTerm) {
+    if (leaderTerm < currentTerm) {
+      return false;
+    } else if (leaderTerm > currentTerm || this.leaderId == null) {
+      // If the request indicates a term that is greater than the current term
+      // or no leader has been set for the current term, make sure to update
+      // leader and term later
+      return true;
+    }
+    Preconditions.checkArgument(this.leaderId.equals(leaderId),
+        "selfId:%s, this.leaderId:%s, received leaderId:%s",
+        selfId, this.leaderId, leaderId);
+    return true;
+  }
+
+  /**
+   * Check if the candidate's term is acceptable
+   */
+  boolean recognizeCandidate(String candidateId,
+      long candidateTerm) {
+    if (candidateTerm > currentTerm) {
+      return true;
+    } else if (candidateTerm == currentTerm) {
+      // has not voted yet or this is a retry
+      return votedFor == null || votedFor.equals(candidateId);
+    }
+    return false;
+  }
+
+  boolean isLogUpToDate(TermIndex candidateLastEntry) {
+    LogEntryProto lastEntry = log.getLastEntry();
+    // need to take into account snapshot
+    SnapshotInfo snapshot = server.getStateMachine().getLatestSnapshot();
+     if (lastEntry == null && snapshot == null) {
+      return true;
+    } else if (candidateLastEntry == null) {
+      return false;
+    }
+    TermIndex local = ServerProtoUtils.toTermIndex(lastEntry);
+    if (local == null || (snapshot != null && snapshot.getIndex() > 
lastEntry.getIndex())) {
+      local = snapshot.getTermIndex();
+    }
+    return local.compareTo(candidateLastEntry) <= 0;
+  }
+
+  @Override
+  public String toString() {
+    return selfId + ":t" + currentTerm + ", leader=" + leaderId
+        + ", voted=" + votedFor + ", raftlog=" + log + ", conf=" + 
getRaftConf();
+  }
+
+  boolean isConfCommitted() {
+    return getLog().getLastCommittedIndex() >=
+        getRaftConf().getLogEntryIndex();
+  }
+
+  public void setRaftConf(long logIndex, RaftConfiguration conf) {
+    configurationManager.addConfiguration(logIndex, conf);
+    RaftServerImpl.LOG.info("{}: successfully update the configuration {}",
+        getSelfId(), conf);
+  }
+
+  void updateConfiguration(LogEntryProto[] entries) {
+    if (entries != null && entries.length > 0) {
+      configurationManager.removeConfigurations(entries[0].getIndex());
+      for (LogEntryProto entry : entries) {
+        if (ProtoUtils.isConfigurationLogEntry(entry)) {
+          final RaftConfiguration conf = ServerProtoUtils.toRaftConfiguration(
+              entry.getIndex(), entry.getConfigurationEntry());
+          configurationManager.addConfiguration(entry.getIndex(), conf);
+          server.getServerRpc().addPeers(conf.getPeers());
+        }
+      }
+    }
+  }
+
+  void updateStatemachine(long majorityIndex, long currentTerm) {
+    log.updateLastCommitted(majorityIndex, currentTerm);
+    stateMachineUpdater.notifyUpdater();
+  }
+
+  void reloadStateMachine(long lastIndexInSnapshot, long currentTerm)
+      throws IOException {
+    log.updateLastCommitted(lastIndexInSnapshot, currentTerm);
+
+    stateMachineUpdater.reloadStateMachine();
+  }
+
+  @Override
+  public void close() throws IOException {
+    stateMachineUpdater.stop();
+    RaftServerImpl.LOG.info("{} closes. The last applied log index is {}",
+        getSelfId(), getLastAppliedIndex());
+    storage.close();
+  }
+
+  @VisibleForTesting
+  public RaftStorage getStorage() {
+    return storage;
+  }
+
+  void installSnapshot(InstallSnapshotRequestProto request) throws IOException 
{
+    // TODO: verify that we need to install the snapshot
+    StateMachine sm = server.getStateMachine();
+    sm.pause(); // pause the SM to prepare for install snapshot
+    snapshotManager.installSnapshot(sm, request);
+    log.syncWithSnapshot(request.getTermIndex().getIndex());
+    this.latestInstalledSnapshot = ServerProtoUtils.toTermIndex(
+        request.getTermIndex());
+  }
+
+  SnapshotInfo getLatestSnapshot() {
+    return 
server.getStateMachine().getStateMachineStorage().getLatestSnapshot();
+  }
+
+  public TermIndex getLatestInstalledSnapshot() {
+    return latestInstalledSnapshot;
+  }
+
+  @VisibleForTesting
+  public long getLastAppliedIndex() {
+    return stateMachineUpdater.getLastAppliedIndex();
+  }
+
+  boolean isCurrentConfCommitted() {
+    return getRaftConf().getLogEntryIndex() <= 
getLog().getLastCommittedIndex();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerUtils.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerUtils.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerUtils.java
new file mode 100644
index 0000000..3dc9ab4
--- /dev/null
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerUtils.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.impl;
+
+import org.apache.commons.lang.builder.HashCodeBuilder;
+import org.apache.ratis.server.protocol.TermIndex;
+
+/** Server utilities for internal use. */
+public class ServerUtils {
+  public static TermIndex newTermIndex(long term, long index) {
+    return new TermIndexImpl(term, index);
+  }
+
+  private static class TermIndexImpl implements TermIndex {
+    private final long term;
+    private final long index; //log index; first index is 1.
+
+    TermIndexImpl(long term, long logIndex) {
+      this.term = term;
+      this.index = logIndex;
+    }
+
+    @Override
+    public long getTerm() {
+      return term;
+    }
+
+    @Override
+    public long getIndex() {
+      return index;
+    }
+
+    @Override
+    public int compareTo(TermIndex that) {
+      final int d = Long.compare(this.getTerm(), that.getTerm());
+      return d != 0 ? d : Long.compare(this.getIndex(), that.getIndex());
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (obj == this) {
+        return true;
+      } else if (obj == null || !(obj instanceof TermIndexImpl)) {
+        return false;
+      }
+
+      final TermIndexImpl that = (TermIndexImpl) obj;
+      return this.getTerm() == that.getTerm()
+          && this.getIndex() == that.getIndex();
+    }
+
+    @Override
+    public int hashCode() {
+      return new HashCodeBuilder().append(term).append(index).hashCode();
+    }
+
+    private static String toString(long n) {
+      return n < 0 ? "~" : "" + n;
+    }
+
+    @Override
+    public String toString() {
+      return "(t:" + toString(term) + ", i:" + toString(index) + ")";
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
new file mode 100644
index 0000000..a2a4149
--- /dev/null
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
@@ -0,0 +1,214 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.impl;
+
+import static 
org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto.LogEntryBodyCase.CONFIGURATIONENTRY;
+import static 
org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto.LogEntryBodyCase.SMLOGENTRY;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.server.storage.RaftLog;
+import org.apache.ratis.server.storage.RaftStorage;
+import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto;
+import org.apache.ratis.statemachine.SnapshotInfo;
+import org.apache.ratis.statemachine.StateMachine;
+import org.apache.ratis.statemachine.TransactionContext;
+import org.apache.ratis.util.Daemon;
+import org.apache.ratis.util.ExitUtils;
+import org.apache.ratis.util.LifeCycle;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * This class tracks the log entries that have been committed in a quorum and
+ * applies them to the state machine. We let a separate thread do this work
+ * asynchronously so that this will not block normal raft protocol.
+ *
+ * If the auto log compaction is enabled, the state machine updater thread will
+ * trigger a snapshot of the state machine by calling
+ * {@link StateMachine#takeSnapshot} when the log size exceeds a limit.
+ */
+class StateMachineUpdater implements Runnable {
+  static final Logger LOG = LoggerFactory.getLogger(StateMachineUpdater.class);
+
+  enum State {
+    RUNNING, STOP, RELOAD
+  }
+
+  private final RaftProperties properties;
+  private final StateMachine stateMachine;
+  private final RaftServerImpl server;
+  private final RaftLog raftLog;
+
+  private volatile long lastAppliedIndex;
+
+  private final boolean autoSnapshotEnabled;
+  private final long snapshotThreshold;
+  private long lastSnapshotIndex;
+
+  private final Thread updater;
+  private volatile State state = State.RUNNING;
+
+  StateMachineUpdater(StateMachine stateMachine, RaftServerImpl server,
+      RaftLog raftLog, long lastAppliedIndex, RaftProperties properties) {
+    this.properties = properties;
+    this.stateMachine = stateMachine;
+    this.server = server;
+    this.raftLog = raftLog;
+
+    this.lastAppliedIndex = lastAppliedIndex;
+    lastSnapshotIndex = lastAppliedIndex;
+
+    autoSnapshotEnabled = properties.getBoolean(
+        RaftServerConfigKeys.RAFT_SERVER_AUTO_SNAPSHOT_ENABLED_KEY,
+        RaftServerConfigKeys.RAFT_SERVER_AUTO_SNAPSHOT_ENABLED_DEFAULT);
+    snapshotThreshold = properties.getLong(
+        RaftServerConfigKeys.RAFT_SERVER_SNAPSHOT_TRIGGER_THRESHOLD_KEY,
+        RaftServerConfigKeys.RAFT_SERVER_SNAPSHOT_TRIGGER_THRESHOLD_DEFAULT);
+    updater = new Daemon(this);
+  }
+
+  void start() {
+    updater.start();
+  }
+
+  void stop() {
+    state = State.STOP;
+    updater.interrupt();
+    try {
+      stateMachine.close();
+    } catch (IOException ignored) {
+    }
+  }
+
+  void reloadStateMachine() {
+    state = State.RELOAD;
+    notifyUpdater();
+  }
+
+  synchronized void notifyUpdater() {
+    notifyAll();
+  }
+
+  @Override
+  public String toString() {
+    return this.getClass().getSimpleName() + "-" + raftLog.getSelfId();
+  }
+
+  @Override
+  public void run() {
+    final RaftStorage storage = server.getState().getStorage();
+    while (isRunning()) {
+      try {
+        synchronized (this) {
+          // when the peers just start, the committedIndex is initialized as 0
+          // and will be updated only after the leader contacts other peers.
+          // Thus initially lastAppliedIndex can be greater than lastCommitted.
+          while (lastAppliedIndex >= raftLog.getLastCommittedIndex()) {
+            wait();
+          }
+        }
+
+        final long committedIndex = raftLog.getLastCommittedIndex();
+        Preconditions.checkState(lastAppliedIndex < committedIndex);
+
+        if (state == State.RELOAD) {
+          Preconditions.checkState(stateMachine.getLifeCycleState() == 
LifeCycle.State.PAUSED);
+
+          stateMachine.reinitialize(server.getId(), properties, storage);
+
+          SnapshotInfo snapshot = stateMachine.getLatestSnapshot();
+          Preconditions.checkState(snapshot != null && snapshot.getIndex() > 
lastAppliedIndex,
+              "Snapshot: %s, lastAppliedIndex: %s", snapshot, 
lastAppliedIndex);
+
+          lastAppliedIndex = snapshot.getIndex();
+          lastSnapshotIndex = snapshot.getIndex();
+          state = State.RUNNING;
+        }
+
+        while (lastAppliedIndex < committedIndex) {
+          final LogEntryProto next = raftLog.get(lastAppliedIndex + 1);
+          if (next != null) {
+            if (next.getLogEntryBodyCase() == CONFIGURATIONENTRY) {
+              // the reply should have already been set. only need to record
+              // the new conf in the state machine.
+              stateMachine.setRaftConfiguration(
+                  ServerProtoUtils.toRaftConfiguration(next.getIndex(),
+                      next.getConfigurationEntry()));
+            } else if (next.getLogEntryBodyCase() == SMLOGENTRY) {
+              // check whether there is a TransactionContext because we are 
the leader.
+              TransactionContext trx = 
server.getTransactionContext(next.getIndex());
+              if (trx == null) {
+                trx = new TransactionContext(stateMachine, next);
+              }
+
+              // Let the StateMachine inject logic for committed transactions 
in sequential order.
+              trx = stateMachine.applyTransactionSerial(trx);
+
+              // TODO: This step can be parallelized
+              CompletableFuture<Message> messageFuture =
+                  stateMachine.applyTransaction(trx);
+              server.replyPendingRequest(next.getIndex(), messageFuture);
+            }
+            lastAppliedIndex++;
+          } else {
+            LOG.debug("{}: logEntry {} is null. There may be snapshot to load. 
state:{}",
+                this, lastAppliedIndex + 1, state);
+            break;
+          }
+        }
+
+        // check if need to trigger a snapshot
+        if (shouldTakeSnapshot(lastAppliedIndex)) {
+          stateMachine.takeSnapshot();
+          // TODO purge logs, including log cache. but should keep log for 
leader's RPCSenders
+          lastSnapshotIndex = lastAppliedIndex;
+        }
+      } catch (InterruptedException e) {
+        if (!isRunning()) {
+          LOG.info("{}: the StateMachineUpdater is interrupted and will 
exit.", this);
+        } else {
+          final String s = this + ": the StateMachineUpdater is wrongly 
interrupted";
+          ExitUtils.terminate(1, s, e, LOG);
+        }
+      } catch (Throwable t) {
+        final String s = this + ": the StateMachineUpdater hits Throwable";
+        ExitUtils.terminate(2, s, t, LOG);
+      }
+    }
+  }
+
+  private boolean isRunning() {
+    return state != State.STOP;
+  }
+
+  private boolean shouldTakeSnapshot(long currentAppliedIndex) {
+    return autoSnapshotEnabled && (state != State.RELOAD) &&
+        (currentAppliedIndex - lastSnapshotIndex >= snapshotThreshold);
+  }
+
+  long getLastAppliedIndex() {
+    return lastAppliedIndex;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/main/java/org/apache/ratis/server/protocol/RaftServerProtocol.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/protocol/RaftServerProtocol.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/protocol/RaftServerProtocol.java
new file mode 100644
index 0000000..f1f5512
--- /dev/null
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/protocol/RaftServerProtocol.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.protocol;
+
+import java.io.IOException;
+
+import org.apache.ratis.shaded.proto.RaftProtos.AppendEntriesReplyProto;
+import org.apache.ratis.shaded.proto.RaftProtos.AppendEntriesRequestProto;
+import org.apache.ratis.shaded.proto.RaftProtos.InstallSnapshotReplyProto;
+import org.apache.ratis.shaded.proto.RaftProtos.InstallSnapshotRequestProto;
+import org.apache.ratis.shaded.proto.RaftProtos.RequestVoteReplyProto;
+import org.apache.ratis.shaded.proto.RaftProtos.RequestVoteRequestProto;
+
+public interface RaftServerProtocol {
+
+  RequestVoteReplyProto requestVote(RequestVoteRequestProto request) throws 
IOException;
+
+  AppendEntriesReplyProto appendEntries(AppendEntriesRequestProto request) 
throws IOException;
+
+  InstallSnapshotReplyProto installSnapshot(InstallSnapshotRequestProto 
request) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/main/java/org/apache/ratis/server/protocol/TermIndex.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/protocol/TermIndex.java 
b/ratis-server/src/main/java/org/apache/ratis/server/protocol/TermIndex.java
new file mode 100644
index 0000000..665f5d5
--- /dev/null
+++ b/ratis-server/src/main/java/org/apache/ratis/server/protocol/TermIndex.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.protocol;
+
+import org.apache.ratis.server.impl.ServerUtils;
+
+/** The term and the log index defined in the Raft consensus algorithm. */
+public interface TermIndex extends Comparable<TermIndex> {
+  /** @return the term. */
+  long getTerm();
+
+  /** @return the index. */
+  long getIndex();
+
+  /** Create a new {@link TermIndex} instance. */
+  static TermIndex newTermIndex(long term, long index) {
+    return ServerUtils.newTermIndex(term, index);
+  }
+}
+
+

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/main/java/org/apache/ratis/server/storage/BufferedChannelBase.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/storage/BufferedChannelBase.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/storage/BufferedChannelBase.java
new file mode 100644
index 0000000..0dfc6f0
--- /dev/null
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/storage/BufferedChannelBase.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.storage;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.channels.FileChannel;
+
+public abstract class BufferedChannelBase implements Closeable {
+  protected final FileChannel fileChannel;
+
+  protected BufferedChannelBase(FileChannel fc) {
+    this.fileChannel = fc;
+  }
+
+  protected FileChannel validateAndGetFileChannel() throws IOException {
+    if (!fileChannel.isOpen()) {
+      throw new IOException(
+          "Attempting to access a file channel that has already been closed");
+    }
+    return fileChannel;
+  }
+
+  /**
+   * Get the current size of the underlying FileChannel.
+   */
+  public long size() throws IOException {
+    return validateAndGetFileChannel().size();
+  }
+
+  /**
+   * Get the {@link FileChannel} that this BufferedChannel wraps around.
+   */
+  public FileChannel getFileChannel() {
+    return fileChannel;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/main/java/org/apache/ratis/server/storage/BufferedWriteChannel.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/storage/BufferedWriteChannel.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/storage/BufferedWriteChannel.java
new file mode 100644
index 0000000..e61a4d3
--- /dev/null
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/storage/BufferedWriteChannel.java
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.storage;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Provides a buffering layer in front of a FileChannel for writing.
+ */
+public class BufferedWriteChannel extends BufferedChannelBase {
+  // The capacity of the write buffer.
+  private final int writeCapacity;
+  // The position of the file channel's write pointer.
+  private AtomicLong writeBufferStartPosition = new AtomicLong(0);
+  // The buffer used to write operations.
+  private final ByteBuffer writeBuffer;
+  // The absolute position of the next write operation.
+  private volatile long position;
+
+  public BufferedWriteChannel(FileChannel fc, int writeCapacity)
+      throws IOException {
+    super(fc);
+    this.writeCapacity = writeCapacity;
+    this.position = fc.position();
+    this.writeBufferStartPosition.set(position);
+    this.writeBuffer = ByteBuffer.allocateDirect(writeCapacity);
+  }
+
+  /**
+   * Write all the data in src to the {@link FileChannel}. Note that this 
function can
+   * buffer or re-order writes based on the implementation. These writes will 
be flushed
+   * to the disk only when flush() is invoked.
+   *
+   * @param src The source ByteBuffer which contains the data to be written.
+   * @throws IOException if a write operation fails.
+   */
+  public void write(ByteBuffer src) throws IOException {
+    int copied = 0;
+    while (src.remaining() > 0) {
+      int truncated = 0;
+      if (writeBuffer.remaining() < src.remaining()) {
+        truncated = src.remaining() - writeBuffer.remaining();
+        src.limit(src.limit() - truncated);
+      }
+      copied += src.remaining();
+      writeBuffer.put(src);
+      src.limit(src.limit() + truncated);
+      // if we have run out of buffer space, we should flush to the file
+      if (writeBuffer.remaining() == 0) {
+        flushInternal();
+      }
+    }
+    position += copied;
+  }
+
+  /**
+   * Write the specified byte.
+   * @param b the byte to be written
+   */
+  public void write(int b) throws IOException {
+    writeBuffer.put((byte) b);
+    if (writeBuffer.remaining() == 0) {
+      flushInternal();
+    }
+    position++;
+  }
+
+  public void write(byte[] b) throws IOException {
+    int offset = 0;
+    while (offset < b.length) {
+      int toPut = Math.min(b.length - offset, writeBuffer.remaining());
+      writeBuffer.put(b, offset, toPut);
+      offset += toPut;
+      if (writeBuffer.remaining() == 0) {
+        flushInternal();
+      }
+    }
+    position += b.length;
+  }
+
+  /**
+   * Get the position where the next write operation will begin writing from.
+   */
+  public long position() {
+    return position;
+  }
+
+  /**
+   * Get the position of the file channel's write pointer.
+   */
+  public long getFileChannelPosition() {
+    return writeBufferStartPosition.get();
+  }
+
+
+  /**
+   * Write any data in the buffer to the file. If sync is set to true, force a
+   * sync operation so that data is persisted to the disk.
+   *
+   * @throws IOException if the write or sync operation fails.
+   */
+  public void flush(boolean shouldForceWrite) throws IOException {
+    synchronized (this) {
+      flushInternal();
+    }
+    if (shouldForceWrite) {
+      forceWrite(false);
+    }
+  }
+
+  /**
+   * Write any data in the buffer to the file and advance the 
writeBufferPosition
+   * Callers are expected to synchronize appropriately
+   *
+   * @throws IOException if the write fails.
+   */
+  private void flushInternal() throws IOException {
+    writeBuffer.flip();
+    do {
+      fileChannel.write(writeBuffer);
+    } while (writeBuffer.hasRemaining());
+    writeBuffer.clear();
+    writeBufferStartPosition.set(fileChannel.position());
+  }
+
+  public long forceWrite(boolean forceMetadata) throws IOException {
+    // This is the point up to which we had flushed to the file system page 
cache
+    // before issuing this force write hence is guaranteed to be made durable 
by
+    // the force write, any flush that happens after this may or may
+    // not be flushed
+    long positionForceWrite = writeBufferStartPosition.get();
+    fileChannel.force(forceMetadata);
+    return positionForceWrite;
+  }
+
+  @Override
+  public void close() throws IOException {
+    fileChannel.close();
+    writeBuffer.clear();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/main/java/org/apache/ratis/server/storage/FileInfo.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/storage/FileInfo.java 
b/ratis-server/src/main/java/org/apache/ratis/server/storage/FileInfo.java
new file mode 100644
index 0000000..9673596
--- /dev/null
+++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/FileInfo.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.storage;
+
+import java.nio.file.Path;
+
+import org.apache.ratis.io.MD5Hash;
+
+/**
+ * Metadata about a file.
+ *
+ * The objects of this class are immutable.
+ */
+public class FileInfo {
+  private final Path path;
+  private final MD5Hash fileDigest;
+  private final long fileSize;
+
+  public FileInfo(Path path, MD5Hash fileDigest) {
+    this.path = path;
+    this.fileDigest = fileDigest;
+    this.fileSize = path.toFile().length();
+  }
+
+  @Override
+  public String toString() {
+    return path.toString();
+  }
+
+  /** @return the path of the file. */
+  public Path getPath() {
+    return path;
+  }
+
+  /** @return the MD5 file digest of the file. */
+  public MD5Hash getFileDigest() {
+    return fileDigest;
+  }
+
+  /** @return the size of the file. */
+  public long getFileSize() {
+    return fileSize;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-server/src/main/java/org/apache/ratis/server/storage/LogInputStream.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/storage/LogInputStream.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/storage/LogInputStream.java
new file mode 100644
index 0000000..60572c6
--- /dev/null
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/storage/LogInputStream.java
@@ -0,0 +1,259 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.storage;
+
+import static 
org.apache.ratis.server.impl.RaftServerConstants.INVALID_LOG_INDEX;
+
+import java.io.Closeable;
+import java.io.EOFException;
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto;
+import org.apache.ratis.util.RaftUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+
+public class LogInputStream implements Closeable {
+  static final Logger LOG = LoggerFactory.getLogger(LogInputStream.class);
+
+  static class LogValidation {
+    private final long validLength;
+    private final long endIndex;
+    private final boolean hasCorruptHeader;
+
+    LogValidation(long validLength, long endIndex, boolean hasCorruptHeader) {
+      this.validLength = validLength;
+      this.endIndex = endIndex;
+      this.hasCorruptHeader = hasCorruptHeader;
+    }
+
+    long getValidLength() {
+      return validLength;
+    }
+
+    long getEndIndex() {
+      return endIndex;
+    }
+
+    boolean hasCorruptHeader() {
+      return hasCorruptHeader;
+    }
+  }
+
+  private enum State {
+    UNINIT,
+    OPEN,
+    CLOSED
+  }
+
+  private final File logFile;
+  private final long startIndex;
+  private final long endIndex;
+  private final boolean isOpen;
+  private State state = State.UNINIT;
+  private LogReader reader;
+
+  public LogInputStream(File log, long startIndex, long endIndex,
+      boolean isOpen) {
+    if (isOpen) {
+      Preconditions.checkArgument(endIndex == INVALID_LOG_INDEX);
+    } else {
+      Preconditions.checkArgument(endIndex >= startIndex);
+    }
+
+    this.logFile = log;
+    this.startIndex = startIndex;
+    this.endIndex = endIndex;
+    this.isOpen = isOpen;
+  }
+
+  private void init() throws IOException {
+    Preconditions.checkState(state == State.UNINIT);
+    try {
+      reader = new LogReader(logFile);
+      // read the log header
+      String header = reader.readLogHeader();
+      Preconditions.checkState(SegmentedRaftLog.HEADER_STR.equals(header),
+          "Corrupted log header: %s", header);
+      state = State.OPEN;
+    } finally {
+      if (reader == null) {
+        state = State.CLOSED;
+      }
+    }
+  }
+
+  long getStartIndex() {
+    return startIndex;
+  }
+
+  long getEndIndex() {
+    return endIndex;
+  }
+
+  String getName() {
+    return logFile.getName();
+  }
+
+  public LogEntryProto nextEntry() throws IOException {
+    LogEntryProto entry = null;
+    switch (state) {
+      case UNINIT:
+        try {
+          init();
+        } catch (Throwable e) {
+          LOG.error("caught exception initializing " + this, e);
+          Throwables.propagateIfPossible(e, IOException.class);
+        }
+        Preconditions.checkState(state != State.UNINIT);
+        return nextEntry();
+      case OPEN:
+        entry = reader.readEntry();
+        if (entry != null) {
+          long index = entry.getIndex();
+          if (!isOpen() && index >= endIndex) {
+            /**
+             * The end index may be derived from the segment recovery
+             * process. It is possible that we still have some uncleaned 
garbage
+             * in the end. We should skip them.
+             */
+            long skipAmt = logFile.length() - reader.getPos();
+            if (skipAmt > 0) {
+              LOG.debug("skipping {} bytes at the end of log '{}': reached" +
+                  " entry {} out of {}", skipAmt, getName(), index, endIndex);
+              reader.skipFully(skipAmt);
+            }
+          }
+        }
+        break;
+      case CLOSED:
+        break; // return null
+    }
+    return entry;
+  }
+
+  long scanNextEntry() throws IOException {
+    Preconditions.checkState(state == State.OPEN);
+    return reader.scanEntry();
+  }
+
+  long getPosition() {
+    if (state == State.OPEN) {
+      return reader.getPos();
+    } else {
+      return 0;
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (state == State.OPEN) {
+      reader.close();
+    }
+    state = State.CLOSED;
+  }
+
+  boolean isOpen() {
+    return isOpen;
+  }
+
+  @Override
+  public String toString() {
+    return getName();
+  }
+
+  /**
+   * @param file          File being scanned and validated.
+   * @param maxTxIdToScan Maximum Tx ID to try to scan.
+   *                      The scan returns after reading this or a higher
+   *                      ID. The file portion beyond this ID is
+   *                      potentially being updated.
+   * @return Result of the validation
+   * @throws IOException
+   */
+  static LogValidation scanEditLog(File file, long maxTxIdToScan)
+      throws IOException {
+    LogInputStream in;
+    try {
+      in = new LogInputStream(file, INVALID_LOG_INDEX, INVALID_LOG_INDEX, 
false);
+      // read the header, initialize the inputstream
+      in.init();
+    } catch (EOFException e) {
+      LOG.warn("Log file " + file + " has no valid header", e);
+      return new LogValidation(0, INVALID_LOG_INDEX, true);
+    }
+
+    try {
+      return scanEditLog(in, maxTxIdToScan);
+    } finally {
+      RaftUtils.cleanup(LOG, in);
+    }
+  }
+
+  /**
+   * Find the last valid entry index in the stream.
+   * If there are invalid or corrupt entries in the middle of the stream,
+   * scanEditLog will skip over them.
+   *
+   * This reads through the stream but does not close it.
+   *
+   * @param maxIndexToScan Maximum entry index to try to scan. The scan returns
+   *                       after reading this or a higher index. The file
+   *                       portion beyond this index is potentially being
+   *                       updated.
+   */
+  static LogValidation scanEditLog(LogInputStream in, long maxIndexToScan) {
+    long lastPos = 0;
+    long end = INVALID_LOG_INDEX;
+    long numValid = 0;
+    boolean hitError = false;
+    while (end < maxIndexToScan) {
+      long index;
+      lastPos = in.getPosition();
+      try {
+        if (hitError) {
+          LogEntryProto entry = in.nextEntry();
+          index = entry != null ? entry.getIndex() : INVALID_LOG_INDEX;
+          LOG.warn("After resync, position is " + in.getPosition());
+        } else {
+          index = in.scanNextEntry();
+        }
+        if (index == INVALID_LOG_INDEX) {
+          break;
+        } else {
+          hitError = false;
+        }
+      } catch (Throwable t) {
+        LOG.warn("Caught exception after scanning through {} ops from {}"
+            + " while determining its valid length. Position was "
+            + lastPos, numValid, in, t);
+        hitError = true;
+        continue;
+      }
+      if (end == INVALID_LOG_INDEX || index > end) {
+        end = index;
+      }
+      numValid++;
+    }
+    return new LogValidation(lastPos, end, false);
+  }
+}

Reply via email to