http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/HadoopClientRequestSender.java
----------------------------------------------------------------------
diff --git 
a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/HadoopClientRequestSender.java
 
b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/HadoopClientRequestSender.java
new file mode 100644
index 0000000..aac7c31
--- /dev/null
+++ 
b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/HadoopClientRequestSender.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.hadooprpc.client;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.ratis.client.RaftClientRequestSender;
+import org.apache.ratis.protocol.*;
+import org.apache.ratis.util.PeerProxyMap;
+
+import java.io.IOException;
+import java.util.Collection;
+
+public class HadoopClientRequestSender implements RaftClientRequestSender {
+
+  private final PeerProxyMap<RaftClientProtocolClientSideTranslatorPB> proxies;
+
+  public HadoopClientRequestSender(
+      Collection<RaftPeer> peers, final Configuration conf) {
+    this.proxies  = new PeerProxyMap<>(
+        p -> new RaftClientProtocolClientSideTranslatorPB(p.getAddress(), 
conf));
+    proxies.addPeers(peers);
+  }
+
+  @Override
+  public RaftClientReply sendRequest(RaftClientRequest request)
+      throws IOException {
+    final String serverId = request.getReplierId();
+    final RaftClientProtocolClientSideTranslatorPB proxy =
+        proxies.getProxy(serverId);
+    try {
+      if (request instanceof SetConfigurationRequest) {
+        return proxy.setConfiguration((SetConfigurationRequest) request);
+      } else {
+        return proxy.submitClientRequest(request);
+      }
+    } catch (RemoteException e) {
+      throw e.unwrapRemoteException(StateMachineException.class,
+          ReconfigurationTimeoutException.class,
+          ReconfigurationInProgressException.class, RaftException.class);
+    }
+  }
+
+  @Override
+  public void addServers(Iterable<RaftPeer> servers) {
+    proxies.addPeers(servers);
+  }
+
+  @Override
+  public void close() {
+    proxies.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/RaftClientProtocolClientSideTranslatorPB.java
----------------------------------------------------------------------
diff --git 
a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/RaftClientProtocolClientSideTranslatorPB.java
 
b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/RaftClientProtocolClientSideTranslatorPB.java
new file mode 100644
index 0000000..a5c1a13
--- /dev/null
+++ 
b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/RaftClientProtocolClientSideTranslatorPB.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.hadooprpc.client;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.ratis.client.impl.ClientProtoUtils;
+import org.apache.ratis.hadooprpc.Proxy;
+import org.apache.ratis.protocol.RaftClientProtocol;
+import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.RaftClientRequest;
+import org.apache.ratis.protocol.SetConfigurationRequest;
+import org.apache.ratis.shaded.com.google.protobuf.ServiceException;
+import org.apache.ratis.shaded.proto.RaftProtos.RaftClientReplyProto;
+import org.apache.ratis.shaded.proto.RaftProtos.RaftClientRequestProto;
+import org.apache.ratis.shaded.proto.RaftProtos.SetConfigurationRequestProto;
+import org.apache.ratis.util.ProtoUtils;
+
+@InterfaceAudience.Private
+public class RaftClientProtocolClientSideTranslatorPB
+    extends Proxy<RaftClientProtocolPB>
+    implements RaftClientProtocol {
+
+  public RaftClientProtocolClientSideTranslatorPB(
+      String addressStr, Configuration conf) throws IOException {
+    super(RaftClientProtocolPB.class, addressStr, conf);
+  }
+
+  @Override
+  public RaftClientReply submitClientRequest(RaftClientRequest request)
+      throws IOException {
+    final RaftClientRequestProto p = 
ClientProtoUtils.toRaftClientRequestProto(request);
+    try {
+      final RaftClientReplyProto reply = 
getProtocol().submitClientRequest(null, p);
+      return ClientProtoUtils.toRaftClientReply(reply);
+    } catch (ServiceException se) {
+      throw ProtoUtils.toIOException(se);
+    }
+  }
+
+  @Override
+  public RaftClientReply setConfiguration(SetConfigurationRequest request)
+      throws IOException {
+    final SetConfigurationRequestProto p
+        = ClientProtoUtils.toSetConfigurationRequestProto(request);
+    try {
+      final RaftClientReplyProto reply = getProtocol().setConfiguration(null, 
p);
+      return ClientProtoUtils.toRaftClientReply(reply);
+    } catch (ServiceException se) {
+      throw ProtoUtils.toIOException(se);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/RaftClientProtocolPB.java
----------------------------------------------------------------------
diff --git 
a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/RaftClientProtocolPB.java
 
b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/RaftClientProtocolPB.java
new file mode 100644
index 0000000..908cd99
--- /dev/null
+++ 
b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/RaftClientProtocolPB.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.hadooprpc.client;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.ipc.ProtocolInfo;
+import org.apache.hadoop.security.KerberosInfo;
+import org.apache.ratis.hadooprpc.HadoopConstants;
+import 
org.apache.ratis.shaded.proto.hadoop.HadoopProtos.RaftClientProtocolService;
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+@KerberosInfo(
+    serverPrincipal = HadoopConstants.RAFT_SERVER_KERBEROS_PRINCIPAL_KEY,
+    clientPrincipal = HadoopConstants.RAFT_CLIENT_KERBEROS_PRINCIPAL_KEY)
+@ProtocolInfo(
+    protocolName = HadoopConstants.RAFT_CLIENT_PROTOCOL_NAME,
+    protocolVersion = 1)
+public interface RaftClientProtocolPB extends
+    RaftClientProtocolService.BlockingInterface {
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/RaftClientProtocolServerSideTranslatorPB.java
----------------------------------------------------------------------
diff --git 
a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/RaftClientProtocolServerSideTranslatorPB.java
 
b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/RaftClientProtocolServerSideTranslatorPB.java
new file mode 100644
index 0000000..08cf589
--- /dev/null
+++ 
b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/RaftClientProtocolServerSideTranslatorPB.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.hadooprpc.client;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.ratis.client.impl.ClientProtoUtils;
+import org.apache.ratis.protocol.RaftClientProtocol;
+import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.RaftClientRequest;
+import org.apache.ratis.protocol.SetConfigurationRequest;
+import org.apache.ratis.shaded.com.google.protobuf.RpcController;
+import org.apache.ratis.shaded.com.google.protobuf.ServiceException;
+import org.apache.ratis.shaded.proto.RaftProtos.RaftClientReplyProto;
+import org.apache.ratis.shaded.proto.RaftProtos.RaftClientRequestProto;
+import org.apache.ratis.shaded.proto.RaftProtos.SetConfigurationRequestProto;
+
+@InterfaceAudience.Private
+public class RaftClientProtocolServerSideTranslatorPB
+    implements RaftClientProtocolPB {
+  private final RaftClientProtocol impl;
+
+  public RaftClientProtocolServerSideTranslatorPB(RaftClientProtocol impl) {
+    this.impl = impl;
+  }
+
+  @Override
+  public RaftClientReplyProto submitClientRequest(
+      RpcController unused, RaftClientRequestProto proto)
+      throws ServiceException {
+    final RaftClientRequest request = 
ClientProtoUtils.toRaftClientRequest(proto);
+    try {
+      final RaftClientReply reply = impl.submitClientRequest(request);
+      return ClientProtoUtils.toRaftClientReplyProto(reply);
+    } catch(IOException ioe) {
+      throw new ServiceException(ioe);
+    }
+  }
+
+  @Override
+  public RaftClientReplyProto setConfiguration(
+      RpcController unused, SetConfigurationRequestProto proto)
+      throws ServiceException {
+    final SetConfigurationRequest request;
+    try {
+      request = ClientProtoUtils.toSetConfigurationRequest(proto);
+      final RaftClientReply reply = impl.setConfiguration(request);
+      return ClientProtoUtils.toRaftClientReplyProto(reply);
+    } catch(IOException ioe) {
+      throw new ServiceException(ioe);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcService.java
----------------------------------------------------------------------
diff --git 
a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcService.java
 
b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcService.java
new file mode 100644
index 0000000..b7ac64a
--- /dev/null
+++ 
b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcService.java
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.hadooprpc.server;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.ProtobufRpcEngineShaded;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.ratis.hadooprpc.Proxy;
+import org.apache.ratis.hadooprpc.client.RaftClientProtocolPB;
+import 
org.apache.ratis.hadooprpc.client.RaftClientProtocolServerSideTranslatorPB;
+import org.apache.ratis.protocol.RaftClientProtocol;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.server.RaftServerRpc;
+import org.apache.ratis.server.protocol.RaftServerProtocol;
+import org.apache.ratis.shaded.com.google.protobuf.BlockingService;
+import org.apache.ratis.shaded.com.google.protobuf.ServiceException;
+import org.apache.ratis.shaded.proto.RaftProtos.AppendEntriesReplyProto;
+import org.apache.ratis.shaded.proto.RaftProtos.AppendEntriesRequestProto;
+import org.apache.ratis.shaded.proto.RaftProtos.InstallSnapshotReplyProto;
+import org.apache.ratis.shaded.proto.RaftProtos.InstallSnapshotRequestProto;
+import org.apache.ratis.shaded.proto.RaftProtos.RequestVoteReplyProto;
+import org.apache.ratis.shaded.proto.RaftProtos.RequestVoteRequestProto;
+import 
org.apache.ratis.shaded.proto.hadoop.HadoopProtos.RaftClientProtocolService;
+import 
org.apache.ratis.shaded.proto.hadoop.HadoopProtos.RaftServerProtocolService;
+import org.apache.ratis.util.CodeInjectionForTesting;
+import org.apache.ratis.util.PeerProxyMap;
+import org.apache.ratis.util.ProtoUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+/** Server side Hadoop RPC service. */
+public class HadoopRpcService implements RaftServerRpc {
+  public static final Logger LOG = 
LoggerFactory.getLogger(HadoopRpcService.class);
+  static final String CLASS_NAME = HadoopRpcService.class.getSimpleName();
+  public static final String SEND_SERVER_REQUEST = CLASS_NAME + 
".sendServerRequest";
+
+  private final String id;
+  private final RPC.Server ipcServer;
+  private final InetSocketAddress ipcServerAddress;
+
+  private final PeerProxyMap<Proxy<RaftServerProtocolPB>> proxies;
+
+  public HadoopRpcService(RaftServer server, final Configuration conf)
+      throws IOException {
+    this.proxies = new PeerProxyMap<>(
+        p -> new Proxy(RaftServerProtocolPB.class, p.getAddress(), conf));
+    this.id = server.getId();
+    this.ipcServer = newRpcServer(server, conf);
+    this.ipcServerAddress = ipcServer.getListenerAddress();
+
+    addRaftClientProtocol(server, conf);
+
+    LOG.info(getClass().getSimpleName() + " created RPC.Server at "
+        + ipcServerAddress);
+  }
+
+  @Override
+  public InetSocketAddress getInetSocketAddress() {
+    return ipcServerAddress;
+  }
+
+  private RPC.Server newRpcServer(RaftServerProtocol serverProtocol, final 
Configuration conf)
+      throws IOException {
+    final RaftServerConfigKeys.Get get = new RaftServerConfigKeys.Get() {
+      @Override
+      protected int getInt(String key, int defaultValue) {
+        return conf.getInt(key, defaultValue);
+      }
+
+      @Override
+      protected String getTrimmed(String key, String defaultValue) {
+        return conf.getTrimmed(key, defaultValue);
+      }
+    };
+
+    final int handlerCount = get.ipc().handlers();
+    final InetSocketAddress address = get.ipc().address();
+
+    final BlockingService service
+        = RaftServerProtocolService.newReflectiveBlockingService(
+            new RaftServerProtocolServerSideTranslatorPB(serverProtocol));
+    RPC.setProtocolEngine(conf, RaftServerProtocolPB.class, 
ProtobufRpcEngineShaded.class);
+    return new RPC.Builder(conf)
+        .setProtocol(RaftServerProtocolPB.class)
+        .setInstance(service)
+        .setBindAddress(address.getHostName())
+        .setPort(address.getPort())
+        .setNumHandlers(handlerCount)
+        .setVerbose(false)
+        .build();
+  }
+
+  private void addRaftClientProtocol(RaftClientProtocol clientProtocol, 
Configuration conf) {
+    final Class<?> protocol = RaftClientProtocolPB.class;
+    RPC.setProtocolEngine(conf,protocol, ProtobufRpcEngineShaded.class);
+
+    final BlockingService service
+        = RaftClientProtocolService.newReflectiveBlockingService(
+        new RaftClientProtocolServerSideTranslatorPB(clientProtocol));
+    ipcServer.addProtocol(RPC.RpcKind.RPC_PROTOCOL_BUFFER, protocol, service);
+  }
+
+  @Override
+  public void start() {
+    ipcServer.start();
+  }
+
+  @Override
+  public void close() {
+    ipcServer.stop();
+  }
+
+  @Override
+  public AppendEntriesReplyProto appendEntries(
+      AppendEntriesRequestProto request) throws IOException {
+    
Preconditions.checkArgument(id.equals(request.getServerRequest().getRequestorId()));
+    CodeInjectionForTesting.execute(SEND_SERVER_REQUEST, id, null, request);
+
+    final RaftServerProtocolPB proxy = proxies.getProxy(
+        request.getServerRequest().getReplyId()).getProtocol();
+    try {
+      return proxy.appendEntries(null, request);
+    } catch (ServiceException se) {
+      throw ProtoUtils.toIOException(se);
+    }
+  }
+
+  @Override
+  public InstallSnapshotReplyProto installSnapshot(
+      InstallSnapshotRequestProto request) throws IOException {
+    
Preconditions.checkArgument(id.equals(request.getServerRequest().getRequestorId()));
+    CodeInjectionForTesting.execute(SEND_SERVER_REQUEST, id, null, request);
+
+    final RaftServerProtocolPB proxy = proxies.getProxy(
+        request.getServerRequest().getReplyId()).getProtocol();
+    try {
+      return proxy.installSnapshot(null, request);
+    } catch (ServiceException se) {
+      throw ProtoUtils.toIOException(se);
+    }
+  }
+
+  @Override
+  public RequestVoteReplyProto requestVote(
+      RequestVoteRequestProto request) throws IOException {
+    
Preconditions.checkArgument(id.equals(request.getServerRequest().getRequestorId()));
+    CodeInjectionForTesting.execute(SEND_SERVER_REQUEST, id, null, request);
+
+    final RaftServerProtocolPB proxy = proxies.getProxy(
+        request.getServerRequest().getReplyId()).getProtocol();
+    try {
+      return proxy.requestVote(null, request);
+    } catch (ServiceException se) {
+      throw ProtoUtils.toIOException(se);
+    }
+  }
+
+  @Override
+  public void addPeers(Iterable<RaftPeer> peers) {
+    proxies.addPeers(peers);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/RaftServerProtocolPB.java
----------------------------------------------------------------------
diff --git 
a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/RaftServerProtocolPB.java
 
b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/RaftServerProtocolPB.java
new file mode 100644
index 0000000..8b92cc4
--- /dev/null
+++ 
b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/RaftServerProtocolPB.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.hadooprpc.server;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.ipc.ProtocolInfo;
+import org.apache.hadoop.security.KerberosInfo;
+import org.apache.ratis.hadooprpc.HadoopConstants;
+import 
org.apache.ratis.shaded.proto.hadoop.HadoopProtos.RaftServerProtocolService;
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+@KerberosInfo(
+    serverPrincipal = HadoopConstants.RAFT_SERVER_KERBEROS_PRINCIPAL_KEY,
+    clientPrincipal = HadoopConstants.RAFT_SERVER_KERBEROS_PRINCIPAL_KEY)
+@ProtocolInfo(
+    protocolName = HadoopConstants.RAFT_SERVER_PROTOCOL_NAME,
+    protocolVersion = 1)
+public interface RaftServerProtocolPB extends
+    RaftServerProtocolService.BlockingInterface {
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/RaftServerProtocolServerSideTranslatorPB.java
----------------------------------------------------------------------
diff --git 
a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/RaftServerProtocolServerSideTranslatorPB.java
 
b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/RaftServerProtocolServerSideTranslatorPB.java
new file mode 100644
index 0000000..a496793
--- /dev/null
+++ 
b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/RaftServerProtocolServerSideTranslatorPB.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.hadooprpc.server;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.ratis.server.protocol.RaftServerProtocol;
+import org.apache.ratis.shaded.com.google.protobuf.RpcController;
+import org.apache.ratis.shaded.com.google.protobuf.ServiceException;
+import org.apache.ratis.shaded.proto.RaftProtos.AppendEntriesReplyProto;
+import org.apache.ratis.shaded.proto.RaftProtos.AppendEntriesRequestProto;
+import org.apache.ratis.shaded.proto.RaftProtos.InstallSnapshotReplyProto;
+import org.apache.ratis.shaded.proto.RaftProtos.InstallSnapshotRequestProto;
+import org.apache.ratis.shaded.proto.RaftProtos.RequestVoteReplyProto;
+import org.apache.ratis.shaded.proto.RaftProtos.RequestVoteRequestProto;
+
+@InterfaceAudience.Private
+public class RaftServerProtocolServerSideTranslatorPB
+    implements RaftServerProtocolPB {
+  private final RaftServerProtocol impl;
+
+  public RaftServerProtocolServerSideTranslatorPB(RaftServerProtocol impl) {
+    this.impl = impl;
+  }
+
+  @Override
+  public RequestVoteReplyProto requestVote(
+      RpcController unused, RequestVoteRequestProto request)
+      throws ServiceException {
+    try {
+      return impl.requestVote(request);
+    } catch(IOException ioe) {
+      throw new ServiceException(ioe);
+    }
+  }
+
+  @Override
+  public AppendEntriesReplyProto appendEntries(
+      RpcController unused, AppendEntriesRequestProto request)
+      throws ServiceException {
+    try {
+      return impl.appendEntries(request);
+    } catch(IOException ioe) {
+      throw new ServiceException(ioe);
+    }
+  }
+
+  @Override
+  public InstallSnapshotReplyProto installSnapshot(RpcController controller,
+      InstallSnapshotRequestProto request) throws ServiceException {
+    try {
+      return impl.installSnapshot(request);
+    } catch(IOException ioe) {
+      throw new ServiceException(ioe);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/MiniRaftClusterWithHadoopRpc.java
----------------------------------------------------------------------
diff --git 
a/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/MiniRaftClusterWithHadoopRpc.java
 
b/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/MiniRaftClusterWithHadoopRpc.java
new file mode 100644
index 0000000..964f3a2
--- /dev/null
+++ 
b/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/MiniRaftClusterWithHadoopRpc.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.hadooprpc;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.ratis.MiniRaftCluster;
+import org.apache.ratis.RaftTestUtil;
+import org.apache.ratis.client.RaftClientRequestSender;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.hadooprpc.client.HadoopClientRequestSender;
+import org.apache.ratis.hadooprpc.server.HadoopRpcService;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.server.impl.DelayLocalExecutionInjection;
+import org.apache.ratis.server.impl.RaftServerImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+public class MiniRaftClusterWithHadoopRpc extends MiniRaftCluster.RpcBase {
+  static final Logger LOG = 
LoggerFactory.getLogger(MiniRaftClusterWithHadoopRpc.class);
+
+  public static final Factory<MiniRaftClusterWithHadoopRpc> FACTORY
+      = new Factory<MiniRaftClusterWithHadoopRpc>() {
+    @Override
+    public MiniRaftClusterWithHadoopRpc newCluster(
+        String[] ids, RaftProperties prop, boolean formatted) throws 
IOException {
+      final Configuration conf = new Configuration();
+      conf.set(RaftServerConfigKeys.Ipc.ADDRESS_KEY, "0.0.0.0:0");
+      return new MiniRaftClusterWithHadoopRpc(ids, prop, conf, formatted);
+    }
+  };
+
+  public static final DelayLocalExecutionInjection sendServerRequest =
+      new DelayLocalExecutionInjection(HadoopRpcService.SEND_SERVER_REQUEST);
+
+  private final Configuration hadoopConf;
+
+  public MiniRaftClusterWithHadoopRpc(int numServers, RaftProperties 
properties,
+      Configuration conf) throws IOException {
+    this(generateIds(numServers, 0), properties, conf, true);
+  }
+
+  public MiniRaftClusterWithHadoopRpc(String[] ids, RaftProperties properties,
+      Configuration hadoopConf, boolean formatted) throws IOException {
+    super(ids, properties, formatted);
+    this.hadoopConf = hadoopConf;
+
+    init(initRpcServices(getServers(), hadoopConf));
+  }
+
+  private static Map<RaftPeer, HadoopRpcService> initRpcServices(
+      Collection<RaftServerImpl> servers, Configuration hadoopConf) throws 
IOException {
+    final Map<RaftPeer, HadoopRpcService> peerRpcs = new HashMap<>();
+
+    for(RaftServerImpl s : servers) {
+      final HadoopRpcService rpc = new HadoopRpcService(s, hadoopConf);
+      peerRpcs.put(new RaftPeer(s.getId(), rpc.getInetSocketAddress()), rpc);
+    }
+    return peerRpcs;
+  }
+
+  @Override
+  protected RaftServerImpl setPeerRpc(RaftPeer peer) throws IOException {
+    Configuration hconf = new Configuration(hadoopConf);
+    hconf.set(RaftServerConfigKeys.Ipc.ADDRESS_KEY, peer.getAddress());
+
+    RaftServerImpl server = servers.get(peer.getId());
+    final HadoopRpcService rpc = new HadoopRpcService(server, hconf);
+    Preconditions.checkState(
+        rpc.getInetSocketAddress().toString().contains(peer.getAddress()),
+        "address in the raft conf: %s, address in rpc server: %s",
+        peer.getAddress(), rpc.getInetSocketAddress().toString());
+    server.setServerRpc(rpc);
+    return server;
+  }
+
+  @Override
+  public Collection<RaftPeer> addNewPeers(Collection<RaftPeer> newPeers,
+                                          Collection<RaftServerImpl> 
newServers, boolean startService)
+      throws IOException {
+    return addNewPeers(initRpcServices(newServers, hadoopConf),
+        newServers, startService);
+  }
+
+  @Override
+  public RaftClientRequestSender getRaftClientRequestSender() {
+    return new HadoopClientRequestSender(getPeers(), hadoopConf);
+  }
+
+  @Override
+  public void blockQueueAndSetDelay(String leaderId, int delayMs)
+      throws InterruptedException {
+    RaftTestUtil.blockQueueAndSetDelay(getServers(), sendServerRequest,
+        leaderId, delayMs, getMaxTimeout());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/TestNotLeaderExceptionWithHadoopRpc.java
----------------------------------------------------------------------
diff --git 
a/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/TestNotLeaderExceptionWithHadoopRpc.java
 
b/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/TestNotLeaderExceptionWithHadoopRpc.java
new file mode 100644
index 0000000..8bc5ae6
--- /dev/null
+++ 
b/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/TestNotLeaderExceptionWithHadoopRpc.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.hadooprpc;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.ratis.MiniRaftCluster;
+import org.apache.ratis.RaftNotLeaderExceptionBaseTest;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.server.RaftServerConfigKeys;
+
+import java.io.IOException;
+
+public class TestNotLeaderExceptionWithHadoopRpc extends 
RaftNotLeaderExceptionBaseTest {
+  @Override
+  public MiniRaftCluster initCluster() throws IOException {
+    String[] s = MiniRaftCluster.generateIds(NUM_PEERS, 0);
+    final Configuration conf = new Configuration();
+    conf.set(RaftServerConfigKeys.Ipc.ADDRESS_KEY, "0.0.0.0:0");
+    RaftProperties prop = new RaftProperties();
+    return new MiniRaftClusterWithHadoopRpc(s, prop, conf, true);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/TestRaftReconfigurationWithHadoopRpc.java
----------------------------------------------------------------------
diff --git 
a/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/TestRaftReconfigurationWithHadoopRpc.java
 
b/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/TestRaftReconfigurationWithHadoopRpc.java
new file mode 100644
index 0000000..cd502ac
--- /dev/null
+++ 
b/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/TestRaftReconfigurationWithHadoopRpc.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.hadooprpc;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.ratis.MiniRaftCluster;
+import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.server.impl.RaftReconfigurationBaseTest;
+
+import java.io.IOException;
+
+import static 
org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY;
+import static 
org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_TIMEOUT_KEY;
+
+public class TestRaftReconfigurationWithHadoopRpc
+    extends RaftReconfigurationBaseTest {
+  @Override
+  public MiniRaftCluster getCluster(int peerNum) throws IOException {
+    final Configuration hadoopConf = new Configuration();
+    hadoopConf.setInt(IPC_CLIENT_CONNECT_TIMEOUT_KEY, 1000);
+    hadoopConf.setInt(IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 0);
+    hadoopConf.set(RaftServerConfigKeys.Ipc.ADDRESS_KEY, "0.0.0.0:0");
+    return new MiniRaftClusterWithHadoopRpc(peerNum, prop, hadoopConf);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/TestRaftSnapshotWithHadoopRpc.java
----------------------------------------------------------------------
diff --git 
a/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/TestRaftSnapshotWithHadoopRpc.java
 
b/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/TestRaftSnapshotWithHadoopRpc.java
new file mode 100644
index 0000000..7bdafb0
--- /dev/null
+++ 
b/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/TestRaftSnapshotWithHadoopRpc.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.hadooprpc;
+
+import org.apache.ratis.MiniRaftCluster;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.statemachine.RaftSnapshotBaseTest;
+
+import java.io.IOException;
+
+public class TestRaftSnapshotWithHadoopRpc extends RaftSnapshotBaseTest {
+  @Override
+  public MiniRaftCluster initCluster(int numServer, RaftProperties prop)
+      throws IOException {
+    return MiniRaftClusterWithHadoopRpc.FACTORY.newCluster(numServer, prop, 
true);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/TestRaftWithHadoopRpc.java
----------------------------------------------------------------------
diff --git 
a/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/TestRaftWithHadoopRpc.java
 
b/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/TestRaftWithHadoopRpc.java
new file mode 100644
index 0000000..9dd3f27
--- /dev/null
+++ 
b/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/TestRaftWithHadoopRpc.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.hadooprpc;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.log4j.Level;
+import org.apache.ratis.RaftBasicTests;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.server.impl.BlockRequestHandlingInjection;
+import org.apache.ratis.server.impl.RaftServerImpl;
+import org.apache.ratis.util.RaftUtils;
+import org.junit.Test;
+
+import static 
org.apache.ratis.hadooprpc.MiniRaftClusterWithHadoopRpc.sendServerRequest;
+
+import java.io.IOException;
+
+public class TestRaftWithHadoopRpc extends RaftBasicTests {
+  static {
+    RaftUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG);
+    RaftUtils.setLogLevel(RaftClient.LOG, Level.DEBUG);
+    RaftUtils.setLogLevel(MiniRaftClusterWithHadoopRpc.LOG, Level.DEBUG);
+  }
+
+  private final MiniRaftClusterWithHadoopRpc cluster;
+
+  public TestRaftWithHadoopRpc() throws IOException {
+    Configuration conf = new Configuration();
+    conf.set(RaftServerConfigKeys.Ipc.ADDRESS_KEY, "0.0.0.0:0");
+    cluster = new MiniRaftClusterWithHadoopRpc(NUM_SERVERS, getProperties(), 
conf);
+  }
+
+  @Override
+  public MiniRaftClusterWithHadoopRpc getCluster() {
+    return cluster;
+  }
+
+  @Override
+  @Test
+  public void testEnforceLeader() throws Exception {
+    super.testEnforceLeader();
+
+    sendServerRequest.clear();
+    BlockRequestHandlingInjection.getInstance().unblockAll();
+  }
+
+  @Override
+  @Test
+  public void testWithLoad() throws Exception {
+    super.testWithLoad();
+    BlockRequestHandlingInjection.getInstance().unblockAll();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-hadoop/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/ratis-hadoop/src/test/resources/log4j.properties 
b/ratis-hadoop/src/test/resources/log4j.properties
new file mode 100644
index 0000000..ced0687
--- /dev/null
+++ b/ratis-hadoop/src/test/resources/log4j.properties
@@ -0,0 +1,18 @@
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+# log4j configuration used during build and unit tests
+
+log4j.rootLogger=info,stdout
+log4j.threshold=ALL
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} 
(%F:%M(%L)) - %m%n

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-netty/pom.xml
----------------------------------------------------------------------
diff --git a/ratis-netty/pom.xml b/ratis-netty/pom.xml
new file mode 100644
index 0000000..cdffdc2
--- /dev/null
+++ b/ratis-netty/pom.xml
@@ -0,0 +1,99 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License. See accompanying LICENSE file.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <artifactId>ratis-project-dist</artifactId>
+    <groupId>org.apache.ratis</groupId>
+    <version>1.0-SNAPSHOT</version>
+    <relativePath>../ratis-project-dist</relativePath>
+  </parent>
+
+  <artifactId>ratis-netty</artifactId>
+  <name>Ratis Netty Support</name>
+
+  <dependencies>
+    <dependency>
+      <artifactId>ratis-proto-shaded</artifactId>
+      <groupId>org.apache.ratis</groupId>
+      <scope>provided</scope>
+    </dependency>
+
+    <dependency>
+      <artifactId>ratis-common</artifactId>
+      <groupId>org.apache.ratis</groupId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <artifactId>ratis-common</artifactId>
+      <groupId>org.apache.ratis</groupId>
+      <scope>test</scope>
+      <type>test-jar</type>
+    </dependency>
+    
+    <dependency>
+      <artifactId>ratis-client</artifactId>
+      <groupId>org.apache.ratis</groupId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <artifactId>ratis-client</artifactId>
+      <groupId>org.apache.ratis</groupId>
+      <scope>test</scope>
+      <type>test-jar</type>
+    </dependency>
+    
+    <dependency>
+      <artifactId>ratis-server</artifactId>
+      <groupId>org.apache.ratis</groupId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <artifactId>ratis-server</artifactId>
+      <groupId>org.apache.ratis</groupId>
+      <scope>test</scope>
+      <type>test-jar</type>
+    </dependency>
+    
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>io.netty</groupId>
+      <artifactId>netty-all</artifactId>
+      <scope>compile</scope>
+    </dependency>
+  </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-netty/src/main/java/org/apache/ratis/netty/NettyClient.java
----------------------------------------------------------------------
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyClient.java 
b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyClient.java
new file mode 100644
index 0000000..1aea87b
--- /dev/null
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyClient.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.ratis.netty;
+
+import com.google.common.base.Preconditions;
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.handler.logging.LogLevel;
+import io.netty.handler.logging.LoggingHandler;
+
+import java.io.Closeable;
+import java.net.InetSocketAddress;
+
+import org.apache.ratis.util.LifeCycle;
+import org.apache.ratis.util.NetUtils;
+
+public class NettyClient implements Closeable {
+  private final LifeCycle lifeCycle = new 
LifeCycle(getClass().getSimpleName());
+
+  private Channel channel;
+
+  /** Connects to the given server address. */
+  public void connect(String serverAddress, EventLoopGroup group,
+                      ChannelInitializer<SocketChannel> initializer)
+      throws InterruptedException {
+    final InetSocketAddress address = 
NetUtils.newInetSocketAddress(serverAddress);
+    Preconditions.checkNotNull(address,
+        "Failed to create InetSocketAddress from %s.", serverAddress);
+
+    lifeCycle.startAndTransition(
+        () -> channel = new Bootstrap()
+            .group(group)
+            .channel(NioSocketChannel.class)
+            .handler(new LoggingHandler(LogLevel.INFO))
+            .handler(initializer)
+            .connect(address)
+            .sync()
+            .channel(),
+        InterruptedException.class);
+  }
+
+  @Override
+  public void close() {
+    lifeCycle.checkStateAndClose(() -> {
+      channel.close().syncUninterruptibly();
+    });
+  }
+
+  public ChannelFuture writeAndFlush(Object msg) {
+    lifeCycle.assertCurrentState(LifeCycle.State.RUNNING);
+    return channel.writeAndFlush(msg);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-netty/src/main/java/org/apache/ratis/netty/NettyRpcProxy.java
----------------------------------------------------------------------
diff --git 
a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyRpcProxy.java 
b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyRpcProxy.java
new file mode 100644
index 0000000..9b8553b
--- /dev/null
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyRpcProxy.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.ratis.netty;
+
+import static 
org.apache.ratis.shaded.proto.netty.NettyProtos.RaftNettyServerReplyProto.RaftNettyServerReplyCase.EXCEPTIONREPLY;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.Queue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.shaded.io.netty.handler.codec.protobuf.ProtobufDecoder;
+import org.apache.ratis.shaded.io.netty.handler.codec.protobuf.ProtobufEncoder;
+import 
org.apache.ratis.shaded.io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
+import 
org.apache.ratis.shaded.io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
+import org.apache.ratis.shaded.proto.RaftProtos.RaftRpcRequestProto;
+import 
org.apache.ratis.shaded.proto.netty.NettyProtos.RaftNettyServerReplyProto;
+import 
org.apache.ratis.shaded.proto.netty.NettyProtos.RaftNettyServerRequestProto;
+import org.apache.ratis.util.PeerProxyMap;
+import org.apache.ratis.util.ProtoUtils;
+import org.apache.ratis.util.RaftUtils;
+
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandler;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+
+public class NettyRpcProxy implements Closeable {
+  public static class PeerMap extends PeerProxyMap<NettyRpcProxy> {
+    private final EventLoopGroup group = new NioEventLoopGroup();
+
+    @Override
+    public NettyRpcProxy createProxyImpl(RaftPeer peer)
+        throws IOException {
+      try {
+        return new NettyRpcProxy(peer, group);
+      } catch (InterruptedException e) {
+        throw RaftUtils.toInterruptedIOException("Failed connecting to " + 
peer, e);
+      }
+    }
+
+    @Override
+    public void close() {
+      super.close();
+      group.shutdownGracefully();
+    }
+  }
+
+  public static long getSeqNum(RaftNettyServerReplyProto proto) {
+    switch (proto.getRaftNettyServerReplyCase()) {
+      case REQUESTVOTEREPLY:
+        return proto.getRequestVoteReply().getServerReply().getSeqNum();
+      case APPENDENTRIESREPLY:
+        return proto.getAppendEntriesReply().getServerReply().getSeqNum();
+      case INSTALLSNAPSHOTREPLY:
+        return proto.getInstallSnapshotReply().getServerReply().getSeqNum();
+      case RAFTCLIENTREPLY:
+        return proto.getRaftClientReply().getRpcReply().getSeqNum();
+      case EXCEPTIONREPLY:
+        return proto.getExceptionReply().getRpcReply().getSeqNum();
+      case RAFTNETTYSERVERREPLY_NOT_SET:
+        throw new IllegalArgumentException("Reply case not set in proto: "
+            + proto.getRaftNettyServerReplyCase());
+      default:
+        throw new UnsupportedOperationException("Reply case not supported: "
+            + proto.getRaftNettyServerReplyCase());
+    }
+  }
+
+
+  class Connection implements Closeable {
+    private final NettyClient client = new NettyClient();
+    private final Queue<CompletableFuture<RaftNettyServerReplyProto>> replies
+        = new LinkedList<>();
+
+    Connection(EventLoopGroup group) throws InterruptedException {
+      final ChannelInboundHandler inboundHandler
+          = new SimpleChannelInboundHandler<RaftNettyServerReplyProto>() {
+        @Override
+        protected void channelRead0(ChannelHandlerContext ctx,
+                                    RaftNettyServerReplyProto proto) {
+          final CompletableFuture<RaftNettyServerReplyProto> future = 
pollReply();
+          if (future == null) {
+            throw new IllegalStateException("Request #" + getSeqNum(proto)
+                + " not found");
+          }
+          if (proto.getRaftNettyServerReplyCase() == EXCEPTIONREPLY) {
+            final Object ioe = 
ProtoUtils.toObject(proto.getExceptionReply().getException());
+            future.completeExceptionally((IOException)ioe);
+          } else {
+            future.complete(proto);
+          }
+        }
+      };
+      final ChannelInitializer<SocketChannel> initializer
+          = new ChannelInitializer<SocketChannel>() {
+        @Override
+        protected void initChannel(SocketChannel ch) throws Exception {
+          final ChannelPipeline p = ch.pipeline();
+
+          p.addLast(new ProtobufVarint32FrameDecoder());
+          p.addLast(new 
ProtobufDecoder(RaftNettyServerReplyProto.getDefaultInstance()));
+          p.addLast(new ProtobufVarint32LengthFieldPrepender());
+          p.addLast(new ProtobufEncoder());
+
+          p.addLast(inboundHandler);
+        }
+      };
+
+      client.connect(peer.getAddress(), group, initializer);
+    }
+
+    synchronized ChannelFuture offer(RaftNettyServerRequestProto request,
+        CompletableFuture<RaftNettyServerReplyProto> reply) {
+      replies.offer(reply);
+      return client.writeAndFlush(request);
+    }
+
+    synchronized CompletableFuture<RaftNettyServerReplyProto> pollReply() {
+      return replies.poll();
+    }
+
+    @Override
+    public synchronized void close() {
+      client.close();
+      if (!replies.isEmpty()) {
+        final IOException e = new IOException("Connection to " + peer + " is 
closed.");
+        replies.stream().forEach(f -> f.completeExceptionally(e));
+        replies.clear();
+      }
+    }
+  }
+
+  private final RaftPeer peer;
+  private final Connection connection;
+
+  public NettyRpcProxy(RaftPeer peer, EventLoopGroup group) throws 
InterruptedException {
+    this.peer = peer;
+    this.connection = new Connection(group);
+  }
+
+  @Override
+  public void close() {
+    connection.close();
+  }
+
+  public RaftNettyServerReplyProto send(
+      RaftRpcRequestProto request, RaftNettyServerRequestProto proto)
+      throws IOException {
+    final CompletableFuture<RaftNettyServerReplyProto> reply = new 
CompletableFuture<>();
+    final ChannelFuture channelFuture = connection.offer(proto, reply);
+
+    try {
+      channelFuture.sync();
+      return reply.get();
+    } catch (InterruptedException e) {
+      throw RaftUtils.toInterruptedIOException(ProtoUtils.toString(request)
+          + " sending from " + peer + " is interrupted.", e);
+    } catch (ExecutionException e) {
+      throw RaftUtils.toIOException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientRequestSender.java
----------------------------------------------------------------------
diff --git 
a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientRequestSender.java
 
b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientRequestSender.java
new file mode 100644
index 0000000..38d806b
--- /dev/null
+++ 
b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientRequestSender.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.ratis.netty.client;
+
+import org.apache.ratis.shaded.proto.RaftProtos.RaftClientRequestProto;
+import org.apache.ratis.shaded.proto.RaftProtos.RaftRpcRequestProto;
+import org.apache.ratis.shaded.proto.RaftProtos.SetConfigurationRequestProto;
+import 
org.apache.ratis.shaded.proto.netty.NettyProtos.RaftNettyServerRequestProto;
+import org.apache.ratis.client.RaftClientRequestSender;
+import org.apache.ratis.client.impl.ClientProtoUtils;
+import org.apache.ratis.netty.NettyRpcProxy;
+import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.RaftClientRequest;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.protocol.SetConfigurationRequest;
+
+import java.io.IOException;
+
+public class NettyClientRequestSender implements RaftClientRequestSender {
+  private final NettyRpcProxy.PeerMap proxies = new NettyRpcProxy.PeerMap();
+
+  public NettyClientRequestSender(Iterable<RaftPeer> servers) {
+    addServers(servers);
+  }
+
+  @Override
+  public RaftClientReply sendRequest(RaftClientRequest request) throws 
IOException {
+    final String serverId = request.getReplierId();
+    final NettyRpcProxy proxy = proxies.getProxy(serverId);
+
+    final RaftNettyServerRequestProto.Builder b = 
RaftNettyServerRequestProto.newBuilder();
+    final RaftRpcRequestProto rpcRequest;
+    if (request instanceof SetConfigurationRequest) {
+      final SetConfigurationRequestProto proto = 
ClientProtoUtils.toSetConfigurationRequestProto(
+          (SetConfigurationRequest)request);
+      b.setSetConfigurationRequest(proto);
+      rpcRequest = proto.getRpcRequest();
+    } else {
+      final RaftClientRequestProto proto = 
ClientProtoUtils.toRaftClientRequestProto(request);
+      b.setRaftClientRequest(proto);
+      rpcRequest = proto.getRpcRequest();
+    }
+    return ClientProtoUtils.toRaftClientReply(
+        proxy.send(rpcRequest, b.build()).getRaftClientReply());
+  }
+
+  @Override
+  public void addServers(Iterable<RaftPeer> servers) {
+    proxies.addPeers(servers);
+  }
+
+  @Override
+  public void close() {
+    proxies.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java
----------------------------------------------------------------------
diff --git 
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java 
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java
new file mode 100644
index 0000000..153f61e
--- /dev/null
+++ 
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.ratis.netty.server;
+
+import com.google.common.base.Preconditions;
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.*;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.logging.LogLevel;
+import io.netty.handler.logging.LoggingHandler;
+
+import org.apache.ratis.shaded.io.netty.handler.codec.protobuf.ProtobufDecoder;
+import org.apache.ratis.shaded.io.netty.handler.codec.protobuf.ProtobufEncoder;
+import 
org.apache.ratis.shaded.io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
+import 
org.apache.ratis.shaded.io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
+import org.apache.ratis.shaded.proto.RaftProtos.*;
+import 
org.apache.ratis.shaded.proto.netty.NettyProtos.RaftNettyExceptionReplyProto;
+import 
org.apache.ratis.shaded.proto.netty.NettyProtos.RaftNettyServerReplyProto;
+import 
org.apache.ratis.shaded.proto.netty.NettyProtos.RaftNettyServerRequestProto;
+import org.apache.ratis.client.impl.ClientProtoUtils;
+import org.apache.ratis.netty.NettyRpcProxy;
+import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.RaftServerRpc;
+import org.apache.ratis.util.CodeInjectionForTesting;
+import org.apache.ratis.util.LifeCycle;
+import org.apache.ratis.util.ProtoUtils;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.channels.ClosedChannelException;
+
+/**
+ * A netty server endpoint that acts as the communication layer.
+ */
+public final class NettyRpcService implements RaftServerRpc {
+  static final String CLASS_NAME = NettyRpcService.class.getSimpleName();
+  public static final String SEND_SERVER_REQUEST = CLASS_NAME + 
".sendServerRequest";
+
+  private final LifeCycle lifeCycle = new 
LifeCycle(getClass().getSimpleName());
+  private final RaftServer server;
+  private final String id;
+
+  private final EventLoopGroup bossGroup = new NioEventLoopGroup();
+  private final EventLoopGroup workerGroup = new NioEventLoopGroup();
+  private final ChannelFuture channelFuture;
+
+  private final NettyRpcProxy.PeerMap proxies = new NettyRpcProxy.PeerMap();
+
+  @ChannelHandler.Sharable
+  class InboundHandler extends 
SimpleChannelInboundHandler<RaftNettyServerRequestProto> {
+    @Override
+    protected void channelRead0(ChannelHandlerContext ctx, 
RaftNettyServerRequestProto proto) {
+      final RaftNettyServerReplyProto reply = handle(proto);
+      ctx.writeAndFlush(reply);
+    }
+  }
+
+  /** Constructs a netty server with the given port. */
+  public NettyRpcService(int port, RaftServer server) {
+    this.server = server;
+    this.id = server.getId();
+
+    final ChannelInitializer<SocketChannel> initializer
+        = new ChannelInitializer<SocketChannel>() {
+      @Override
+      protected void initChannel(SocketChannel ch) throws Exception {
+        final ChannelPipeline p = ch.pipeline();
+
+        p.addLast(new ProtobufVarint32FrameDecoder());
+        p.addLast(new 
ProtobufDecoder(RaftNettyServerRequestProto.getDefaultInstance()));
+        p.addLast(new ProtobufVarint32LengthFieldPrepender());
+        p.addLast(new ProtobufEncoder());
+
+        p.addLast(new InboundHandler());
+      }
+    };
+
+    channelFuture = new ServerBootstrap()
+        .group(bossGroup, workerGroup)
+        .channel(NioServerSocketChannel.class)
+        .handler(new LoggingHandler(LogLevel.INFO))
+        .childHandler(initializer)
+        .bind(port);
+  }
+
+  private Channel getChannel() {
+    return channelFuture.awaitUninterruptibly().channel();
+  }
+
+  @Override
+  public void start() {
+    lifeCycle.startAndTransition(() -> channelFuture.syncUninterruptibly());
+  }
+
+  @Override
+  public void close() {
+    lifeCycle.checkStateAndClose(() -> {
+      bossGroup.shutdownGracefully();
+      workerGroup.shutdownGracefully();
+      final ChannelFuture f = getChannel().close();
+      proxies.close();
+      f.syncUninterruptibly();
+    });
+  }
+
+  @Override
+  public InetSocketAddress getInetSocketAddress() {
+    return (InetSocketAddress)getChannel().localAddress();
+  }
+
+  RaftNettyServerReplyProto handle(RaftNettyServerRequestProto proto) {
+    RaftRpcRequestProto rpcRequest = null;
+    try {
+      switch (proto.getRaftNettyServerRequestCase()) {
+        case REQUESTVOTEREQUEST: {
+          final RequestVoteRequestProto request = 
proto.getRequestVoteRequest();
+          rpcRequest = request.getServerRequest();
+          final RequestVoteReplyProto reply = server.requestVote(request);
+          return RaftNettyServerReplyProto.newBuilder()
+              .setRequestVoteReply(reply)
+              .build();
+        }
+        case APPENDENTRIESREQUEST: {
+          final AppendEntriesRequestProto request = 
proto.getAppendEntriesRequest();
+          rpcRequest = request.getServerRequest();
+          final AppendEntriesReplyProto reply = server.appendEntries(request);
+          return RaftNettyServerReplyProto.newBuilder()
+              .setAppendEntriesReply(reply)
+              .build();
+        }
+        case INSTALLSNAPSHOTREQUEST: {
+          final InstallSnapshotRequestProto request = 
proto.getInstallSnapshotRequest();
+          rpcRequest = request.getServerRequest();
+          final InstallSnapshotReplyProto reply = 
server.installSnapshot(request);
+          return RaftNettyServerReplyProto.newBuilder()
+              .setInstallSnapshotReply(reply)
+              .build();
+        }
+        case RAFTCLIENTREQUEST: {
+          final RaftClientRequestProto request = proto.getRaftClientRequest();
+          rpcRequest = request.getRpcRequest();
+          final RaftClientReply reply = server.submitClientRequest(
+              ClientProtoUtils.toRaftClientRequest(request));
+          return RaftNettyServerReplyProto.newBuilder()
+              
.setRaftClientReply(ClientProtoUtils.toRaftClientReplyProto(reply))
+              .build();
+        }
+        case SETCONFIGURATIONREQUEST: {
+          final SetConfigurationRequestProto request = 
proto.getSetConfigurationRequest();
+          rpcRequest = request.getRpcRequest();
+          final RaftClientReply reply = server.setConfiguration(
+              ClientProtoUtils.toSetConfigurationRequest(request));
+          return RaftNettyServerReplyProto.newBuilder()
+              
.setRaftClientReply(ClientProtoUtils.toRaftClientReplyProto(reply))
+              .build();
+        }
+        case RAFTNETTYSERVERREQUEST_NOT_SET:
+          throw new IllegalArgumentException("Request case not set in proto: "
+              + proto.getRaftNettyServerRequestCase());
+        default:
+          throw new UnsupportedOperationException("Request case not supported: 
"
+              + proto.getRaftNettyServerRequestCase());
+      }
+    } catch (IOException ioe) {
+      Preconditions.checkNotNull(rpcRequest);
+      return toRaftNettyServerReplyProto(rpcRequest, ioe);
+    }
+  }
+
+  private static RaftNettyServerReplyProto toRaftNettyServerReplyProto(
+      RaftRpcRequestProto request, IOException e) {
+    final RaftRpcReplyProto.Builder rpcReply = 
ClientProtoUtils.toRaftRpcReplyProtoBuilder(
+        request.getRequestorId(),
+        request.getReplyId(),
+        request.getSeqNum(), false);
+    final RaftNettyExceptionReplyProto.Builder ioe = 
RaftNettyExceptionReplyProto.newBuilder()
+        .setRpcReply(rpcReply)
+        .setException(ProtoUtils.toByteString(e));
+    return 
RaftNettyServerReplyProto.newBuilder().setExceptionReply(ioe).build();
+  }
+
+  @Override
+  public RequestVoteReplyProto requestVote(RequestVoteRequestProto request) 
throws IOException {
+    
Preconditions.checkArgument(id.equals(request.getServerRequest().getRequestorId()));
+    CodeInjectionForTesting.execute(SEND_SERVER_REQUEST, id, null, request);
+
+    final RaftNettyServerRequestProto proto = 
RaftNettyServerRequestProto.newBuilder()
+        .setRequestVoteRequest(request)
+        .build();
+    final RaftRpcRequestProto serverRequest = request.getServerRequest();
+    return sendRaftNettyServerRequestProto(serverRequest, 
proto).getRequestVoteReply();
+  }
+
+  @Override
+  public AppendEntriesReplyProto appendEntries(AppendEntriesRequestProto 
request) throws IOException {
+    
Preconditions.checkArgument(id.equals(request.getServerRequest().getRequestorId()));
+    CodeInjectionForTesting.execute(SEND_SERVER_REQUEST, id, null, request);
+
+    final RaftNettyServerRequestProto proto = 
RaftNettyServerRequestProto.newBuilder()
+        .setAppendEntriesRequest(request)
+        .build();
+    final RaftRpcRequestProto serverRequest = request.getServerRequest();
+    return sendRaftNettyServerRequestProto(serverRequest, 
proto).getAppendEntriesReply();
+  }
+
+  @Override
+  public InstallSnapshotReplyProto installSnapshot(InstallSnapshotRequestProto 
request) throws IOException {
+    
Preconditions.checkArgument(id.equals(request.getServerRequest().getRequestorId()));
+    CodeInjectionForTesting.execute(SEND_SERVER_REQUEST, id, null, request);
+
+    final RaftNettyServerRequestProto proto = 
RaftNettyServerRequestProto.newBuilder()
+        .setInstallSnapshotRequest(request)
+        .build();
+    final RaftRpcRequestProto serverRequest = request.getServerRequest();
+    return sendRaftNettyServerRequestProto(serverRequest, 
proto).getInstallSnapshotReply();
+  }
+
+  private RaftNettyServerReplyProto sendRaftNettyServerRequestProto(
+      RaftRpcRequestProto request, RaftNettyServerRequestProto proto)
+      throws IOException {
+    final String id = request.getReplyId();
+    final NettyRpcProxy p = proxies.getProxy(id);
+    try {
+      return p.send(request, proto);
+    } catch (ClosedChannelException cce) {
+      proxies.resetProxy(id);
+      throw cce;
+    }
+  }
+
+  @Override
+  public void addPeers(Iterable<RaftPeer> peers) {
+    proxies.addPeers(peers);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-netty/src/test/java/org/apache/ratis/netty/MiniRaftClusterWithNetty.java
----------------------------------------------------------------------
diff --git 
a/ratis-netty/src/test/java/org/apache/ratis/netty/MiniRaftClusterWithNetty.java
 
b/ratis-netty/src/test/java/org/apache/ratis/netty/MiniRaftClusterWithNetty.java
new file mode 100644
index 0000000..92e7722
--- /dev/null
+++ 
b/ratis-netty/src/test/java/org/apache/ratis/netty/MiniRaftClusterWithNetty.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.netty;
+
+import org.apache.ratis.MiniRaftCluster;
+import org.apache.ratis.RaftTestUtil;
+import org.apache.ratis.client.RaftClientRequestSender;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.netty.client.NettyClientRequestSender;
+import org.apache.ratis.netty.server.NettyRpcService;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.server.impl.DelayLocalExecutionInjection;
+import org.apache.ratis.server.impl.RaftConfiguration;
+import org.apache.ratis.server.impl.RaftServerImpl;
+import org.apache.ratis.util.NetUtils;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+public class MiniRaftClusterWithNetty extends MiniRaftCluster.RpcBase {
+  public static final Factory<MiniRaftClusterWithNetty> FACTORY
+      = new Factory<MiniRaftClusterWithNetty>() {
+    @Override
+    public MiniRaftClusterWithNetty newCluster(
+        String[] ids, RaftProperties prop, boolean formatted) {
+      return new MiniRaftClusterWithNetty(ids, prop, formatted);
+    }
+  };
+
+  public static final DelayLocalExecutionInjection sendServerRequest
+      = new DelayLocalExecutionInjection(NettyRpcService.SEND_SERVER_REQUEST);
+
+  public MiniRaftClusterWithNetty(int numServers, RaftProperties properties) {
+    this(generateIds(numServers, 0), properties, true);
+  }
+
+  public MiniRaftClusterWithNetty(
+      String[] ids, RaftProperties properties, boolean formatted) {
+    super(ids, properties, formatted);
+    init(initRpcServices(getServers(), getConf()));
+  }
+
+  private static String getAddress(String id, RaftConfiguration conf) {
+    final RaftPeer peer = conf.getPeer(id);
+    if (peer != null) {
+      final String address = peer.getAddress();
+      if (address != null) {
+        return address;
+      }
+    }
+    return "0.0.0.0:0";
+  }
+
+  private static NettyRpcService newNettyRpcService(
+      RaftServerImpl s, RaftConfiguration conf) {
+    final String address = getAddress(s.getId(), conf);
+    final int port = NetUtils.newInetSocketAddress(address).getPort();
+    return new NettyRpcService(port, s);
+  }
+
+  private static Map<RaftPeer, NettyRpcService> initRpcServices(
+      Collection<RaftServerImpl> servers, RaftConfiguration conf) {
+    final Map<RaftPeer, NettyRpcService> peerRpcs = new HashMap<>();
+
+    for (RaftServerImpl s : servers) {
+      final NettyRpcService rpc = newNettyRpcService(s, conf);
+      peerRpcs.put(new RaftPeer(s.getId(), rpc.getInetSocketAddress()), rpc);
+    }
+
+    return peerRpcs;
+  }
+
+  @Override
+  protected RaftServerImpl setPeerRpc(RaftPeer peer) throws IOException {
+    final RaftServerImpl s = servers.get(peer.getId());
+    final NettyRpcService rpc = newNettyRpcService(s, conf);
+    s.setServerRpc(rpc);
+    return s;
+  }
+
+  @Override
+  protected Collection<RaftPeer> addNewPeers(
+      Collection<RaftPeer> newPeers, Collection<RaftServerImpl> newServers,
+      boolean startService) throws IOException {
+    return addNewPeers(initRpcServices(newServers, conf),
+        newServers, startService);
+  }
+
+  @Override
+  public RaftClientRequestSender getRaftClientRequestSender() {
+    return new NettyClientRequestSender(getPeers());
+  }
+
+  @Override
+  protected void blockQueueAndSetDelay(String leaderId, int delayMs)
+      throws InterruptedException {
+    RaftTestUtil.blockQueueAndSetDelay(getServers(), sendServerRequest,
+        leaderId, delayMs, getMaxTimeout());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-netty/src/test/java/org/apache/ratis/netty/TestNotLeaderExceptionWithNetty.java
----------------------------------------------------------------------
diff --git 
a/ratis-netty/src/test/java/org/apache/ratis/netty/TestNotLeaderExceptionWithNetty.java
 
b/ratis-netty/src/test/java/org/apache/ratis/netty/TestNotLeaderExceptionWithNetty.java
new file mode 100644
index 0000000..9c267e7
--- /dev/null
+++ 
b/ratis-netty/src/test/java/org/apache/ratis/netty/TestNotLeaderExceptionWithNetty.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.netty;
+
+import org.apache.ratis.MiniRaftCluster;
+import org.apache.ratis.RaftNotLeaderExceptionBaseTest;
+import org.apache.ratis.conf.RaftProperties;
+
+import java.io.IOException;
+
+public class TestNotLeaderExceptionWithNetty extends 
RaftNotLeaderExceptionBaseTest {
+  @Override
+  public MiniRaftCluster initCluster() throws IOException {
+    String[] s = MiniRaftCluster.generateIds(NUM_PEERS, 0);
+    RaftProperties prop = new RaftProperties();
+    return new MiniRaftClusterWithNetty(s, prop, true);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-netty/src/test/java/org/apache/ratis/netty/TestRaftReconfigurationWithNetty.java
----------------------------------------------------------------------
diff --git 
a/ratis-netty/src/test/java/org/apache/ratis/netty/TestRaftReconfigurationWithNetty.java
 
b/ratis-netty/src/test/java/org/apache/ratis/netty/TestRaftReconfigurationWithNetty.java
new file mode 100644
index 0000000..cfa9729
--- /dev/null
+++ 
b/ratis-netty/src/test/java/org/apache/ratis/netty/TestRaftReconfigurationWithNetty.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.netty;
+
+import java.io.IOException;
+
+import org.apache.ratis.MiniRaftCluster;
+import org.apache.ratis.server.impl.RaftReconfigurationBaseTest;
+
+public class TestRaftReconfigurationWithNetty
+    extends RaftReconfigurationBaseTest {
+  @Override
+  public MiniRaftCluster getCluster(int peerNum) throws IOException {
+    return MiniRaftClusterWithNetty.FACTORY.newCluster(peerNum, prop, true);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-netty/src/test/java/org/apache/ratis/netty/TestRaftSnapshotWithNetty.java
----------------------------------------------------------------------
diff --git 
a/ratis-netty/src/test/java/org/apache/ratis/netty/TestRaftSnapshotWithNetty.java
 
b/ratis-netty/src/test/java/org/apache/ratis/netty/TestRaftSnapshotWithNetty.java
new file mode 100644
index 0000000..18807c0
--- /dev/null
+++ 
b/ratis-netty/src/test/java/org/apache/ratis/netty/TestRaftSnapshotWithNetty.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.netty;
+
+import org.apache.ratis.MiniRaftCluster;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.statemachine.RaftSnapshotBaseTest;
+
+import java.io.IOException;
+
+public class TestRaftSnapshotWithNetty extends RaftSnapshotBaseTest {
+  @Override
+  public MiniRaftCluster initCluster(int numServer, RaftProperties prop)
+      throws IOException {
+    return MiniRaftClusterWithNetty.FACTORY.newCluster(numServer, prop, true);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-netty/src/test/java/org/apache/ratis/netty/TestRaftWithNetty.java
----------------------------------------------------------------------
diff --git 
a/ratis-netty/src/test/java/org/apache/ratis/netty/TestRaftWithNetty.java 
b/ratis-netty/src/test/java/org/apache/ratis/netty/TestRaftWithNetty.java
new file mode 100644
index 0000000..3954065
--- /dev/null
+++ b/ratis-netty/src/test/java/org/apache/ratis/netty/TestRaftWithNetty.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.netty;
+
+import org.apache.log4j.Level;
+import org.apache.ratis.RaftBasicTests;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.server.impl.BlockRequestHandlingInjection;
+import org.apache.ratis.server.impl.RaftServerImpl;
+import org.apache.ratis.util.RaftUtils;
+import org.junit.Test;
+
+import java.io.IOException;
+
+public class TestRaftWithNetty extends RaftBasicTests {
+  static {
+    RaftUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG);
+    RaftUtils.setLogLevel(RaftClient.LOG, Level.DEBUG);
+  }
+
+  private final MiniRaftClusterWithNetty cluster;
+
+  public TestRaftWithNetty() throws IOException {
+    cluster = new MiniRaftClusterWithNetty(NUM_SERVERS, getProperties());
+  }
+
+  @Override
+  public MiniRaftClusterWithNetty getCluster() {
+    return cluster;
+  }
+
+  @Override
+  @Test
+  public void testEnforceLeader() throws Exception {
+    super.testEnforceLeader();
+
+    MiniRaftClusterWithNetty.sendServerRequest.clear();
+    BlockRequestHandlingInjection.getInstance().unblockAll();
+  }
+
+  @Override
+  @Test
+  public void testWithLoad() throws Exception {
+    super.testWithLoad();
+    BlockRequestHandlingInjection.getInstance().unblockAll();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-netty/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/ratis-netty/src/test/resources/log4j.properties 
b/ratis-netty/src/test/resources/log4j.properties
new file mode 100644
index 0000000..ced0687
--- /dev/null
+++ b/ratis-netty/src/test/resources/log4j.properties
@@ -0,0 +1,18 @@
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+# log4j configuration used during build and unit tests
+
+log4j.rootLogger=info,stdout
+log4j.threshold=ALL
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} 
(%F:%M(%L)) - %m%n

Reply via email to