http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-examples/src/test/java/org/apache/ratis/TestRestartRaftPeer.java ---------------------------------------------------------------------- diff --git a/ratis-examples/src/test/java/org/apache/ratis/TestRestartRaftPeer.java b/ratis-examples/src/test/java/org/apache/ratis/TestRestartRaftPeer.java new file mode 100644 index 0000000..37b4064 --- /dev/null +++ b/ratis-examples/src/test/java/org/apache/ratis/TestRestartRaftPeer.java @@ -0,0 +1,116 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis; + +import org.apache.log4j.Level; +import org.apache.ratis.MiniRaftCluster; +import org.apache.ratis.RaftTestUtil; +import org.apache.ratis.RaftTestUtil.SimpleMessage; +import org.apache.ratis.client.RaftClient; +import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.examples.RaftExamplesTestUtil; +import org.apache.ratis.server.RaftServerConfigKeys; +import org.apache.ratis.server.impl.RaftServerImpl; +import org.apache.ratis.server.simulation.RequestHandler; +import org.apache.ratis.server.storage.RaftLog; +import org.apache.ratis.statemachine.SimpleStateMachine4Testing; +import org.apache.ratis.statemachine.StateMachine; +import org.apache.ratis.util.RaftUtils; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.Timeout; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; + +/** + * Test restarting raft peers. + */ +@RunWith(Parameterized.class) +public class TestRestartRaftPeer { + static Logger LOG = LoggerFactory.getLogger(TestRestartRaftPeer.class); + static { + RaftUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG); + RaftUtils.setLogLevel(RaftLog.LOG, Level.DEBUG); + RaftUtils.setLogLevel(RequestHandler.LOG, Level.DEBUG); + RaftUtils.setLogLevel(RaftClient.LOG, Level.DEBUG); + } + + @Parameterized.Parameters + public static Collection<Object[]> data() throws IOException { + RaftProperties prop = new RaftProperties(); + prop.setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY, + SimpleStateMachine4Testing.class, StateMachine.class); + prop.setInt(RaftServerConfigKeys.RAFT_LOG_SEGMENT_MAX_SIZE_KEY, 1024 * 8); + return RaftExamplesTestUtil.getMiniRaftClusters(prop, 3); + } + + @Parameterized.Parameter + public MiniRaftCluster cluster; + + @Rule + public Timeout globalTimeout = new Timeout(60 * 1000); + + @Test + public void restartFollower() throws Exception { + cluster.start(); + RaftTestUtil.waitForLeader(cluster); + final String leaderId = cluster.getLeader().getId(); + final RaftClient client = cluster.createClient("client", leaderId); + + // write some messages + final byte[] content = new byte[1024]; + Arrays.fill(content, (byte) 1); + final SimpleMessage message = new SimpleMessage(new String(content)); + for (int i = 0; i < 10; i++) { + Assert.assertTrue(client.send(message).isSuccess()); + } + + // restart a follower + String followerId = cluster.getFollowers().get(0).getId(); + LOG.info("Restart follower {}", followerId); + cluster.restartServer(followerId, false); + + // write some more messages + for (int i = 0; i < 10; i++) { + Assert.assertTrue(client.send(message).isSuccess()); + } + client.close(); + + // make sure the restarted follower can catchup + boolean catchup = false; + long lastAppliedIndex = 0; + for (int i = 0; i < 10 && !catchup; i++) { + Thread.sleep(500); + lastAppliedIndex = cluster.getServer(followerId).getState().getLastAppliedIndex(); + catchup = lastAppliedIndex >= 20; + } + Assert.assertTrue("lastAppliedIndex=" + lastAppliedIndex, catchup); + + // make sure the restarted peer's log segments is correct + cluster.restartServer(followerId, false); + Assert.assertTrue(cluster.getServer(followerId).getState().getLog() + .getLastEntry().getIndex() >= 20); + } +}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-examples/src/test/java/org/apache/ratis/examples/RaftExamplesTestUtil.java ---------------------------------------------------------------------- diff --git a/ratis-examples/src/test/java/org/apache/ratis/examples/RaftExamplesTestUtil.java b/ratis-examples/src/test/java/org/apache/ratis/examples/RaftExamplesTestUtil.java new file mode 100644 index 0000000..ff936bd --- /dev/null +++ b/ratis-examples/src/test/java/org/apache/ratis/examples/RaftExamplesTestUtil.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.examples; + +import org.apache.ratis.MiniRaftCluster; +import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.grpc.MiniRaftClusterWithGRpc; +import org.apache.ratis.hadooprpc.MiniRaftClusterWithHadoopRpc; +import org.apache.ratis.netty.MiniRaftClusterWithNetty; +import org.apache.ratis.server.simulation.MiniRaftClusterWithSimulatedRpc; +import org.apache.ratis.statemachine.StateMachine; + +import java.io.IOException; +import java.util.*; + +public class RaftExamplesTestUtil { + private static void add( + Collection<Object[]> clusters, MiniRaftCluster.Factory factory, + String[] ids, RaftProperties properties) + throws IOException { + clusters.add(new Object[]{factory.newCluster(ids, properties, true)}); + } + + public static Collection<Object[]> getMiniRaftClusters( + RaftProperties prop, int clusterSize, Class<?>... clusterClasses) + throws IOException { + final List<Class<?>> classes = Arrays.asList(clusterClasses); + final boolean isAll = classes.isEmpty(); //empty means all + + final Iterator<String[]> ids = new Iterator<String[]>() { + private int i = 0; + @Override + public boolean hasNext() { + return true; + } + @Override + public String[] next() { + return MiniRaftCluster.generateIds(clusterSize, i++*clusterSize); + } + }; + + final List<Object[]> clusters = new ArrayList<>(); + + if (isAll || classes.contains(MiniRaftClusterWithSimulatedRpc.class)) { + add(clusters, MiniRaftClusterWithSimulatedRpc.FACTORY, ids.next(), prop); + } + if (isAll || classes.contains(MiniRaftClusterWithHadoopRpc.class)) { + add(clusters, MiniRaftClusterWithHadoopRpc.FACTORY, ids.next(), prop); + } + if (isAll || classes.contains(MiniRaftClusterWithNetty.class)) { + add(clusters, MiniRaftClusterWithNetty.FACTORY, ids.next(), prop); + } + if (isAll || classes.contains(MiniRaftClusterWithGRpc.class)) { + add(clusters, MiniRaftClusterWithGRpc.FACTORY, ids.next(), prop); + } + return clusters; + } + + public static <S extends StateMachine> Collection<Object[]> getMiniRaftClusters( + Class<S> stateMachineClass, Class<?>... clusterClasses) throws IOException { + final RaftProperties prop = new RaftProperties(); + prop.setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY, + stateMachineClass, StateMachine.class); + return getMiniRaftClusters(prop, 3, clusterClasses); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-examples/src/test/java/org/apache/ratis/examples/arithmetic/TestArithmetic.java ---------------------------------------------------------------------- diff --git a/ratis-examples/src/test/java/org/apache/ratis/examples/arithmetic/TestArithmetic.java b/ratis-examples/src/test/java/org/apache/ratis/examples/arithmetic/TestArithmetic.java new file mode 100644 index 0000000..44cf894 --- /dev/null +++ b/ratis-examples/src/test/java/org/apache/ratis/examples/arithmetic/TestArithmetic.java @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.examples.arithmetic; + + +import org.apache.log4j.Level; +import org.apache.ratis.MiniRaftCluster; +import org.apache.ratis.RaftTestUtil; +import org.apache.ratis.client.RaftClient; +import org.apache.ratis.examples.RaftExamplesTestUtil; +import org.apache.ratis.examples.arithmetic.ArithmeticStateMachine; +import org.apache.ratis.examples.arithmetic.AssignmentMessage; +import org.apache.ratis.examples.arithmetic.expression.*; +import org.apache.ratis.protocol.RaftClientReply; +import org.apache.ratis.util.RaftUtils; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.IOException; +import java.util.Collection; + +@RunWith(Parameterized.class) +public class TestArithmetic { + static { + RaftUtils.setLogLevel(ArithmeticStateMachine.LOG, Level.ALL); + } + + @Parameterized.Parameters + public static Collection<Object[]> data() throws IOException { + return RaftExamplesTestUtil.getMiniRaftClusters(ArithmeticStateMachine.class); + } + + @Parameterized.Parameter + public MiniRaftCluster cluster; + + @Test + public void testPythagorean() throws Exception { + cluster.start(); + RaftTestUtil.waitForLeader(cluster); + final String leaderId = cluster.getLeader().getId(); + final RaftClient client = cluster.createClient("pythagorean", leaderId); + + final Variable a = new Variable("a"); + final Variable b = new Variable("b"); + final Variable c = new Variable("c"); + final BinaryExpression a2 = new BinaryExpression(BinaryExpression.Op.MULT, a, a); + final BinaryExpression b2 = new BinaryExpression(BinaryExpression.Op.MULT, b, b); + final BinaryExpression c2 = new BinaryExpression(BinaryExpression.Op.ADD, a2, b2); + final AssignmentMessage pythagorean = new AssignmentMessage(c, + new UnaryExpression(UnaryExpression.Op.SQRT, c2)); + + final AssignmentMessage nullA = new AssignmentMessage(a, NullValue.getInstance()); + final AssignmentMessage nullB = new AssignmentMessage(b, NullValue.getInstance()); + final AssignmentMessage nullC = new AssignmentMessage(c, NullValue.getInstance()); + + for(int n = 3; n < 100; n += 2) { + int n2 = n*n; + int half_n2 = n2/2; + + RaftClientReply r; + r = client.send(new AssignmentMessage(a, new DoubleValue(n))); + assertRaftClientReply(r, (double)n); + r = client.sendReadOnly(Expression.Utils.toMessage(a2)); + assertRaftClientReply(r, (double)n2); + r = client.send(new AssignmentMessage(b, new DoubleValue(half_n2))); + assertRaftClientReply(r, (double)half_n2); + r = client.sendReadOnly(Expression.Utils.toMessage(b2)); + assertRaftClientReply(r, (double)half_n2*half_n2); + r = client.send(pythagorean); + assertRaftClientReply(r, (double)half_n2 + 1); + + r = client.send(nullA); + assertRaftClientReply(r, null); + r = client.send(nullB); + assertRaftClientReply(r, null); + r = client.send(nullC); + assertRaftClientReply(r, null); + } + client.close(); + cluster.shutdown(); + } + + static void assertRaftClientReply(RaftClientReply reply, Double expected) { + Assert.assertTrue(reply.isSuccess()); + final Expression e = Expression.Utils.bytes2Expression( + reply.getMessage().getContent().toByteArray(), 0); + Assert.assertEquals(expected, e.evaluate(null)); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-examples/src/test/java/org/apache/ratis/examples/arithmetic/expression/TestExpression.java ---------------------------------------------------------------------- diff --git a/ratis-examples/src/test/java/org/apache/ratis/examples/arithmetic/expression/TestExpression.java b/ratis-examples/src/test/java/org/apache/ratis/examples/arithmetic/expression/TestExpression.java new file mode 100644 index 0000000..a2d6e29 --- /dev/null +++ b/ratis-examples/src/test/java/org/apache/ratis/examples/arithmetic/expression/TestExpression.java @@ -0,0 +1,107 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.examples.arithmetic.expression; + + +import org.apache.ratis.examples.arithmetic.expression.BinaryExpression; +import org.apache.ratis.examples.arithmetic.expression.DoubleValue; +import org.apache.ratis.examples.arithmetic.expression.Expression; +import org.apache.ratis.examples.arithmetic.expression.UnaryExpression; +import org.apache.ratis.examples.arithmetic.expression.Variable; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Random; +import java.util.concurrent.ThreadLocalRandom; + +public class TestExpression { + static final Logger LOG = LoggerFactory.getLogger(TestExpression.class); + + @Test + public void testArithmeticUtils() throws Exception { + final Random ran = ThreadLocalRandom.current(); + final byte[] buf = new byte[1024]; + int offset = 0; + + for(int i = 0; i < 10; i++) { + { + final int n = ran.nextInt(); + Expression.Utils.int2bytes(n, buf, offset); + final int m = Expression.Utils.bytes2int(buf, offset); + Assert.assertEquals(n, m); + offset += 4; + } + { + final long n = ran.nextLong(); + Expression.Utils.long2bytes(n, buf, offset); + final long m = Expression.Utils.bytes2long(buf, offset); + Assert.assertEquals(n, m); + offset += 8; + } + { + final double n = ran.nextDouble(); + Expression.Utils.double2bytes(n, buf, offset); + final double m = Expression.Utils.bytes2double(buf, offset); + Assert.assertTrue(n == m); + offset += 8; + } + } + } + @Test + public void testOp() throws Exception { + for(BinaryExpression.Op op : BinaryExpression.Op.values()) { + final byte b = op.byteValue(); + Assert.assertEquals(op, BinaryExpression.Op.valueOf(b)); + } + for(UnaryExpression.Op op : UnaryExpression.Op.values()) { + final byte b = op.byteValue(); + Assert.assertEquals(op, UnaryExpression.Op.valueOf(b)); + } + } + + @Test + public void testExpression() throws Exception { + final byte[] buf = new byte[1024]; + int offset = 0; + + { + final Variable a = new Variable("pi"); + LOG.info("var a: " + a); + final int len = a.toBytes(buf, offset); + final Variable a2 = new Variable(buf, offset); + LOG.info("var a2: " + a2); + Assert.assertEquals(a.getName(), a2.getName()); + Assert.assertEquals(len, a.length()); + Assert.assertEquals(len, a2.length()); + offset += len; + } + + { + final DoubleValue three = new DoubleValue(3); + LOG.info("double three: " + three.evaluate(null)); + final int len = three.toBytes(buf, offset); + final DoubleValue three2 = new DoubleValue(buf, offset); + LOG.info("double three2: " + three2.evaluate(null)); + Assert.assertTrue(three.evaluate(null).equals(three2.evaluate(null))); + Assert.assertEquals(len, three.length()); + Assert.assertEquals(len, three2.length()); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-examples/src/test/java/org/apache/ratis/statemachine/TestRaftStateMachineException.java ---------------------------------------------------------------------- diff --git a/ratis-examples/src/test/java/org/apache/ratis/statemachine/TestRaftStateMachineException.java b/ratis-examples/src/test/java/org/apache/ratis/statemachine/TestRaftStateMachineException.java new file mode 100644 index 0000000..1f885ea --- /dev/null +++ b/ratis-examples/src/test/java/org/apache/ratis/statemachine/TestRaftStateMachineException.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.statemachine; + +import org.apache.log4j.Level; +import org.apache.ratis.MiniRaftCluster; +import org.apache.ratis.RaftTestUtil; +import org.apache.ratis.client.RaftClient; +import org.apache.ratis.examples.RaftExamplesTestUtil; +import org.apache.ratis.protocol.Message; +import org.apache.ratis.protocol.StateMachineException; +import org.apache.ratis.server.impl.RaftServerImpl; +import org.apache.ratis.server.simulation.RequestHandler; +import org.apache.ratis.server.storage.RaftLog; +import org.apache.ratis.statemachine.SimpleStateMachine4Testing; +import org.apache.ratis.statemachine.TransactionContext; +import org.apache.ratis.util.RaftUtils; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.IOException; +import java.util.Collection; +import java.util.concurrent.CompletableFuture; + +import static org.junit.Assert.fail; + +@RunWith(Parameterized.class) +public class TestRaftStateMachineException { + static { + RaftUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG); + RaftUtils.setLogLevel(RaftLog.LOG, Level.DEBUG); + RaftUtils.setLogLevel(RequestHandler.LOG, Level.DEBUG); + RaftUtils.setLogLevel(RaftClient.LOG, Level.DEBUG); + } + + protected static class StateMachineWithException extends SimpleStateMachine4Testing { + @Override + public CompletableFuture<Message> applyTransaction(TransactionContext trx) { + CompletableFuture<Message> future = new CompletableFuture<>(); + future.completeExceptionally(new StateMachineException("Fake Exception")); + return future; + } + } + + @Parameterized.Parameters + public static Collection<Object[]> data() throws IOException { + return RaftExamplesTestUtil.getMiniRaftClusters( + StateMachineWithException.class); + } + + @Parameterized.Parameter + public MiniRaftCluster cluster; + + @Test + public void testHandleStateMachineException() throws Exception { + cluster.start(); + RaftTestUtil.waitForLeader(cluster); + + final String leaderId = cluster.getLeader().getId(); + + try(final RaftClient client = cluster.createClient("client", leaderId)) { + client.send(new RaftTestUtil.SimpleMessage("m")); + fail("Exception expected"); + } catch (StateMachineException e) { + e.printStackTrace(); + Assert.assertTrue(e.getMessage().contains("Fake Exception")); + } + + cluster.shutdown(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-examples/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/ratis-examples/src/test/resources/log4j.properties b/ratis-examples/src/test/resources/log4j.properties new file mode 100644 index 0000000..ced0687 --- /dev/null +++ b/ratis-examples/src/test/resources/log4j.properties @@ -0,0 +1,18 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# log4j configuration used during build and unit tests + +log4j.rootLogger=info,stdout +log4j.threshold=ALL +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-grpc/pom.xml ---------------------------------------------------------------------- diff --git a/ratis-grpc/pom.xml b/ratis-grpc/pom.xml new file mode 100644 index 0000000..6a46be5 --- /dev/null +++ b/ratis-grpc/pom.xml @@ -0,0 +1,93 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. See accompanying LICENSE file. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <artifactId>ratis-project-dist</artifactId> + <groupId>org.apache.ratis</groupId> + <version>1.0-SNAPSHOT</version> + <relativePath>../ratis-project-dist</relativePath> + </parent> + + <artifactId>ratis-grpc</artifactId> + <name>Ratis gRPC Support</name> + + <dependencies> + <dependency> + <artifactId>ratis-proto-shaded</artifactId> + <groupId>org.apache.ratis</groupId> + <scope>provided</scope> + </dependency> + + <dependency> + <artifactId>ratis-common</artifactId> + <groupId>org.apache.ratis</groupId> + <scope>provided</scope> + </dependency> + <dependency> + <artifactId>ratis-common</artifactId> + <groupId>org.apache.ratis</groupId> + <scope>test</scope> + <type>test-jar</type> + </dependency> + + <dependency> + <artifactId>ratis-client</artifactId> + <groupId>org.apache.ratis</groupId> + <scope>provided</scope> + </dependency> + <dependency> + <artifactId>ratis-client</artifactId> + <groupId>org.apache.ratis</groupId> + <scope>test</scope> + <type>test-jar</type> + </dependency> + + <dependency> + <artifactId>ratis-server</artifactId> + <groupId>org.apache.ratis</groupId> + <scope>provided</scope> + </dependency> + <dependency> + <artifactId>ratis-server</artifactId> + <groupId>org.apache.ratis</groupId> + <scope>test</scope> + <type>test-jar</type> + </dependency> + + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + </dependency> + + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + </dependency> + + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-all</artifactId> + <scope>test</scope> + </dependency> + </dependencies> +</project> http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java new file mode 100644 index 0000000..b61e70e --- /dev/null +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java @@ -0,0 +1,152 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.grpc; + +import com.google.common.base.Preconditions; + +import org.apache.ratis.shaded.io.grpc.Server; +import org.apache.ratis.shaded.io.grpc.ServerBuilder; +import org.apache.ratis.shaded.io.grpc.netty.NettyServerBuilder; +import org.apache.ratis.shaded.proto.RaftProtos.*; +import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.grpc.client.RaftClientProtocolService; +import org.apache.ratis.grpc.server.RaftServerProtocolClient; +import org.apache.ratis.grpc.server.RaftServerProtocolService; +import org.apache.ratis.protocol.RaftPeer; +import org.apache.ratis.server.RaftServer; +import org.apache.ratis.server.RaftServerRpc; +import org.apache.ratis.util.CodeInjectionForTesting; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.ratis.grpc.RaftGrpcConfigKeys.RAFT_GRPC_SERVER_PORT_DEFAULT; +import static org.apache.ratis.grpc.RaftGrpcConfigKeys.RAFT_GRPC_SERVER_PORT_KEY; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +public class RaftGRpcService implements RaftServerRpc { + static final Logger LOG = LoggerFactory.getLogger(RaftGRpcService.class); + public static final String GRPC_SEND_SERVER_REQUEST = + RaftGRpcService.class.getSimpleName() + ".sendRequest"; + + private final Server server; + private final InetSocketAddress address; + private final Map<String, RaftServerProtocolClient> peers = + Collections.synchronizedMap(new HashMap<>()); + private final String selfId; + + public RaftGRpcService(RaftServer raftServer, RaftProperties properties) { + int port = properties.getInt(RAFT_GRPC_SERVER_PORT_KEY, + RAFT_GRPC_SERVER_PORT_DEFAULT); + int maxMessageSize = properties.getInt( + RaftGrpcConfigKeys.RAFT_GRPC_MESSAGE_MAXSIZE_KEY, + RaftGrpcConfigKeys.RAFT_GRPC_MESSAGE_MAXSIZE_DEFAULT); + ServerBuilder serverBuilder = ServerBuilder.forPort(port); + selfId = raftServer.getId(); + server = ((NettyServerBuilder) serverBuilder).maxMessageSize(maxMessageSize) + .addService(new RaftServerProtocolService(selfId, raftServer)) + .addService(new RaftClientProtocolService(selfId, raftServer)) + .build(); + + // start service to determine the port (in case port is configured as 0) + startService(); + address = new InetSocketAddress(server.getPort()); + LOG.info("Server started, listening on " + address.getPort()); + } + + @Override + public void start() { + // do nothing + } + + private void startService() { + try { + server.start(); + } catch (IOException e) { + LOG.error("Failed to start Grpc server", e); + System.exit(1); + } + Runtime.getRuntime().addShutdownHook(new Thread() { + @Override + public void run() { + System.err.println("*** shutting down gRPC server since JVM is shutting down"); + RaftGRpcService.this.close(); + System.err.println("*** server shut down"); + } + }); + } + + @Override + public void close() { + if (server != null) { + server.shutdown(); + } + shutdownClients(); + } + + @Override + public InetSocketAddress getInetSocketAddress() { + return address; + } + + @Override + public AppendEntriesReplyProto appendEntries( + AppendEntriesRequestProto request) throws IOException { + throw new UnsupportedOperationException( + "Blocking AppendEntries call is not supported"); + } + + @Override + public InstallSnapshotReplyProto installSnapshot( + InstallSnapshotRequestProto request) throws IOException { + throw new UnsupportedOperationException( + "Blocking InstallSnapshot call is not supported"); + } + + @Override + public RequestVoteReplyProto requestVote(RequestVoteRequestProto request) + throws IOException { + CodeInjectionForTesting.execute(GRPC_SEND_SERVER_REQUEST, selfId, + null, request); + + RaftServerProtocolClient target = Preconditions.checkNotNull( + peers.get(request.getServerRequest().getReplyId())); + return target.requestVote(request); + } + + @Override + public void addPeers(Iterable<RaftPeer> newPeers) { + for (RaftPeer p : newPeers) { + if (!peers.containsKey(p.getId())) { + peers.put(p.getId(), new RaftServerProtocolClient(p)); + } + } + } + + private void shutdownClients() { + peers.values().forEach(RaftServerProtocolClient::shutdown); + } + + public RaftServerProtocolClient getRpcClient(RaftPeer peer) { + return peers.get(peer.getId()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGrpcConfigKeys.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGrpcConfigKeys.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGrpcConfigKeys.java new file mode 100644 index 0000000..ffec8ff --- /dev/null +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGrpcConfigKeys.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.grpc; + +import org.apache.ratis.client.RaftClientConfigKeys; + +public interface RaftGrpcConfigKeys { + String PREFIX = "raft.grpc"; + + String RAFT_GRPC_SERVER_PORT_KEY = PREFIX + ".server.port"; + int RAFT_GRPC_SERVER_PORT_DEFAULT = 0; + + String RAFT_GRPC_MESSAGE_MAXSIZE_KEY = PREFIX + ".message.maxsize"; + int RAFT_GRPC_MESSAGE_MAXSIZE_DEFAULT = 64 * 1024 * 1024; // 64 MB + + String RAFT_GRPC_LEADER_MAX_OUTSTANDING_APPENDS_KEY = + PREFIX + "leader.max.outstanding.appends"; + int RAFT_GRPC_LEADER_MAX_OUTSTANDING_APPENDS_DEFAULT = 128; + + String RAFT_GRPC_CLIENT_MAX_OUTSTANDING_APPENDS_KEY = + PREFIX + "client.max.outstanding.appends"; + int RAFT_GRPC_CLIENT_MAX_OUTSTANDING_APPENDS_DEFAULT = 128; + + String RAFT_OUTPUTSTREAM_BUFFER_SIZE_KEY = "raft.outputstream.buffer.size"; + int RAFT_OUTPUTSTREAM_BUFFER_SIZE_DEFAULT = 64 * 1024; + + String RAFT_OUTPUTSTREAM_MAX_RETRY_TIMES_KEY = "raft.outputstream.max.retry.times"; + int RAFT_OUTPUTSTREAM_MAX_RETRY_TIMES_DEFAULT = 5; + + String RAFT_OUTPUTSTREAM_RETRY_INTERVAL_KEY = "raft.outputstream.retry.interval"; + long RAFT_OUTPUTSTREAM_RETRY_INTERVAL_DEFAULT = RaftClientConfigKeys.RAFT_RPC_TIMEOUT_MS_DEFAULT; +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGrpcUtil.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGrpcUtil.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGrpcUtil.java new file mode 100644 index 0000000..52ed851 --- /dev/null +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGrpcUtil.java @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.grpc; + +import org.apache.ratis.shaded.io.grpc.Metadata; +import org.apache.ratis.shaded.io.grpc.Status; +import org.apache.ratis.shaded.io.grpc.StatusRuntimeException; +import org.apache.ratis.util.RaftUtils; + +import java.io.IOException; +import java.io.PrintWriter; +import java.io.StringWriter; +import java.lang.reflect.Constructor; + +public class RaftGrpcUtil { + public static final Metadata.Key<String> EXCEPTION_TYPE_KEY = + Metadata.Key.of("exception-type", Metadata.ASCII_STRING_MARSHALLER); + + public static String stringifyException(Throwable e) { + StringWriter stm = new StringWriter(); + PrintWriter wrt = new PrintWriter(stm); + e.printStackTrace(wrt); + wrt.close(); + return stm.toString(); + } + + public static StatusRuntimeException wrapException(Throwable t) { + Metadata trailers = new Metadata(); + trailers.put(EXCEPTION_TYPE_KEY, t.getClass().getCanonicalName()); + return new StatusRuntimeException( + Status.INTERNAL.withDescription(RaftGrpcUtil.stringifyException(t)), + trailers); + } + + public static IOException unwrapException(StatusRuntimeException se) { + final Metadata trailers = se.getTrailers(); + final Status status = se.getStatus(); + if (trailers != null && status != null) { + final String className = trailers.get(EXCEPTION_TYPE_KEY); + if (className != null) { + try { + Class<?> clazz = Class.forName(className); + final Exception unwrapped = instantiateException( + clazz.asSubclass(Exception.class), status.getDescription(), se); + return RaftUtils.asIOException(unwrapped); + } catch (Exception e) { + return new IOException(se); + } + } + } + return new IOException(se); + } + + public static IOException unwrapIOException(Throwable t) { + final IOException e; + if (t instanceof StatusRuntimeException) { + e = RaftGrpcUtil.unwrapException((StatusRuntimeException) t); + } else { + e = RaftUtils.asIOException(t); + } + return e; + } + + private static Exception instantiateException(Class<? extends Exception> cls, + String message, Exception from) throws Exception { + Constructor<? extends Exception> cn = cls.getConstructor(String.class); + cn.setAccessible(true); + Exception ex = cn.newInstance(message); + ex.initCause(from); + return ex; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java new file mode 100644 index 0000000..cb05b33 --- /dev/null +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java @@ -0,0 +1,396 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.grpc.client; + +import com.google.common.base.Preconditions; + +import org.apache.ratis.shaded.com.google.protobuf.ByteString; +import org.apache.ratis.shaded.proto.RaftProtos.RaftClientReplyProto; +import org.apache.ratis.shaded.proto.RaftProtos.RaftClientRequestProto; +import org.apache.ratis.shaded.proto.RaftProtos.RaftRpcRequestProto; +import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.grpc.RaftGrpcConfigKeys; +import org.apache.ratis.grpc.RaftGrpcUtil; +import org.apache.ratis.protocol.NotLeaderException; +import org.apache.ratis.protocol.RaftClientReply; +import org.apache.ratis.protocol.RaftPeer; +import org.apache.ratis.util.Daemon; +import org.apache.ratis.util.PeerProxyMap; +import org.apache.ratis.util.RaftUtils; +import org.apache.ratis.util.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.IOException; +import java.util.*; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static org.apache.ratis.client.impl.ClientProtoUtils.*; +import static org.apache.ratis.grpc.RaftGrpcConfigKeys.RAFT_GRPC_CLIENT_MAX_OUTSTANDING_APPENDS_DEFAULT; +import static org.apache.ratis.grpc.RaftGrpcConfigKeys.RAFT_GRPC_CLIENT_MAX_OUTSTANDING_APPENDS_KEY; + +public class AppendStreamer implements Closeable { + public static final Logger LOG = LoggerFactory.getLogger(AppendStreamer.class); + + enum RunningState {RUNNING, LOOK_FOR_LEADER, CLOSED, ERROR} + + private static class ExceptionAndRetry { + private final Map<String, IOException> exceptionMap = new HashMap<>(); + private final AtomicInteger retryTimes = new AtomicInteger(0); + private final int maxRetryTimes; + private final long retryInterval; + + ExceptionAndRetry(RaftProperties prop) { + maxRetryTimes = prop.getInt( + RaftGrpcConfigKeys.RAFT_OUTPUTSTREAM_MAX_RETRY_TIMES_KEY, + RaftGrpcConfigKeys.RAFT_OUTPUTSTREAM_MAX_RETRY_TIMES_DEFAULT); + retryInterval = prop.getTimeDuration( + RaftGrpcConfigKeys.RAFT_OUTPUTSTREAM_RETRY_INTERVAL_KEY, + RaftGrpcConfigKeys.RAFT_OUTPUTSTREAM_RETRY_INTERVAL_DEFAULT, + TimeUnit.MILLISECONDS); + } + + void addException(String peer, IOException e) { + exceptionMap.put(peer, e); + retryTimes.incrementAndGet(); + } + + IOException getCombinedException() { + return new IOException("Exceptions: " + exceptionMap); + } + + boolean shouldRetry() { + return retryTimes.get() <= maxRetryTimes; + } + } + + private final Deque<RaftClientRequestProto> dataQueue; + private final Deque<RaftClientRequestProto> ackQueue; + private final int maxPendingNum; + + private final PeerProxyMap<RaftClientProtocolProxy> proxyMap; + private final Map<String, RaftPeer> peers; + private String leaderId; + private volatile RaftClientProtocolProxy leaderProxy; + private final String clientId; + + private volatile RunningState running = RunningState.RUNNING; + private final ExceptionAndRetry exceptionAndRetry; + private final Sender senderThread; + + AppendStreamer(RaftProperties prop, Collection<RaftPeer> peers, + String leaderId, String clientId) { + this.clientId = clientId; + maxPendingNum = prop.getInt( + RAFT_GRPC_CLIENT_MAX_OUTSTANDING_APPENDS_KEY, + RAFT_GRPC_CLIENT_MAX_OUTSTANDING_APPENDS_DEFAULT); + dataQueue = new ConcurrentLinkedDeque<>(); + ackQueue = new ConcurrentLinkedDeque<>(); + exceptionAndRetry = new ExceptionAndRetry(prop); + + this.peers = peers.stream().collect( + Collectors.toMap(RaftPeer::getId, Function.identity())); + proxyMap = new PeerProxyMap<>( + raftPeer -> new RaftClientProtocolProxy(raftPeer, ResponseHandler::new)); + proxyMap.addPeers(peers); + refreshLeaderProxy(leaderId, null); + + senderThread = new Sender(); + senderThread.setName(this.toString() + "-sender"); + senderThread.start(); + } + + private synchronized void refreshLeaderProxy(String suggested, + String oldLeader) { + if (suggested != null) { + leaderId = suggested; + } else { + if (oldLeader == null) { + leaderId = peers.keySet().iterator().next(); + } else { + leaderId = StringUtils.next(oldLeader, peers.keySet()); + } + } + LOG.debug("{} switches leader from {} to {}. suggested leader: {}", this, + oldLeader, leaderId, suggested); + if (leaderProxy != null) { + leaderProxy.closeCurrentSession(); + } + try { + leaderProxy = proxyMap.getProxy(leaderId); + } catch (IOException e) { + LOG.error("Should not hit IOException here", e); + refreshLeader(null, leaderId); + } + } + + private boolean isRunning() { + return running == RunningState.RUNNING || + running == RunningState.LOOK_FOR_LEADER; + } + + private void checkState() throws IOException { + if (!isRunning()) { + throwException("The AppendStreamer has been closed"); + } + } + + synchronized void write(ByteString content, long seqNum) + throws IOException { + checkState(); + while (isRunning() && dataQueue.size() >= maxPendingNum) { + try { + wait(); + } catch (InterruptedException ignored) { + } + } + if (isRunning()) { + // wrap the current buffer into a RaftClientRequestProto + final RaftClientRequestProto request = genRaftClientRequestProto( + clientId, leaderId, seqNum, content, false); + dataQueue.offer(request); + this.notifyAll(); + } else { + throwException(this + " got closed."); + } + } + + synchronized void flush() throws IOException { + checkState(); + if (dataQueue.isEmpty() && ackQueue.isEmpty()) { + return; + } + // wait for the pending Q to become empty + while (isRunning() && (!dataQueue.isEmpty() || !ackQueue.isEmpty())) { + try { + wait(); + } catch (InterruptedException ignored) { + } + } + if (!isRunning() && (!dataQueue.isEmpty() || !ackQueue.isEmpty())) { + throwException(this + " got closed before finishing flush"); + } + } + + @Override + public void close() throws IOException { + if (!isRunning()) { + return; + } + flush(); + + running = RunningState.CLOSED; + senderThread.interrupt(); + try { + senderThread.join(); + } catch (InterruptedException ignored) { + } + proxyMap.close(); + } + + @Override + public String toString() { + return this.getClass().getSimpleName() + "-" + clientId; + } + + private class Sender extends Daemon { + @Override + public void run() { + while (isRunning()) { + + synchronized (AppendStreamer.this) { + while (isRunning() && shouldWait()) { + try { + AppendStreamer.this.wait(); + } catch (InterruptedException ignored) { + } + } + if (running == RunningState.RUNNING) { + RaftClientRequestProto next = dataQueue.poll(); + leaderProxy.onNext(next); + ackQueue.offer(next); + } + } + } + } + + private boolean shouldWait() { + // the sender should wait if any of the following is true + // 1) there is no data to send + // 2) there are too many outstanding pending requests + // 3) Error/NotLeaderException just happened, we're still waiting for + // the first response to confirm the new leader + return dataQueue.isEmpty() || ackQueue.size() >= maxPendingNum || + running == RunningState.LOOK_FOR_LEADER; + } + } + + /** the response handler for stream RPC */ + private class ResponseHandler implements + RaftClientProtocolProxy.CloseableStreamObserver { + private final String targetId; + // once handled the first NotLeaderException or Error, the handler should + // be inactive and should not make any further action. + private volatile boolean active = true; + + ResponseHandler(RaftPeer target) { + targetId = target.getId(); + } + + @Override + public String toString() { + return AppendStreamer.this + "-ResponseHandler-" + targetId; + } + + @Override + public void onNext(RaftClientReplyProto reply) { + if (!active) { + return; + } + synchronized (AppendStreamer.this) { + RaftClientRequestProto pending = Preconditions.checkNotNull( + ackQueue.peek()); + if (reply.getRpcReply().getSuccess()) { + Preconditions.checkState(pending.getRpcRequest().getSeqNum() == + reply.getRpcReply().getSeqNum()); + ackQueue.poll(); + LOG.trace("{} received success ack for request {}", this, + pending.getRpcRequest()); + // we've identified the correct leader + if (running == RunningState.LOOK_FOR_LEADER) { + running = RunningState.RUNNING; + } + } else { + // this may be a NotLeaderException + RaftClientReply r = toRaftClientReply(reply); + if (r.isNotLeader()) { + LOG.debug("{} received a NotLeaderException from {}", this, + r.getReplierId()); + handleNotLeader(r.getNotLeaderException(), targetId); + } + } + AppendStreamer.this.notifyAll(); + } + } + + @Override + public void onError(Throwable t) { + if (active) { + synchronized (AppendStreamer.this) { + handleError(t, this); + AppendStreamer.this.notifyAll(); + } + } + } + + @Override + public void onCompleted() { + LOG.info("{} onCompleted, pending requests #: {}", this, + ackQueue.size()); + } + + @Override // called by handleError and handleNotLeader + public void close() throws IOException { + active = false; + } + } + + private void throwException(String msg) throws IOException { + if (running == RunningState.ERROR) { + throw exceptionAndRetry.getCombinedException(); + } else { + throw new IOException(msg); + } + } + + private void handleNotLeader(NotLeaderException nle, + String oldLeader) { + Preconditions.checkState(Thread.holdsLock(AppendStreamer.this)); + // handle NotLeaderException: refresh leader and RaftConfiguration + refreshPeers(nle.getPeers()); + + refreshLeader(nle.getSuggestedLeader().getId(), oldLeader); + } + + private void handleError(Throwable t, ResponseHandler handler) { + Preconditions.checkState(Thread.holdsLock(AppendStreamer.this)); + final IOException e = RaftGrpcUtil.unwrapIOException(t); + + exceptionAndRetry.addException(handler.targetId, e); + LOG.debug("{} got error: {}. Total retry times {}, max retry times {}.", + handler, e, exceptionAndRetry.retryTimes.get(), + exceptionAndRetry.maxRetryTimes); + + leaderProxy.onError(); + if (exceptionAndRetry.shouldRetry()) { + refreshLeader(null, leaderId); + } else { + running = RunningState.ERROR; + } + } + + private void refreshLeader(String suggestedLeader, String oldLeader) { + running = RunningState.LOOK_FOR_LEADER; + refreshLeaderProxy(suggestedLeader, oldLeader); + reQueuePendingRequests(leaderId); + + final RaftClientRequestProto request = Preconditions.checkNotNull( + dataQueue.poll()); + ackQueue.offer(request); + try { + Thread.sleep(exceptionAndRetry.retryInterval); + } catch (InterruptedException ignored) { + } + leaderProxy.onNext(request); + } + + private void reQueuePendingRequests(String newLeader) { + if (isRunning()) { + // resend all the pending requests + while (!ackQueue.isEmpty()) { + RaftClientRequestProto oldRequest = ackQueue.pollLast(); + RaftRpcRequestProto r = oldRequest.getRpcRequest(); + RaftClientRequestProto newRequest = RaftClientRequestProto.newBuilder() + .setMessage(oldRequest.getMessage()) + .setReadOnly(oldRequest.getReadOnly()) + .setRpcRequest(toRaftRpcRequestProtoBuilder( + clientId, newLeader, r.getSeqNum())) + .build(); + dataQueue.offerFirst(newRequest); + } + } + } + + private void refreshPeers(RaftPeer[] newPeers) { + if (newPeers != null && newPeers.length > 0) { + // we only add new peers, we do not remove any peer even if it no longer + // belongs to the current raft conf + Arrays.stream(newPeers).forEach(peer -> { + peers.putIfAbsent(peer.getId(), peer); + proxyMap.putIfAbsent(peer); + }); + + LOG.debug("refreshed peers: {}", peers); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolClient.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolClient.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolClient.java new file mode 100644 index 0000000..74fb253 --- /dev/null +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolClient.java @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.grpc.client; + +import org.apache.ratis.shaded.io.grpc.ManagedChannel; +import org.apache.ratis.shaded.io.grpc.ManagedChannelBuilder; +import org.apache.ratis.shaded.io.grpc.StatusRuntimeException; +import org.apache.ratis.shaded.io.grpc.stub.StreamObserver; +import org.apache.ratis.shaded.proto.RaftProtos.RaftClientReplyProto; +import org.apache.ratis.shaded.proto.RaftProtos.RaftClientRequestProto; +import org.apache.ratis.shaded.proto.RaftProtos.SetConfigurationRequestProto; +import org.apache.ratis.shaded.proto.grpc.RaftClientProtocolServiceGrpc; +import org.apache.ratis.shaded.proto.grpc.RaftClientProtocolServiceGrpc.RaftClientProtocolServiceBlockingStub; +import org.apache.ratis.shaded.proto.grpc.RaftClientProtocolServiceGrpc.RaftClientProtocolServiceStub; +import org.apache.ratis.grpc.RaftGrpcUtil; +import org.apache.ratis.protocol.RaftPeer; + +import java.io.Closeable; +import java.io.IOException; + +public class RaftClientProtocolClient implements Closeable { + private final RaftPeer target; + private final ManagedChannel channel; + private final RaftClientProtocolServiceBlockingStub blockingStub; + private final RaftClientProtocolServiceStub asyncStub; + + public RaftClientProtocolClient(RaftPeer target) { + this.target = target; + channel = ManagedChannelBuilder.forTarget(target.getAddress()) + .usePlaintext(true).build(); + blockingStub = RaftClientProtocolServiceGrpc.newBlockingStub(channel); + asyncStub = RaftClientProtocolServiceGrpc.newStub(channel); + } + + @Override + public void close() { + channel.shutdownNow(); + } + + public RaftClientReplyProto setConfiguration( + SetConfigurationRequestProto request) throws IOException { + try { + return blockingStub.setConfiguration(request); + } catch (StatusRuntimeException e) { + // unwrap StatusRuntimeException + throw RaftGrpcUtil.unwrapException(e); + } + } + + StreamObserver<RaftClientRequestProto> append( + StreamObserver<RaftClientReplyProto> responseHandler) { + return asyncStub.append(responseHandler); + } + + public RaftPeer getTarget() { + return target; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolProxy.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolProxy.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolProxy.java new file mode 100644 index 0000000..6892c71 --- /dev/null +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolProxy.java @@ -0,0 +1,104 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.grpc.client; + +import org.apache.ratis.shaded.io.grpc.stub.StreamObserver; +import org.apache.ratis.shaded.proto.RaftProtos.RaftClientReplyProto; +import org.apache.ratis.shaded.proto.RaftProtos.RaftClientRequestProto; +import org.apache.ratis.protocol.RaftPeer; + +import java.io.Closeable; +import java.io.IOException; +import java.util.function.Function; + +public class RaftClientProtocolProxy implements Closeable { + private final RaftClientProtocolClient proxy; + private final Function<RaftPeer, CloseableStreamObserver> responseHandlerCreation; + private RpcSession currentSession; + + public RaftClientProtocolProxy(RaftPeer target, + Function<RaftPeer, CloseableStreamObserver> responseHandlerCreation) { + proxy = new RaftClientProtocolClient(target); + this.responseHandlerCreation = responseHandlerCreation; + } + + @Override + public void close() throws IOException { + closeCurrentSession(); + proxy.close(); + } + + @Override + public String toString() { + return "ProxyTo:" + proxy.getTarget(); + } + + public void closeCurrentSession() { + if (currentSession != null) { + currentSession.close(); + currentSession = null; + } + } + + public void onNext(RaftClientRequestProto request) { + if (currentSession == null) { + currentSession = new RpcSession( + responseHandlerCreation.apply(proxy.getTarget())); + } + currentSession.requestObserver.onNext(request); + } + + public void onError() { + if (currentSession != null) { + currentSession.onError(); + } + } + + public interface CloseableStreamObserver + extends StreamObserver<RaftClientReplyProto>, Closeable { + } + + class RpcSession implements Closeable { + private final StreamObserver<RaftClientRequestProto> requestObserver; + private final CloseableStreamObserver responseHandler; + private boolean hasError = false; + + RpcSession(CloseableStreamObserver responseHandler) { + this.responseHandler = responseHandler; + this.requestObserver = proxy.append(responseHandler); + } + + void onError() { + hasError = true; + } + + @Override + public void close() { + if (!hasError) { + try { + requestObserver.onCompleted(); + } catch (Exception ignored) { + } + } + try { + responseHandler.close(); + } catch (IOException ignored) { + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolService.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolService.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolService.java new file mode 100644 index 0000000..bb212e1 --- /dev/null +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolService.java @@ -0,0 +1,195 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.grpc.client; + +import com.google.common.base.Preconditions; + +import org.apache.ratis.shaded.io.grpc.stub.StreamObserver; +import org.apache.ratis.shaded.proto.RaftProtos.RaftClientReplyProto; +import org.apache.ratis.shaded.proto.RaftProtos.RaftClientRequestProto; +import org.apache.ratis.shaded.proto.RaftProtos.SetConfigurationRequestProto; +import org.apache.ratis.shaded.proto.grpc.RaftClientProtocolServiceGrpc.RaftClientProtocolServiceImplBase; +import org.apache.ratis.client.impl.ClientProtoUtils; +import org.apache.ratis.grpc.RaftGrpcUtil; +import org.apache.ratis.protocol.RaftClientAsynchronousProtocol; +import org.apache.ratis.protocol.RaftClientReply; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.*; +import java.util.concurrent.CompletableFuture; + +public class RaftClientProtocolService extends RaftClientProtocolServiceImplBase { + static final Logger LOG = LoggerFactory.getLogger(RaftClientProtocolService.class); + + private static class PendingAppend implements Comparable<PendingAppend> { + private final long seqNum; + private volatile RaftClientReply reply; + + PendingAppend(long seqNum) { + this.seqNum = seqNum; + } + + boolean isReady() { + return reply != null || this == COMPLETED; + } + + void setReply(RaftClientReply reply) { + this.reply = reply; + } + + @Override + public int compareTo(PendingAppend p) { + return seqNum == p.seqNum ? 0 : (seqNum < p.seqNum ? -1 : 1); + } + + @Override + public String toString() { + return seqNum + ", reply:" + (reply == null ? "null" : reply.toString()); + } + } + private static final PendingAppend COMPLETED = new PendingAppend(Long.MAX_VALUE); + + private final String id; + private final RaftClientAsynchronousProtocol client; + + public RaftClientProtocolService(String id, RaftClientAsynchronousProtocol client) { + this.id = id; + this.client = client; + } + + @Override + public void setConfiguration(SetConfigurationRequestProto request, + StreamObserver<RaftClientReplyProto> responseObserver) { + try { + CompletableFuture<RaftClientReply> future = client.setConfigurationAsync( + ClientProtoUtils.toSetConfigurationRequest(request)); + future.whenCompleteAsync((reply, exception) -> { + if (exception != null) { + responseObserver.onError(RaftGrpcUtil.wrapException(exception)); + } else { + responseObserver.onNext(ClientProtoUtils.toRaftClientReplyProto(reply)); + responseObserver.onCompleted(); + } + }); + } catch (Exception e) { + responseObserver.onError(RaftGrpcUtil.wrapException(e)); + } + } + + @Override + public StreamObserver<RaftClientRequestProto> append( + StreamObserver<RaftClientReplyProto> responseObserver) { + return new AppendRequestStreamObserver(responseObserver); + } + + private class AppendRequestStreamObserver implements + StreamObserver<RaftClientRequestProto> { + private final List<PendingAppend> pendingList = new LinkedList<>(); + private final StreamObserver<RaftClientReplyProto> responseObserver; + + AppendRequestStreamObserver(StreamObserver<RaftClientReplyProto> ro) { + this.responseObserver = ro; + } + + @Override + public void onNext(RaftClientRequestProto request) { + try { + PendingAppend p = new PendingAppend(request.getRpcRequest().getSeqNum()); + synchronized (pendingList) { + pendingList.add(p); + } + + CompletableFuture<RaftClientReply> future = client.submitClientRequestAsync( + ClientProtoUtils.toRaftClientRequest(request)); + future.whenCompleteAsync((reply, exception) -> { + if (exception != null) { + // TODO: the exception may be from either raft or state machine. + // Currently we skip all the following responses when getting an + // exception from the state machine. + responseObserver.onError(RaftGrpcUtil.wrapException(exception)); + } else { + final long replySeq = reply.getSeqNum(); + synchronized (pendingList) { + Preconditions.checkState(!pendingList.isEmpty(), + "PendingList is empty when handling onNext for seqNum %s", + replySeq); + final long headSeqNum = pendingList.get(0).seqNum; + // we assume the seqNum is consecutive for a stream RPC call + final PendingAppend pendingForReply = pendingList.get( + (int) (replySeq - headSeqNum)); + Preconditions.checkState(pendingForReply != null && + pendingForReply.seqNum == replySeq, + "pending for reply is: %s, the pending list: %s", + pendingForReply, pendingList); + pendingForReply.setReply(reply); + + if (headSeqNum == replySeq) { + Collection<PendingAppend> readySet = new ArrayList<>(); + // if this is head, we send back all the ready responses + Iterator<PendingAppend> iter = pendingList.iterator(); + PendingAppend pending; + while (iter.hasNext() && ((pending = iter.next()).isReady())) { + readySet.add(pending); + iter.remove(); + } + sendReadyReplies(readySet); + } + } + } + }); + } catch (Throwable e) { + LOG.info("{} got exception when handling client append request {}: {}", + id, request.getRpcRequest(), e); + responseObserver.onError(RaftGrpcUtil.wrapException(e)); + } + } + + private void sendReadyReplies(Collection<PendingAppend> readySet) { + readySet.forEach(ready -> { + Preconditions.checkState(ready.isReady()); + if (ready == COMPLETED) { + responseObserver.onCompleted(); + } else { + responseObserver.onNext( + ClientProtoUtils.toRaftClientReplyProto(ready.reply)); + } + }); + } + + @Override + public void onError(Throwable t) { + // for now we just log a msg + LOG.warn("{} onError: client Append cancelled", id, t); + synchronized (pendingList) { + pendingList.clear(); + } + } + + @Override + public void onCompleted() { + synchronized (pendingList) { + if (pendingList.isEmpty()) { + responseObserver.onCompleted(); + } else { + pendingList.add(COMPLETED); + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientSenderWithGrpc.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientSenderWithGrpc.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientSenderWithGrpc.java new file mode 100644 index 0000000..6d9e11f --- /dev/null +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientSenderWithGrpc.java @@ -0,0 +1,119 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.grpc.client; + +import org.apache.ratis.shaded.io.grpc.StatusRuntimeException; +import org.apache.ratis.shaded.io.grpc.stub.StreamObserver; +import org.apache.ratis.shaded.proto.RaftProtos.RaftClientReplyProto; +import org.apache.ratis.shaded.proto.RaftProtos.RaftClientRequestProto; +import org.apache.ratis.shaded.proto.RaftProtos.SetConfigurationRequestProto; +import org.apache.ratis.client.RaftClientRequestSender; +import org.apache.ratis.grpc.RaftGrpcUtil; +import org.apache.ratis.protocol.RaftClientReply; +import org.apache.ratis.protocol.RaftClientRequest; +import org.apache.ratis.protocol.RaftPeer; +import org.apache.ratis.protocol.SetConfigurationRequest; +import org.apache.ratis.util.PeerProxyMap; +import org.apache.ratis.util.RaftUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.ratis.client.impl.ClientProtoUtils.*; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.util.Collection; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + +public class RaftClientSenderWithGrpc implements RaftClientRequestSender { + public static final Logger LOG = LoggerFactory.getLogger(RaftClientSenderWithGrpc.class); + + private final PeerProxyMap<RaftClientProtocolClient> proxies + = new PeerProxyMap<>(RaftClientProtocolClient::new); + + public RaftClientSenderWithGrpc(Collection<RaftPeer> peers) { + addServers(peers); + } + + @Override + public RaftClientReply sendRequest(RaftClientRequest request) + throws IOException { + final String serverId = request.getReplierId(); + final RaftClientProtocolClient proxy = proxies.getProxy(serverId); + if (request instanceof SetConfigurationRequest) { + SetConfigurationRequestProto setConf = + toSetConfigurationRequestProto((SetConfigurationRequest) request); + return toRaftClientReply(proxy.setConfiguration(setConf)); + } else { + RaftClientRequestProto requestProto = toRaftClientRequestProto(request); + CompletableFuture<RaftClientReplyProto> replyFuture = + new CompletableFuture<>(); + final StreamObserver<RaftClientRequestProto> requestObserver = + proxy.append(new StreamObserver<RaftClientReplyProto>() { + @Override + public void onNext(RaftClientReplyProto value) { + replyFuture.complete(value); + } + + @Override + public void onError(Throwable t) { + // This implementation is used as RaftClientRequestSender. Retry + // logic on Exception is in RaftClient. + final IOException e; + if (t instanceof StatusRuntimeException) { + e = RaftGrpcUtil.unwrapException((StatusRuntimeException) t); + } else { + e = RaftUtils.asIOException(t); + } + replyFuture.completeExceptionally(e); + } + + @Override + public void onCompleted() { + if (!replyFuture.isDone()) { + replyFuture.completeExceptionally( + new IOException("No reply for request " + request)); + } + } + }); + requestObserver.onNext(requestProto); + requestObserver.onCompleted(); + + // TODO: timeout support + try { + return toRaftClientReply(replyFuture.get()); + } catch (InterruptedException e) { + throw new InterruptedIOException( + "Interrupted while waiting for response of request " + request); + } catch (ExecutionException e) { + throw RaftUtils.toIOException(e); + } + } + } + + @Override + public void addServers(Iterable<RaftPeer> servers) { + proxies.addPeers(servers); + } + + @Override + public void close() throws IOException { + proxies.close(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftOutputStream.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftOutputStream.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftOutputStream.java new file mode 100644 index 0000000..a3905f8 --- /dev/null +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftOutputStream.java @@ -0,0 +1,112 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.grpc.client; + +import static org.apache.ratis.grpc.RaftGrpcConfigKeys.RAFT_OUTPUTSTREAM_BUFFER_SIZE_DEFAULT; +import static org.apache.ratis.grpc.RaftGrpcConfigKeys.RAFT_OUTPUTSTREAM_BUFFER_SIZE_KEY; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.Collection; + +import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.protocol.RaftPeer; +import org.apache.ratis.util.ProtoUtils; + +public class RaftOutputStream extends OutputStream { + /** internal buffer */ + private final byte buf[]; + private int count; + private long seqNum = 0; + private final String clientId; + private final AppendStreamer streamer; + + private boolean closed = false; + + public RaftOutputStream(RaftProperties prop, String clientId, + Collection<RaftPeer> peers, String leaderId) { + final int bufferSize = prop.getInt(RAFT_OUTPUTSTREAM_BUFFER_SIZE_KEY, + RAFT_OUTPUTSTREAM_BUFFER_SIZE_DEFAULT); + buf = new byte[bufferSize]; + count = 0; + this.clientId = clientId; + streamer = new AppendStreamer(prop, peers, leaderId, clientId); + } + + @Override + public void write(int b) throws IOException { + checkClosed(); + buf[count++] = (byte)b; + flushIfNecessary(); + } + + private void flushIfNecessary() throws IOException { + if(count == buf.length) { + flushToStreamer(); + } + } + + @Override + public void write(byte b[], int off, int len) throws IOException { + checkClosed(); + if (off < 0 || len < 0 || off > b.length - len) { + throw new ArrayIndexOutOfBoundsException(); + } + + int total = 0; + while (total < len) { + int toWrite = Math.min(len - total, buf.length - count); + System.arraycopy(b, off + total, buf, count, toWrite); + count += toWrite; + total += toWrite; + flushIfNecessary(); + } + } + + private void flushToStreamer() throws IOException { + if (count > 0) { + streamer.write(ProtoUtils.toByteString(buf, 0, count), seqNum++); + count = 0; + } + } + + @Override + public void flush() throws IOException { + checkClosed(); + flushToStreamer(); + streamer.flush(); + } + + @Override + public void close() throws IOException { + flushToStreamer(); + streamer.close(); // streamer will flush + this.closed = true; + } + + @Override + public String toString() { + return "RaftOutputStream-" + clientId; + } + + private void checkClosed() throws IOException { + if (closed) { + throw new IOException(this.toString() + " was closed."); + } + } +}