http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-examples/src/test/java/org/apache/ratis/TestRestartRaftPeer.java
----------------------------------------------------------------------
diff --git 
a/ratis-examples/src/test/java/org/apache/ratis/TestRestartRaftPeer.java 
b/ratis-examples/src/test/java/org/apache/ratis/TestRestartRaftPeer.java
new file mode 100644
index 0000000..37b4064
--- /dev/null
+++ b/ratis-examples/src/test/java/org/apache/ratis/TestRestartRaftPeer.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis;
+
+import org.apache.log4j.Level;
+import org.apache.ratis.MiniRaftCluster;
+import org.apache.ratis.RaftTestUtil;
+import org.apache.ratis.RaftTestUtil.SimpleMessage;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.examples.RaftExamplesTestUtil;
+import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.server.impl.RaftServerImpl;
+import org.apache.ratis.server.simulation.RequestHandler;
+import org.apache.ratis.server.storage.RaftLog;
+import org.apache.ratis.statemachine.SimpleStateMachine4Testing;
+import org.apache.ratis.statemachine.StateMachine;
+import org.apache.ratis.util.RaftUtils;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+
+/**
+ * Test restarting raft peers.
+ */
+@RunWith(Parameterized.class)
+public class TestRestartRaftPeer {
+  static Logger LOG = LoggerFactory.getLogger(TestRestartRaftPeer.class);
+  static {
+    RaftUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG);
+    RaftUtils.setLogLevel(RaftLog.LOG, Level.DEBUG);
+    RaftUtils.setLogLevel(RequestHandler.LOG, Level.DEBUG);
+    RaftUtils.setLogLevel(RaftClient.LOG, Level.DEBUG);
+  }
+
+  @Parameterized.Parameters
+  public static Collection<Object[]> data() throws IOException {
+    RaftProperties prop = new RaftProperties();
+    prop.setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY,
+        SimpleStateMachine4Testing.class, StateMachine.class);
+    prop.setInt(RaftServerConfigKeys.RAFT_LOG_SEGMENT_MAX_SIZE_KEY, 1024 * 8);
+    return RaftExamplesTestUtil.getMiniRaftClusters(prop, 3);
+  }
+
+  @Parameterized.Parameter
+  public MiniRaftCluster cluster;
+
+  @Rule
+  public Timeout globalTimeout = new Timeout(60 * 1000);
+
+  @Test
+  public void restartFollower() throws Exception {
+    cluster.start();
+    RaftTestUtil.waitForLeader(cluster);
+    final String leaderId = cluster.getLeader().getId();
+    final RaftClient client = cluster.createClient("client", leaderId);
+
+    // write some messages
+    final byte[] content = new byte[1024];
+    Arrays.fill(content, (byte) 1);
+    final SimpleMessage message = new SimpleMessage(new String(content));
+    for (int i = 0; i < 10; i++) {
+      Assert.assertTrue(client.send(message).isSuccess());
+    }
+
+    // restart a follower
+    String followerId = cluster.getFollowers().get(0).getId();
+    LOG.info("Restart follower {}", followerId);
+    cluster.restartServer(followerId, false);
+
+    // write some more messages
+    for (int i = 0; i < 10; i++) {
+      Assert.assertTrue(client.send(message).isSuccess());
+    }
+    client.close();
+
+    // make sure the restarted follower can catchup
+    boolean catchup = false;
+    long lastAppliedIndex = 0;
+    for (int i = 0; i < 10 && !catchup; i++) {
+      Thread.sleep(500);
+      lastAppliedIndex = 
cluster.getServer(followerId).getState().getLastAppliedIndex();
+      catchup = lastAppliedIndex >= 20;
+    }
+    Assert.assertTrue("lastAppliedIndex=" + lastAppliedIndex, catchup);
+
+    // make sure the restarted peer's log segments is correct
+    cluster.restartServer(followerId, false);
+    Assert.assertTrue(cluster.getServer(followerId).getState().getLog()
+        .getLastEntry().getIndex() >= 20);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-examples/src/test/java/org/apache/ratis/examples/RaftExamplesTestUtil.java
----------------------------------------------------------------------
diff --git 
a/ratis-examples/src/test/java/org/apache/ratis/examples/RaftExamplesTestUtil.java
 
b/ratis-examples/src/test/java/org/apache/ratis/examples/RaftExamplesTestUtil.java
new file mode 100644
index 0000000..ff936bd
--- /dev/null
+++ 
b/ratis-examples/src/test/java/org/apache/ratis/examples/RaftExamplesTestUtil.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.examples;
+
+import org.apache.ratis.MiniRaftCluster;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.grpc.MiniRaftClusterWithGRpc;
+import org.apache.ratis.hadooprpc.MiniRaftClusterWithHadoopRpc;
+import org.apache.ratis.netty.MiniRaftClusterWithNetty;
+import org.apache.ratis.server.simulation.MiniRaftClusterWithSimulatedRpc;
+import org.apache.ratis.statemachine.StateMachine;
+
+import java.io.IOException;
+import java.util.*;
+
+public class RaftExamplesTestUtil {
+  private static void add(
+      Collection<Object[]> clusters, MiniRaftCluster.Factory factory,
+      String[] ids, RaftProperties properties)
+      throws IOException {
+    clusters.add(new Object[]{factory.newCluster(ids, properties, true)});
+  }
+
+  public static Collection<Object[]> getMiniRaftClusters(
+      RaftProperties prop, int clusterSize, Class<?>... clusterClasses)
+      throws IOException {
+    final List<Class<?>> classes = Arrays.asList(clusterClasses);
+    final boolean isAll = classes.isEmpty(); //empty means all
+
+    final Iterator<String[]> ids = new Iterator<String[]>() {
+      private int i = 0;
+      @Override
+      public boolean hasNext() {
+        return true;
+      }
+      @Override
+      public String[] next() {
+        return MiniRaftCluster.generateIds(clusterSize, i++*clusterSize);
+      }
+    };
+
+    final List<Object[]> clusters = new ArrayList<>();
+
+    if (isAll || classes.contains(MiniRaftClusterWithSimulatedRpc.class)) {
+      add(clusters, MiniRaftClusterWithSimulatedRpc.FACTORY, ids.next(), prop);
+    }
+    if (isAll || classes.contains(MiniRaftClusterWithHadoopRpc.class)) {
+      add(clusters, MiniRaftClusterWithHadoopRpc.FACTORY, ids.next(), prop);
+    }
+    if (isAll || classes.contains(MiniRaftClusterWithNetty.class)) {
+      add(clusters, MiniRaftClusterWithNetty.FACTORY, ids.next(), prop);
+    }
+    if (isAll || classes.contains(MiniRaftClusterWithGRpc.class)) {
+      add(clusters, MiniRaftClusterWithGRpc.FACTORY, ids.next(), prop);
+    }
+    return clusters;
+  }
+
+  public static <S extends StateMachine> Collection<Object[]> 
getMiniRaftClusters(
+      Class<S> stateMachineClass, Class<?>... clusterClasses) throws 
IOException {
+    final RaftProperties prop = new RaftProperties();
+    prop.setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY,
+        stateMachineClass, StateMachine.class);
+    return getMiniRaftClusters(prop, 3, clusterClasses);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-examples/src/test/java/org/apache/ratis/examples/arithmetic/TestArithmetic.java
----------------------------------------------------------------------
diff --git 
a/ratis-examples/src/test/java/org/apache/ratis/examples/arithmetic/TestArithmetic.java
 
b/ratis-examples/src/test/java/org/apache/ratis/examples/arithmetic/TestArithmetic.java
new file mode 100644
index 0000000..44cf894
--- /dev/null
+++ 
b/ratis-examples/src/test/java/org/apache/ratis/examples/arithmetic/TestArithmetic.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.examples.arithmetic;
+
+
+import org.apache.log4j.Level;
+import org.apache.ratis.MiniRaftCluster;
+import org.apache.ratis.RaftTestUtil;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.examples.RaftExamplesTestUtil;
+import org.apache.ratis.examples.arithmetic.ArithmeticStateMachine;
+import org.apache.ratis.examples.arithmetic.AssignmentMessage;
+import org.apache.ratis.examples.arithmetic.expression.*;
+import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.util.RaftUtils;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.IOException;
+import java.util.Collection;
+
+@RunWith(Parameterized.class)
+public class TestArithmetic {
+  static {
+    RaftUtils.setLogLevel(ArithmeticStateMachine.LOG, Level.ALL);
+  }
+
+  @Parameterized.Parameters
+  public static Collection<Object[]> data() throws IOException {
+    return 
RaftExamplesTestUtil.getMiniRaftClusters(ArithmeticStateMachine.class);
+  }
+
+  @Parameterized.Parameter
+  public MiniRaftCluster cluster;
+
+  @Test
+  public void testPythagorean() throws Exception {
+    cluster.start();
+    RaftTestUtil.waitForLeader(cluster);
+    final String leaderId = cluster.getLeader().getId();
+    final RaftClient client = cluster.createClient("pythagorean", leaderId);
+
+    final Variable a = new Variable("a");
+    final Variable b = new Variable("b");
+    final Variable c = new Variable("c");
+    final BinaryExpression a2 = new BinaryExpression(BinaryExpression.Op.MULT, 
a, a);
+    final BinaryExpression b2 = new BinaryExpression(BinaryExpression.Op.MULT, 
b, b);
+    final BinaryExpression c2 = new BinaryExpression(BinaryExpression.Op.ADD, 
a2, b2);
+    final AssignmentMessage pythagorean = new AssignmentMessage(c,
+        new UnaryExpression(UnaryExpression.Op.SQRT, c2));
+
+    final AssignmentMessage nullA = new AssignmentMessage(a, 
NullValue.getInstance());
+    final AssignmentMessage nullB = new AssignmentMessage(b, 
NullValue.getInstance());
+    final AssignmentMessage nullC = new AssignmentMessage(c, 
NullValue.getInstance());
+
+    for(int n = 3; n < 100; n += 2) {
+      int n2 = n*n;
+      int half_n2 = n2/2;
+
+      RaftClientReply r;
+      r = client.send(new AssignmentMessage(a, new DoubleValue(n)));
+      assertRaftClientReply(r, (double)n);
+      r = client.sendReadOnly(Expression.Utils.toMessage(a2));
+      assertRaftClientReply(r, (double)n2);
+      r = client.send(new AssignmentMessage(b, new DoubleValue(half_n2)));
+      assertRaftClientReply(r, (double)half_n2);
+      r = client.sendReadOnly(Expression.Utils.toMessage(b2));
+      assertRaftClientReply(r, (double)half_n2*half_n2);
+      r = client.send(pythagorean);
+      assertRaftClientReply(r, (double)half_n2 + 1);
+
+      r = client.send(nullA);
+      assertRaftClientReply(r, null);
+      r = client.send(nullB);
+      assertRaftClientReply(r, null);
+      r = client.send(nullC);
+      assertRaftClientReply(r, null);
+    }
+    client.close();
+    cluster.shutdown();
+  }
+
+  static void assertRaftClientReply(RaftClientReply reply, Double expected) {
+    Assert.assertTrue(reply.isSuccess());
+    final Expression e = Expression.Utils.bytes2Expression(
+        reply.getMessage().getContent().toByteArray(), 0);
+    Assert.assertEquals(expected, e.evaluate(null));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-examples/src/test/java/org/apache/ratis/examples/arithmetic/expression/TestExpression.java
----------------------------------------------------------------------
diff --git 
a/ratis-examples/src/test/java/org/apache/ratis/examples/arithmetic/expression/TestExpression.java
 
b/ratis-examples/src/test/java/org/apache/ratis/examples/arithmetic/expression/TestExpression.java
new file mode 100644
index 0000000..a2d6e29
--- /dev/null
+++ 
b/ratis-examples/src/test/java/org/apache/ratis/examples/arithmetic/expression/TestExpression.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.examples.arithmetic.expression;
+
+
+import org.apache.ratis.examples.arithmetic.expression.BinaryExpression;
+import org.apache.ratis.examples.arithmetic.expression.DoubleValue;
+import org.apache.ratis.examples.arithmetic.expression.Expression;
+import org.apache.ratis.examples.arithmetic.expression.UnaryExpression;
+import org.apache.ratis.examples.arithmetic.expression.Variable;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Random;
+import java.util.concurrent.ThreadLocalRandom;
+
+public class TestExpression {
+  static final Logger LOG = LoggerFactory.getLogger(TestExpression.class);
+
+  @Test
+  public void testArithmeticUtils() throws Exception {
+    final Random ran = ThreadLocalRandom.current();
+    final byte[] buf = new byte[1024];
+    int offset = 0;
+
+    for(int i = 0; i < 10; i++) {
+      {
+        final int n = ran.nextInt();
+        Expression.Utils.int2bytes(n, buf, offset);
+        final int m = Expression.Utils.bytes2int(buf, offset);
+        Assert.assertEquals(n, m);
+        offset += 4;
+      }
+      {
+        final long n = ran.nextLong();
+        Expression.Utils.long2bytes(n, buf, offset);
+        final long m = Expression.Utils.bytes2long(buf, offset);
+        Assert.assertEquals(n, m);
+        offset += 8;
+      }
+      {
+        final double n = ran.nextDouble();
+        Expression.Utils.double2bytes(n, buf, offset);
+        final double m = Expression.Utils.bytes2double(buf, offset);
+        Assert.assertTrue(n == m);
+        offset += 8;
+      }
+    }
+  }
+  @Test
+  public void testOp() throws Exception {
+    for(BinaryExpression.Op op : BinaryExpression.Op.values()) {
+      final byte b = op.byteValue();
+      Assert.assertEquals(op, BinaryExpression.Op.valueOf(b));
+    }
+    for(UnaryExpression.Op op : UnaryExpression.Op.values()) {
+      final byte b = op.byteValue();
+      Assert.assertEquals(op, UnaryExpression.Op.valueOf(b));
+    }
+  }
+
+  @Test
+  public void testExpression() throws Exception {
+    final byte[] buf = new byte[1024];
+    int offset = 0;
+
+    {
+      final Variable a = new Variable("pi");
+      LOG.info("var a: " + a);
+      final int len = a.toBytes(buf, offset);
+      final Variable a2 = new Variable(buf, offset);
+      LOG.info("var a2: " + a2);
+      Assert.assertEquals(a.getName(), a2.getName());
+      Assert.assertEquals(len, a.length());
+      Assert.assertEquals(len, a2.length());
+      offset += len;
+    }
+
+    {
+      final DoubleValue three = new DoubleValue(3);
+      LOG.info("double three: " + three.evaluate(null));
+      final int len = three.toBytes(buf, offset);
+      final DoubleValue three2 = new DoubleValue(buf, offset);
+      LOG.info("double three2: " + three2.evaluate(null));
+      Assert.assertTrue(three.evaluate(null).equals(three2.evaluate(null)));
+      Assert.assertEquals(len, three.length());
+      Assert.assertEquals(len, three2.length());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-examples/src/test/java/org/apache/ratis/statemachine/TestRaftStateMachineException.java
----------------------------------------------------------------------
diff --git 
a/ratis-examples/src/test/java/org/apache/ratis/statemachine/TestRaftStateMachineException.java
 
b/ratis-examples/src/test/java/org/apache/ratis/statemachine/TestRaftStateMachineException.java
new file mode 100644
index 0000000..1f885ea
--- /dev/null
+++ 
b/ratis-examples/src/test/java/org/apache/ratis/statemachine/TestRaftStateMachineException.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.statemachine;
+
+import org.apache.log4j.Level;
+import org.apache.ratis.MiniRaftCluster;
+import org.apache.ratis.RaftTestUtil;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.examples.RaftExamplesTestUtil;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.StateMachineException;
+import org.apache.ratis.server.impl.RaftServerImpl;
+import org.apache.ratis.server.simulation.RequestHandler;
+import org.apache.ratis.server.storage.RaftLog;
+import org.apache.ratis.statemachine.SimpleStateMachine4Testing;
+import org.apache.ratis.statemachine.TransactionContext;
+import org.apache.ratis.util.RaftUtils;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+
+import static org.junit.Assert.fail;
+
+@RunWith(Parameterized.class)
+public class TestRaftStateMachineException {
+  static {
+    RaftUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG);
+    RaftUtils.setLogLevel(RaftLog.LOG, Level.DEBUG);
+    RaftUtils.setLogLevel(RequestHandler.LOG, Level.DEBUG);
+    RaftUtils.setLogLevel(RaftClient.LOG, Level.DEBUG);
+  }
+
+  protected static class StateMachineWithException extends 
SimpleStateMachine4Testing {
+    @Override
+    public CompletableFuture<Message> applyTransaction(TransactionContext trx) 
{
+      CompletableFuture<Message> future = new CompletableFuture<>();
+      future.completeExceptionally(new StateMachineException("Fake 
Exception"));
+      return future;
+    }
+  }
+
+  @Parameterized.Parameters
+  public static Collection<Object[]> data() throws IOException {
+    return RaftExamplesTestUtil.getMiniRaftClusters(
+        StateMachineWithException.class);
+  }
+
+  @Parameterized.Parameter
+  public MiniRaftCluster cluster;
+
+  @Test
+  public void testHandleStateMachineException() throws Exception {
+    cluster.start();
+    RaftTestUtil.waitForLeader(cluster);
+
+    final String leaderId = cluster.getLeader().getId();
+
+    try(final RaftClient client = cluster.createClient("client", leaderId)) {
+      client.send(new RaftTestUtil.SimpleMessage("m"));
+      fail("Exception expected");
+    } catch (StateMachineException e) {
+      e.printStackTrace();
+      Assert.assertTrue(e.getMessage().contains("Fake Exception"));
+    }
+
+    cluster.shutdown();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-examples/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/ratis-examples/src/test/resources/log4j.properties 
b/ratis-examples/src/test/resources/log4j.properties
new file mode 100644
index 0000000..ced0687
--- /dev/null
+++ b/ratis-examples/src/test/resources/log4j.properties
@@ -0,0 +1,18 @@
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+# log4j configuration used during build and unit tests
+
+log4j.rootLogger=info,stdout
+log4j.threshold=ALL
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} 
(%F:%M(%L)) - %m%n

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-grpc/pom.xml
----------------------------------------------------------------------
diff --git a/ratis-grpc/pom.xml b/ratis-grpc/pom.xml
new file mode 100644
index 0000000..6a46be5
--- /dev/null
+++ b/ratis-grpc/pom.xml
@@ -0,0 +1,93 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License. See accompanying LICENSE file.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <artifactId>ratis-project-dist</artifactId>
+    <groupId>org.apache.ratis</groupId>
+    <version>1.0-SNAPSHOT</version>
+    <relativePath>../ratis-project-dist</relativePath>
+  </parent>
+
+  <artifactId>ratis-grpc</artifactId>
+  <name>Ratis gRPC Support</name>
+
+  <dependencies>
+    <dependency>
+      <artifactId>ratis-proto-shaded</artifactId>
+      <groupId>org.apache.ratis</groupId>
+      <scope>provided</scope>
+    </dependency>
+
+    <dependency>
+      <artifactId>ratis-common</artifactId>
+      <groupId>org.apache.ratis</groupId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <artifactId>ratis-common</artifactId>
+      <groupId>org.apache.ratis</groupId>
+      <scope>test</scope>
+      <type>test-jar</type>
+    </dependency>
+    
+    <dependency>
+      <artifactId>ratis-client</artifactId>
+      <groupId>org.apache.ratis</groupId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <artifactId>ratis-client</artifactId>
+      <groupId>org.apache.ratis</groupId>
+      <scope>test</scope>
+      <type>test-jar</type>
+    </dependency>
+    
+    <dependency>
+      <artifactId>ratis-server</artifactId>
+      <groupId>org.apache.ratis</groupId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <artifactId>ratis-server</artifactId>
+      <groupId>org.apache.ratis</groupId>
+      <scope>test</scope>
+      <type>test-jar</type>
+    </dependency>
+    
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java
----------------------------------------------------------------------
diff --git 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java
new file mode 100644
index 0000000..b61e70e
--- /dev/null
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.grpc;
+
+import com.google.common.base.Preconditions;
+
+import org.apache.ratis.shaded.io.grpc.Server;
+import org.apache.ratis.shaded.io.grpc.ServerBuilder;
+import org.apache.ratis.shaded.io.grpc.netty.NettyServerBuilder;
+import org.apache.ratis.shaded.proto.RaftProtos.*;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.grpc.client.RaftClientProtocolService;
+import org.apache.ratis.grpc.server.RaftServerProtocolClient;
+import org.apache.ratis.grpc.server.RaftServerProtocolService;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.RaftServerRpc;
+import org.apache.ratis.util.CodeInjectionForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static 
org.apache.ratis.grpc.RaftGrpcConfigKeys.RAFT_GRPC_SERVER_PORT_DEFAULT;
+import static 
org.apache.ratis.grpc.RaftGrpcConfigKeys.RAFT_GRPC_SERVER_PORT_KEY;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+public class RaftGRpcService implements RaftServerRpc {
+  static final Logger LOG = LoggerFactory.getLogger(RaftGRpcService.class);
+  public static final String GRPC_SEND_SERVER_REQUEST =
+      RaftGRpcService.class.getSimpleName() + ".sendRequest";
+
+  private final Server server;
+  private final InetSocketAddress address;
+  private final Map<String, RaftServerProtocolClient> peers =
+      Collections.synchronizedMap(new HashMap<>());
+  private final String selfId;
+
+  public RaftGRpcService(RaftServer raftServer, RaftProperties properties) {
+    int port = properties.getInt(RAFT_GRPC_SERVER_PORT_KEY,
+        RAFT_GRPC_SERVER_PORT_DEFAULT);
+    int maxMessageSize = properties.getInt(
+        RaftGrpcConfigKeys.RAFT_GRPC_MESSAGE_MAXSIZE_KEY,
+        RaftGrpcConfigKeys.RAFT_GRPC_MESSAGE_MAXSIZE_DEFAULT);
+    ServerBuilder serverBuilder = ServerBuilder.forPort(port);
+    selfId = raftServer.getId();
+    server = ((NettyServerBuilder) 
serverBuilder).maxMessageSize(maxMessageSize)
+        .addService(new RaftServerProtocolService(selfId, raftServer))
+        .addService(new RaftClientProtocolService(selfId, raftServer))
+        .build();
+
+    // start service to determine the port (in case port is configured as 0)
+    startService();
+    address = new InetSocketAddress(server.getPort());
+    LOG.info("Server started, listening on " + address.getPort());
+  }
+
+  @Override
+  public void start() {
+    // do nothing
+  }
+
+  private void startService() {
+    try {
+      server.start();
+    } catch (IOException e) {
+      LOG.error("Failed to start Grpc server", e);
+      System.exit(1);
+    }
+    Runtime.getRuntime().addShutdownHook(new Thread() {
+      @Override
+      public void run() {
+        System.err.println("*** shutting down gRPC server since JVM is 
shutting down");
+        RaftGRpcService.this.close();
+        System.err.println("*** server shut down");
+      }
+    });
+  }
+
+  @Override
+  public void close() {
+    if (server != null) {
+      server.shutdown();
+    }
+    shutdownClients();
+  }
+
+  @Override
+  public InetSocketAddress getInetSocketAddress() {
+    return address;
+  }
+
+  @Override
+  public AppendEntriesReplyProto appendEntries(
+      AppendEntriesRequestProto request) throws IOException {
+    throw new UnsupportedOperationException(
+        "Blocking AppendEntries call is not supported");
+  }
+
+  @Override
+  public InstallSnapshotReplyProto installSnapshot(
+      InstallSnapshotRequestProto request) throws IOException {
+    throw new UnsupportedOperationException(
+        "Blocking InstallSnapshot call is not supported");
+  }
+
+  @Override
+  public RequestVoteReplyProto requestVote(RequestVoteRequestProto request)
+      throws IOException {
+    CodeInjectionForTesting.execute(GRPC_SEND_SERVER_REQUEST, selfId,
+        null, request);
+
+    RaftServerProtocolClient target = Preconditions.checkNotNull(
+        peers.get(request.getServerRequest().getReplyId()));
+    return target.requestVote(request);
+  }
+
+  @Override
+  public void addPeers(Iterable<RaftPeer> newPeers) {
+    for (RaftPeer p : newPeers) {
+      if (!peers.containsKey(p.getId())) {
+        peers.put(p.getId(), new RaftServerProtocolClient(p));
+      }
+    }
+  }
+
+  private void shutdownClients() {
+    peers.values().forEach(RaftServerProtocolClient::shutdown);
+  }
+
+  public RaftServerProtocolClient getRpcClient(RaftPeer peer) {
+    return peers.get(peer.getId());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGrpcConfigKeys.java
----------------------------------------------------------------------
diff --git 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGrpcConfigKeys.java 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGrpcConfigKeys.java
new file mode 100644
index 0000000..ffec8ff
--- /dev/null
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGrpcConfigKeys.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.grpc;
+
+import org.apache.ratis.client.RaftClientConfigKeys;
+
+public interface RaftGrpcConfigKeys {
+  String PREFIX = "raft.grpc";
+
+  String RAFT_GRPC_SERVER_PORT_KEY = PREFIX + ".server.port";
+  int RAFT_GRPC_SERVER_PORT_DEFAULT = 0;
+
+  String RAFT_GRPC_MESSAGE_MAXSIZE_KEY = PREFIX + ".message.maxsize";
+  int RAFT_GRPC_MESSAGE_MAXSIZE_DEFAULT = 64 * 1024 * 1024; // 64 MB
+
+  String RAFT_GRPC_LEADER_MAX_OUTSTANDING_APPENDS_KEY =
+      PREFIX + "leader.max.outstanding.appends";
+  int RAFT_GRPC_LEADER_MAX_OUTSTANDING_APPENDS_DEFAULT = 128;
+
+  String RAFT_GRPC_CLIENT_MAX_OUTSTANDING_APPENDS_KEY =
+      PREFIX + "client.max.outstanding.appends";
+  int RAFT_GRPC_CLIENT_MAX_OUTSTANDING_APPENDS_DEFAULT = 128;
+
+  String RAFT_OUTPUTSTREAM_BUFFER_SIZE_KEY = "raft.outputstream.buffer.size";
+  int RAFT_OUTPUTSTREAM_BUFFER_SIZE_DEFAULT = 64 * 1024;
+
+  String RAFT_OUTPUTSTREAM_MAX_RETRY_TIMES_KEY = 
"raft.outputstream.max.retry.times";
+  int RAFT_OUTPUTSTREAM_MAX_RETRY_TIMES_DEFAULT = 5;
+
+  String RAFT_OUTPUTSTREAM_RETRY_INTERVAL_KEY = 
"raft.outputstream.retry.interval";
+  long RAFT_OUTPUTSTREAM_RETRY_INTERVAL_DEFAULT = 
RaftClientConfigKeys.RAFT_RPC_TIMEOUT_MS_DEFAULT;
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGrpcUtil.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGrpcUtil.java 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGrpcUtil.java
new file mode 100644
index 0000000..52ed851
--- /dev/null
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGrpcUtil.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.grpc;
+
+import org.apache.ratis.shaded.io.grpc.Metadata;
+import org.apache.ratis.shaded.io.grpc.Status;
+import org.apache.ratis.shaded.io.grpc.StatusRuntimeException;
+import org.apache.ratis.util.RaftUtils;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.lang.reflect.Constructor;
+
+public class RaftGrpcUtil {
+  public static final Metadata.Key<String> EXCEPTION_TYPE_KEY =
+      Metadata.Key.of("exception-type", Metadata.ASCII_STRING_MARSHALLER);
+
+  public static String stringifyException(Throwable e) {
+    StringWriter stm = new StringWriter();
+    PrintWriter wrt = new PrintWriter(stm);
+    e.printStackTrace(wrt);
+    wrt.close();
+    return stm.toString();
+  }
+
+  public static StatusRuntimeException wrapException(Throwable t) {
+    Metadata trailers = new Metadata();
+    trailers.put(EXCEPTION_TYPE_KEY, t.getClass().getCanonicalName());
+    return new StatusRuntimeException(
+        Status.INTERNAL.withDescription(RaftGrpcUtil.stringifyException(t)),
+        trailers);
+  }
+
+  public static IOException unwrapException(StatusRuntimeException se) {
+    final Metadata trailers = se.getTrailers();
+    final Status status = se.getStatus();
+    if (trailers != null && status != null) {
+      final String className = trailers.get(EXCEPTION_TYPE_KEY);
+      if (className != null) {
+        try {
+          Class<?> clazz = Class.forName(className);
+          final Exception unwrapped = instantiateException(
+              clazz.asSubclass(Exception.class), status.getDescription(), se);
+          return RaftUtils.asIOException(unwrapped);
+        } catch (Exception e) {
+          return new IOException(se);
+        }
+      }
+    }
+    return new IOException(se);
+  }
+
+  public static IOException unwrapIOException(Throwable t) {
+    final IOException e;
+    if (t instanceof StatusRuntimeException) {
+      e = RaftGrpcUtil.unwrapException((StatusRuntimeException) t);
+    } else {
+      e = RaftUtils.asIOException(t);
+    }
+    return e;
+  }
+
+  private static Exception instantiateException(Class<? extends Exception> cls,
+      String message, Exception from) throws Exception {
+    Constructor<? extends Exception> cn = cls.getConstructor(String.class);
+    cn.setAccessible(true);
+    Exception ex = cn.newInstance(message);
+    ex.initCause(from);
+    return ex;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java
----------------------------------------------------------------------
diff --git 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java
new file mode 100644
index 0000000..cb05b33
--- /dev/null
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java
@@ -0,0 +1,396 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.grpc.client;
+
+import com.google.common.base.Preconditions;
+
+import org.apache.ratis.shaded.com.google.protobuf.ByteString;
+import org.apache.ratis.shaded.proto.RaftProtos.RaftClientReplyProto;
+import org.apache.ratis.shaded.proto.RaftProtos.RaftClientRequestProto;
+import org.apache.ratis.shaded.proto.RaftProtos.RaftRpcRequestProto;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.grpc.RaftGrpcConfigKeys;
+import org.apache.ratis.grpc.RaftGrpcUtil;
+import org.apache.ratis.protocol.NotLeaderException;
+import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.util.Daemon;
+import org.apache.ratis.util.PeerProxyMap;
+import org.apache.ratis.util.RaftUtils;
+import org.apache.ratis.util.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static org.apache.ratis.client.impl.ClientProtoUtils.*;
+import static 
org.apache.ratis.grpc.RaftGrpcConfigKeys.RAFT_GRPC_CLIENT_MAX_OUTSTANDING_APPENDS_DEFAULT;
+import static 
org.apache.ratis.grpc.RaftGrpcConfigKeys.RAFT_GRPC_CLIENT_MAX_OUTSTANDING_APPENDS_KEY;
+
+public class AppendStreamer implements Closeable {
+  public static final Logger LOG = 
LoggerFactory.getLogger(AppendStreamer.class);
+
+  enum RunningState {RUNNING, LOOK_FOR_LEADER, CLOSED, ERROR}
+
+  private static class ExceptionAndRetry {
+    private final Map<String, IOException> exceptionMap = new HashMap<>();
+    private final AtomicInteger retryTimes = new AtomicInteger(0);
+    private final int maxRetryTimes;
+    private final long retryInterval;
+
+    ExceptionAndRetry(RaftProperties prop) {
+      maxRetryTimes = prop.getInt(
+          RaftGrpcConfigKeys.RAFT_OUTPUTSTREAM_MAX_RETRY_TIMES_KEY,
+          RaftGrpcConfigKeys.RAFT_OUTPUTSTREAM_MAX_RETRY_TIMES_DEFAULT);
+      retryInterval = prop.getTimeDuration(
+          RaftGrpcConfigKeys.RAFT_OUTPUTSTREAM_RETRY_INTERVAL_KEY,
+          RaftGrpcConfigKeys.RAFT_OUTPUTSTREAM_RETRY_INTERVAL_DEFAULT,
+          TimeUnit.MILLISECONDS);
+    }
+
+    void addException(String peer, IOException e) {
+      exceptionMap.put(peer, e);
+      retryTimes.incrementAndGet();
+    }
+
+    IOException getCombinedException() {
+      return new IOException("Exceptions: " + exceptionMap);
+    }
+
+    boolean shouldRetry() {
+      return retryTimes.get() <= maxRetryTimes;
+    }
+  }
+
+  private final Deque<RaftClientRequestProto> dataQueue;
+  private final Deque<RaftClientRequestProto> ackQueue;
+  private final int maxPendingNum;
+
+  private final PeerProxyMap<RaftClientProtocolProxy> proxyMap;
+  private final Map<String, RaftPeer> peers;
+  private String leaderId;
+  private volatile RaftClientProtocolProxy leaderProxy;
+  private final String clientId;
+
+  private volatile RunningState running = RunningState.RUNNING;
+  private final ExceptionAndRetry exceptionAndRetry;
+  private final Sender senderThread;
+
+  AppendStreamer(RaftProperties prop, Collection<RaftPeer> peers,
+      String leaderId, String clientId) {
+    this.clientId = clientId;
+    maxPendingNum = prop.getInt(
+        RAFT_GRPC_CLIENT_MAX_OUTSTANDING_APPENDS_KEY,
+        RAFT_GRPC_CLIENT_MAX_OUTSTANDING_APPENDS_DEFAULT);
+    dataQueue = new ConcurrentLinkedDeque<>();
+    ackQueue = new ConcurrentLinkedDeque<>();
+    exceptionAndRetry = new ExceptionAndRetry(prop);
+
+    this.peers = peers.stream().collect(
+        Collectors.toMap(RaftPeer::getId, Function.identity()));
+    proxyMap = new PeerProxyMap<>(
+        raftPeer -> new RaftClientProtocolProxy(raftPeer, 
ResponseHandler::new));
+    proxyMap.addPeers(peers);
+    refreshLeaderProxy(leaderId, null);
+
+    senderThread = new Sender();
+    senderThread.setName(this.toString() + "-sender");
+    senderThread.start();
+  }
+
+  private synchronized void refreshLeaderProxy(String suggested,
+      String oldLeader) {
+    if (suggested != null) {
+      leaderId = suggested;
+    } else {
+      if (oldLeader == null) {
+        leaderId = peers.keySet().iterator().next();
+      } else {
+        leaderId = StringUtils.next(oldLeader, peers.keySet());
+      }
+    }
+    LOG.debug("{} switches leader from {} to {}. suggested leader: {}", this,
+          oldLeader, leaderId, suggested);
+    if (leaderProxy != null) {
+      leaderProxy.closeCurrentSession();
+    }
+    try {
+      leaderProxy = proxyMap.getProxy(leaderId);
+    } catch (IOException e) {
+      LOG.error("Should not hit IOException here", e);
+      refreshLeader(null, leaderId);
+    }
+  }
+
+  private boolean isRunning() {
+    return running == RunningState.RUNNING ||
+        running == RunningState.LOOK_FOR_LEADER;
+  }
+
+  private void checkState() throws IOException {
+    if (!isRunning()) {
+      throwException("The AppendStreamer has been closed");
+    }
+  }
+
+  synchronized void write(ByteString content, long seqNum)
+      throws IOException {
+    checkState();
+    while (isRunning() && dataQueue.size() >= maxPendingNum) {
+      try {
+        wait();
+      } catch (InterruptedException ignored) {
+      }
+    }
+    if (isRunning()) {
+      // wrap the current buffer into a RaftClientRequestProto
+      final RaftClientRequestProto request = genRaftClientRequestProto(
+          clientId, leaderId, seqNum, content, false);
+      dataQueue.offer(request);
+      this.notifyAll();
+    } else {
+      throwException(this + " got closed.");
+    }
+  }
+
+  synchronized void flush() throws IOException {
+    checkState();
+    if (dataQueue.isEmpty() && ackQueue.isEmpty()) {
+      return;
+    }
+    // wait for the pending Q to become empty
+    while (isRunning() && (!dataQueue.isEmpty() || !ackQueue.isEmpty())) {
+      try {
+        wait();
+      } catch (InterruptedException ignored) {
+      }
+    }
+    if (!isRunning() && (!dataQueue.isEmpty() || !ackQueue.isEmpty())) {
+      throwException(this + " got closed before finishing flush");
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (!isRunning()) {
+      return;
+    }
+    flush();
+
+    running = RunningState.CLOSED;
+    senderThread.interrupt();
+    try {
+      senderThread.join();
+    } catch (InterruptedException ignored) {
+    }
+    proxyMap.close();
+  }
+
+  @Override
+  public String toString() {
+    return this.getClass().getSimpleName() + "-" + clientId;
+  }
+
+  private class Sender extends Daemon {
+    @Override
+    public void run() {
+      while (isRunning()) {
+
+        synchronized (AppendStreamer.this) {
+          while (isRunning() && shouldWait()) {
+            try {
+              AppendStreamer.this.wait();
+            } catch (InterruptedException ignored) {
+            }
+          }
+          if (running == RunningState.RUNNING) {
+            RaftClientRequestProto next = dataQueue.poll();
+            leaderProxy.onNext(next);
+            ackQueue.offer(next);
+          }
+        }
+      }
+    }
+
+    private boolean shouldWait() {
+      // the sender should wait if any of the following is true
+      // 1) there is no data to send
+      // 2) there are too many outstanding pending requests
+      // 3) Error/NotLeaderException just happened, we're still waiting for
+      //    the first response to confirm the new leader
+      return dataQueue.isEmpty() || ackQueue.size() >= maxPendingNum ||
+          running == RunningState.LOOK_FOR_LEADER;
+    }
+  }
+
+  /** the response handler for stream RPC */
+  private class ResponseHandler implements
+      RaftClientProtocolProxy.CloseableStreamObserver {
+    private final String targetId;
+    // once handled the first NotLeaderException or Error, the handler should
+    // be inactive and should not make any further action.
+    private volatile boolean active = true;
+
+    ResponseHandler(RaftPeer target) {
+      targetId = target.getId();
+    }
+
+    @Override
+    public String toString() {
+      return AppendStreamer.this + "-ResponseHandler-" + targetId;
+    }
+
+    @Override
+    public void onNext(RaftClientReplyProto reply) {
+      if (!active) {
+        return;
+      }
+      synchronized (AppendStreamer.this) {
+        RaftClientRequestProto pending = Preconditions.checkNotNull(
+            ackQueue.peek());
+        if (reply.getRpcReply().getSuccess()) {
+          Preconditions.checkState(pending.getRpcRequest().getSeqNum() ==
+              reply.getRpcReply().getSeqNum());
+          ackQueue.poll();
+          LOG.trace("{} received success ack for request {}", this,
+              pending.getRpcRequest());
+          // we've identified the correct leader
+          if (running == RunningState.LOOK_FOR_LEADER) {
+            running = RunningState.RUNNING;
+          }
+        } else {
+          // this may be a NotLeaderException
+          RaftClientReply r = toRaftClientReply(reply);
+          if (r.isNotLeader()) {
+            LOG.debug("{} received a NotLeaderException from {}", this,
+                r.getReplierId());
+            handleNotLeader(r.getNotLeaderException(), targetId);
+          }
+        }
+        AppendStreamer.this.notifyAll();
+      }
+    }
+
+    @Override
+    public void onError(Throwable t) {
+      if (active) {
+        synchronized (AppendStreamer.this) {
+          handleError(t, this);
+          AppendStreamer.this.notifyAll();
+        }
+      }
+    }
+
+    @Override
+    public void onCompleted() {
+      LOG.info("{} onCompleted, pending requests #: {}", this,
+          ackQueue.size());
+    }
+
+    @Override // called by handleError and handleNotLeader
+    public void close() throws IOException {
+      active = false;
+    }
+  }
+
+  private void throwException(String msg) throws IOException {
+    if (running == RunningState.ERROR) {
+      throw exceptionAndRetry.getCombinedException();
+    } else {
+      throw new IOException(msg);
+    }
+  }
+
+  private void handleNotLeader(NotLeaderException nle,
+      String oldLeader) {
+    Preconditions.checkState(Thread.holdsLock(AppendStreamer.this));
+    // handle NotLeaderException: refresh leader and RaftConfiguration
+    refreshPeers(nle.getPeers());
+
+    refreshLeader(nle.getSuggestedLeader().getId(), oldLeader);
+  }
+
+  private void handleError(Throwable t, ResponseHandler handler) {
+    Preconditions.checkState(Thread.holdsLock(AppendStreamer.this));
+    final IOException e = RaftGrpcUtil.unwrapIOException(t);
+
+    exceptionAndRetry.addException(handler.targetId, e);
+    LOG.debug("{} got error: {}. Total retry times {}, max retry times {}.",
+        handler, e, exceptionAndRetry.retryTimes.get(),
+        exceptionAndRetry.maxRetryTimes);
+
+    leaderProxy.onError();
+    if (exceptionAndRetry.shouldRetry()) {
+      refreshLeader(null, leaderId);
+    } else {
+      running = RunningState.ERROR;
+    }
+  }
+
+  private void refreshLeader(String suggestedLeader, String oldLeader) {
+    running = RunningState.LOOK_FOR_LEADER;
+    refreshLeaderProxy(suggestedLeader, oldLeader);
+    reQueuePendingRequests(leaderId);
+
+    final RaftClientRequestProto request = Preconditions.checkNotNull(
+        dataQueue.poll());
+    ackQueue.offer(request);
+    try {
+      Thread.sleep(exceptionAndRetry.retryInterval);
+    } catch (InterruptedException ignored) {
+    }
+    leaderProxy.onNext(request);
+  }
+
+  private void reQueuePendingRequests(String newLeader) {
+    if (isRunning()) {
+      // resend all the pending requests
+      while (!ackQueue.isEmpty()) {
+        RaftClientRequestProto oldRequest = ackQueue.pollLast();
+        RaftRpcRequestProto r = oldRequest.getRpcRequest();
+        RaftClientRequestProto newRequest = RaftClientRequestProto.newBuilder()
+            .setMessage(oldRequest.getMessage())
+            .setReadOnly(oldRequest.getReadOnly())
+            .setRpcRequest(toRaftRpcRequestProtoBuilder(
+                clientId, newLeader, r.getSeqNum()))
+            .build();
+        dataQueue.offerFirst(newRequest);
+      }
+    }
+  }
+
+  private void refreshPeers(RaftPeer[] newPeers) {
+    if (newPeers != null && newPeers.length > 0) {
+      // we only add new peers, we do not remove any peer even if it no longer
+      // belongs to the current raft conf
+      Arrays.stream(newPeers).forEach(peer -> {
+        peers.putIfAbsent(peer.getId(), peer);
+        proxyMap.putIfAbsent(peer);
+      });
+
+      LOG.debug("refreshed peers: {}", peers);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolClient.java
----------------------------------------------------------------------
diff --git 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolClient.java
 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolClient.java
new file mode 100644
index 0000000..74fb253
--- /dev/null
+++ 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolClient.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.grpc.client;
+
+import org.apache.ratis.shaded.io.grpc.ManagedChannel;
+import org.apache.ratis.shaded.io.grpc.ManagedChannelBuilder;
+import org.apache.ratis.shaded.io.grpc.StatusRuntimeException;
+import org.apache.ratis.shaded.io.grpc.stub.StreamObserver;
+import org.apache.ratis.shaded.proto.RaftProtos.RaftClientReplyProto;
+import org.apache.ratis.shaded.proto.RaftProtos.RaftClientRequestProto;
+import org.apache.ratis.shaded.proto.RaftProtos.SetConfigurationRequestProto;
+import org.apache.ratis.shaded.proto.grpc.RaftClientProtocolServiceGrpc;
+import 
org.apache.ratis.shaded.proto.grpc.RaftClientProtocolServiceGrpc.RaftClientProtocolServiceBlockingStub;
+import 
org.apache.ratis.shaded.proto.grpc.RaftClientProtocolServiceGrpc.RaftClientProtocolServiceStub;
+import org.apache.ratis.grpc.RaftGrpcUtil;
+import org.apache.ratis.protocol.RaftPeer;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+public class RaftClientProtocolClient implements Closeable {
+  private final RaftPeer target;
+  private final ManagedChannel channel;
+  private final RaftClientProtocolServiceBlockingStub blockingStub;
+  private final RaftClientProtocolServiceStub asyncStub;
+
+  public RaftClientProtocolClient(RaftPeer target) {
+    this.target = target;
+    channel = ManagedChannelBuilder.forTarget(target.getAddress())
+        .usePlaintext(true).build();
+    blockingStub = RaftClientProtocolServiceGrpc.newBlockingStub(channel);
+    asyncStub = RaftClientProtocolServiceGrpc.newStub(channel);
+  }
+
+  @Override
+  public void close() {
+    channel.shutdownNow();
+  }
+
+  public RaftClientReplyProto setConfiguration(
+      SetConfigurationRequestProto request) throws IOException {
+    try {
+      return blockingStub.setConfiguration(request);
+    } catch (StatusRuntimeException e) {
+      // unwrap StatusRuntimeException
+      throw RaftGrpcUtil.unwrapException(e);
+    }
+  }
+
+  StreamObserver<RaftClientRequestProto> append(
+      StreamObserver<RaftClientReplyProto> responseHandler) {
+    return asyncStub.append(responseHandler);
+  }
+
+  public RaftPeer getTarget() {
+    return target;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolProxy.java
----------------------------------------------------------------------
diff --git 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolProxy.java
 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolProxy.java
new file mode 100644
index 0000000..6892c71
--- /dev/null
+++ 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolProxy.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.grpc.client;
+
+import org.apache.ratis.shaded.io.grpc.stub.StreamObserver;
+import org.apache.ratis.shaded.proto.RaftProtos.RaftClientReplyProto;
+import org.apache.ratis.shaded.proto.RaftProtos.RaftClientRequestProto;
+import org.apache.ratis.protocol.RaftPeer;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.function.Function;
+
+public class RaftClientProtocolProxy implements Closeable {
+  private final RaftClientProtocolClient proxy;
+  private final Function<RaftPeer, CloseableStreamObserver> 
responseHandlerCreation;
+  private RpcSession currentSession;
+
+  public RaftClientProtocolProxy(RaftPeer target,
+      Function<RaftPeer, CloseableStreamObserver> responseHandlerCreation) {
+    proxy = new RaftClientProtocolClient(target);
+    this.responseHandlerCreation = responseHandlerCreation;
+  }
+
+  @Override
+  public void close() throws IOException {
+    closeCurrentSession();
+    proxy.close();
+  }
+
+  @Override
+  public String toString() {
+    return "ProxyTo:" + proxy.getTarget();
+  }
+
+  public void closeCurrentSession() {
+    if (currentSession != null) {
+      currentSession.close();
+      currentSession = null;
+    }
+  }
+
+  public void onNext(RaftClientRequestProto request) {
+    if (currentSession == null) {
+      currentSession = new RpcSession(
+          responseHandlerCreation.apply(proxy.getTarget()));
+    }
+    currentSession.requestObserver.onNext(request);
+  }
+
+  public void onError() {
+    if (currentSession != null) {
+      currentSession.onError();
+    }
+  }
+
+  public interface CloseableStreamObserver
+      extends StreamObserver<RaftClientReplyProto>, Closeable {
+  }
+
+  class RpcSession implements Closeable {
+    private final StreamObserver<RaftClientRequestProto> requestObserver;
+    private final CloseableStreamObserver responseHandler;
+    private boolean hasError = false;
+
+    RpcSession(CloseableStreamObserver responseHandler) {
+      this.responseHandler = responseHandler;
+      this.requestObserver = proxy.append(responseHandler);
+    }
+
+    void onError() {
+      hasError = true;
+    }
+
+    @Override
+    public void close() {
+      if (!hasError) {
+        try {
+          requestObserver.onCompleted();
+        } catch (Exception ignored) {
+        }
+      }
+      try {
+        responseHandler.close();
+      } catch (IOException ignored) {
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolService.java
----------------------------------------------------------------------
diff --git 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolService.java
 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolService.java
new file mode 100644
index 0000000..bb212e1
--- /dev/null
+++ 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolService.java
@@ -0,0 +1,195 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.grpc.client;
+
+import com.google.common.base.Preconditions;
+
+import org.apache.ratis.shaded.io.grpc.stub.StreamObserver;
+import org.apache.ratis.shaded.proto.RaftProtos.RaftClientReplyProto;
+import org.apache.ratis.shaded.proto.RaftProtos.RaftClientRequestProto;
+import org.apache.ratis.shaded.proto.RaftProtos.SetConfigurationRequestProto;
+import 
org.apache.ratis.shaded.proto.grpc.RaftClientProtocolServiceGrpc.RaftClientProtocolServiceImplBase;
+import org.apache.ratis.client.impl.ClientProtoUtils;
+import org.apache.ratis.grpc.RaftGrpcUtil;
+import org.apache.ratis.protocol.RaftClientAsynchronousProtocol;
+import org.apache.ratis.protocol.RaftClientReply;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+import java.util.concurrent.CompletableFuture;
+
+public class RaftClientProtocolService extends 
RaftClientProtocolServiceImplBase {
+  static final Logger LOG = 
LoggerFactory.getLogger(RaftClientProtocolService.class);
+
+  private static class PendingAppend implements Comparable<PendingAppend> {
+    private final long seqNum;
+    private volatile RaftClientReply reply;
+
+    PendingAppend(long seqNum) {
+      this.seqNum = seqNum;
+    }
+
+    boolean isReady() {
+      return reply != null || this == COMPLETED;
+    }
+
+    void setReply(RaftClientReply reply) {
+      this.reply = reply;
+    }
+
+    @Override
+    public int compareTo(PendingAppend p) {
+      return seqNum == p.seqNum ? 0 : (seqNum < p.seqNum ? -1 : 1);
+    }
+
+    @Override
+    public String toString() {
+      return seqNum + ", reply:" + (reply == null ? "null" : reply.toString());
+    }
+  }
+  private static final PendingAppend COMPLETED = new 
PendingAppend(Long.MAX_VALUE);
+
+  private final String id;
+  private final RaftClientAsynchronousProtocol client;
+
+  public RaftClientProtocolService(String id, RaftClientAsynchronousProtocol 
client) {
+    this.id = id;
+    this.client = client;
+  }
+
+  @Override
+  public void setConfiguration(SetConfigurationRequestProto request,
+      StreamObserver<RaftClientReplyProto> responseObserver) {
+    try {
+      CompletableFuture<RaftClientReply> future = client.setConfigurationAsync(
+          ClientProtoUtils.toSetConfigurationRequest(request));
+      future.whenCompleteAsync((reply, exception) -> {
+        if (exception != null) {
+          responseObserver.onError(RaftGrpcUtil.wrapException(exception));
+        } else {
+          
responseObserver.onNext(ClientProtoUtils.toRaftClientReplyProto(reply));
+          responseObserver.onCompleted();
+        }
+      });
+    } catch (Exception e) {
+      responseObserver.onError(RaftGrpcUtil.wrapException(e));
+    }
+  }
+
+  @Override
+  public StreamObserver<RaftClientRequestProto> append(
+      StreamObserver<RaftClientReplyProto> responseObserver) {
+    return new AppendRequestStreamObserver(responseObserver);
+  }
+
+  private class AppendRequestStreamObserver implements
+      StreamObserver<RaftClientRequestProto> {
+    private final List<PendingAppend> pendingList = new LinkedList<>();
+    private final StreamObserver<RaftClientReplyProto> responseObserver;
+
+    AppendRequestStreamObserver(StreamObserver<RaftClientReplyProto> ro) {
+      this.responseObserver = ro;
+    }
+
+    @Override
+    public void onNext(RaftClientRequestProto request) {
+      try {
+        PendingAppend p = new 
PendingAppend(request.getRpcRequest().getSeqNum());
+        synchronized (pendingList) {
+          pendingList.add(p);
+        }
+
+        CompletableFuture<RaftClientReply> future = 
client.submitClientRequestAsync(
+            ClientProtoUtils.toRaftClientRequest(request));
+        future.whenCompleteAsync((reply, exception) -> {
+          if (exception != null) {
+            // TODO: the exception may be from either raft or state machine.
+            // Currently we skip all the following responses when getting an
+            // exception from the state machine.
+            responseObserver.onError(RaftGrpcUtil.wrapException(exception));
+          } else {
+            final long replySeq = reply.getSeqNum();
+            synchronized (pendingList) {
+              Preconditions.checkState(!pendingList.isEmpty(),
+                  "PendingList is empty when handling onNext for seqNum %s",
+                  replySeq);
+              final long headSeqNum = pendingList.get(0).seqNum;
+              // we assume the seqNum is consecutive for a stream RPC call
+              final PendingAppend pendingForReply = pendingList.get(
+                  (int) (replySeq - headSeqNum));
+              Preconditions.checkState(pendingForReply != null &&
+                      pendingForReply.seqNum == replySeq,
+                  "pending for reply is: %s, the pending list: %s",
+                  pendingForReply, pendingList);
+              pendingForReply.setReply(reply);
+
+              if (headSeqNum == replySeq) {
+                Collection<PendingAppend> readySet = new ArrayList<>();
+                // if this is head, we send back all the ready responses
+                Iterator<PendingAppend> iter = pendingList.iterator();
+                PendingAppend pending;
+                while (iter.hasNext() && ((pending = iter.next()).isReady())) {
+                  readySet.add(pending);
+                  iter.remove();
+                }
+                sendReadyReplies(readySet);
+              }
+            }
+          }
+        });
+      } catch (Throwable e) {
+        LOG.info("{} got exception when handling client append request {}: {}",
+            id, request.getRpcRequest(), e);
+        responseObserver.onError(RaftGrpcUtil.wrapException(e));
+      }
+    }
+
+    private void sendReadyReplies(Collection<PendingAppend> readySet) {
+      readySet.forEach(ready -> {
+        Preconditions.checkState(ready.isReady());
+        if (ready == COMPLETED) {
+          responseObserver.onCompleted();
+        } else {
+          responseObserver.onNext(
+              ClientProtoUtils.toRaftClientReplyProto(ready.reply));
+        }
+      });
+    }
+
+    @Override
+    public void onError(Throwable t) {
+      // for now we just log a msg
+      LOG.warn("{} onError: client Append cancelled", id, t);
+      synchronized (pendingList) {
+        pendingList.clear();
+      }
+    }
+
+    @Override
+    public void onCompleted() {
+      synchronized (pendingList) {
+        if (pendingList.isEmpty()) {
+          responseObserver.onCompleted();
+        } else {
+          pendingList.add(COMPLETED);
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientSenderWithGrpc.java
----------------------------------------------------------------------
diff --git 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientSenderWithGrpc.java
 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientSenderWithGrpc.java
new file mode 100644
index 0000000..6d9e11f
--- /dev/null
+++ 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientSenderWithGrpc.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.grpc.client;
+
+import org.apache.ratis.shaded.io.grpc.StatusRuntimeException;
+import org.apache.ratis.shaded.io.grpc.stub.StreamObserver;
+import org.apache.ratis.shaded.proto.RaftProtos.RaftClientReplyProto;
+import org.apache.ratis.shaded.proto.RaftProtos.RaftClientRequestProto;
+import org.apache.ratis.shaded.proto.RaftProtos.SetConfigurationRequestProto;
+import org.apache.ratis.client.RaftClientRequestSender;
+import org.apache.ratis.grpc.RaftGrpcUtil;
+import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.RaftClientRequest;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.protocol.SetConfigurationRequest;
+import org.apache.ratis.util.PeerProxyMap;
+import org.apache.ratis.util.RaftUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.ratis.client.impl.ClientProtoUtils.*;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+public class RaftClientSenderWithGrpc implements RaftClientRequestSender {
+  public static final Logger LOG = 
LoggerFactory.getLogger(RaftClientSenderWithGrpc.class);
+
+  private final PeerProxyMap<RaftClientProtocolClient> proxies
+      = new PeerProxyMap<>(RaftClientProtocolClient::new);
+
+  public RaftClientSenderWithGrpc(Collection<RaftPeer> peers) {
+    addServers(peers);
+  }
+
+  @Override
+  public RaftClientReply sendRequest(RaftClientRequest request)
+      throws IOException {
+    final String serverId = request.getReplierId();
+    final RaftClientProtocolClient proxy = proxies.getProxy(serverId);
+    if (request instanceof SetConfigurationRequest) {
+      SetConfigurationRequestProto setConf =
+          toSetConfigurationRequestProto((SetConfigurationRequest) request);
+      return toRaftClientReply(proxy.setConfiguration(setConf));
+    } else {
+      RaftClientRequestProto requestProto = toRaftClientRequestProto(request);
+      CompletableFuture<RaftClientReplyProto> replyFuture =
+          new CompletableFuture<>();
+      final StreamObserver<RaftClientRequestProto> requestObserver =
+          proxy.append(new StreamObserver<RaftClientReplyProto>() {
+            @Override
+            public void onNext(RaftClientReplyProto value) {
+              replyFuture.complete(value);
+            }
+
+            @Override
+            public void onError(Throwable t) {
+              // This implementation is used as RaftClientRequestSender. Retry
+              // logic on Exception is in RaftClient.
+              final IOException e;
+              if (t instanceof StatusRuntimeException) {
+                e = RaftGrpcUtil.unwrapException((StatusRuntimeException) t);
+              } else {
+                e = RaftUtils.asIOException(t);
+              }
+              replyFuture.completeExceptionally(e);
+            }
+
+            @Override
+            public void onCompleted() {
+              if (!replyFuture.isDone()) {
+                replyFuture.completeExceptionally(
+                    new IOException("No reply for request " + request));
+              }
+            }
+          });
+      requestObserver.onNext(requestProto);
+      requestObserver.onCompleted();
+
+      // TODO: timeout support
+      try {
+        return toRaftClientReply(replyFuture.get());
+      } catch (InterruptedException e) {
+        throw new InterruptedIOException(
+            "Interrupted while waiting for response of request " + request);
+      } catch (ExecutionException e) {
+        throw RaftUtils.toIOException(e);
+      }
+    }
+  }
+
+  @Override
+  public void addServers(Iterable<RaftPeer> servers) {
+    proxies.addPeers(servers);
+  }
+
+  @Override
+  public void close() throws IOException {
+    proxies.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftOutputStream.java
----------------------------------------------------------------------
diff --git 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftOutputStream.java 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftOutputStream.java
new file mode 100644
index 0000000..a3905f8
--- /dev/null
+++ 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftOutputStream.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.grpc.client;
+
+import static 
org.apache.ratis.grpc.RaftGrpcConfigKeys.RAFT_OUTPUTSTREAM_BUFFER_SIZE_DEFAULT;
+import static 
org.apache.ratis.grpc.RaftGrpcConfigKeys.RAFT_OUTPUTSTREAM_BUFFER_SIZE_KEY;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Collection;
+
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.util.ProtoUtils;
+
+public class RaftOutputStream extends OutputStream {
+  /** internal buffer */
+  private final byte buf[];
+  private int count;
+  private long seqNum = 0;
+  private final String clientId;
+  private final AppendStreamer streamer;
+
+  private boolean closed = false;
+
+  public RaftOutputStream(RaftProperties prop, String clientId,
+      Collection<RaftPeer> peers, String leaderId) {
+    final int bufferSize = prop.getInt(RAFT_OUTPUTSTREAM_BUFFER_SIZE_KEY,
+        RAFT_OUTPUTSTREAM_BUFFER_SIZE_DEFAULT);
+    buf = new byte[bufferSize];
+    count = 0;
+    this.clientId = clientId;
+    streamer = new AppendStreamer(prop, peers, leaderId, clientId);
+  }
+
+  @Override
+  public void write(int b) throws IOException {
+    checkClosed();
+    buf[count++] = (byte)b;
+    flushIfNecessary();
+  }
+
+  private void flushIfNecessary() throws IOException {
+    if(count == buf.length) {
+      flushToStreamer();
+    }
+  }
+
+  @Override
+  public void write(byte b[], int off, int len) throws IOException {
+    checkClosed();
+    if (off < 0 || len < 0 || off > b.length - len) {
+      throw new ArrayIndexOutOfBoundsException();
+    }
+
+    int total = 0;
+    while (total < len) {
+      int toWrite = Math.min(len - total, buf.length - count);
+      System.arraycopy(b, off + total, buf, count, toWrite);
+      count += toWrite;
+      total += toWrite;
+      flushIfNecessary();
+    }
+  }
+
+  private void flushToStreamer() throws IOException {
+    if (count > 0) {
+      streamer.write(ProtoUtils.toByteString(buf, 0, count), seqNum++);
+      count = 0;
+    }
+  }
+
+  @Override
+  public void flush() throws IOException {
+    checkClosed();
+    flushToStreamer();
+    streamer.flush();
+  }
+
+  @Override
+  public void close() throws IOException {
+    flushToStreamer();
+    streamer.close(); // streamer will flush
+    this.closed = true;
+  }
+
+  @Override
+  public String toString() {
+    return "RaftOutputStream-" + clientId;
+  }
+
+  private void checkClosed() throws IOException {
+    if (closed) {
+      throw new IOException(this.toString() + " was closed.");
+    }
+  }
+}

Reply via email to