RATIS-324. Rename grpc classes. Contributed by Tsz Wo Nicholas Sze.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/ed8e60da Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/ed8e60da Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/ed8e60da Branch: refs/heads/master Commit: ed8e60dad83ae733fe6c6c3752e82f1fee72c12e Parents: 974919e Author: Lokesh Jain <[email protected]> Authored: Thu Sep 20 10:29:48 2018 +0530 Committer: Lokesh Jain <[email protected]> Committed: Thu Sep 20 10:29:48 2018 +0530 ---------------------------------------------------------------------- .../ratis/examples/ParameterizedBaseTest.java | 6 +- .../filestore/TestFileStoreAsyncWithGrpc.java | 6 +- .../filestore/TestFileStoreWithGrpc.java | 6 +- .../java/org/apache/ratis/grpc/GrpcFactory.java | 9 +- .../java/org/apache/ratis/grpc/GrpcUtil.java | 131 ++++++ .../org/apache/ratis/grpc/RaftGRpcService.java | 154 ------- .../org/apache/ratis/grpc/RaftGrpcUtil.java | 131 ------ .../ratis/grpc/client/AppendStreamer.java | 390 ----------------- .../grpc/client/GrpcClientProtocolClient.java | 234 ++++++++++ .../grpc/client/GrpcClientProtocolProxy.java | 107 +++++ .../grpc/client/GrpcClientProtocolService.java | 195 +++++++++ .../apache/ratis/grpc/client/GrpcClientRpc.java | 14 +- .../ratis/grpc/client/GrpcClientStreamer.java | 390 +++++++++++++++++ .../ratis/grpc/client/GrpcOutputStream.java | 112 +++++ .../grpc/client/RaftClientProtocolClient.java | 234 ---------- .../grpc/client/RaftClientProtocolProxy.java | 107 ----- .../grpc/client/RaftClientProtocolService.java | 195 --------- .../ratis/grpc/client/RaftOutputStream.java | 114 ----- .../ratis/grpc/server/AdminProtocolService.java | 53 --- .../ratis/grpc/server/GRpcLogAppender.java | 438 ------------------- .../grpc/server/GrpcAdminProtocolService.java | 53 +++ .../ratis/grpc/server/GrpcLogAppender.java | 437 ++++++++++++++++++ .../grpc/server/GrpcServerProtocolClient.java | 75 ++++ .../grpc/server/GrpcServerProtocolService.java | 134 ++++++ .../apache/ratis/grpc/server/GrpcService.java | 152 +++++++ .../grpc/server/RaftServerProtocolClient.java | 75 ---- .../grpc/server/RaftServerProtocolService.java | 134 ------ .../ratis/grpc/MiniRaftClusterWithGRpc.java | 72 --- .../ratis/grpc/MiniRaftClusterWithGrpc.java | 72 +++ .../ratis/grpc/TestGroupManagementWithGrpc.java | 2 +- .../ratis/grpc/TestLeaderElectionWithGrpc.java | 6 +- .../ratis/grpc/TestRaftAsyncWithGrpc.java | 4 +- .../ratis/grpc/TestRaftExceptionWithGrpc.java | 4 +- .../grpc/TestRaftReconfigurationWithGRpc.java | 36 -- .../grpc/TestRaftReconfigurationWithGrpc.java | 36 ++ .../ratis/grpc/TestRaftServerWithGrpc.java | 4 +- .../ratis/grpc/TestRaftSnapshotWithGrpc.java | 5 +- .../TestRaftStateMachineExceptionWithGrpc.java | 4 +- .../org/apache/ratis/grpc/TestRaftStream.java | 24 +- .../org/apache/ratis/grpc/TestRaftWithGrpc.java | 8 +- .../ratis/grpc/TestRetryCacheWithGrpc.java | 6 +- .../grpc/TestServerInformationWithGrpc.java | 4 +- ratis-proto-shaded/src/main/proto/GRpc.proto | 54 --- ratis-proto-shaded/src/main/proto/Grpc.proto | 54 +++ 44 files changed, 2237 insertions(+), 2244 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ed8e60da/ratis-examples/src/test/java/org/apache/ratis/examples/ParameterizedBaseTest.java ---------------------------------------------------------------------- diff --git a/ratis-examples/src/test/java/org/apache/ratis/examples/ParameterizedBaseTest.java b/ratis-examples/src/test/java/org/apache/ratis/examples/ParameterizedBaseTest.java index 057c73a..03f60ec 100644 --- a/ratis-examples/src/test/java/org/apache/ratis/examples/ParameterizedBaseTest.java +++ b/ratis-examples/src/test/java/org/apache/ratis/examples/ParameterizedBaseTest.java @@ -21,7 +21,7 @@ import org.apache.ratis.BaseTest; import org.apache.ratis.MiniRaftCluster; import org.apache.ratis.RaftTestUtil; import org.apache.ratis.conf.RaftProperties; -import org.apache.ratis.grpc.MiniRaftClusterWithGRpc; +import org.apache.ratis.grpc.MiniRaftClusterWithGrpc; import org.apache.ratis.hadooprpc.MiniRaftClusterWithHadoopRpc; import org.apache.ratis.netty.MiniRaftClusterWithNetty; import org.apache.ratis.server.simulation.MiniRaftClusterWithSimulatedRpc; @@ -101,8 +101,8 @@ public abstract class ParameterizedBaseTest extends BaseTest { if (isAll || classes.contains(MiniRaftClusterWithSimulatedRpc.class)) { add(clusters, MiniRaftClusterWithSimulatedRpc.FACTORY, ids.next(), prop); } - if (isAll || classes.contains(MiniRaftClusterWithGRpc.class)) { - add(clusters, MiniRaftClusterWithGRpc.FACTORY, ids.next(), prop); + if (isAll || classes.contains(MiniRaftClusterWithGrpc.class)) { + add(clusters, MiniRaftClusterWithGrpc.FACTORY, ids.next(), prop); } if (isAll || classes.contains(MiniRaftClusterWithNetty.class)) { add(clusters, MiniRaftClusterWithNetty.FACTORY, ids.next(), prop); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ed8e60da/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/TestFileStoreAsyncWithGrpc.java ---------------------------------------------------------------------- diff --git a/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/TestFileStoreAsyncWithGrpc.java b/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/TestFileStoreAsyncWithGrpc.java index 02bd2b0..d25c85e 100644 --- a/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/TestFileStoreAsyncWithGrpc.java +++ b/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/TestFileStoreAsyncWithGrpc.java @@ -17,9 +17,9 @@ */ package org.apache.ratis.examples.filestore; -import org.apache.ratis.grpc.MiniRaftClusterWithGRpc; +import org.apache.ratis.grpc.MiniRaftClusterWithGrpc; public class TestFileStoreAsyncWithGrpc - extends FileStoreAsyncBaseTest<MiniRaftClusterWithGRpc> - implements MiniRaftClusterWithGRpc.FactoryGet { + extends FileStoreAsyncBaseTest<MiniRaftClusterWithGrpc> + implements MiniRaftClusterWithGrpc.FactoryGet { } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ed8e60da/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/TestFileStoreWithGrpc.java ---------------------------------------------------------------------- diff --git a/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/TestFileStoreWithGrpc.java b/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/TestFileStoreWithGrpc.java index 71ae294..6e46b6e 100644 --- a/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/TestFileStoreWithGrpc.java +++ b/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/TestFileStoreWithGrpc.java @@ -17,9 +17,9 @@ */ package org.apache.ratis.examples.filestore; -import org.apache.ratis.grpc.MiniRaftClusterWithGRpc; +import org.apache.ratis.grpc.MiniRaftClusterWithGrpc; public class TestFileStoreWithGrpc - extends FileStoreBaseTest<MiniRaftClusterWithGRpc> - implements MiniRaftClusterWithGRpc.FactoryGet { + extends FileStoreBaseTest<MiniRaftClusterWithGrpc> + implements MiniRaftClusterWithGrpc.FactoryGet { } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ed8e60da/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcFactory.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcFactory.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcFactory.java index 4f2612e..836ee1c 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcFactory.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcFactory.java @@ -21,7 +21,8 @@ import org.apache.ratis.client.ClientFactory; import org.apache.ratis.conf.Parameters; import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.grpc.client.GrpcClientRpc; -import org.apache.ratis.grpc.server.GRpcLogAppender; +import org.apache.ratis.grpc.server.GrpcLogAppender; +import org.apache.ratis.grpc.server.GrpcService; import org.apache.ratis.protocol.ClientId; import org.apache.ratis.rpc.SupportedRpcType; import org.apache.ratis.server.RaftServer; @@ -38,12 +39,12 @@ public class GrpcFactory implements ServerFactory, ClientFactory { @Override public LogAppender newLogAppender(RaftServerImpl server, LeaderState state, FollowerInfo f) { - return new GRpcLogAppender(server, state, f); + return new GrpcLogAppender(server, state, f); } @Override - public RaftGRpcService newRaftServerRpc(RaftServer server) { - return RaftGRpcService.newBuilder() + public GrpcService newRaftServerRpc(RaftServer server) { + return GrpcService.newBuilder() .setServer(server) .build(); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ed8e60da/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcUtil.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcUtil.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcUtil.java new file mode 100644 index 0000000..84f01c8 --- /dev/null +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcUtil.java @@ -0,0 +1,131 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.grpc; + +import org.apache.ratis.protocol.RaftClientReply; +import org.apache.ratis.protocol.ServerNotReadyException; +import org.apache.ratis.shaded.io.grpc.Metadata; +import org.apache.ratis.shaded.io.grpc.Status; +import org.apache.ratis.shaded.io.grpc.StatusRuntimeException; +import org.apache.ratis.shaded.io.grpc.stub.StreamObserver; +import org.apache.ratis.util.*; +import org.slf4j.Logger; + +import java.io.IOException; +import java.util.concurrent.CompletableFuture; +import java.util.function.Function; +import java.util.function.Supplier; + +public interface GrpcUtil { + Metadata.Key<String> EXCEPTION_TYPE_KEY = + Metadata.Key.of("exception-type", Metadata.ASCII_STRING_MARSHALLER); + Metadata.Key<String> CALL_ID = + Metadata.Key.of("call-id", Metadata.ASCII_STRING_MARSHALLER); + + static StatusRuntimeException wrapException(Throwable t) { + return wrapException(t, -1); + } + + static StatusRuntimeException wrapException(Throwable t, long callId) { + t = JavaUtils.unwrapCompletionException(t); + + Metadata trailers = new Metadata(); + trailers.put(EXCEPTION_TYPE_KEY, t.getClass().getCanonicalName()); + if (callId > 0) { + trailers.put(CALL_ID, String.valueOf(callId)); + } + return new StatusRuntimeException( + Status.INTERNAL.withCause(t).withDescription(t.getMessage()), trailers); + } + + static Throwable unwrapThrowable(Throwable t) { + if (t instanceof StatusRuntimeException) { + final IOException ioe = tryUnwrapException((StatusRuntimeException)t); + if (ioe != null) { + return ioe; + } + } + return t; + } + + static IOException unwrapException(StatusRuntimeException se) { + final IOException ioe = tryUnwrapException(se); + return ioe != null? ioe: new IOException(se); + } + + static IOException tryUnwrapException(StatusRuntimeException se) { + final Metadata trailers = se.getTrailers(); + final Status status = se.getStatus(); + if (trailers != null && status != null) { + final String className = trailers.get(EXCEPTION_TYPE_KEY); + if (className != null) { + try { + Class<?> clazz = Class.forName(className); + final Exception unwrapped = ReflectionUtils.instantiateException( + clazz.asSubclass(Exception.class), status.getDescription(), se); + return IOUtils.asIOException(unwrapped); + } catch (Exception e) { + se.addSuppressed(e); + return new IOException(se); + } + } + } + return null; + } + + static long getCallId(Throwable t) { + if (t instanceof StatusRuntimeException) { + final Metadata trailers = ((StatusRuntimeException)t).getTrailers(); + String callId = trailers.get(CALL_ID); + return callId != null ? Integer.parseInt(callId) : -1; + } + return -1; + } + + static IOException unwrapIOException(Throwable t) { + final IOException e; + if (t instanceof StatusRuntimeException) { + e = GrpcUtil.unwrapException((StatusRuntimeException) t); + } else { + e = IOUtils.asIOException(t); + } + return e; + } + + static <REPLY extends RaftClientReply, REPLY_PROTO> void asyncCall( + StreamObserver<REPLY_PROTO> responseObserver, + CheckedSupplier<CompletableFuture<REPLY>, IOException> supplier, + Function<REPLY, REPLY_PROTO> toProto) { + try { + supplier.get().whenCompleteAsync((reply, exception) -> { + if (exception != null) { + responseObserver.onError(GrpcUtil.wrapException(exception)); + } else { + responseObserver.onNext(toProto.apply(reply)); + responseObserver.onCompleted(); + } + }); + } catch (Exception e) { + responseObserver.onError(GrpcUtil.wrapException(e)); + } + } + + static void warn(Logger log, Supplier<String> message, Throwable t) { + LogUtils.warn(log, message, unwrapThrowable(t), StatusRuntimeException.class, ServerNotReadyException.class); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ed8e60da/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java deleted file mode 100644 index d638a45..0000000 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java +++ /dev/null @@ -1,154 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.ratis.grpc; - -import org.apache.ratis.grpc.client.RaftClientProtocolService; -import org.apache.ratis.grpc.server.AdminProtocolService; -import org.apache.ratis.grpc.server.RaftServerProtocolClient; -import org.apache.ratis.grpc.server.RaftServerProtocolService; -import org.apache.ratis.protocol.RaftPeerId; -import org.apache.ratis.rpc.SupportedRpcType; -import org.apache.ratis.server.RaftServer; -import org.apache.ratis.server.RaftServerConfigKeys; -import org.apache.ratis.server.RaftServerRpc; -import org.apache.ratis.server.impl.RaftServerRpcWithProxy; -import org.apache.ratis.shaded.io.grpc.Server; -import org.apache.ratis.shaded.io.grpc.netty.NettyServerBuilder; -import org.apache.ratis.shaded.proto.RaftProtos.*; -import org.apache.ratis.util.*; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.util.function.Supplier; - -/** A grpc implementation of {@link RaftServerRpc}. */ -public class RaftGRpcService extends RaftServerRpcWithProxy<RaftServerProtocolClient, PeerProxyMap<RaftServerProtocolClient>> { - static final Logger LOG = LoggerFactory.getLogger(RaftGRpcService.class); - public static final String GRPC_SEND_SERVER_REQUEST = - RaftGRpcService.class.getSimpleName() + ".sendRequest"; - - public static class Builder extends RaftServerRpc.Builder<Builder,RaftGRpcService> { - private Builder() {} - - @Override - public Builder getThis() { - return this; - } - - @Override - public RaftGRpcService build() { - return new RaftGRpcService(getServer()); - } - } - - public static Builder newBuilder() { - return new Builder(); - } - - private final Server server; - private final Supplier<InetSocketAddress> addressSupplier; - - private RaftGRpcService(RaftServer server) { - this(server, server::getId, - GrpcConfigKeys.Server.port(server.getProperties()), - GrpcConfigKeys.messageSizeMax(server.getProperties(), LOG::info), - RaftServerConfigKeys.Log.Appender.bufferCapacity(server.getProperties()), - GrpcConfigKeys.flowControlWindow(server.getProperties(), LOG::info), - RaftServerConfigKeys.Rpc.requestTimeout(server.getProperties())); - } - private RaftGRpcService(RaftServer raftServer, Supplier<RaftPeerId> idSupplier, int port, - SizeInBytes grpcMessageSizeMax, SizeInBytes appenderBufferSize, - SizeInBytes flowControlWindow, TimeDuration requestTimeoutDuration) { - super(idSupplier, id -> new PeerProxyMap<>(id.toString(), - p -> new RaftServerProtocolClient(p, flowControlWindow.getSizeInt(), requestTimeoutDuration))); - if (appenderBufferSize.getSize() > grpcMessageSizeMax.getSize()) { - throw new IllegalArgumentException("Illegal configuration: " - + RaftServerConfigKeys.Log.Appender.BUFFER_CAPACITY_KEY + " = " + appenderBufferSize - + " > " + GrpcConfigKeys.MESSAGE_SIZE_MAX_KEY + " = " + grpcMessageSizeMax); - } - - server = NettyServerBuilder.forPort(port) - .maxInboundMessageSize(grpcMessageSizeMax.getSizeInt()) - .flowControlWindow(flowControlWindow.getSizeInt()) - .addService(new RaftServerProtocolService(idSupplier, raftServer)) - .addService(new RaftClientProtocolService(idSupplier, raftServer)) - .addService(new AdminProtocolService(raftServer)) - .build(); - addressSupplier = JavaUtils.memoize(() -> new InetSocketAddress(port != 0? port: server.getPort())); - } - - @Override - public SupportedRpcType getRpcType() { - return SupportedRpcType.GRPC; - } - - @Override - public void startImpl() { - try { - server.start(); - } catch (IOException e) { - ExitUtils.terminate(1, "Failed to start Grpc server", e, LOG); - } - LOG.info("{}: {} started, listening on {}", getId(), getClass().getSimpleName(), getInetSocketAddress()); - } - - @Override - public void closeImpl() throws IOException { - final String name = getId() + ": shutdown server with port " + server.getPort(); - LOG.info("{} now", name); - final Server s = server.shutdownNow(); - super.closeImpl(); - try { - s.awaitTermination(); - } catch(InterruptedException e) { - throw IOUtils.toInterruptedIOException(name + " failed", e); - } - LOG.info("{} successfully", name); - } - - @Override - public InetSocketAddress getInetSocketAddress() { - return addressSupplier.get(); - } - - @Override - public AppendEntriesReplyProto appendEntries( - AppendEntriesRequestProto request) throws IOException { - throw new UnsupportedOperationException( - "Blocking AppendEntries call is not supported"); - } - - @Override - public InstallSnapshotReplyProto installSnapshot( - InstallSnapshotRequestProto request) throws IOException { - throw new UnsupportedOperationException( - "Blocking InstallSnapshot call is not supported"); - } - - @Override - public RequestVoteReplyProto requestVote(RequestVoteRequestProto request) - throws IOException { - CodeInjectionForTesting.execute(GRPC_SEND_SERVER_REQUEST, getId(), - null, request); - - final RaftPeerId target = RaftPeerId.valueOf(request.getServerRequest().getReplyId()); - return getProxies().getProxy(target).requestVote(request); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ed8e60da/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGrpcUtil.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGrpcUtil.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGrpcUtil.java deleted file mode 100644 index ecbbf44..0000000 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGrpcUtil.java +++ /dev/null @@ -1,131 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.ratis.grpc; - -import org.apache.ratis.protocol.RaftClientReply; -import org.apache.ratis.protocol.ServerNotReadyException; -import org.apache.ratis.shaded.io.grpc.Metadata; -import org.apache.ratis.shaded.io.grpc.Status; -import org.apache.ratis.shaded.io.grpc.StatusRuntimeException; -import org.apache.ratis.shaded.io.grpc.stub.StreamObserver; -import org.apache.ratis.util.*; -import org.slf4j.Logger; - -import java.io.IOException; -import java.util.concurrent.CompletableFuture; -import java.util.function.Function; -import java.util.function.Supplier; - -public interface RaftGrpcUtil { - Metadata.Key<String> EXCEPTION_TYPE_KEY = - Metadata.Key.of("exception-type", Metadata.ASCII_STRING_MARSHALLER); - Metadata.Key<String> CALL_ID = - Metadata.Key.of("call-id", Metadata.ASCII_STRING_MARSHALLER); - - static StatusRuntimeException wrapException(Throwable t) { - return wrapException(t, -1); - } - - static StatusRuntimeException wrapException(Throwable t, long callId) { - t = JavaUtils.unwrapCompletionException(t); - - Metadata trailers = new Metadata(); - trailers.put(EXCEPTION_TYPE_KEY, t.getClass().getCanonicalName()); - if (callId > 0) { - trailers.put(CALL_ID, String.valueOf(callId)); - } - return new StatusRuntimeException( - Status.INTERNAL.withCause(t).withDescription(t.getMessage()), trailers); - } - - static Throwable unwrapThrowable(Throwable t) { - if (t instanceof StatusRuntimeException) { - final IOException ioe = tryUnwrapException((StatusRuntimeException)t); - if (ioe != null) { - return ioe; - } - } - return t; - } - - static IOException unwrapException(StatusRuntimeException se) { - final IOException ioe = tryUnwrapException(se); - return ioe != null? ioe: new IOException(se); - } - - static IOException tryUnwrapException(StatusRuntimeException se) { - final Metadata trailers = se.getTrailers(); - final Status status = se.getStatus(); - if (trailers != null && status != null) { - final String className = trailers.get(EXCEPTION_TYPE_KEY); - if (className != null) { - try { - Class<?> clazz = Class.forName(className); - final Exception unwrapped = ReflectionUtils.instantiateException( - clazz.asSubclass(Exception.class), status.getDescription(), se); - return IOUtils.asIOException(unwrapped); - } catch (Exception e) { - se.addSuppressed(e); - return new IOException(se); - } - } - } - return null; - } - - static long getCallId(Throwable t) { - if (t instanceof StatusRuntimeException) { - final Metadata trailers = ((StatusRuntimeException)t).getTrailers(); - String callId = trailers.get(CALL_ID); - return callId != null ? Integer.parseInt(callId) : -1; - } - return -1; - } - - static IOException unwrapIOException(Throwable t) { - final IOException e; - if (t instanceof StatusRuntimeException) { - e = RaftGrpcUtil.unwrapException((StatusRuntimeException) t); - } else { - e = IOUtils.asIOException(t); - } - return e; - } - - static <REPLY extends RaftClientReply, REPLY_PROTO> void asyncCall( - StreamObserver<REPLY_PROTO> responseObserver, - CheckedSupplier<CompletableFuture<REPLY>, IOException> supplier, - Function<REPLY, REPLY_PROTO> toProto) { - try { - supplier.get().whenCompleteAsync((reply, exception) -> { - if (exception != null) { - responseObserver.onError(RaftGrpcUtil.wrapException(exception)); - } else { - responseObserver.onNext(toProto.apply(reply)); - responseObserver.onCompleted(); - } - }); - } catch (Exception e) { - responseObserver.onError(RaftGrpcUtil.wrapException(e)); - } - } - - static void warn(Logger log, Supplier<String> message, Throwable t) { - LogUtils.warn(log, message, unwrapThrowable(t), StatusRuntimeException.class, ServerNotReadyException.class); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ed8e60da/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java deleted file mode 100644 index 3068751..0000000 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java +++ /dev/null @@ -1,390 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.ratis.grpc.client; - -import org.apache.ratis.client.impl.ClientProtoUtils; -import org.apache.ratis.conf.RaftProperties; -import org.apache.ratis.grpc.GrpcConfigKeys; -import org.apache.ratis.grpc.RaftGrpcUtil; -import org.apache.ratis.protocol.*; -import org.apache.ratis.shaded.com.google.protobuf.ByteString; -import org.apache.ratis.shaded.proto.RaftProtos.RaftClientReplyProto; -import org.apache.ratis.shaded.proto.RaftProtos.RaftClientRequestProto; -import org.apache.ratis.shaded.proto.RaftProtos.RaftRpcRequestProto; -import org.apache.ratis.util.*; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.Closeable; -import java.io.IOException; -import java.util.*; -import java.util.concurrent.ConcurrentLinkedDeque; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.Function; -import java.util.stream.Collectors; - -public class AppendStreamer implements Closeable { - public static final Logger LOG = LoggerFactory.getLogger(AppendStreamer.class); - - enum RunningState {RUNNING, LOOK_FOR_LEADER, CLOSED, ERROR} - - private static class ExceptionAndRetry { - private final Map<RaftPeerId, IOException> exceptionMap = new HashMap<>(); - private final AtomicInteger retryTimes = new AtomicInteger(0); - private final int maxRetryTimes; - private final TimeDuration retryInterval; - - ExceptionAndRetry(RaftProperties prop) { - maxRetryTimes = GrpcConfigKeys.OutputStream.retryTimes(prop); - retryInterval = GrpcConfigKeys.OutputStream.retryInterval(prop); - } - - void addException(RaftPeerId peer, IOException e) { - exceptionMap.put(peer, e); - retryTimes.incrementAndGet(); - } - - IOException getCombinedException() { - return new IOException("Exceptions: " + exceptionMap); - } - - boolean shouldRetry() { - return retryTimes.get() <= maxRetryTimes; - } - } - - private final Deque<RaftClientRequestProto> dataQueue; - private final Deque<RaftClientRequestProto> ackQueue; - private final int maxPendingNum; - private final SizeInBytes maxMessageSize; - - private final PeerProxyMap<RaftClientProtocolProxy> proxyMap; - private final Map<RaftPeerId, RaftPeer> peers; - private RaftPeerId leaderId; - private volatile RaftClientProtocolProxy leaderProxy; - private final ClientId clientId; - - private volatile RunningState running = RunningState.RUNNING; - private final ExceptionAndRetry exceptionAndRetry; - private final Sender senderThread; - private final RaftGroupId groupId; - - AppendStreamer(RaftProperties prop, RaftGroup group, - RaftPeerId leaderId, ClientId clientId) { - this.clientId = clientId; - maxPendingNum = GrpcConfigKeys.OutputStream.outstandingAppendsMax(prop); - maxMessageSize = GrpcConfigKeys.messageSizeMax(prop, LOG::debug); - dataQueue = new ConcurrentLinkedDeque<>(); - ackQueue = new ConcurrentLinkedDeque<>(); - exceptionAndRetry = new ExceptionAndRetry(prop); - - this.groupId = group.getGroupId(); - this.peers = group.getPeers().stream().collect( - Collectors.toMap(RaftPeer::getId, Function.identity())); - proxyMap = new PeerProxyMap<>(clientId.toString(), - raftPeer -> new RaftClientProtocolProxy(clientId, raftPeer, ResponseHandler::new, - prop)); - proxyMap.addPeers(group.getPeers()); - refreshLeaderProxy(leaderId, null); - - senderThread = new Sender(); - senderThread.setName(this.toString() + "-sender"); - senderThread.start(); - } - - private synchronized void refreshLeaderProxy(RaftPeerId suggested, - RaftPeerId oldLeader) { - if (suggested != null) { - leaderId = suggested; - } else { - if (oldLeader == null) { - leaderId = peers.keySet().iterator().next(); - } else { - leaderId = CollectionUtils.random(oldLeader, peers.keySet()); - if (leaderId == null) { - leaderId = oldLeader; - } - } - } - LOG.debug("{} switches leader from {} to {}. suggested leader: {}", this, - oldLeader, leaderId, suggested); - if (leaderProxy != null) { - leaderProxy.closeCurrentSession(); - } - try { - leaderProxy = proxyMap.getProxy(leaderId); - } catch (IOException e) { - LOG.error("Should not hit IOException here", e); - refreshLeader(null, leaderId); - } - } - - private boolean isRunning() { - return running == RunningState.RUNNING || - running == RunningState.LOOK_FOR_LEADER; - } - - private void checkState() throws IOException { - if (!isRunning()) { - throwException("The AppendStreamer has been closed"); - } - } - - synchronized void write(ByteString content, long seqNum) - throws IOException { - checkState(); - while (isRunning() && dataQueue.size() >= maxPendingNum) { - try { - wait(); - } catch (InterruptedException ignored) { - } - } - if (isRunning()) { - // wrap the current buffer into a RaftClientRequestProto - final RaftClientRequestProto request = ClientProtoUtils.toRaftClientRequestProto( - clientId, leaderId, groupId, seqNum, seqNum, content); - if (request.getSerializedSize() > maxMessageSize.getSizeInt()) { - throw new IOException("msg size:" + request.getSerializedSize() + - " exceeds maximum:" + maxMessageSize.getSizeInt()); - } - dataQueue.offer(request); - this.notifyAll(); - } else { - throwException(this + " got closed."); - } - } - - synchronized void flush() throws IOException { - checkState(); - if (dataQueue.isEmpty() && ackQueue.isEmpty()) { - return; - } - // wait for the pending Q to become empty - while (isRunning() && (!dataQueue.isEmpty() || !ackQueue.isEmpty())) { - try { - wait(); - } catch (InterruptedException ignored) { - } - } - if (!isRunning() && (!dataQueue.isEmpty() || !ackQueue.isEmpty())) { - throwException(this + " got closed before finishing flush"); - } - } - - @Override - public void close() throws IOException { - if (!isRunning()) { - return; - } - flush(); - - running = RunningState.CLOSED; - senderThread.interrupt(); - try { - senderThread.join(); - } catch (InterruptedException ignored) { - } - proxyMap.close(); - } - - @Override - public String toString() { - return this.getClass().getSimpleName() + "-" + clientId; - } - - private class Sender extends Daemon { - @Override - public void run() { - while (isRunning()) { - - synchronized (AppendStreamer.this) { - while (isRunning() && shouldWait()) { - try { - AppendStreamer.this.wait(); - } catch (InterruptedException ignored) { - } - } - if (running == RunningState.RUNNING) { - Preconditions.assertTrue(!dataQueue.isEmpty(), "dataQueue is empty"); - RaftClientRequestProto next = dataQueue.poll(); - leaderProxy.onNext(next); - ackQueue.offer(next); - } - } - } - } - - private boolean shouldWait() { - // the sender should wait if any of the following is true - // 1) there is no data to send - // 2) there are too many outstanding pending requests - // 3) Error/NotLeaderException just happened, we're still waiting for - // the first response to confirm the new leader - return dataQueue.isEmpty() || ackQueue.size() >= maxPendingNum || - running == RunningState.LOOK_FOR_LEADER; - } - } - - /** the response handler for stream RPC */ - private class ResponseHandler implements - RaftClientProtocolProxy.CloseableStreamObserver { - private final RaftPeerId targetId; - // once handled the first NotLeaderException or Error, the handler should - // be inactive and should not make any further action. - private volatile boolean active = true; - - ResponseHandler(RaftPeer target) { - targetId = target.getId(); - } - - @Override - public String toString() { - return AppendStreamer.this + "-ResponseHandler-" + targetId; - } - - @Override - public void onNext(RaftClientReplyProto reply) { - if (!active) { - return; - } - synchronized (AppendStreamer.this) { - RaftClientRequestProto pending = Objects.requireNonNull(ackQueue.peek()); - if (reply.getRpcReply().getSuccess()) { - Preconditions.assertTrue(pending.getRpcRequest().getCallId() == reply.getRpcReply().getCallId(), - () -> "pending=" + ClientProtoUtils.toString(pending) + " but reply=" + ClientProtoUtils.toString(reply)); - ackQueue.poll(); - if (LOG.isTraceEnabled()) { - LOG.trace("{} received success ack for {}", this, ClientProtoUtils.toString(pending)); - } - // we've identified the correct leader - if (running == RunningState.LOOK_FOR_LEADER) { - running = RunningState.RUNNING; - } - } else { - // this may be a NotLeaderException - RaftClientReply r = ClientProtoUtils.toRaftClientReply(reply); - final NotLeaderException nle = r.getNotLeaderException(); - if (nle != null) { - LOG.debug("{} received a NotLeaderException from {}", this, - r.getServerId()); - handleNotLeader(nle, targetId); - } - } - AppendStreamer.this.notifyAll(); - } - } - - @Override - public void onError(Throwable t) { - LOG.warn(this + " onError", t); - if (active) { - synchronized (AppendStreamer.this) { - handleError(t, this); - AppendStreamer.this.notifyAll(); - } - } - } - - @Override - public void onCompleted() { - LOG.info("{} onCompleted, pending requests #: {}", this, - ackQueue.size()); - } - - @Override // called by handleError and handleNotLeader - public void close() throws IOException { - active = false; - } - } - - private void throwException(String msg) throws IOException { - if (running == RunningState.ERROR) { - throw exceptionAndRetry.getCombinedException(); - } else { - throw new IOException(msg); - } - } - - private void handleNotLeader(NotLeaderException nle, - RaftPeerId oldLeader) { - Preconditions.assertTrue(Thread.holdsLock(AppendStreamer.this)); - // handle NotLeaderException: refresh leader and RaftConfiguration - refreshPeers(nle.getPeers()); - - refreshLeader(nle.getSuggestedLeader().getId(), oldLeader); - } - - private void handleError(Throwable t, ResponseHandler handler) { - Preconditions.assertTrue(Thread.holdsLock(AppendStreamer.this)); - final IOException e = RaftGrpcUtil.unwrapIOException(t); - - exceptionAndRetry.addException(handler.targetId, e); - LOG.debug("{} got error: {}. Total retry times {}, max retry times {}.", - handler, e, exceptionAndRetry.retryTimes.get(), - exceptionAndRetry.maxRetryTimes); - - leaderProxy.onError(); - if (exceptionAndRetry.shouldRetry()) { - refreshLeader(null, leaderId); - } else { - running = RunningState.ERROR; - } - } - - private void refreshLeader(RaftPeerId suggestedLeader, RaftPeerId oldLeader) { - running = RunningState.LOOK_FOR_LEADER; - refreshLeaderProxy(suggestedLeader, oldLeader); - reQueuePendingRequests(leaderId); - - final RaftClientRequestProto request = Objects.requireNonNull( - dataQueue.poll()); - ackQueue.offer(request); - try { - exceptionAndRetry.retryInterval.sleep(); - } catch (InterruptedException ignored) { - } - leaderProxy.onNext(request); - } - - private void reQueuePendingRequests(RaftPeerId newLeader) { - if (isRunning()) { - // resend all the pending requests - while (!ackQueue.isEmpty()) { - final RaftClientRequestProto oldRequest = ackQueue.pollLast(); - final RaftRpcRequestProto.Builder newRpc = RaftRpcRequestProto.newBuilder(oldRequest.getRpcRequest()) - .setReplyId(newLeader.toByteString()); - final RaftClientRequestProto newRequest = RaftClientRequestProto.newBuilder(oldRequest) - .setRpcRequest(newRpc).build(); - dataQueue.offerFirst(newRequest); - } - } - } - - private void refreshPeers(RaftPeer[] newPeers) { - if (newPeers != null && newPeers.length > 0) { - // we only add new peers, we do not remove any peer even if it no longer - // belongs to the current raft conf - Arrays.stream(newPeers).forEach(peer -> { - peers.putIfAbsent(peer.getId(), peer); - proxyMap.computeIfAbsent(peer); - }); - - LOG.debug("refreshed peers: {}", peers); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ed8e60da/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java new file mode 100644 index 0000000..a2e53bf --- /dev/null +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java @@ -0,0 +1,234 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.grpc.client; + +import org.apache.ratis.client.RaftClientConfigKeys; +import org.apache.ratis.client.impl.ClientProtoUtils; +import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.grpc.GrpcConfigKeys; +import org.apache.ratis.grpc.GrpcUtil; +import org.apache.ratis.protocol.*; +import org.apache.ratis.util.TimeoutScheduler; +import org.apache.ratis.shaded.io.grpc.ManagedChannel; +import org.apache.ratis.shaded.io.grpc.StatusRuntimeException; +import org.apache.ratis.shaded.io.grpc.netty.NegotiationType; +import org.apache.ratis.shaded.io.grpc.netty.NettyChannelBuilder; +import org.apache.ratis.shaded.io.grpc.stub.StreamObserver; +import org.apache.ratis.shaded.proto.RaftProtos.*; +import org.apache.ratis.shaded.proto.grpc.AdminProtocolServiceGrpc; +import org.apache.ratis.shaded.proto.grpc.AdminProtocolServiceGrpc.AdminProtocolServiceBlockingStub; +import org.apache.ratis.shaded.proto.grpc.RaftClientProtocolServiceGrpc; +import org.apache.ratis.shaded.proto.grpc.RaftClientProtocolServiceGrpc.RaftClientProtocolServiceBlockingStub; +import org.apache.ratis.shaded.proto.grpc.RaftClientProtocolServiceGrpc.RaftClientProtocolServiceStub; +import org.apache.ratis.util.CheckedSupplier; +import org.apache.ratis.util.CollectionUtils; +import org.apache.ratis.util.JavaUtils; +import org.apache.ratis.util.SizeInBytes; +import org.apache.ratis.util.TimeDuration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; +import java.util.function.Supplier; + +public class GrpcClientProtocolClient implements Closeable { + public static final Logger LOG = LoggerFactory.getLogger(GrpcClientProtocolClient.class); + + private final Supplier<String> name; + private final RaftPeer target; + private final ManagedChannel channel; + + private final TimeDuration requestTimeoutDuration; + private final TimeoutScheduler scheduler = TimeoutScheduler.newInstance(1); + + private final RaftClientProtocolServiceBlockingStub blockingStub; + private final RaftClientProtocolServiceStub asyncStub; + private final AdminProtocolServiceBlockingStub adminBlockingStub; + + private final AtomicReference<AsyncStreamObservers> appendStreamObservers = new AtomicReference<>(); + + public GrpcClientProtocolClient(ClientId id, RaftPeer target, RaftProperties properties) { + this.name = JavaUtils.memoize(() -> id + "->" + target.getId()); + this.target = target; + + final SizeInBytes flowControlWindow = GrpcConfigKeys.flowControlWindow(properties, LOG::debug); + final SizeInBytes maxMessageSize = GrpcConfigKeys.messageSizeMax(properties, LOG::debug); + channel = NettyChannelBuilder.forTarget(target.getAddress()) + .negotiationType(NegotiationType.PLAINTEXT) + .flowControlWindow(flowControlWindow.getSizeInt()) + .maxInboundMessageSize(maxMessageSize.getSizeInt()) + .build(); + blockingStub = RaftClientProtocolServiceGrpc.newBlockingStub(channel); + asyncStub = RaftClientProtocolServiceGrpc.newStub(channel); + adminBlockingStub = AdminProtocolServiceGrpc.newBlockingStub(channel); + this.requestTimeoutDuration = RaftClientConfigKeys.Rpc.requestTimeout(properties); + } + + String getName() { + return name.get(); + } + + @Override + public void close() { + final AsyncStreamObservers observers = appendStreamObservers.get(); + if (observers != null) { + observers.close(); + } + channel.shutdownNow(); + } + + RaftClientReplyProto groupAdd(GroupManagementRequestProto request) throws IOException { + return blockingCall(() -> adminBlockingStub + .withDeadlineAfter(requestTimeoutDuration.getDuration(), requestTimeoutDuration.getUnit()) + .groupManagement(request)); + } + + ServerInformationReplyProto serverInformation(ServerInformationRequestProto request) { + return adminBlockingStub + .withDeadlineAfter(requestTimeoutDuration.getDuration(), requestTimeoutDuration.getUnit()) + .serverInformation(request); + } + + RaftClientReplyProto setConfiguration( + SetConfigurationRequestProto request) throws IOException { + return blockingCall(() -> blockingStub + .withDeadlineAfter(requestTimeoutDuration.getDuration(), requestTimeoutDuration.getUnit()) + .setConfiguration(request)); + } + + private static RaftClientReplyProto blockingCall( + CheckedSupplier<RaftClientReplyProto, StatusRuntimeException> supplier + ) throws IOException { + try { + return supplier.get(); + } catch (StatusRuntimeException e) { + throw GrpcUtil.unwrapException(e); + } + } + + StreamObserver<RaftClientRequestProto> append( + StreamObserver<RaftClientReplyProto> responseHandler) { + return asyncStub.append(responseHandler); + } + + StreamObserver<RaftClientRequestProto> appendWithTimeout( + StreamObserver<RaftClientReplyProto> responseHandler) { + return asyncStub.withDeadlineAfter(requestTimeoutDuration.getDuration(), requestTimeoutDuration.getUnit()) + .append(responseHandler); + } + + AsyncStreamObservers getAppendStreamObservers() { + return appendStreamObservers.updateAndGet(a -> a != null? a : new AsyncStreamObservers()); + } + + public RaftPeer getTarget() { + return target; + } + + class AsyncStreamObservers implements Closeable { + /** Request map: callId -> future */ + private final AtomicReference<Map<Long, CompletableFuture<RaftClientReply>>> replies = new AtomicReference<>(new ConcurrentHashMap<>()); + private final StreamObserver<RaftClientReplyProto> replyStreamObserver = new StreamObserver<RaftClientReplyProto>() { + @Override + public void onNext(RaftClientReplyProto proto) { + final long callId = proto.getRpcReply().getCallId(); + try { + final RaftClientReply reply = ClientProtoUtils.toRaftClientReply(proto); + final NotLeaderException nle = reply.getNotLeaderException(); + if (nle != null) { + completeReplyExceptionally(nle, NotLeaderException.class.getName()); + return; + } + handleReplyFuture(callId, f -> f.complete(reply)); + } catch (Throwable t) { + handleReplyFuture(callId, f -> f.completeExceptionally(t)); + } + } + + @Override + public void onError(Throwable t) { + final IOException ioe = GrpcUtil.unwrapIOException(t); + completeReplyExceptionally(ioe, "onError"); + } + + @Override + public void onCompleted() { + completeReplyExceptionally(null, "completed"); + } + }; + private final StreamObserver<RaftClientRequestProto> requestStreamObserver = append(replyStreamObserver); + + CompletableFuture<RaftClientReply> onNext(RaftClientRequest request) { + final Map<Long, CompletableFuture<RaftClientReply>> map = replies.get(); + if (map == null) { + return JavaUtils.completeExceptionally(new IOException("Already closed.")); + } + final CompletableFuture<RaftClientReply> f = new CompletableFuture<>(); + CollectionUtils.putNew(request.getCallId(), f, map, + () -> getName() + ":" + getClass().getSimpleName()); + try { + requestStreamObserver.onNext(ClientProtoUtils.toRaftClientRequestProto(request)); + scheduler.onTimeout(requestTimeoutDuration, () -> timeoutCheck(request), LOG, + () -> "Timeout check failed for client request: " + request); + } catch(Throwable t) { + handleReplyFuture(request.getCallId(), future -> future.completeExceptionally(t)); + } + return f; + } + + private void timeoutCheck(RaftClientRequest request) { + handleReplyFuture(request.getCallId(), f -> f.completeExceptionally( + new IOException("Request timeout " + requestTimeoutDuration + ": " + request))); + } + + private void handleReplyFuture(long callId, Consumer<CompletableFuture<RaftClientReply>> handler) { + Optional.ofNullable(replies.get()) + .map(replyMap -> replyMap.remove(callId)) + .ifPresent(handler); + } + + @Override + public void close() { + requestStreamObserver.onCompleted(); + completeReplyExceptionally(null, "close"); + } + + private void completeReplyExceptionally(Throwable t, String event) { + appendStreamObservers.compareAndSet(this, null); + final Map<Long, CompletableFuture<RaftClientReply>> map = replies.getAndSet(null); + if (map == null) { + return; + } + for (Map.Entry<Long, CompletableFuture<RaftClientReply>> entry : map.entrySet()) { + final CompletableFuture<RaftClientReply> f = entry.getValue(); + if (!f.isDone()) { + f.completeExceptionally(t != null? t + : new IOException(getName() + ": Stream " + event + + ": no reply for async request cid=" + entry.getKey())); + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ed8e60da/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolProxy.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolProxy.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolProxy.java new file mode 100644 index 0000000..156e6c3 --- /dev/null +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolProxy.java @@ -0,0 +1,107 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.grpc.client; + +import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.protocol.ClientId; +import org.apache.ratis.shaded.io.grpc.stub.StreamObserver; +import org.apache.ratis.shaded.proto.RaftProtos.RaftClientReplyProto; +import org.apache.ratis.shaded.proto.RaftProtos.RaftClientRequestProto; +import org.apache.ratis.protocol.RaftPeer; + +import java.io.Closeable; +import java.io.IOException; +import java.util.function.Function; + +public class GrpcClientProtocolProxy implements Closeable { + private final GrpcClientProtocolClient proxy; + private final Function<RaftPeer, CloseableStreamObserver> responseHandlerCreation; + private RpcSession currentSession; + + public GrpcClientProtocolProxy(ClientId clientId, RaftPeer target, + Function<RaftPeer, CloseableStreamObserver> responseHandlerCreation, + RaftProperties properties) { + proxy = new GrpcClientProtocolClient(clientId, target, properties); + this.responseHandlerCreation = responseHandlerCreation; + } + + @Override + public void close() throws IOException { + closeCurrentSession(); + proxy.close(); + } + + @Override + public String toString() { + return "ProxyTo:" + proxy.getTarget(); + } + + public void closeCurrentSession() { + if (currentSession != null) { + currentSession.close(); + currentSession = null; + } + } + + public void onNext(RaftClientRequestProto request) { + if (currentSession == null) { + currentSession = new RpcSession( + responseHandlerCreation.apply(proxy.getTarget())); + } + currentSession.requestObserver.onNext(request); + } + + public void onError() { + if (currentSession != null) { + currentSession.onError(); + } + } + + public interface CloseableStreamObserver + extends StreamObserver<RaftClientReplyProto>, Closeable { + } + + class RpcSession implements Closeable { + private final StreamObserver<RaftClientRequestProto> requestObserver; + private final CloseableStreamObserver responseHandler; + private boolean hasError = false; + + RpcSession(CloseableStreamObserver responseHandler) { + this.responseHandler = responseHandler; + this.requestObserver = proxy.append(responseHandler); + } + + void onError() { + hasError = true; + } + + @Override + public void close() { + if (!hasError) { + try { + requestObserver.onCompleted(); + } catch (Exception ignored) { + } + } + try { + responseHandler.close(); + } catch (IOException ignored) { + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ed8e60da/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolService.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolService.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolService.java new file mode 100644 index 0000000..22f7f56 --- /dev/null +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolService.java @@ -0,0 +1,195 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.grpc.client; + +import org.apache.ratis.client.impl.ClientProtoUtils; +import org.apache.ratis.grpc.GrpcUtil; +import org.apache.ratis.protocol.*; +import org.apache.ratis.shaded.io.grpc.stub.StreamObserver; +import org.apache.ratis.shaded.proto.RaftProtos.RaftClientReplyProto; +import org.apache.ratis.shaded.proto.RaftProtos.RaftClientRequestProto; +import org.apache.ratis.shaded.proto.RaftProtos.SetConfigurationRequestProto; +import org.apache.ratis.shaded.proto.grpc.RaftClientProtocolServiceGrpc.RaftClientProtocolServiceImplBase; +import org.apache.ratis.util.JavaUtils; +import org.apache.ratis.util.Preconditions; +import org.apache.ratis.util.SlidingWindow; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.concurrent.CompletionException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Supplier; + +public class GrpcClientProtocolService extends RaftClientProtocolServiceImplBase { + public static final Logger LOG = LoggerFactory.getLogger(GrpcClientProtocolService.class); + + private static class PendingAppend implements SlidingWindow.Request<RaftClientReply> { + private final RaftClientRequest request; + private volatile RaftClientReply reply; + + PendingAppend(RaftClientRequest request) { + this.request = request; + } + + @Override + public boolean hasReply() { + return reply != null || this == COMPLETED; + } + + @Override + public void setReply(RaftClientReply reply) { + this.reply = reply; + } + + RaftClientReply getReply() { + return reply; + } + + RaftClientRequest getRequest() { + return request; + } + + @Override + public long getSeqNum() { + return request != null? request.getSeqNum(): Long.MAX_VALUE; + } + + @Override + public String toString() { + return request != null? getSeqNum() + ":" + reply: "COMPLETED"; + } + } + private static final PendingAppend COMPLETED = new PendingAppend(null); + + private final Supplier<RaftPeerId> idSupplier; + private final RaftClientAsynchronousProtocol protocol; + + public GrpcClientProtocolService(Supplier<RaftPeerId> idSupplier, RaftClientAsynchronousProtocol protocol) { + this.idSupplier = idSupplier; + this.protocol = protocol; + } + + RaftPeerId getId() { + return idSupplier.get(); + } + + @Override + public void setConfiguration(SetConfigurationRequestProto proto, + StreamObserver<RaftClientReplyProto> responseObserver) { + final SetConfigurationRequest request = ClientProtoUtils.toSetConfigurationRequest(proto); + GrpcUtil.asyncCall(responseObserver, () -> protocol.setConfigurationAsync(request), + ClientProtoUtils::toRaftClientReplyProto); + } + + @Override + public StreamObserver<RaftClientRequestProto> append( + StreamObserver<RaftClientReplyProto> responseObserver) { + return new AppendRequestStreamObserver(responseObserver); + } + + private final AtomicInteger streamCount = new AtomicInteger(); + + private class AppendRequestStreamObserver implements + StreamObserver<RaftClientRequestProto> { + private final String name = getId() + "-" + streamCount.getAndIncrement(); + private final StreamObserver<RaftClientReplyProto> responseObserver; + private final SlidingWindow.Server<PendingAppend, RaftClientReply> slidingWindow + = new SlidingWindow.Server<>(name, COMPLETED); + private final AtomicBoolean isClosed; + + AppendRequestStreamObserver(StreamObserver<RaftClientReplyProto> ro) { + LOG.debug("new AppendRequestStreamObserver {}", name); + this.responseObserver = ro; + this.isClosed = new AtomicBoolean(false); + } + + void processClientRequestAsync(PendingAppend pending) { + try { + protocol.submitClientRequestAsync(pending.getRequest() + ).thenAcceptAsync(reply -> slidingWindow.receiveReply( + pending.getSeqNum(), reply, this::sendReply, this::processClientRequestAsync) + ).exceptionally(exception -> { + // TODO: the exception may be from either raft or state machine. + // Currently we skip all the following responses when getting an + // exception from the state machine. + responseError(exception, () -> "processClientRequestAsync for " + pending.getRequest()); + return null; + }); + } catch (IOException e) { + throw new CompletionException("Failed processClientRequestAsync for " + pending.getRequest(), e); + } + } + + @Override + public void onNext(RaftClientRequestProto request) { + try { + final RaftClientRequest r = ClientProtoUtils.toRaftClientRequest(request); + final PendingAppend p = new PendingAppend(r); + slidingWindow.receivedRequest(p, this::processClientRequestAsync); + } catch (Throwable e) { + responseError(e, () -> "onNext for " + ClientProtoUtils.toString(request)); + } + } + + private void sendReply(PendingAppend ready) { + Preconditions.assertTrue(ready.hasReply()); + if (ready == COMPLETED) { + close(); + } else { + LOG.debug("{}: sendReply seq={}, {}", name, ready.getSeqNum(), ready.getReply()); + responseObserver.onNext( + ClientProtoUtils.toRaftClientReplyProto(ready.getReply())); + } + } + + @Override + public void onError(Throwable t) { + // for now we just log a msg + GrpcUtil.warn(LOG, () -> name + ": onError", t); + slidingWindow.close(); + } + + @Override + public void onCompleted() { + if (slidingWindow.endOfRequests()) { + close(); + } + } + + private void close() { + if (isClosed.compareAndSet(false, true)) { + LOG.debug("{}: close", name); + responseObserver.onCompleted(); + slidingWindow.close(); + } + } + + void responseError(Throwable t, Supplier<String> message) { + if (isClosed.compareAndSet(false, true)) { + t = JavaUtils.unwrapCompletionException(t); + if (LOG.isDebugEnabled()) { + LOG.debug(name + ": Failed " + message.get(), t); + } + responseObserver.onError(GrpcUtil.wrapException(t)); + slidingWindow.close(); + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ed8e60da/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java index 48ab95d..47264e7 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java @@ -21,7 +21,7 @@ import org.apache.ratis.client.impl.ClientProtoUtils; import org.apache.ratis.client.impl.RaftClientRpcWithProxy; import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.grpc.GrpcConfigKeys; -import org.apache.ratis.grpc.RaftGrpcUtil; +import org.apache.ratis.grpc.GrpcUtil; import org.apache.ratis.protocol.ClientId; import org.apache.ratis.protocol.GroupManagementRequest; import org.apache.ratis.protocol.RaftClientReply; @@ -46,14 +46,14 @@ import java.io.InterruptedIOException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; -public class GrpcClientRpc extends RaftClientRpcWithProxy<RaftClientProtocolClient> { +public class GrpcClientRpc extends RaftClientRpcWithProxy<GrpcClientProtocolClient> { public static final Logger LOG = LoggerFactory.getLogger(GrpcClientRpc.class); private final ClientId clientId; private final int maxMessageSize; public GrpcClientRpc(ClientId clientId, RaftProperties properties) { - super(new PeerProxyMap<>(clientId.toString(), p -> new RaftClientProtocolClient(clientId, p, properties))); + super(new PeerProxyMap<>(clientId.toString(), p -> new GrpcClientProtocolClient(clientId, p, properties))); this.clientId = clientId; this.maxMessageSize = GrpcConfigKeys.messageSizeMax(properties, LOG::debug).getSizeInt(); } @@ -63,7 +63,7 @@ public class GrpcClientRpc extends RaftClientRpcWithProxy<RaftClientProtocolClie RaftClientRequest request) { final RaftPeerId serverId = request.getServerId(); try { - final RaftClientProtocolClient proxy = getProxies().getProxy(serverId); + final GrpcClientProtocolClient proxy = getProxies().getProxy(serverId); // Reuse the same grpc stream for all async calls. return proxy.getAppendStreamObservers().onNext(request); } catch (IOException e) { @@ -75,7 +75,7 @@ public class GrpcClientRpc extends RaftClientRpcWithProxy<RaftClientProtocolClie public RaftClientReply sendRequest(RaftClientRequest request) throws IOException { final RaftPeerId serverId = request.getServerId(); - final RaftClientProtocolClient proxy = getProxies().getProxy(serverId); + final GrpcClientProtocolClient proxy = getProxies().getProxy(serverId); if (request instanceof GroupManagementRequest) { final GroupManagementRequestProto proto = ClientProtoUtils.toGroupManagementRequestProto((GroupManagementRequest)request); return ClientProtoUtils.toRaftClientReply(proxy.groupAdd(proto)); @@ -102,7 +102,7 @@ public class GrpcClientRpc extends RaftClientRpcWithProxy<RaftClientProtocolClie } private CompletableFuture<RaftClientReply> sendRequest( - RaftClientRequest request, RaftClientProtocolClient proxy) throws IOException { + RaftClientRequest request, GrpcClientProtocolClient proxy) throws IOException { final RaftClientRequestProto requestProto = toRaftClientRequestProto(request); final CompletableFuture<RaftClientReplyProto> replyFuture = @@ -117,7 +117,7 @@ public class GrpcClientRpc extends RaftClientRpcWithProxy<RaftClientProtocolClie @Override public void onError(Throwable t) { - replyFuture.completeExceptionally(RaftGrpcUtil.unwrapIOException(t)); + replyFuture.completeExceptionally(GrpcUtil.unwrapIOException(t)); } @Override http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ed8e60da/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientStreamer.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientStreamer.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientStreamer.java new file mode 100644 index 0000000..5e5b941 --- /dev/null +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientStreamer.java @@ -0,0 +1,390 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.grpc.client; + +import org.apache.ratis.client.impl.ClientProtoUtils; +import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.grpc.GrpcConfigKeys; +import org.apache.ratis.grpc.GrpcUtil; +import org.apache.ratis.protocol.*; +import org.apache.ratis.shaded.com.google.protobuf.ByteString; +import org.apache.ratis.shaded.proto.RaftProtos.RaftClientReplyProto; +import org.apache.ratis.shaded.proto.RaftProtos.RaftClientRequestProto; +import org.apache.ratis.shaded.proto.RaftProtos.RaftRpcRequestProto; +import org.apache.ratis.util.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.IOException; +import java.util.*; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; +import java.util.stream.Collectors; + +public class GrpcClientStreamer implements Closeable { + public static final Logger LOG = LoggerFactory.getLogger(GrpcClientStreamer.class); + + enum RunningState {RUNNING, LOOK_FOR_LEADER, CLOSED, ERROR} + + private static class ExceptionAndRetry { + private final Map<RaftPeerId, IOException> exceptionMap = new HashMap<>(); + private final AtomicInteger retryTimes = new AtomicInteger(0); + private final int maxRetryTimes; + private final TimeDuration retryInterval; + + ExceptionAndRetry(RaftProperties prop) { + maxRetryTimes = GrpcConfigKeys.OutputStream.retryTimes(prop); + retryInterval = GrpcConfigKeys.OutputStream.retryInterval(prop); + } + + void addException(RaftPeerId peer, IOException e) { + exceptionMap.put(peer, e); + retryTimes.incrementAndGet(); + } + + IOException getCombinedException() { + return new IOException("Exceptions: " + exceptionMap); + } + + boolean shouldRetry() { + return retryTimes.get() <= maxRetryTimes; + } + } + + private final Deque<RaftClientRequestProto> dataQueue; + private final Deque<RaftClientRequestProto> ackQueue; + private final int maxPendingNum; + private final SizeInBytes maxMessageSize; + + private final PeerProxyMap<GrpcClientProtocolProxy> proxyMap; + private final Map<RaftPeerId, RaftPeer> peers; + private RaftPeerId leaderId; + private volatile GrpcClientProtocolProxy leaderProxy; + private final ClientId clientId; + + private volatile RunningState running = RunningState.RUNNING; + private final ExceptionAndRetry exceptionAndRetry; + private final Sender senderThread; + private final RaftGroupId groupId; + + GrpcClientStreamer(RaftProperties prop, RaftGroup group, + RaftPeerId leaderId, ClientId clientId) { + this.clientId = clientId; + maxPendingNum = GrpcConfigKeys.OutputStream.outstandingAppendsMax(prop); + maxMessageSize = GrpcConfigKeys.messageSizeMax(prop, LOG::debug); + dataQueue = new ConcurrentLinkedDeque<>(); + ackQueue = new ConcurrentLinkedDeque<>(); + exceptionAndRetry = new ExceptionAndRetry(prop); + + this.groupId = group.getGroupId(); + this.peers = group.getPeers().stream().collect( + Collectors.toMap(RaftPeer::getId, Function.identity())); + proxyMap = new PeerProxyMap<>(clientId.toString(), + raftPeer -> new GrpcClientProtocolProxy(clientId, raftPeer, ResponseHandler::new, + prop)); + proxyMap.addPeers(group.getPeers()); + refreshLeaderProxy(leaderId, null); + + senderThread = new Sender(); + senderThread.setName(this.toString() + "-sender"); + senderThread.start(); + } + + private synchronized void refreshLeaderProxy(RaftPeerId suggested, + RaftPeerId oldLeader) { + if (suggested != null) { + leaderId = suggested; + } else { + if (oldLeader == null) { + leaderId = peers.keySet().iterator().next(); + } else { + leaderId = CollectionUtils.random(oldLeader, peers.keySet()); + if (leaderId == null) { + leaderId = oldLeader; + } + } + } + LOG.debug("{} switches leader from {} to {}. suggested leader: {}", this, + oldLeader, leaderId, suggested); + if (leaderProxy != null) { + leaderProxy.closeCurrentSession(); + } + try { + leaderProxy = proxyMap.getProxy(leaderId); + } catch (IOException e) { + LOG.error("Should not hit IOException here", e); + refreshLeader(null, leaderId); + } + } + + private boolean isRunning() { + return running == RunningState.RUNNING || + running == RunningState.LOOK_FOR_LEADER; + } + + private void checkState() throws IOException { + if (!isRunning()) { + throwException("The GrpcClientStreamer has been closed"); + } + } + + synchronized void write(ByteString content, long seqNum) + throws IOException { + checkState(); + while (isRunning() && dataQueue.size() >= maxPendingNum) { + try { + wait(); + } catch (InterruptedException ignored) { + } + } + if (isRunning()) { + // wrap the current buffer into a RaftClientRequestProto + final RaftClientRequestProto request = ClientProtoUtils.toRaftClientRequestProto( + clientId, leaderId, groupId, seqNum, seqNum, content); + if (request.getSerializedSize() > maxMessageSize.getSizeInt()) { + throw new IOException("msg size:" + request.getSerializedSize() + + " exceeds maximum:" + maxMessageSize.getSizeInt()); + } + dataQueue.offer(request); + this.notifyAll(); + } else { + throwException(this + " got closed."); + } + } + + synchronized void flush() throws IOException { + checkState(); + if (dataQueue.isEmpty() && ackQueue.isEmpty()) { + return; + } + // wait for the pending Q to become empty + while (isRunning() && (!dataQueue.isEmpty() || !ackQueue.isEmpty())) { + try { + wait(); + } catch (InterruptedException ignored) { + } + } + if (!isRunning() && (!dataQueue.isEmpty() || !ackQueue.isEmpty())) { + throwException(this + " got closed before finishing flush"); + } + } + + @Override + public void close() throws IOException { + if (!isRunning()) { + return; + } + flush(); + + running = RunningState.CLOSED; + senderThread.interrupt(); + try { + senderThread.join(); + } catch (InterruptedException ignored) { + } + proxyMap.close(); + } + + @Override + public String toString() { + return this.getClass().getSimpleName() + "-" + clientId; + } + + private class Sender extends Daemon { + @Override + public void run() { + while (isRunning()) { + + synchronized (GrpcClientStreamer.this) { + while (isRunning() && shouldWait()) { + try { + GrpcClientStreamer.this.wait(); + } catch (InterruptedException ignored) { + } + } + if (running == RunningState.RUNNING) { + Preconditions.assertTrue(!dataQueue.isEmpty(), "dataQueue is empty"); + RaftClientRequestProto next = dataQueue.poll(); + leaderProxy.onNext(next); + ackQueue.offer(next); + } + } + } + } + + private boolean shouldWait() { + // the sender should wait if any of the following is true + // 1) there is no data to send + // 2) there are too many outstanding pending requests + // 3) Error/NotLeaderException just happened, we're still waiting for + // the first response to confirm the new leader + return dataQueue.isEmpty() || ackQueue.size() >= maxPendingNum || + running == RunningState.LOOK_FOR_LEADER; + } + } + + /** the response handler for stream RPC */ + private class ResponseHandler implements + GrpcClientProtocolProxy.CloseableStreamObserver { + private final RaftPeerId targetId; + // once handled the first NotLeaderException or Error, the handler should + // be inactive and should not make any further action. + private volatile boolean active = true; + + ResponseHandler(RaftPeer target) { + targetId = target.getId(); + } + + @Override + public String toString() { + return GrpcClientStreamer.this + "-ResponseHandler-" + targetId; + } + + @Override + public void onNext(RaftClientReplyProto reply) { + if (!active) { + return; + } + synchronized (GrpcClientStreamer.this) { + RaftClientRequestProto pending = Objects.requireNonNull(ackQueue.peek()); + if (reply.getRpcReply().getSuccess()) { + Preconditions.assertTrue(pending.getRpcRequest().getCallId() == reply.getRpcReply().getCallId(), + () -> "pending=" + ClientProtoUtils.toString(pending) + " but reply=" + ClientProtoUtils.toString(reply)); + ackQueue.poll(); + if (LOG.isTraceEnabled()) { + LOG.trace("{} received success ack for {}", this, ClientProtoUtils.toString(pending)); + } + // we've identified the correct leader + if (running == RunningState.LOOK_FOR_LEADER) { + running = RunningState.RUNNING; + } + } else { + // this may be a NotLeaderException + RaftClientReply r = ClientProtoUtils.toRaftClientReply(reply); + final NotLeaderException nle = r.getNotLeaderException(); + if (nle != null) { + LOG.debug("{} received a NotLeaderException from {}", this, + r.getServerId()); + handleNotLeader(nle, targetId); + } + } + GrpcClientStreamer.this.notifyAll(); + } + } + + @Override + public void onError(Throwable t) { + LOG.warn(this + " onError", t); + if (active) { + synchronized (GrpcClientStreamer.this) { + handleError(t, this); + GrpcClientStreamer.this.notifyAll(); + } + } + } + + @Override + public void onCompleted() { + LOG.info("{} onCompleted, pending requests #: {}", this, + ackQueue.size()); + } + + @Override // called by handleError and handleNotLeader + public void close() throws IOException { + active = false; + } + } + + private void throwException(String msg) throws IOException { + if (running == RunningState.ERROR) { + throw exceptionAndRetry.getCombinedException(); + } else { + throw new IOException(msg); + } + } + + private void handleNotLeader(NotLeaderException nle, + RaftPeerId oldLeader) { + Preconditions.assertTrue(Thread.holdsLock(GrpcClientStreamer.this)); + // handle NotLeaderException: refresh leader and RaftConfiguration + refreshPeers(nle.getPeers()); + + refreshLeader(nle.getSuggestedLeader().getId(), oldLeader); + } + + private void handleError(Throwable t, ResponseHandler handler) { + Preconditions.assertTrue(Thread.holdsLock(GrpcClientStreamer.this)); + final IOException e = GrpcUtil.unwrapIOException(t); + + exceptionAndRetry.addException(handler.targetId, e); + LOG.debug("{} got error: {}. Total retry times {}, max retry times {}.", + handler, e, exceptionAndRetry.retryTimes.get(), + exceptionAndRetry.maxRetryTimes); + + leaderProxy.onError(); + if (exceptionAndRetry.shouldRetry()) { + refreshLeader(null, leaderId); + } else { + running = RunningState.ERROR; + } + } + + private void refreshLeader(RaftPeerId suggestedLeader, RaftPeerId oldLeader) { + running = RunningState.LOOK_FOR_LEADER; + refreshLeaderProxy(suggestedLeader, oldLeader); + reQueuePendingRequests(leaderId); + + final RaftClientRequestProto request = Objects.requireNonNull( + dataQueue.poll()); + ackQueue.offer(request); + try { + exceptionAndRetry.retryInterval.sleep(); + } catch (InterruptedException ignored) { + } + leaderProxy.onNext(request); + } + + private void reQueuePendingRequests(RaftPeerId newLeader) { + if (isRunning()) { + // resend all the pending requests + while (!ackQueue.isEmpty()) { + final RaftClientRequestProto oldRequest = ackQueue.pollLast(); + final RaftRpcRequestProto.Builder newRpc = RaftRpcRequestProto.newBuilder(oldRequest.getRpcRequest()) + .setReplyId(newLeader.toByteString()); + final RaftClientRequestProto newRequest = RaftClientRequestProto.newBuilder(oldRequest) + .setRpcRequest(newRpc).build(); + dataQueue.offerFirst(newRequest); + } + } + } + + private void refreshPeers(RaftPeer[] newPeers) { + if (newPeers != null && newPeers.length > 0) { + // we only add new peers, we do not remove any peer even if it no longer + // belongs to the current raft conf + Arrays.stream(newPeers).forEach(peer -> { + peers.putIfAbsent(peer.getId(), peer); + proxyMap.computeIfAbsent(peer); + }); + + LOG.debug("refreshed peers: {}", peers); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ed8e60da/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcOutputStream.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcOutputStream.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcOutputStream.java new file mode 100644 index 0000000..e857aaf --- /dev/null +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcOutputStream.java @@ -0,0 +1,112 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.grpc.client; + +import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.grpc.GrpcConfigKeys; +import org.apache.ratis.protocol.ClientId; +import org.apache.ratis.protocol.RaftGroup; +import org.apache.ratis.protocol.RaftPeerId; +import org.apache.ratis.util.ProtoUtils; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.concurrent.atomic.AtomicLong; + +public class GrpcOutputStream extends OutputStream { + /** internal buffer */ + private final byte buf[]; + private int count; + private final AtomicLong seqNum = new AtomicLong(); + private final ClientId clientId; + private final GrpcClientStreamer streamer; + + private boolean closed = false; + + public GrpcOutputStream(RaftProperties prop, ClientId clientId, + RaftGroup group, RaftPeerId leaderId) { + final int bufferSize = GrpcConfigKeys.OutputStream.bufferSize(prop).getSizeInt(); + buf = new byte[bufferSize]; + count = 0; + this.clientId = clientId; + streamer = new GrpcClientStreamer(prop, group, leaderId, clientId); + } + + @Override + public void write(int b) throws IOException { + checkClosed(); + buf[count++] = (byte)b; + flushIfNecessary(); + } + + private void flushIfNecessary() throws IOException { + if(count == buf.length) { + flushToStreamer(); + } + } + + @Override + public void write(byte b[], int off, int len) throws IOException { + checkClosed(); + if (off < 0 || len < 0 || off > b.length - len) { + throw new ArrayIndexOutOfBoundsException(); + } + + int total = 0; + while (total < len) { + int toWrite = Math.min(len - total, buf.length - count); + System.arraycopy(b, off + total, buf, count, toWrite); + count += toWrite; + total += toWrite; + flushIfNecessary(); + } + } + + private void flushToStreamer() throws IOException { + if (count > 0) { + streamer.write(ProtoUtils.toByteString(buf, 0, count), + seqNum.getAndIncrement()); + count = 0; + } + } + + @Override + public void flush() throws IOException { + checkClosed(); + flushToStreamer(); + streamer.flush(); + } + + @Override + public void close() throws IOException { + flushToStreamer(); + streamer.close(); // streamer will flush + this.closed = true; + } + + @Override + public String toString() { + return "GrpcOutputStream-" + clientId; + } + + private void checkClosed() throws IOException { + if (closed) { + throw new IOException(this.toString() + " was closed."); + } + } +}
