http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-grpc/src/main/java/org/apache/raft/grpc/server/GRpcLogAppender.java
----------------------------------------------------------------------
diff --git 
a/raft-grpc/src/main/java/org/apache/raft/grpc/server/GRpcLogAppender.java 
b/raft-grpc/src/main/java/org/apache/raft/grpc/server/GRpcLogAppender.java
deleted file mode 100644
index a8a39bb..0000000
--- a/raft-grpc/src/main/java/org/apache/raft/grpc/server/GRpcLogAppender.java
+++ /dev/null
@@ -1,415 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.raft.grpc.server;
-
-import com.google.common.base.Preconditions;
-import org.apache.raft.grpc.RaftGRpcService;
-import org.apache.raft.grpc.RaftGrpcConfigKeys;
-import org.apache.raft.server.impl.FollowerInfo;
-import org.apache.raft.server.impl.LeaderState;
-import org.apache.raft.server.impl.LogAppender;
-import org.apache.raft.server.impl.RaftServerImpl;
-import org.apache.raft.shaded.io.grpc.Status;
-import org.apache.raft.shaded.io.grpc.stub.StreamObserver;
-import org.apache.raft.shaded.proto.RaftProtos.AppendEntriesReplyProto;
-import org.apache.raft.shaded.proto.RaftProtos.AppendEntriesRequestProto;
-import org.apache.raft.shaded.proto.RaftProtos.InstallSnapshotReplyProto;
-import org.apache.raft.shaded.proto.RaftProtos.InstallSnapshotRequestProto;
-import org.apache.raft.statemachine.SnapshotInfo;
-import org.apache.raft.util.CodeInjectionForTesting;
-
-import java.util.LinkedList;
-import java.util.Queue;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import static org.apache.raft.grpc.RaftGRpcService.GRPC_SEND_SERVER_REQUEST;
-
-/**
- * A new log appender implementation using grpc bi-directional stream API.
- */
-public class GRpcLogAppender extends LogAppender {
-  private final RaftServerProtocolClient client;
-  private final Queue<AppendEntriesRequestProto> pendingRequests;
-  private final int maxPendingRequestsNum;
-  private volatile boolean firstResponseReceived = false;
-
-  private final AppendLogResponseHandler appendResponseHandler;
-  private final InstallSnapshotResponseHandler snapshotResponseHandler;
-
-  private volatile StreamObserver<AppendEntriesRequestProto> 
appendLogRequestObserver;
-  private StreamObserver<InstallSnapshotRequestProto> snapshotRequestObserver;
-
-  public GRpcLogAppender(RaftServerImpl server, LeaderState leaderState,
-                         FollowerInfo f) {
-    super(server, leaderState, f);
-
-    RaftGRpcService rpcService = (RaftGRpcService) server.getServerRpc();
-    client = rpcService.getRpcClient(f.getPeer());
-    maxPendingRequestsNum = server.getProperties().getInt(
-        RaftGrpcConfigKeys.RAFT_GRPC_LEADER_MAX_OUTSTANDING_APPENDS_KEY,
-        RaftGrpcConfigKeys.RAFT_GRPC_LEADER_MAX_OUTSTANDING_APPENDS_DEFAULT);
-    pendingRequests = new ConcurrentLinkedQueue<>();
-
-    appendResponseHandler = new AppendLogResponseHandler();
-    snapshotResponseHandler = new InstallSnapshotResponseHandler();
-  }
-
-  @Override
-  public void run() {
-    while (isAppenderRunning()) {
-      if (shouldSendRequest()) {
-        SnapshotInfo snapshot = shouldInstallSnapshot();
-        if (snapshot != null) {
-          installSnapshot(snapshot, snapshotResponseHandler);
-        } else {
-          // keep appending log entries or sending heartbeats
-          appendLog();
-        }
-      }
-
-      if (isAppenderRunning() && !shouldSendRequest()) {
-        // use lastSend time instead of lastResponse time
-        final long waitTime = getHeartbeatRemainingTime(
-            follower.getLastRpcTime());
-        if (waitTime > 0) {
-          synchronized (this) {
-            try {
-              LOG.debug("{} decides to wait {}ms before appending to {}",
-                  server.getId(), waitTime, follower.getPeer());
-              wait(waitTime);
-            } catch (InterruptedException ignored) {
-            }
-          }
-        }
-      }
-    }
-    appendLogRequestObserver.onCompleted();
-  }
-
-  private boolean shouldWait() {
-    return pendingRequests.size() >= maxPendingRequestsNum ||
-        shouldWaitForFirstResponse();
-  }
-
-  private void appendLog() {
-    if (appendLogRequestObserver == null) {
-      appendLogRequestObserver = client.appendEntries(appendResponseHandler);
-    }
-    AppendEntriesRequestProto pending = null;
-    final StreamObserver<AppendEntriesRequestProto> s;
-    synchronized (this) {
-      // if the queue's size >= maxSize, wait
-      while (isAppenderRunning() && shouldWait()) {
-        try {
-          LOG.debug("{} wait to send the next AppendEntries to {}",
-              server.getId(), follower.getPeer());
-          this.wait();
-        } catch (InterruptedException ignored) {
-        }
-      }
-
-      if (isAppenderRunning()) {
-        // prepare and enqueue the append request. note changes on follower's
-        // nextIndex and ops on pendingRequests should always be associated
-        // together and protected by the lock
-        pending = createRequest();
-        if (pending != null) {
-          Preconditions.checkState(pendingRequests.offer(pending));
-          updateNextIndex(pending);
-        }
-      }
-      s = appendLogRequestObserver;
-    }
-
-    if (pending != null && isAppenderRunning()) {
-      sendRequest(pending, s);
-    }
-  }
-
-  private void sendRequest(AppendEntriesRequestProto request,
-      StreamObserver<AppendEntriesRequestProto> s) {
-    CodeInjectionForTesting.execute(GRPC_SEND_SERVER_REQUEST, server.getId(),
-        null, request);
-
-    s.onNext(request);
-    follower.updateLastRpcSendTime();
-  }
-
-  private void updateNextIndex(AppendEntriesRequestProto request) {
-    final int count = request.getEntriesCount();
-    if (count > 0) {
-      follower.updateNextIndex(request.getEntries(count - 1).getIndex() + 1);
-    }
-  }
-
-  /**
-   * if this is the first append, wait for the response of the first append so
-   * that we can get the correct next index.
-   */
-  private boolean shouldWaitForFirstResponse() {
-    return pendingRequests.size() > 0 && !firstResponseReceived;
-  }
-
-  /**
-   * StreamObserver for handling responses from the follower
-   */
-  private class AppendLogResponseHandler
-      implements StreamObserver<AppendEntriesReplyProto> {
-    /**
-     * After receiving a appendEntries reply, do the following:
-     * 1. If the reply is success, update the follower's match index and submit
-     *    an event to leaderState
-     * 2. If the reply is NOT_LEADER, step down
-     * 3. If the reply is INCONSISTENCY, decrease the follower's next index
-     *    based on the response
-     */
-    @Override
-    public void onNext(AppendEntriesReplyProto reply) {
-      LOG.debug("{} received {} response from {}", server.getId(),
-          (!firstResponseReceived ? "the first" : "a"),
-          follower.getPeer());
-
-      // update the last rpc time
-      follower.updateLastRpcResponseTime();
-
-      if (!firstResponseReceived) {
-        firstResponseReceived = true;
-      }
-      switch (reply.getResult()) {
-        case SUCCESS:
-          onSuccess(reply);
-          break;
-        case NOT_LEADER:
-          onNotLeader(reply);
-          break;
-        case INCONSISTENCY:
-          onInconsistency(reply);
-          break;
-        default:
-          break;
-      }
-      notifyAppend();
-    }
-
-    /**
-     * for now we simply retry the first pending request
-     */
-    @Override
-    public void onError(Throwable t) {
-      if (!isAppenderRunning()) {
-        LOG.info("{} is stopped", GRpcLogAppender.this);
-        return;
-      }
-      LOG.warn("{} got error when appending entries to {}, exception: {}.",
-          server.getId(), follower.getPeer().getId(), t);
-
-      synchronized (this) {
-        final Status cause = Status.fromThrowable(t);
-        if (cause != null && cause.getCode() == Status.Code.INTERNAL) {
-          // TODO check other Status. Add sleep to avoid tight loop
-          LOG.debug("{} restarts Append call to {} due to error {}",
-              server.getId(), follower.getPeer(), t);
-          // recreate the StreamObserver
-          appendLogRequestObserver = 
client.appendEntries(appendResponseHandler);
-          // reset firstResponseReceived to false
-          firstResponseReceived = false;
-        }
-
-        // clear the pending requests queue and reset the next index of 
follower
-        AppendEntriesRequestProto request = pendingRequests.peek();
-        if (request != null) {
-          final long nextIndex = request.hasPreviousLog() ?
-              request.getPreviousLog().getIndex() + 1 : 
raftLog.getStartIndex();
-          clearPendingRequests(nextIndex);
-        }
-      }
-    }
-
-    @Override
-    public void onCompleted() {
-      LOG.info("{} stops appending log entries to follower {}", server.getId(),
-          follower);
-    }
-  }
-
-  private void clearPendingRequests(long newNextIndex) {
-    pendingRequests.clear();
-    follower.decreaseNextIndex(newNextIndex);
-  }
-
-  private void onSuccess(AppendEntriesReplyProto reply) {
-    AppendEntriesRequestProto request = pendingRequests.poll();
-    final long replyNextIndex = reply.getNextIndex();
-    Preconditions.checkNotNull(request,
-        "Got reply with next index %s but the pending queue is empty",
-        replyNextIndex);
-
-    if (request.getEntriesCount() == 0) {
-      Preconditions.checkState(!request.hasPreviousLog() ||
-              replyNextIndex - 1 == request.getPreviousLog().getIndex(),
-          "reply's next index is %s, request's previous is %s",
-          replyNextIndex, request.getPreviousLog());
-    } else {
-      // check if the reply and the pending request is consistent
-      final long lastEntryIndex = request
-          .getEntries(request.getEntriesCount() - 1).getIndex();
-      Preconditions.checkState(replyNextIndex == lastEntryIndex + 1,
-          "reply's next index is %s, request's last entry index is %s",
-          replyNextIndex, lastEntryIndex);
-      follower.updateMatchIndex(lastEntryIndex);
-      submitEventOnSuccessAppend();
-    }
-  }
-
-  private void onNotLeader(AppendEntriesReplyProto reply) {
-    checkResponseTerm(reply.getTerm());
-    // the running loop will end and the connection will onComplete
-  }
-
-  private synchronized void onInconsistency(AppendEntriesReplyProto reply) {
-    AppendEntriesRequestProto request = pendingRequests.peek();
-    Preconditions.checkState(request.hasPreviousLog());
-    if (request.getPreviousLog().getIndex() >= reply.getNextIndex()) {
-      clearPendingRequests(reply.getNextIndex());
-    }
-  }
-
-  private class InstallSnapshotResponseHandler
-      implements StreamObserver<InstallSnapshotReplyProto> {
-    private final Queue<Integer> pending;
-    private final AtomicBoolean done = new AtomicBoolean(false);
-
-    InstallSnapshotResponseHandler() {
-      pending = new LinkedList<>();
-    }
-
-    synchronized void addPending(InstallSnapshotRequestProto request) {
-      pending.offer(request.getRequestIndex());
-    }
-
-    synchronized void removePending(InstallSnapshotReplyProto reply) {
-      int index = pending.poll();
-      Preconditions.checkState(index == reply.getRequestIndex());
-    }
-
-    boolean isDone() {
-      return done.get();
-    }
-
-    void close() {
-      done.set(true);
-      GRpcLogAppender.this.notifyAppend();
-    }
-
-    synchronized boolean hasAllResponse() {
-      return pending.isEmpty();
-    }
-
-    @Override
-    public void onNext(InstallSnapshotReplyProto reply) {
-      LOG.debug("{} received {} response from {}", server.getId(),
-          (!firstResponseReceived ? "the first" : "a"),
-          follower.getPeer());
-
-      // update the last rpc time
-      follower.updateLastRpcResponseTime();
-
-      if (!firstResponseReceived) {
-        firstResponseReceived = true;
-      }
-
-      switch (reply.getResult()) {
-        case SUCCESS:
-          removePending(reply);
-          break;
-        case NOT_LEADER:
-          checkResponseTerm(reply.getTerm());
-          break;
-        case UNRECOGNIZED:
-          break;
-      }
-    }
-
-    @Override
-    public void onError(Throwable t) {
-      if (!isAppenderRunning()) {
-        LOG.info("{} is stopped", GRpcLogAppender.this);
-        return;
-      }
-      LOG.info("{} got error when installing snapshot to {}, exception: {}",
-          server.getId(), follower.getPeer(), t);
-      close();
-    }
-
-    @Override
-    public void onCompleted() {
-      LOG.info("{} stops sending snapshots to follower {}", server.getId(),
-          follower);
-      close();
-    }
-  }
-
-  private void installSnapshot(SnapshotInfo snapshot,
-      InstallSnapshotResponseHandler responseHandler) {
-    LOG.info("{}: follower {}'s next index is {}," +
-            " log's start index is {}, need to install snapshot",
-        server.getId(), follower.getPeer(), follower.getNextIndex(),
-        raftLog.getStartIndex());
-
-    snapshotRequestObserver = client.installSnapshot(snapshotResponseHandler);
-    final String requestId = UUID.randomUUID().toString();
-    try {
-      for (InstallSnapshotRequestProto request :
-          new SnapshotRequestIter(snapshot, requestId)) {
-        if (isAppenderRunning()) {
-          snapshotRequestObserver.onNext(request);
-          follower.updateLastRpcSendTime();
-          responseHandler.addPending(request);
-        } else {
-          break;
-        }
-      }
-      snapshotRequestObserver.onCompleted();
-    } catch (Exception e) {
-      LOG.warn("{} failed to install snapshot {}. Exception: {}", this,
-          snapshot.getFiles(), e);
-      snapshotRequestObserver.onError(e);
-      return;
-    } finally {
-      snapshotRequestObserver = null;
-    }
-
-    synchronized (this) {
-      while (isAppenderRunning() && !responseHandler.isDone()) {
-        try {
-          wait();
-        } catch (InterruptedException ignored) {
-        }
-      }
-    }
-
-    if (responseHandler.hasAllResponse()) {
-      follower.updateMatchIndex(snapshot.getTermIndex().getIndex());
-      follower.updateNextIndex(snapshot.getTermIndex().getIndex() + 1);
-      LOG.info("{}: install snapshot-{} successfully on follower {}",
-          server.getId(), snapshot.getTermIndex().getIndex(), 
follower.getPeer());
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-grpc/src/main/java/org/apache/raft/grpc/server/PipelinedLogAppenderFactory.java
----------------------------------------------------------------------
diff --git 
a/raft-grpc/src/main/java/org/apache/raft/grpc/server/PipelinedLogAppenderFactory.java
 
b/raft-grpc/src/main/java/org/apache/raft/grpc/server/PipelinedLogAppenderFactory.java
deleted file mode 100644
index cc2e513..0000000
--- 
a/raft-grpc/src/main/java/org/apache/raft/grpc/server/PipelinedLogAppenderFactory.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.raft.grpc.server;
-
-import org.apache.raft.server.impl.FollowerInfo;
-import org.apache.raft.server.impl.LeaderState;
-import org.apache.raft.server.impl.LogAppender;
-import org.apache.raft.server.impl.LogAppenderFactory;
-import org.apache.raft.server.impl.RaftServerImpl;
-
-public class PipelinedLogAppenderFactory implements LogAppenderFactory {
-  @Override
-  public LogAppender getLogAppender(RaftServerImpl server, LeaderState state,
-                                    FollowerInfo f) {
-    return new GRpcLogAppender(server, state, f);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-grpc/src/main/java/org/apache/raft/grpc/server/RaftServerProtocolClient.java
----------------------------------------------------------------------
diff --git 
a/raft-grpc/src/main/java/org/apache/raft/grpc/server/RaftServerProtocolClient.java
 
b/raft-grpc/src/main/java/org/apache/raft/grpc/server/RaftServerProtocolClient.java
deleted file mode 100644
index 437e1f4..0000000
--- 
a/raft-grpc/src/main/java/org/apache/raft/grpc/server/RaftServerProtocolClient.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.raft.grpc.server;
-
-import org.apache.raft.protocol.RaftPeer;
-import org.apache.raft.shaded.io.grpc.ManagedChannel;
-import org.apache.raft.shaded.io.grpc.ManagedChannelBuilder;
-import org.apache.raft.shaded.io.grpc.stub.StreamObserver;
-import org.apache.raft.shaded.proto.RaftProtos.*;
-import org.apache.raft.shaded.proto.grpc.RaftServerProtocolServiceGrpc;
-import 
org.apache.raft.shaded.proto.grpc.RaftServerProtocolServiceGrpc.RaftServerProtocolServiceBlockingStub;
-import 
org.apache.raft.shaded.proto.grpc.RaftServerProtocolServiceGrpc.RaftServerProtocolServiceStub;
-
-/**
- * This is a RaftClient implementation that supports streaming data to the raft
- * ring. The stream implementation utilizes gRPC.
- */
-public class RaftServerProtocolClient {
-  private final ManagedChannel channel;
-  private final RaftServerProtocolServiceBlockingStub blockingStub;
-  private final RaftServerProtocolServiceStub asyncStub;
-
-  public RaftServerProtocolClient(RaftPeer target) {
-    channel = ManagedChannelBuilder.forTarget(target.getAddress())
-        .usePlaintext(true).build();
-    blockingStub = RaftServerProtocolServiceGrpc.newBlockingStub(channel);
-    asyncStub = RaftServerProtocolServiceGrpc.newStub(channel);
-  }
-
-  public void shutdown() {
-    channel.shutdownNow();
-  }
-
-  public RequestVoteReplyProto requestVote(RequestVoteRequestProto request) {
-    // the StatusRuntimeException will be handled by the caller
-    return blockingStub.requestVote(request);
-  }
-
-  StreamObserver<AppendEntriesRequestProto> appendEntries(
-      StreamObserver<AppendEntriesReplyProto> responseHandler) {
-    return asyncStub.appendEntries(responseHandler);
-  }
-
-  StreamObserver<InstallSnapshotRequestProto> installSnapshot(
-      StreamObserver<InstallSnapshotReplyProto> responseHandler) {
-    return asyncStub.installSnapshot(responseHandler);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-grpc/src/main/java/org/apache/raft/grpc/server/RaftServerProtocolService.java
----------------------------------------------------------------------
diff --git 
a/raft-grpc/src/main/java/org/apache/raft/grpc/server/RaftServerProtocolService.java
 
b/raft-grpc/src/main/java/org/apache/raft/grpc/server/RaftServerProtocolService.java
deleted file mode 100644
index 53dbb6a..0000000
--- 
a/raft-grpc/src/main/java/org/apache/raft/grpc/server/RaftServerProtocolService.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.raft.grpc.server;
-
-import org.apache.raft.grpc.RaftGrpcUtil;
-import org.apache.raft.server.protocol.RaftServerProtocol;
-import org.apache.raft.shaded.io.grpc.stub.StreamObserver;
-import org.apache.raft.shaded.proto.RaftProtos.*;
-import 
org.apache.raft.shaded.proto.grpc.RaftServerProtocolServiceGrpc.RaftServerProtocolServiceImplBase;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class RaftServerProtocolService extends 
RaftServerProtocolServiceImplBase {
-  public static final Logger LOG = 
LoggerFactory.getLogger(RaftServerProtocolService.class);
-
-  private final String id;
-  private final RaftServerProtocol server;
-
-  public RaftServerProtocolService(String id, RaftServerProtocol server) {
-    this.id = id;
-    this.server = server;
-  }
-
-  @Override
-  public void requestVote(RequestVoteRequestProto request,
-      StreamObserver<RequestVoteReplyProto> responseObserver) {
-    try {
-      final RequestVoteReplyProto reply = server.requestVote(request);
-      responseObserver.onNext(reply);
-      responseObserver.onCompleted();
-    } catch (Throwable e) {
-      LOG.info("{} got exception when handling requestVote {}: {}",
-          id, request.getServerRequest(), e);
-      responseObserver.onError(RaftGrpcUtil.wrapException(e));
-    }
-  }
-
-  @Override
-  public StreamObserver<AppendEntriesRequestProto> appendEntries(
-      StreamObserver<AppendEntriesReplyProto> responseObserver) {
-    return new StreamObserver<AppendEntriesRequestProto>() {
-      @Override
-      public void onNext(AppendEntriesRequestProto request) {
-        try {
-          final AppendEntriesReplyProto reply = server.appendEntries(request);
-          responseObserver.onNext(reply);
-        } catch (Throwable e) {
-          LOG.info("{} got exception when handling appendEntries {}: {}",
-              id, request.getServerRequest(), e);
-          responseObserver.onError(RaftGrpcUtil.wrapException(e));
-        }
-      }
-
-      @Override
-      public void onError(Throwable t) {
-        // for now we just log a msg
-        LOG.info("{}: appendEntries on error. Exception: {}", id, t);
-      }
-
-      @Override
-      public void onCompleted() {
-        LOG.info("{}: appendEntries completed", id);
-        responseObserver.onCompleted();
-      }
-    };
-  }
-
-  @Override
-  public StreamObserver<InstallSnapshotRequestProto> installSnapshot(
-      StreamObserver<InstallSnapshotReplyProto> responseObserver) {
-    return new StreamObserver<InstallSnapshotRequestProto>() {
-      @Override
-      public void onNext(InstallSnapshotRequestProto request) {
-        try {
-          final InstallSnapshotReplyProto reply = 
server.installSnapshot(request);
-          responseObserver.onNext(reply);
-        } catch (Throwable e) {
-          LOG.info("{} got exception when handling installSnapshot {}: {}",
-              id, request.getServerRequest(), e);
-          responseObserver.onError(RaftGrpcUtil.wrapException(e));
-        }
-      }
-
-      @Override
-      public void onError(Throwable t) {
-        LOG.info("{}: installSnapshot on error. Exception: {}", id, t);
-      }
-
-      @Override
-      public void onCompleted() {
-        LOG.info("{}: installSnapshot completed", id);
-        responseObserver.onCompleted();
-      }
-    };
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-grpc/src/test/java/org/apache/raft/grpc/MiniRaftClusterWithGRpc.java
----------------------------------------------------------------------
diff --git 
a/raft-grpc/src/test/java/org/apache/raft/grpc/MiniRaftClusterWithGRpc.java 
b/raft-grpc/src/test/java/org/apache/raft/grpc/MiniRaftClusterWithGRpc.java
deleted file mode 100644
index 359dabd..0000000
--- a/raft-grpc/src/test/java/org/apache/raft/grpc/MiniRaftClusterWithGRpc.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.raft.grpc;
-
-import com.google.common.base.Preconditions;
-import org.apache.raft.MiniRaftCluster;
-import org.apache.raft.RaftTestUtil;
-import org.apache.raft.client.RaftClientRequestSender;
-import org.apache.raft.conf.RaftProperties;
-import org.apache.raft.grpc.client.RaftClientSenderWithGrpc;
-import org.apache.raft.grpc.server.PipelinedLogAppenderFactory;
-import org.apache.raft.protocol.RaftPeer;
-import org.apache.raft.server.impl.BlockRequestHandlingInjection;
-import org.apache.raft.server.impl.DelayLocalExecutionInjection;
-import org.apache.raft.server.impl.LogAppenderFactory;
-import org.apache.raft.server.impl.RaftServerImpl;
-import org.apache.raft.util.NetUtils;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-
-import static 
org.apache.raft.server.RaftServerConfigKeys.RAFT_SERVER_LOG_APPENDER_FACTORY_CLASS_KEY;
-
-public class MiniRaftClusterWithGRpc extends MiniRaftCluster.RpcBase {
-  public static final Factory<MiniRaftClusterWithGRpc> FACTORY
-      = new Factory<MiniRaftClusterWithGRpc>() {
-    @Override
-    public MiniRaftClusterWithGRpc newCluster(
-        String[] ids, RaftProperties prop, boolean formatted) throws 
IOException {
-      return new MiniRaftClusterWithGRpc(ids, prop, formatted);
-    }
-  };
-
-  public static final DelayLocalExecutionInjection sendServerRequestInjection =
-      new 
DelayLocalExecutionInjection(RaftGRpcService.GRPC_SEND_SERVER_REQUEST);
-
-  public MiniRaftClusterWithGRpc(int numServers, RaftProperties properties)
-      throws IOException {
-    this(generateIds(numServers, 0), properties, true);
-  }
-
-  public MiniRaftClusterWithGRpc(String[] ids, RaftProperties properties,
-      boolean formatted) throws IOException {
-    super(ids, getPropForGrpc(properties), formatted);
-    init(initRpcServices(getServers(), properties));
-  }
-
-  private static RaftProperties getPropForGrpc(RaftProperties prop) {
-    RaftProperties newProp = new RaftProperties(prop);
-    newProp.setClass(RAFT_SERVER_LOG_APPENDER_FACTORY_CLASS_KEY,
-        PipelinedLogAppenderFactory.class, LogAppenderFactory.class);
-    return newProp;
-  }
-
-  private static Map<RaftPeer, RaftGRpcService> initRpcServices(
-      Collection<RaftServerImpl> servers, RaftProperties prop) throws 
IOException {
-    final Map<RaftPeer, RaftGRpcService> peerRpcs = new HashMap<>();
-
-    for (RaftServerImpl s : servers) {
-      final RaftGRpcService rpc = new RaftGRpcService(s, prop);
-      peerRpcs.put(new RaftPeer(s.getId(), rpc.getInetSocketAddress()), rpc);
-    }
-    return peerRpcs;
-  }
-
-  @Override
-  public RaftClientRequestSender getRaftClientRequestSender() {
-    return new RaftClientSenderWithGrpc(getPeers());
-  }
-
-  @Override
-  protected Collection<RaftPeer> addNewPeers(Collection<RaftPeer> newPeers,
-                                             Collection<RaftServerImpl> 
newServers, boolean startService)
-      throws IOException {
-    final Map<RaftPeer, RaftGRpcService> peers = initRpcServices(newServers, 
properties);
-    for (Map.Entry<RaftPeer, RaftGRpcService> entry : peers.entrySet()) {
-      RaftServerImpl server = servers.get(entry.getKey().getId());
-      server.setServerRpc(entry.getValue());
-      if (!startService) {
-        
BlockRequestHandlingInjection.getInstance().blockReplier(server.getId());
-      } else {
-        server.start();
-      }
-    }
-    return new ArrayList<>(peers.keySet());
-  }
-
-  @Override
-  protected RaftServerImpl setPeerRpc(RaftPeer peer) throws IOException {
-    RaftServerImpl server = servers.get(peer.getId());
-    int port = NetUtils.newInetSocketAddress(peer.getAddress()).getPort();
-    int oldPort = 
properties.getInt(RaftGrpcConfigKeys.RAFT_GRPC_SERVER_PORT_KEY,
-        RaftGrpcConfigKeys.RAFT_GRPC_SERVER_PORT_DEFAULT);
-    properties.setInt(RaftGrpcConfigKeys.RAFT_GRPC_SERVER_PORT_KEY, port);
-    final RaftGRpcService rpc = new RaftGRpcService(server, properties);
-    Preconditions.checkState(
-        rpc.getInetSocketAddress().toString().contains(peer.getAddress()),
-        "address in the raft conf: %s, address in rpc server: %s",
-        peer.getAddress(), rpc.getInetSocketAddress().toString());
-    server.setServerRpc(rpc);
-    properties.setInt(RaftGrpcConfigKeys.RAFT_GRPC_SERVER_PORT_KEY, oldPort);
-    return server;
-  }
-
-  @Override
-  public void startServer(String id) {
-    super.startServer(id);
-    BlockRequestHandlingInjection.getInstance().unblockReplier(id);
-  }
-
-  @Override
-  protected void blockQueueAndSetDelay(String leaderId, int delayMs)
-      throws InterruptedException {
-    RaftTestUtil.blockQueueAndSetDelay(getServers(), 
sendServerRequestInjection,
-        leaderId, delayMs, getMaxTimeout());
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-grpc/src/test/java/org/apache/raft/grpc/TestNotLeaderExceptionWithGrpc.java
----------------------------------------------------------------------
diff --git 
a/raft-grpc/src/test/java/org/apache/raft/grpc/TestNotLeaderExceptionWithGrpc.java
 
b/raft-grpc/src/test/java/org/apache/raft/grpc/TestNotLeaderExceptionWithGrpc.java
deleted file mode 100644
index a8357c9..0000000
--- 
a/raft-grpc/src/test/java/org/apache/raft/grpc/TestNotLeaderExceptionWithGrpc.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.raft.grpc;
-
-import org.apache.raft.MiniRaftCluster;
-import org.apache.raft.RaftNotLeaderExceptionBaseTest;
-import org.apache.raft.conf.RaftProperties;
-import org.apache.raft.grpc.server.PipelinedLogAppenderFactory;
-import org.apache.raft.server.impl.LogAppenderFactory;
-
-import java.io.IOException;
-
-import static 
org.apache.raft.server.RaftServerConfigKeys.RAFT_SERVER_LOG_APPENDER_FACTORY_CLASS_KEY;
-
-public class TestNotLeaderExceptionWithGrpc extends 
RaftNotLeaderExceptionBaseTest {
-  @Override
-  public MiniRaftCluster initCluster() throws IOException {
-    String[] s = MiniRaftCluster.generateIds(NUM_PEERS, 0);
-    RaftProperties prop = new RaftProperties();
-    prop.setClass(RAFT_SERVER_LOG_APPENDER_FACTORY_CLASS_KEY,
-        PipelinedLogAppenderFactory.class, LogAppenderFactory.class);
-    return new MiniRaftClusterWithGRpc(s, prop, true);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-grpc/src/test/java/org/apache/raft/grpc/TestRaftReconfigurationWithGRpc.java
----------------------------------------------------------------------
diff --git 
a/raft-grpc/src/test/java/org/apache/raft/grpc/TestRaftReconfigurationWithGRpc.java
 
b/raft-grpc/src/test/java/org/apache/raft/grpc/TestRaftReconfigurationWithGRpc.java
deleted file mode 100644
index 83e6c62..0000000
--- 
a/raft-grpc/src/test/java/org/apache/raft/grpc/TestRaftReconfigurationWithGRpc.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.raft.grpc;
-
-import org.apache.log4j.Level;
-import org.apache.raft.grpc.server.PipelinedLogAppenderFactory;
-import org.apache.raft.grpc.server.RaftServerProtocolService;
-import org.apache.raft.server.impl.LogAppenderFactory;
-import org.apache.raft.server.impl.RaftReconfigurationBaseTest;
-import org.apache.raft.util.RaftUtils;
-import org.junit.BeforeClass;
-
-import java.io.IOException;
-
-import static 
org.apache.raft.server.RaftServerConfigKeys.RAFT_SERVER_LOG_APPENDER_FACTORY_CLASS_KEY;
-
-public class TestRaftReconfigurationWithGRpc extends 
RaftReconfigurationBaseTest {
-  static {
-    RaftUtils.setLogLevel(RaftServerProtocolService.LOG, Level.DEBUG);
-  }
-
-  @BeforeClass
-  public static void setProp() {
-    prop.setClass(RAFT_SERVER_LOG_APPENDER_FACTORY_CLASS_KEY,
-        PipelinedLogAppenderFactory.class, LogAppenderFactory.class);
-  }
-
-  @Override
-  public MiniRaftClusterWithGRpc getCluster(int peerNum) throws IOException {
-    return new MiniRaftClusterWithGRpc(peerNum, prop);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-grpc/src/test/java/org/apache/raft/grpc/TestRaftSnapshotWithGrpc.java
----------------------------------------------------------------------
diff --git 
a/raft-grpc/src/test/java/org/apache/raft/grpc/TestRaftSnapshotWithGrpc.java 
b/raft-grpc/src/test/java/org/apache/raft/grpc/TestRaftSnapshotWithGrpc.java
deleted file mode 100644
index 74b2c63..0000000
--- a/raft-grpc/src/test/java/org/apache/raft/grpc/TestRaftSnapshotWithGrpc.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.raft.grpc;
-
-import org.apache.raft.MiniRaftCluster;
-import org.apache.raft.conf.RaftProperties;
-import org.apache.raft.statemachine.RaftSnapshotBaseTest;
-
-import java.io.IOException;
-
-public class TestRaftSnapshotWithGrpc extends RaftSnapshotBaseTest {
-  @Override
-  public MiniRaftCluster initCluster(int numServer, RaftProperties prop)
-      throws IOException {
-    return MiniRaftClusterWithGRpc.FACTORY.newCluster(numServer, prop, true);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-grpc/src/test/java/org/apache/raft/grpc/TestRaftStream.java
----------------------------------------------------------------------
diff --git a/raft-grpc/src/test/java/org/apache/raft/grpc/TestRaftStream.java 
b/raft-grpc/src/test/java/org/apache/raft/grpc/TestRaftStream.java
deleted file mode 100644
index 82a4e13..0000000
--- a/raft-grpc/src/test/java/org/apache/raft/grpc/TestRaftStream.java
+++ /dev/null
@@ -1,319 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.raft.grpc;
-
-import org.apache.log4j.Level;
-import org.apache.raft.RaftTestUtil;
-import org.apache.raft.conf.RaftProperties;
-import org.apache.raft.grpc.client.AppendStreamer;
-import org.apache.raft.grpc.client.RaftOutputStream;
-import org.apache.raft.grpc.server.PipelinedLogAppenderFactory;
-import org.apache.raft.server.impl.LogAppenderFactory;
-import org.apache.raft.server.impl.RaftServerImpl;
-import org.apache.raft.server.storage.RaftLog;
-import org.apache.raft.shaded.proto.RaftProtos.LogEntryProto;
-import org.apache.raft.util.RaftUtils;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.*;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.function.Supplier;
-
-import static org.apache.raft.RaftTestUtil.waitForLeader;
-import static 
org.apache.raft.grpc.RaftGrpcConfigKeys.RAFT_OUTPUTSTREAM_BUFFER_SIZE_KEY;
-import static 
org.apache.raft.server.RaftServerConfigKeys.RAFT_SERVER_LOG_APPENDER_FACTORY_CLASS_KEY;
-import static org.junit.Assert.fail;
-
-public class TestRaftStream {
-  static {
-    RaftUtils.setLogLevel(AppendStreamer.LOG, Level.ALL);
-  }
-  static final Logger LOG = LoggerFactory.getLogger(TestRaftStream.class);
-
-  private static final RaftProperties prop = new RaftProperties();
-  private static final int NUM_SERVERS = 3;
-
-  private MiniRaftClusterWithGRpc cluster;
-
-
-  @BeforeClass
-  public static void setProp() {
-    prop.setClass(RAFT_SERVER_LOG_APPENDER_FACTORY_CLASS_KEY,
-        PipelinedLogAppenderFactory.class, LogAppenderFactory.class);
-  }
-
-  @After
-  public void tearDown() {
-    if (cluster != null) {
-      cluster.shutdown();
-    }
-  }
-
-  private byte[] genContent(int count) {
-    return toBytes(count);
-  }
-
-  private byte[] toBytes(int i) {
-    byte[] b = new byte[4];
-    b[0] = (byte) ((i >>> 24) & 0xFF);
-    b[1] = (byte) ((i >>> 16) & 0xFF);
-    b[2] = (byte) ((i >>> 8) & 0xFF);
-    b[3] = (byte) (i & 0xFF);
-    return b;
-  }
-
-  @Test
-  public void testSimpleWrite() throws Exception {
-    LOG.info("Running testSimpleWrite");
-
-    // default 64K is too large for a test
-    prop.setInt(RAFT_OUTPUTSTREAM_BUFFER_SIZE_KEY, 4);
-    cluster = new MiniRaftClusterWithGRpc(NUM_SERVERS, prop);
-
-    cluster.start();
-    RaftServerImpl leader = waitForLeader(cluster);
-
-    int count = 1;
-    try (RaftOutputStream out = new RaftOutputStream(prop, "writer-1",
-        cluster.getPeers(), leader.getId())) {
-      for (int i = 0; i < 500; i++) { // generate 500 requests
-        out.write(genContent(count++));
-      }
-    }
-
-    // check the leader's raft log
-    final RaftLog raftLog = leader.getState().getLog();
-    final AtomicInteger currentNum = new AtomicInteger(1);
-    checkLog(raftLog, 500, () -> {
-      int value = currentNum.getAndIncrement();
-      return toBytes(value);
-    });
-  }
-
-  private void checkLog(RaftLog raftLog, long expectedCommittedIndex,
-      Supplier<byte[]> s) {
-    long committedIndex = raftLog.getLastCommittedIndex();
-    Assert.assertEquals(expectedCommittedIndex, committedIndex);
-    // check the log content
-    LogEntryProto[] entries = raftLog.getEntries(1, expectedCommittedIndex + 
1);
-    for (LogEntryProto entry : entries) {
-      byte[] logData = entry.getSmLogEntry().getData().toByteArray();
-      byte[] expected = s.get();
-      Assert.assertEquals("log entry: " + entry,
-          expected.length, logData.length);
-      Assert.assertArrayEquals(expected, logData);
-    }
-  }
-
-  @Test
-  public void testWriteAndFlush() throws Exception {
-    LOG.info("Running testWriteAndFlush");
-
-    prop.setInt(RAFT_OUTPUTSTREAM_BUFFER_SIZE_KEY, ByteValue.BUFFERSIZE);
-    cluster = new MiniRaftClusterWithGRpc(NUM_SERVERS, prop);
-    cluster.start();
-
-    RaftServerImpl leader = waitForLeader(cluster);
-    RaftOutputStream out = new RaftOutputStream(prop, "writer",
-        cluster.getPeers(), leader.getId());
-
-    int[] lengths = new int[]{1, 500, 1023, 1024, 1025, 2048, 3000, 3072};
-    ByteValue[] values = new ByteValue[lengths.length];
-    for (int i = 0; i < values.length; i++) {
-      values[i] = new ByteValue(lengths[i], (byte) 9);
-    }
-
-    List<byte[]> expectedTxs = new ArrayList<>();
-    for (ByteValue v : values) {
-      byte[] data = v.genData();
-      expectedTxs.addAll(v.getTransactions());
-      out.write(data);
-      out.flush();
-
-      // make sure after the flush the data has been committed
-      Assert.assertEquals(expectedTxs.size(),
-          leader.getState().getLastAppliedIndex());
-    }
-    out.close();
-
-    try {
-      out.write(0);
-      fail("The OutputStream has been closed");
-    } catch (IOException ignored) {
-    }
-
-    LOG.info("Start to check leader's log");
-    final AtomicInteger index = new AtomicInteger(0);
-    checkLog(leader.getState().getLog(), expectedTxs.size(),
-        () -> expectedTxs.get(index.getAndIncrement()));
-  }
-
-  private static class ByteValue {
-    final static int BUFFERSIZE = 1024;
-
-    final int length;
-    final byte value;
-    final int numTx;
-    byte[] data;
-
-    ByteValue(int length, byte value) {
-      this.length = length;
-      this.value = value;
-      numTx = (length - 1) / BUFFERSIZE + 1;
-    }
-
-    byte[] genData() {
-      data = new byte[length];
-      Arrays.fill(data, value);
-      return data;
-    }
-
-    Collection<byte[]> getTransactions() {
-      if (data.length <= BUFFERSIZE) {
-        return Collections.singletonList(data);
-      } else {
-        List<byte[]> list = new ArrayList<>();
-        for (int i = 0; i < numTx; i++) {
-          int txSize = Math.min(BUFFERSIZE, length - BUFFERSIZE * i);
-          byte[] t = new byte[txSize];
-          Arrays.fill(t, value);
-          list.add(t);
-        }
-        return list;
-      }
-    }
-  }
-
-  @Test
-  public void testWriteWithOffset() throws Exception {
-    LOG.info("Running testWriteWithOffset");
-    prop.setInt(RAFT_OUTPUTSTREAM_BUFFER_SIZE_KEY, ByteValue.BUFFERSIZE);
-
-    cluster = new MiniRaftClusterWithGRpc(NUM_SERVERS, prop);
-    cluster.start();
-    RaftServerImpl leader = waitForLeader(cluster);
-
-    RaftOutputStream out = new RaftOutputStream(prop, "writer",
-        cluster.getPeers(), leader.getId());
-
-    byte[] b1 = new byte[ByteValue.BUFFERSIZE / 2];
-    Arrays.fill(b1, (byte) 1);
-    byte[] b2 = new byte[ByteValue.BUFFERSIZE];
-    Arrays.fill(b2, (byte) 2);
-    byte[] b3 = new byte[ByteValue.BUFFERSIZE * 2 + ByteValue.BUFFERSIZE / 2];
-    Arrays.fill(b3, (byte) 3);
-    byte[] b4 = new byte[ByteValue.BUFFERSIZE * 4];
-    Arrays.fill(b3, (byte) 4);
-
-    byte[] expected = new byte[ByteValue.BUFFERSIZE * 8];
-    byte[][] data = new byte[][]{b1, b2, b3, b4};
-    final Random random = new Random();
-    int totalSize = 0;
-    for (byte[] b : data) {
-      System.arraycopy(b, 0, expected, totalSize, b.length);
-      totalSize += b.length;
-
-      int written = 0;
-      while (written < b.length) {
-        int toWrite = random.nextInt(b.length - written) + 1;
-        LOG.info("write {} bytes", toWrite);
-        out.write(b, written, toWrite);
-        written += toWrite;
-      }
-    }
-    out.close();
-
-    final RaftLog log = leader.getState().getLog();
-    // 0.5 + 1 + 2.5 + 4 = 8
-    Assert.assertEquals(8, leader.getState().getLastAppliedIndex());
-    Assert.assertEquals(8, log.getLastCommittedIndex());
-    LogEntryProto[] entries = log.getEntries(1, 9);
-    byte[] actual = new byte[ByteValue.BUFFERSIZE * 8];
-    totalSize = 0;
-    for (LogEntryProto e : entries) {
-      byte[] eValue = e.getSmLogEntry().getData().toByteArray();
-      Assert.assertEquals(ByteValue.BUFFERSIZE, eValue.length);
-      System.arraycopy(eValue, 0, actual, totalSize, eValue.length);
-      totalSize += eValue.length;
-    }
-    Assert.assertArrayEquals(expected, actual);
-  }
-
-  /**
-   * Write while leader is killed
-   */
-  @Test
-  public void testKillLeader() throws Exception {
-    LOG.info("Running testChangeLeader");
-
-    prop.setInt(RAFT_OUTPUTSTREAM_BUFFER_SIZE_KEY, 4);
-    cluster = new MiniRaftClusterWithGRpc(NUM_SERVERS, prop);
-    cluster.start();
-    final RaftServerImpl leader = waitForLeader(cluster);
-
-    final AtomicBoolean running  = new AtomicBoolean(true);
-    final AtomicBoolean success = new AtomicBoolean(false);
-    final AtomicInteger result = new AtomicInteger(0);
-    final CountDownLatch latch = new CountDownLatch(1);
-
-    new Thread(() -> {
-      LOG.info("Writer thread starts");
-      int count = 0;
-      try (RaftOutputStream out = new RaftOutputStream(prop, "writer",
-          cluster.getPeers(), leader.getId())) {
-        while (running.get()) {
-          out.write(toBytes(count++));
-          Thread.sleep(10);
-        }
-        success.set(true);
-        result.set(count);
-      } catch (Exception e) {
-        LOG.info("Got exception when writing", e);
-        success.set(false);
-      } finally {
-        latch.countDown();
-      }
-    }).start();
-
-    // force change the leader
-    RaftTestUtil.waitAndKillLeader(cluster, true);
-    final RaftServerImpl newLeader = waitForLeader(cluster);
-    Assert.assertNotEquals(leader.getId(), newLeader.getId());
-    Thread.sleep(500);
-
-    running.set(false);
-    latch.await(5, TimeUnit.SECONDS);
-    Assert.assertTrue(success.get());
-    // total number of tx should be >= result + 2, where 2 means two NoOp from
-    // leaders. It may be larger than result+2 because the client may resend
-    // requests and we do not have retry cache on servers yet.
-    LOG.info("last applied index: {}. total number of requests: {}",
-        newLeader.getState().getLastAppliedIndex(), result.get());
-    Assert.assertTrue(
-        newLeader.getState().getLastAppliedIndex() >= result.get() + 1);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-grpc/src/test/java/org/apache/raft/grpc/TestRaftWithGrpc.java
----------------------------------------------------------------------
diff --git a/raft-grpc/src/test/java/org/apache/raft/grpc/TestRaftWithGrpc.java 
b/raft-grpc/src/test/java/org/apache/raft/grpc/TestRaftWithGrpc.java
deleted file mode 100644
index c6667b4..0000000
--- a/raft-grpc/src/test/java/org/apache/raft/grpc/TestRaftWithGrpc.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.raft.grpc;
-
-import org.apache.log4j.Level;
-import org.apache.raft.RaftBasicTests;
-import org.apache.raft.grpc.server.PipelinedLogAppenderFactory;
-import org.apache.raft.server.impl.BlockRequestHandlingInjection;
-import org.apache.raft.server.impl.LogAppenderFactory;
-import org.apache.raft.server.impl.RaftServerImpl;
-import org.apache.raft.util.RaftUtils;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import java.io.IOException;
-
-import static 
org.apache.raft.server.RaftServerConfigKeys.RAFT_SERVER_LOG_APPENDER_FACTORY_CLASS_KEY;
-
-public class TestRaftWithGrpc extends RaftBasicTests {
-  static {
-    RaftUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG);
-  }
-
-  private final MiniRaftClusterWithGRpc cluster;
-
-  @BeforeClass
-  public static void setProp() {
-    properties.setClass(RAFT_SERVER_LOG_APPENDER_FACTORY_CLASS_KEY,
-        PipelinedLogAppenderFactory.class, LogAppenderFactory.class);
-  }
-
-  public TestRaftWithGrpc() throws IOException {
-    cluster = new MiniRaftClusterWithGRpc(NUM_SERVERS, properties);
-    Assert.assertNull(cluster.getLeader());
-  }
-
-  @Override
-  public MiniRaftClusterWithGRpc getCluster() {
-    return cluster;
-  }
-
-  @Override
-  @Test
-  public void testEnforceLeader() throws Exception {
-    super.testEnforceLeader();
-
-    MiniRaftClusterWithGRpc.sendServerRequestInjection.clear();
-    BlockRequestHandlingInjection.getInstance().unblockAll();
-  }
-
-  @Override
-  @Test
-  public void testWithLoad() throws Exception {
-    super.testWithLoad();
-    BlockRequestHandlingInjection.getInstance().unblockAll();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-grpc/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/raft-grpc/src/test/resources/log4j.properties 
b/raft-grpc/src/test/resources/log4j.properties
deleted file mode 100644
index ced0687..0000000
--- a/raft-grpc/src/test/resources/log4j.properties
+++ /dev/null
@@ -1,18 +0,0 @@
-#   Licensed under the Apache License, Version 2.0 (the "License");
-#   you may not use this file except in compliance with the License.
-#   You may obtain a copy of the License at
-#
-#       http://www.apache.org/licenses/LICENSE-2.0
-#
-#   Unless required by applicable law or agreed to in writing, software
-#   distributed under the License is distributed on an "AS IS" BASIS,
-#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#   See the License for the specific language governing permissions and
-#   limitations under the License.
-# log4j configuration used during build and unit tests
-
-log4j.rootLogger=info,stdout
-log4j.threshold=ALL
-log4j.appender.stdout=org.apache.log4j.ConsoleAppender
-log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
-log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} 
(%F:%M(%L)) - %m%n

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-hadoop/pom.xml
----------------------------------------------------------------------
diff --git a/raft-hadoop/pom.xml b/raft-hadoop/pom.xml
deleted file mode 100644
index da84fc9..0000000
--- a/raft-hadoop/pom.xml
+++ /dev/null
@@ -1,99 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-  Licensed under the Apache License, Version 2.0 (the "License");
-  you may not use this file except in compliance with the License.
-  You may obtain a copy of the License at
-
-    http://www.apache.org/licenses/LICENSE-2.0
-
-  Unless required by applicable law or agreed to in writing, software
-  distributed under the License is distributed on an "AS IS" BASIS,
-  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  See the License for the specific language governing permissions and
-  limitations under the License. See accompanying LICENSE file.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0";
-         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
-  <modelVersion>4.0.0</modelVersion>
-  <parent>
-    <artifactId>raft-project-dist</artifactId>
-    <groupId>com.hortonworks.raft</groupId>
-    <version>1.0-SNAPSHOT</version>
-    <relativePath>../raft-project-dist</relativePath>
-  </parent>
-
-  <artifactId>raft-hadoop</artifactId>
-  <name>Raft Hadoop Support</name>
-
-  <dependencies>
-    <dependency>
-      <artifactId>raft-proto-shaded</artifactId>
-      <groupId>com.hortonworks.raft</groupId>
-      <scope>provided</scope>
-    </dependency>
-
-    <dependency>
-      <artifactId>raft-common</artifactId>
-      <groupId>com.hortonworks.raft</groupId>
-      <scope>provided</scope>
-    </dependency>
-    <dependency>
-      <artifactId>raft-common</artifactId>
-      <groupId>com.hortonworks.raft</groupId>
-      <scope>test</scope>
-      <type>test-jar</type>
-    </dependency>
-    
-    <dependency>
-      <artifactId>raft-client</artifactId>
-      <groupId>com.hortonworks.raft</groupId>
-      <scope>provided</scope>
-    </dependency>
-    <dependency>
-      <artifactId>raft-client</artifactId>
-      <groupId>com.hortonworks.raft</groupId>
-      <scope>test</scope>
-      <type>test-jar</type>
-    </dependency>
-    
-    <dependency>
-      <artifactId>raft-server</artifactId>
-      <groupId>com.hortonworks.raft</groupId>
-      <scope>provided</scope>
-    </dependency>
-    <dependency>
-      <artifactId>raft-server</artifactId>
-      <groupId>com.hortonworks.raft</groupId>
-      <scope>test</scope>
-      <type>test-jar</type>
-    </dependency>
-    
-    <dependency>
-      <groupId>org.slf4j</groupId>
-      <artifactId>slf4j-api</artifactId>
-    </dependency>
-
-    <dependency>
-      <groupId>com.google.guava</groupId>
-      <artifactId>guava</artifactId>
-    </dependency>
-
-    <dependency>
-      <groupId>junit</groupId>
-      <artifactId>junit</artifactId>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.mockito</groupId>
-      <artifactId>mockito-all</artifactId>
-      <scope>test</scope>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.hadoop</groupId>
-      <artifactId>hadoop-common</artifactId>
-      <version>${hadoop.version}</version>
-    </dependency>    
-  </dependencies>
-</project>

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-hadoop/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngineShaded.java
----------------------------------------------------------------------
diff --git 
a/raft-hadoop/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngineShaded.java 
b/raft-hadoop/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngineShaded.java
deleted file mode 100644
index 824b19c..0000000
--- 
a/raft-hadoop/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngineShaded.java
+++ /dev/null
@@ -1,623 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.ipc;
-
-import com.google.common.annotations.VisibleForTesting;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.DataOutputOutputStream;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.retry.RetryPolicy;
-import org.apache.hadoop.ipc.Client.ConnectionId;
-import org.apache.hadoop.ipc.RPC.RpcInvoker;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.SecretManager;
-import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.hadoop.util.ProtoUtil;
-import org.apache.hadoop.util.Time;
-import org.apache.raft.shaded.com.google.protobuf.*;
-import org.apache.raft.shaded.com.google.protobuf.Descriptors.MethodDescriptor;
-import 
org.apache.raft.shaded.org.apache.hadoop.ipc.protobuf.ProtobufRpcEngineProtos.RequestHeaderProto;
-import 
org.apache.raft.shaded.org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto;
-import 
org.apache.raft.shaded.org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto;
-
-import javax.net.SocketFactory;
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.lang.reflect.Method;
-import java.lang.reflect.Proxy;
-import java.net.InetSocketAddress;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-/**
- * Copied from {@link org.apache.hadoop.ipc.ProtobufRpcEngine}
- * and replaced the protobuf classes with the shaded classes.
- */
-@InterfaceStability.Evolving
-public class ProtobufRpcEngineShaded implements RpcEngine {
-  public static final Log LOG = 
LogFactory.getLog(ProtobufRpcEngineShaded.class);
-  
-  static { // Register the rpcRequest deserializer for WritableRpcEngine 
-    org.apache.hadoop.ipc.Server.registerProtocolEngine(
-        RPC.RpcKind.RPC_PROTOCOL_BUFFER, RpcRequestWrapper.class,
-        new Server.ProtoBufRpcInvoker());
-  }
-
-  private static final ClientCache CLIENTS = new ClientCache();
-
-  public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
-      InetSocketAddress addr, UserGroupInformation ticket, Configuration conf,
-      SocketFactory factory, int rpcTimeout) throws IOException {
-    return getProxy(protocol, clientVersion, addr, ticket, conf, factory,
-        rpcTimeout, null);
-  }
-
-  @Override
-  public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
-      InetSocketAddress addr, UserGroupInformation ticket, Configuration conf,
-      SocketFactory factory, int rpcTimeout, RetryPolicy connectionRetryPolicy
-      ) throws IOException {
-    return getProxy(protocol, clientVersion, addr, ticket, conf, factory,
-      rpcTimeout, connectionRetryPolicy, null);
-  }
-
-  @Override
-  @SuppressWarnings("unchecked")
-  public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
-      InetSocketAddress addr, UserGroupInformation ticket, Configuration conf,
-      SocketFactory factory, int rpcTimeout, RetryPolicy connectionRetryPolicy,
-      AtomicBoolean fallbackToSimpleAuth) throws IOException {
-
-    final Invoker invoker = new Invoker(protocol, addr, ticket, conf, factory,
-        rpcTimeout, connectionRetryPolicy, fallbackToSimpleAuth);
-    return new ProtocolProxy<T>(protocol, (T) Proxy.newProxyInstance(
-        protocol.getClassLoader(), new Class[]{protocol}, invoker), false);
-  }
-  
-  @Override
-  public ProtocolProxy<ProtocolMetaInfoPB> getProtocolMetaInfoProxy(
-      ConnectionId connId, Configuration conf, SocketFactory factory)
-      throws IOException {
-    Class<ProtocolMetaInfoPB> protocol = ProtocolMetaInfoPB.class;
-    return new ProtocolProxy<ProtocolMetaInfoPB>(protocol,
-        (ProtocolMetaInfoPB) Proxy.newProxyInstance(protocol.getClassLoader(),
-            new Class[] { protocol }, new Invoker(protocol, connId, conf,
-                factory)), false);
-  }
-
-  private static class Invoker implements RpcInvocationHandler {
-    private final Map<String, Message> returnTypes = 
-        new ConcurrentHashMap<String, Message>();
-    private boolean isClosed = false;
-    private final Client.ConnectionId remoteId;
-    private final Client client;
-    private final long clientProtocolVersion;
-    private final String protocolName;
-    private AtomicBoolean fallbackToSimpleAuth;
-
-    private Invoker(Class<?> protocol, InetSocketAddress addr,
-        UserGroupInformation ticket, Configuration conf, SocketFactory factory,
-        int rpcTimeout, RetryPolicy connectionRetryPolicy,
-        AtomicBoolean fallbackToSimpleAuth) throws IOException {
-      this(protocol, Client.ConnectionId.getConnectionId(
-          addr, protocol, ticket, rpcTimeout, connectionRetryPolicy, conf),
-          conf, factory);
-      this.fallbackToSimpleAuth = fallbackToSimpleAuth;
-    }
-    
-    /**
-     * This constructor takes a connectionId, instead of creating a new one.
-     */
-    private Invoker(Class<?> protocol, Client.ConnectionId connId,
-        Configuration conf, SocketFactory factory) {
-      this.remoteId = connId;
-      this.client = CLIENTS.getClient(conf, factory, RpcResponseWrapper.class);
-      this.protocolName = RPC.getProtocolName(protocol);
-      this.clientProtocolVersion = RPC
-          .getProtocolVersion(protocol);
-    }
-
-    private RequestHeaderProto constructRpcRequestHeader(Method method) {
-      RequestHeaderProto.Builder builder = RequestHeaderProto
-          .newBuilder();
-      builder.setMethodName(method.getName());
-     
-
-      // For protobuf, {@code protocol} used when creating client side proxy is
-      // the interface extending BlockingInterface, which has the annotations 
-      // such as ProtocolName etc.
-      //
-      // Using Method.getDeclaringClass(), as in WritableEngine to get at
-      // the protocol interface will return BlockingInterface, from where 
-      // the annotation ProtocolName and Version cannot be
-      // obtained.
-      //
-      // Hence we simply use the protocol class used to create the proxy.
-      // For PB this may limit the use of mixins on client side.
-      builder.setDeclaringClassProtocolName(protocolName);
-      builder.setClientProtocolVersion(clientProtocolVersion);
-      return builder.build();
-    }
-
-    /**
-     * This is the client side invoker of RPC method. It only throws
-     * ServiceException, since the invocation proxy expects only
-     * ServiceException to be thrown by the method in case protobuf service.
-     * 
-     * ServiceException has the following causes:
-     * <ol>
-     * <li>Exceptions encountered on the client side in this method are 
-     * set as cause in ServiceException as is.</li>
-     * <li>Exceptions from the server are wrapped in RemoteException and are
-     * set as cause in ServiceException</li>
-     * </ol>
-     * 
-     * Note that the client calling protobuf RPC methods, must handle
-     * ServiceException by getting the cause from the ServiceException. If the
-     * cause is RemoteException, then unwrap it to get the exception thrown by
-     * the server.
-     */
-    @Override
-    public Object invoke(Object proxy, Method method, Object[] args)
-        throws ServiceException {
-      long startTime = 0;
-      if (LOG.isDebugEnabled()) {
-        startTime = Time.now();
-      }
-      
-      if (args.length != 2) { // RpcController + Message
-        throw new ServiceException("Too many parameters for request. Method: ["
-            + method.getName() + "]" + ", Expected: 2, Actual: "
-            + args.length);
-      }
-      if (args[1] == null) {
-        throw new ServiceException("null param while calling Method: ["
-            + method.getName() + "]");
-      }
-
-      RequestHeaderProto rpcRequestHeader = constructRpcRequestHeader(method);
-      
-      if (LOG.isTraceEnabled()) {
-        LOG.trace(Thread.currentThread().getId() + ": Call -> " +
-            remoteId + ": " + method.getName() +
-            " {" + TextFormat.shortDebugString((Message) args[1]) + "}");
-      }
-
-
-      Message theRequest = (Message) args[1];
-      final RpcResponseWrapper val;
-      try {
-        val = (RpcResponseWrapper) client.call(RPC.RpcKind.RPC_PROTOCOL_BUFFER,
-            new RpcRequestWrapper(rpcRequestHeader, theRequest), remoteId,
-            fallbackToSimpleAuth);
-
-      } catch (Throwable e) {
-        if (LOG.isTraceEnabled()) {
-          LOG.trace(Thread.currentThread().getId() + ": Exception <- " +
-              remoteId + ": " + method.getName() +
-                " {" + e + "}");
-        }
-        throw new ServiceException(e);
-      }
-
-      if (LOG.isDebugEnabled()) {
-        long callTime = Time.now() - startTime;
-        LOG.debug("Call: " + method.getName() + " took " + callTime + "ms");
-      }
-      
-      Message prototype = null;
-      try {
-        prototype = getReturnProtoType(method);
-      } catch (Exception e) {
-        throw new ServiceException(e);
-      }
-      Message returnMessage;
-      try {
-        returnMessage = prototype.newBuilderForType()
-            .mergeFrom(val.theResponseRead).build();
-
-        if (LOG.isTraceEnabled()) {
-          LOG.trace(Thread.currentThread().getId() + ": Response <- " +
-              remoteId + ": " + method.getName() +
-                " {" + TextFormat.shortDebugString(returnMessage) + "}");
-        }
-
-      } catch (Throwable e) {
-        throw new ServiceException(e);
-      }
-      return returnMessage;
-    }
-
-    @Override
-    public void close() throws IOException {
-      if (!isClosed) {
-        isClosed = true;
-        CLIENTS.stopClient(client);
-      }
-    }
-
-    private Message getReturnProtoType(Method method) throws Exception {
-      if (returnTypes.containsKey(method.getName())) {
-        return returnTypes.get(method.getName());
-      }
-      
-      Class<?> returnType = method.getReturnType();
-      Method newInstMethod = returnType.getMethod("getDefaultInstance");
-      newInstMethod.setAccessible(true);
-      Message prototype = (Message) newInstMethod.invoke(null, (Object[]) 
null);
-      returnTypes.put(method.getName(), prototype);
-      return prototype;
-    }
-
-    @Override //RpcInvocationHandler
-    public ConnectionId getConnectionId() {
-      return remoteId;
-    }
-  }
-
-  interface RpcWrapper extends Writable {
-    int getLength();
-  }
-  /**
-   * Wrapper for Protocol Buffer Requests
-   * 
-   * Note while this wrapper is writable, the request on the wire is in
-   * Protobuf. Several methods on {@link org.apache.hadoop.ipc.Server and RPC} 
-   * use type Writable as a wrapper to work across multiple RpcEngine kinds.
-   */
-  private static abstract class RpcMessageWithHeader<T extends 
GeneratedMessage>
-    implements RpcWrapper {
-    T requestHeader;
-    Message theRequest; // for clientSide, the request is here
-    byte[] theRequestRead; // for server side, the request is here
-
-    public RpcMessageWithHeader() {
-    }
-
-    public RpcMessageWithHeader(T requestHeader, Message theRequest) {
-      this.requestHeader = requestHeader;
-      this.theRequest = theRequest;
-    }
-
-    @Override
-    public void write(DataOutput out) throws IOException {
-      OutputStream os = DataOutputOutputStream.constructOutputStream(out);
-      
-      ((Message)requestHeader).writeDelimitedTo(os);
-      theRequest.writeDelimitedTo(os);
-    }
-
-    @Override
-    public void readFields(DataInput in) throws IOException {
-      requestHeader = parseHeaderFrom(readVarintBytes(in));
-      theRequestRead = readMessageRequest(in);
-    }
-
-    abstract T parseHeaderFrom(byte[] bytes) throws IOException;
-
-    byte[] readMessageRequest(DataInput in) throws IOException {
-      return readVarintBytes(in);
-    }
-
-    private static byte[] readVarintBytes(DataInput in) throws IOException {
-      final int length = ProtoUtil.readRawVarint32(in);
-      final byte[] bytes = new byte[length];
-      in.readFully(bytes);
-      return bytes;
-    }
-
-    public T getMessageHeader() {
-      return requestHeader;
-    }
-
-    public byte[] getMessageBytes() {
-      return theRequestRead;
-    }
-    
-    @Override
-    public int getLength() {
-      int headerLen = requestHeader.getSerializedSize();
-      int reqLen;
-      if (theRequest != null) {
-        reqLen = theRequest.getSerializedSize();
-      } else if (theRequestRead != null ) {
-        reqLen = theRequestRead.length;
-      } else {
-        throw new IllegalArgumentException(
-            "getLength on uninitialized RpcWrapper");      
-      }
-      return CodedOutputStream.computeRawVarint32Size(headerLen) +  headerLen
-          + CodedOutputStream.computeRawVarint32Size(reqLen) + reqLen;
-    }
-  }
-  
-  private static class RpcRequestWrapper
-  extends RpcMessageWithHeader<RequestHeaderProto> {
-    @SuppressWarnings("unused")
-    public RpcRequestWrapper() {}
-    
-    public RpcRequestWrapper(
-        RequestHeaderProto requestHeader, Message theRequest) {
-      super(requestHeader, theRequest);
-    }
-    
-    @Override
-    RequestHeaderProto parseHeaderFrom(byte[] bytes) throws IOException {
-      return RequestHeaderProto.parseFrom(bytes);
-    }
-    
-    @Override
-    public String toString() {
-      return requestHeader.getDeclaringClassProtocolName() + "." +
-          requestHeader.getMethodName();
-    }
-  }
-
-  @InterfaceAudience.LimitedPrivate({"RPC"})
-  public static class RpcRequestMessageWrapper
-  extends RpcMessageWithHeader<RpcRequestHeaderProto> {
-    public RpcRequestMessageWrapper() {}
-    
-    public RpcRequestMessageWrapper(
-        RpcRequestHeaderProto requestHeader, Message theRequest) {
-      super(requestHeader, theRequest);
-    }
-    
-    @Override
-    RpcRequestHeaderProto parseHeaderFrom(byte[] bytes) throws IOException {
-      return RpcRequestHeaderProto.parseFrom(bytes);
-    }
-  }
-
-  @InterfaceAudience.LimitedPrivate({"RPC"})
-  public static class RpcResponseMessageWrapper
-  extends RpcMessageWithHeader<RpcResponseHeaderProto> {
-    public RpcResponseMessageWrapper() {}
-    
-    public RpcResponseMessageWrapper(
-        RpcResponseHeaderProto responseHeader, Message theRequest) {
-      super(responseHeader, theRequest);
-    }
-    
-    @Override
-    byte[] readMessageRequest(DataInput in) throws IOException {
-      // error message contain no message body
-      switch (requestHeader.getStatus()) {
-        case ERROR:
-        case FATAL:
-          return null;
-        default:
-          return super.readMessageRequest(in);
-      }
-    }
-    
-    @Override
-    RpcResponseHeaderProto parseHeaderFrom(byte[] bytes) throws IOException {
-      return RpcResponseHeaderProto.parseFrom(bytes);
-    }
-  }
-
-  /**
-   *  Wrapper for Protocol Buffer Responses
-   * 
-   * Note while this wrapper is writable, the request on the wire is in
-   * Protobuf. Several methods on {@link org.apache.hadoop.ipc.Server and RPC} 
-   * use type Writable as a wrapper to work across multiple RpcEngine kinds.
-   */
-  @InterfaceAudience.LimitedPrivate({"RPC"}) // temporarily exposed 
-  public static class RpcResponseWrapper implements RpcWrapper {
-    Message theResponse; // for senderSide, the response is here
-    byte[] theResponseRead; // for receiver side, the response is here
-
-    public RpcResponseWrapper() {
-    }
-
-    public RpcResponseWrapper(Message message) {
-      this.theResponse = message;
-    }
-
-    @Override
-    public void write(DataOutput out) throws IOException {
-      OutputStream os = DataOutputOutputStream.constructOutputStream(out);
-      theResponse.writeDelimitedTo(os);   
-    }
-
-    @Override
-    public void readFields(DataInput in) throws IOException {
-      int length = ProtoUtil.readRawVarint32(in);
-      theResponseRead = new byte[length];
-      in.readFully(theResponseRead);
-    }
-    
-    @Override
-    public int getLength() {
-      int resLen;
-      if (theResponse != null) {
-        resLen = theResponse.getSerializedSize();
-      } else if (theResponseRead != null ) {
-        resLen = theResponseRead.length;
-      } else {
-        throw new IllegalArgumentException(
-            "getLength on uninitialized RpcWrapper");      
-      }
-      return CodedOutputStream.computeRawVarint32Size(resLen) + resLen;
-    }
-  }
-
-  @VisibleForTesting
-  @InterfaceAudience.Private
-  @InterfaceStability.Unstable
-  static Client getClient(Configuration conf) {
-    return CLIENTS.getClient(conf, SocketFactory.getDefault(),
-        RpcResponseWrapper.class);
-  }
-  
- 
-
-  @Override
-  public RPC.Server getServer(Class<?> protocol, Object protocolImpl,
-      String bindAddress, int port, int numHandlers, int numReaders,
-      int queueSizePerHandler, boolean verbose, Configuration conf,
-      SecretManager<? extends TokenIdentifier> secretManager,
-      String portRangeConfig)
-      throws IOException {
-    return new Server(protocol, protocolImpl, conf, bindAddress, port,
-        numHandlers, numReaders, queueSizePerHandler, verbose, secretManager,
-        portRangeConfig);
-  }
-  
-  public static class Server extends RPC.Server {
-    /**
-     * Construct an RPC server.
-     * 
-     * @param protocolClass the class of protocol
-     * @param protocolImpl the protocolImpl whose methods will be called
-     * @param conf the configuration to use
-     * @param bindAddress the address to bind on to listen for connection
-     * @param port the port to listen for connections on
-     * @param numHandlers the number of method handler threads to run
-     * @param verbose whether each call should be logged
-     * @param portRangeConfig A config parameter that can be used to restrict
-     * the range of ports used when port is 0 (an ephemeral port)
-     */
-    public Server(Class<?> protocolClass, Object protocolImpl,
-        Configuration conf, String bindAddress, int port, int numHandlers,
-        int numReaders, int queueSizePerHandler, boolean verbose,
-        SecretManager<? extends TokenIdentifier> secretManager, 
-        String portRangeConfig)
-        throws IOException {
-      super(bindAddress, port, null, numHandlers,
-          numReaders, queueSizePerHandler, conf, classNameBase(protocolImpl
-              .getClass().getName()), secretManager, portRangeConfig);
-      this.verbose = verbose;  
-      registerProtocolAndImpl(RPC.RpcKind.RPC_PROTOCOL_BUFFER, protocolClass,
-          protocolImpl);
-    }
-    
-    /**
-     * Protobuf invoker for {@link RpcInvoker}
-     */
-    static class ProtoBufRpcInvoker implements RpcInvoker {
-      private static ProtoClassProtoImpl getProtocolImpl(RPC.Server server,
-          String protoName, long clientVersion) throws RpcServerException {
-        ProtoNameVer pv = new ProtoNameVer(protoName, clientVersion);
-        ProtoClassProtoImpl impl = 
-            server.getProtocolImplMap(RPC.RpcKind.RPC_PROTOCOL_BUFFER).get(pv);
-        if (impl == null) { // no match for Protocol AND Version
-          VerProtocolImpl highest = 
-              
server.getHighestSupportedProtocol(RPC.RpcKind.RPC_PROTOCOL_BUFFER, 
-                  protoName);
-          if (highest == null) {
-            throw new RpcNoSuchProtocolException(
-                "Unknown protocol: " + protoName);
-          }
-          // protocol supported but not the version that client wants
-          throw new RPC.VersionMismatch(protoName, clientVersion,
-              highest.version);
-        }
-        return impl;
-      }
-
-      @Override 
-      /**
-       * This is a server side method, which is invoked over RPC. On success
-       * the return response has protobuf response payload. On failure, the
-       * exception name and the stack trace are return in the resposne.
-       * See {@link HadoopRpcResponseProto}
-       * 
-       * In this method there three types of exceptions possible and they are
-       * returned in response as follows.
-       * <ol>
-       * <li> Exceptions encountered in this method that are returned 
-       * as {@link RpcServerException} </li>
-       * <li> Exceptions thrown by the service is wrapped in ServiceException. 
-       * In that this method returns in response the exception thrown by the 
-       * service.</li>
-       * <li> Other exceptions thrown by the service. They are returned as
-       * it is.</li>
-       * </ol>
-       */
-      public Writable call(RPC.Server server, String protocol,
-          Writable writableRequest, long receiveTime) throws Exception {
-        RpcRequestWrapper request = (RpcRequestWrapper) writableRequest;
-        RequestHeaderProto rpcRequest = request.requestHeader;
-        String methodName = rpcRequest.getMethodName();
-        String protoName = rpcRequest.getDeclaringClassProtocolName();
-        long clientVersion = rpcRequest.getClientProtocolVersion();
-        if (server.verbose)
-          LOG.info("Call: protocol=" + protocol + ", method=" + methodName);
-        
-        ProtoClassProtoImpl protocolImpl = getProtocolImpl(server, protoName,
-            clientVersion);
-        BlockingService service = (BlockingService) protocolImpl.protocolImpl;
-        MethodDescriptor methodDescriptor = service.getDescriptorForType()
-            .findMethodByName(methodName);
-        if (methodDescriptor == null) {
-          String msg = "Unknown method " + methodName + " called on " + 
protocol
-              + " protocol.";
-          LOG.warn(msg);
-          throw new RpcNoSuchMethodException(msg);
-        }
-        Message prototype = service.getRequestPrototype(methodDescriptor);
-        Message param = prototype.newBuilderForType()
-            .mergeFrom(request.theRequestRead).build();
-        
-        Message result;
-        long startTime = Time.now();
-        int qTime = (int) (startTime - receiveTime);
-        Exception exception = null;
-        try {
-          server.rpcDetailedMetrics.init(protocolImpl.protocolClass);
-          result = service.callBlockingMethod(methodDescriptor, null, param);
-        } catch (ServiceException e) {
-          exception = (Exception) e.getCause();
-          throw (Exception) e.getCause();
-        } catch (Exception e) {
-          exception = e;
-          throw e;
-        } finally {
-          int processingTime = (int) (Time.now() - startTime);
-          if (LOG.isDebugEnabled()) {
-            String msg = "Served: " + methodName + " queueTime= " + qTime +
-                " procesingTime= " + processingTime;
-            if (exception != null) {
-              msg += " exception= " + exception.getClass().getSimpleName();
-            }
-            LOG.debug(msg);
-          }
-          String detailedMetricsName = (exception == null) ?
-              methodName :
-              exception.getClass().getSimpleName();
-          server.rpcMetrics.addRpcQueueTime(qTime);
-          server.rpcMetrics.addRpcProcessingTime(processingTime);
-          server.rpcDetailedMetrics.addProcessingTime(detailedMetricsName,
-              processingTime);
-        }
-        return new RpcResponseWrapper(result);
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-hadoop/src/main/java/org/apache/raft/hadooprpc/HadoopConstants.java
----------------------------------------------------------------------
diff --git 
a/raft-hadoop/src/main/java/org/apache/raft/hadooprpc/HadoopConstants.java 
b/raft-hadoop/src/main/java/org/apache/raft/hadooprpc/HadoopConstants.java
deleted file mode 100644
index b8b0497..0000000
--- a/raft-hadoop/src/main/java/org/apache/raft/hadooprpc/HadoopConstants.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.raft.hadooprpc;
-
-public interface HadoopConstants {
-  String RAFT_SERVER_KERBEROS_PRINCIPAL_KEY
-      = "raft.server.kerberos.principal";
-  String RAFT_CLIENT_KERBEROS_PRINCIPAL_KEY
-      = "raft.client.kerberos.principal";
-  String RAFT_SERVER_PROTOCOL_NAME
-      = "org.apache.hadoop.raft.server.protocol.RaftServerProtocol";
-  String RAFT_CLIENT_PROTOCOL_NAME
-      = "org.apache.hadoop.raft.protocol.RaftClientProtocol";
-}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-hadoop/src/main/java/org/apache/raft/hadooprpc/Proxy.java
----------------------------------------------------------------------
diff --git a/raft-hadoop/src/main/java/org/apache/raft/hadooprpc/Proxy.java 
b/raft-hadoop/src/main/java/org/apache/raft/hadooprpc/Proxy.java
deleted file mode 100644
index 60c8be5..0000000
--- a/raft-hadoop/src/main/java/org/apache/raft/hadooprpc/Proxy.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.raft.hadooprpc;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.ipc.ProtobufRpcEngineShaded;
-import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.UserGroupInformation;
-
-import java.io.Closeable;
-import java.io.IOException;
-
-public class Proxy<PROTOCOL> implements Closeable {
-  public static <PROTOCOL> PROTOCOL getProxy(
-      Class<PROTOCOL> clazz, String addressStr, Configuration conf)
-      throws IOException {
-    RPC.setProtocolEngine(conf, clazz, ProtobufRpcEngineShaded.class);
-    return RPC.getProxy(clazz, RPC.getProtocolVersion(clazz),
-        org.apache.raft.util.NetUtils.newInetSocketAddress(addressStr),
-        UserGroupInformation.getCurrentUser(),
-        conf, NetUtils.getSocketFactory(conf, clazz));
-  }
-
-  private final PROTOCOL protocol;
-
-  public Proxy(Class<PROTOCOL> clazz, String addressStr, Configuration conf)
-      throws IOException {
-    this.protocol = getProxy(clazz, addressStr, conf);
-  }
-
-  public PROTOCOL getProtocol() {
-    return protocol;
-  }
-
-  @Override
-  public void close() {
-    RPC.stopProxy(protocol);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-hadoop/src/main/java/org/apache/raft/hadooprpc/client/HadoopClientRequestSender.java
----------------------------------------------------------------------
diff --git 
a/raft-hadoop/src/main/java/org/apache/raft/hadooprpc/client/HadoopClientRequestSender.java
 
b/raft-hadoop/src/main/java/org/apache/raft/hadooprpc/client/HadoopClientRequestSender.java
deleted file mode 100644
index 29372ea..0000000
--- 
a/raft-hadoop/src/main/java/org/apache/raft/hadooprpc/client/HadoopClientRequestSender.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.raft.hadooprpc.client;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.ipc.RemoteException;
-import org.apache.raft.client.RaftClientRequestSender;
-import org.apache.raft.protocol.*;
-import org.apache.raft.util.PeerProxyMap;
-
-import java.io.IOException;
-import java.util.Collection;
-
-public class HadoopClientRequestSender implements RaftClientRequestSender {
-
-  private final PeerProxyMap<RaftClientProtocolClientSideTranslatorPB> proxies;
-
-  public HadoopClientRequestSender(
-      Collection<RaftPeer> peers, final Configuration conf) {
-    this.proxies  = new PeerProxyMap<>(
-        p -> new RaftClientProtocolClientSideTranslatorPB(p.getAddress(), 
conf));
-    proxies.addPeers(peers);
-  }
-
-  @Override
-  public RaftClientReply sendRequest(RaftClientRequest request)
-      throws IOException {
-    final String serverId = request.getReplierId();
-    final RaftClientProtocolClientSideTranslatorPB proxy =
-        proxies.getProxy(serverId);
-    try {
-      if (request instanceof SetConfigurationRequest) {
-        return proxy.setConfiguration((SetConfigurationRequest) request);
-      } else {
-        return proxy.submitClientRequest(request);
-      }
-    } catch (RemoteException e) {
-      throw e.unwrapRemoteException(StateMachineException.class,
-          ReconfigurationTimeoutException.class,
-          ReconfigurationInProgressException.class, RaftException.class);
-    }
-  }
-
-  @Override
-  public void addServers(Iterable<RaftPeer> servers) {
-    proxies.addPeers(servers);
-  }
-
-  @Override
-  public void close() {
-    proxies.close();
-  }
-}


Reply via email to