Renamed `grpc::Channel` to `grpc::client::Connection`. This renaming is made to avoid name conflicts between `::grpc::Channel` and libprocess' own wrapper. Also, since this wrapper is only used at the client side, it is moved into the `client` namespace.
Review: https://reviews.apache.org/r/67156 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/ddfea093 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/ddfea093 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/ddfea093 Branch: refs/heads/master Commit: ddfea093f576cdd22d4fa221326406eaf8e59b49 Parents: 3bac270 Author: Chun-Hung Hsiao <chhs...@mesosphere.io> Authored: Tue May 15 17:54:24 2018 -0700 Committer: Chun-Hung Hsiao <chhs...@mesosphere.io> Committed: Wed May 23 16:31:12 2018 -0700 ---------------------------------------------------------------------- 3rdparty/libprocess/include/process/grpc.hpp | 70 +++++++++++------------ 3rdparty/libprocess/src/tests/grpc_tests.cpp | 57 +++++++++--------- 2 files changed, 60 insertions(+), 67 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/ddfea093/3rdparty/libprocess/include/process/grpc.hpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/include/process/grpc.hpp b/3rdparty/libprocess/include/process/grpc.hpp index 68bdeb1..cf165f0 100644 --- a/3rdparty/libprocess/include/process/grpc.hpp +++ b/3rdparty/libprocess/include/process/grpc.hpp @@ -34,11 +34,11 @@ #include <stout/try.hpp> -// This file provides libprocess "support" for using gRPC. In -// particular, it defines two wrapper classes: `Channel` (representing a -// connection to a gRPC server) and `client::Runtime`, which integrates -// an event loop waiting for gRPC responses, and provides the `call` -// interface to create an asynchrous gRPC call and return a `Future`. +// This file provides libprocess "support" for using gRPC. In particular, it +// defines two wrapper classes: `client::Connection` which represents a +// connection to a gRPC server, and `client::Runtime` which integrates an event +// loop waiting for gRPC responses and provides the `call` interface to create +// an asynchronous gRPC call and return a `Future`. #define GRPC_CLIENT_METHOD(service, rpc) \ @@ -47,34 +47,6 @@ namespace process { namespace grpc { -// Forward declarations. -namespace client { class Runtime; } - - -/** - * A copyable interface to manage a connection to a gRPC server. - * All `Channel` copies share the same connection. Note that the - * connection is established lazily by the gRPC runtime library: the - * actual connection is delayed till an RPC call is made. - */ -class Channel -{ -public: - Channel(const std::string& uri, - const std::shared_ptr<::grpc::ChannelCredentials>& credentials = - ::grpc::InsecureChannelCredentials()) - : channel(::grpc::CreateChannel(uri, credentials)) {} - - explicit Channel(std::shared_ptr<::grpc::Channel> _channel) - : channel(std::move(_channel)) {} - -private: - std::shared_ptr<::grpc::Channel> channel; - - friend class client::Runtime; -}; - - /** * Represents errors caused by non-OK gRPC statuses. See: * https://grpc.io/grpc/cpp/classgrpc_1_1_status.html @@ -117,6 +89,28 @@ struct MethodTraits< /** + * A copyable interface to manage a connection to a gRPC server. All + * `Connection` copies share the same gRPC channel which is thread safe. Note + * that the actual connection is established lazily by the gRPC library at the + * time an RPC is made to the channel. + */ +class Connection +{ +public: + Connection( + const std::string& uri, + const std::shared_ptr<::grpc::ChannelCredentials>& credentials = + ::grpc::InsecureChannelCredentials()) + : channel(::grpc::CreateChannel(uri, credentials)) {} + + explicit Connection(std::shared_ptr<::grpc::Channel> _channel) + : channel(std::move(_channel)) {} + + const std::shared_ptr<::grpc::Channel> channel; +}; + + +/** * A copyable interface to manage an internal gRPC runtime instance for * asynchronous gRPC calls. A gRPC runtime instance includes a gRPC * `CompletionQueue` to manage outstanding requests, a looper thread to @@ -143,7 +137,7 @@ public: * future never fails; it will return a `StatusError` if a non-OK status is * returned for the call, so the caller can handle the error programmatically. * - * @param channel A connection to a gRPC server. + * @param connection A connection to a gRPC server. * @param rpc The asynchronous gRPC call to make. This can be obtained * by the `GRPC_CLIENT_METHOD(service, rpc)` macro. * @param request The request protobuf for the gRPC call. @@ -156,7 +150,7 @@ public: typename Response = typename internal::MethodTraits<Method>::response_type> Future<Try<Response, StatusError>> call( - const Channel& channel, + const Connection& connection, Method&& method, const Request& request) { @@ -193,9 +187,9 @@ public: std::shared_ptr<Response> response(new Response()); std::shared_ptr<::grpc::Status> status(new ::grpc::Status()); - std::shared_ptr<::grpc::ClientAsyncResponseReader<Response>> reader( - (typename internal::MethodTraits<Method>::stub_type( - channel.channel).*method)(context.get(), request, &data->queue)); + std::shared_ptr<::grpc::ClientAsyncResponseReader<Response>> reader = + (typename internal::MethodTraits<Method>::stub_type( + connection.channel).*method)(context.get(), request, &data->queue); reader->StartCall(); reader->Finish( http://git-wip-us.apache.org/repos/asf/mesos/blob/ddfea093/3rdparty/libprocess/src/tests/grpc_tests.cpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/src/tests/grpc_tests.cpp b/3rdparty/libprocess/src/tests/grpc_tests.cpp index f1cdb5e..eb9621b 100644 --- a/3rdparty/libprocess/src/tests/grpc_tests.cpp +++ b/3rdparty/libprocess/src/tests/grpc_tests.cpp @@ -46,7 +46,6 @@ using ::grpc::Status; using process::Future; using process::Promise; -using process::grpc::Channel; using process::grpc::StatusError; using testing::_; @@ -67,7 +66,7 @@ public: MOCK_METHOD3(Send, Status(ServerContext*, const Ping* ping, Pong* pong)); - Try<Channel> startup(const Option<string>& address = None()) + Try<client::Connection> startup(const Option<string>& address = None()) { ServerBuilder builder; @@ -83,8 +82,8 @@ public: } return address.isSome() - ? Channel(address.get()) - : Channel(server->InProcessChannel(ChannelArguments())); + ? client::Connection(address.get()) + : client::Connection(server->InProcessChannel(ChannelArguments())); } Try<Nothing> shutdown() @@ -104,7 +103,7 @@ class GRPCClientTest : public TemporaryDirectoryTest { protected: // TODO(chhsiao): Consider removing this once we have a way to get a - // channel before the server starts on Windows. See the + // connection before the server starts on Windows. See the // `DiscardedBeforeServerStarted` test below. string server_address() const { @@ -118,13 +117,13 @@ protected: TEST_F(GRPCClientTest, Success) { PingPongServer server; - Try<Channel> channel = server.startup(); - ASSERT_SOME(channel); + Try<client::Connection> connection = server.startup(); + ASSERT_SOME(connection); client::Runtime runtime; Future<Try<Pong, StatusError>> pong = - runtime.call(channel.get(), GRPC_CLIENT_METHOD(PingPong, Send), Ping()); + runtime.call(connection.get(), GRPC_CLIENT_METHOD(PingPong, Send), Ping()); AWAIT_ASSERT_READY(pong); EXPECT_SOME(pong.get()); @@ -140,8 +139,8 @@ TEST_F(GRPCClientTest, Success) TEST_F(GRPCClientTest, ConcurrentRPCs) { PingPongServer server; - Try<Channel> channel = server.startup(); - ASSERT_SOME(channel); + Try<client::Connection> connection = server.startup(); + ASSERT_SOME(connection); shared_ptr<Promise<Nothing>> processed1(new Promise<Nothing>()); shared_ptr<Promise<Nothing>> processed2(new Promise<Nothing>()); @@ -171,13 +170,13 @@ TEST_F(GRPCClientTest, ConcurrentRPCs) client::Runtime runtime; Future<Try<Pong, StatusError>> pong1 = - runtime.call(channel.get(), GRPC_CLIENT_METHOD(PingPong, Send), Ping()); + runtime.call(connection.get(), GRPC_CLIENT_METHOD(PingPong, Send), Ping()); Future<Try<Pong, StatusError>> pong2 = - runtime.call(channel.get(), GRPC_CLIENT_METHOD(PingPong, Send), Ping()); + runtime.call(connection.get(), GRPC_CLIENT_METHOD(PingPong, Send), Ping()); Future<Try<Pong, StatusError>> pong3 = - runtime.call(channel.get(), GRPC_CLIENT_METHOD(PingPong, Send), Ping()); + runtime.call(connection.get(), GRPC_CLIENT_METHOD(PingPong, Send), Ping()); AWAIT_READY(processed1->future()); AWAIT_READY(processed2->future()); @@ -210,13 +209,13 @@ TEST_F(GRPCClientTest, StatusError) EXPECT_CALL(server, Send(_, _, _)) .WillOnce(Return(Status::CANCELLED)); - Try<Channel> channel = server.startup(); - ASSERT_SOME(channel); + Try<client::Connection> connection = server.startup(); + ASSERT_SOME(connection); client::Runtime runtime; Future<Try<Pong, StatusError>> pong = - runtime.call(channel.get(), GRPC_CLIENT_METHOD(PingPong, Send), Ping()); + runtime.call(connection.get(), GRPC_CLIENT_METHOD(PingPong, Send), Ping()); AWAIT_ASSERT_READY(pong); EXPECT_ERROR(pong.get()); @@ -238,11 +237,11 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(GRPCClientTest, DiscardedBeforeServerStarted) EXPECT_CALL(server, Send(_, _, _)) .Times(0); - Channel channel(server_address()); + client::Connection connection(server_address()); client::Runtime runtime; Future<Try<Pong, StatusError>> pong = - runtime.call(channel, GRPC_CLIENT_METHOD(PingPong, Send), Ping()); + runtime.call(connection, GRPC_CLIENT_METHOD(PingPong, Send), Ping()); pong.discard(); @@ -274,13 +273,13 @@ TEST_F(GRPCClientTest, DiscardedWhenServerProcessing) }), Return(Status::OK))); - Try<Channel> channel = server.startup(); - ASSERT_SOME(channel); + Try<client::Connection> connection = server.startup(); + ASSERT_SOME(connection); client::Runtime runtime; Future<Try<Pong, StatusError>> pong = - runtime.call(channel.get(), GRPC_CLIENT_METHOD(PingPong, Send), Ping()); + runtime.call(connection.get(), GRPC_CLIENT_METHOD(PingPong, Send), Ping()); AWAIT_READY(processed->future()); @@ -313,13 +312,13 @@ TEST_F(GRPCClientTest, ClientShutdown) }), Return(Status::OK))); - Try<Channel> channel = server.startup(); - ASSERT_SOME(channel); + Try<client::Connection> connection = server.startup(); + ASSERT_SOME(connection); client::Runtime runtime; Future<Try<Pong, StatusError>> pong = - runtime.call(channel.get(), GRPC_CLIENT_METHOD(PingPong, Send), Ping()); + runtime.call(connection.get(), GRPC_CLIENT_METHOD(PingPong, Send), Ping()); AWAIT_READY(processed->future()); @@ -343,11 +342,11 @@ TEST_F(GRPCClientTest, ClientShutdown) // to connect to the server. TEST_F(GRPCClientTest, ServerUnreachable) { - Channel channel("nosuchhost"); + client::Connection connection("nosuchhost"); client::Runtime runtime; Future<Try<Pong, StatusError>> pong = - runtime.call(channel, GRPC_CLIENT_METHOD(PingPong, Send), Ping()); + runtime.call(connection, GRPC_CLIENT_METHOD(PingPong, Send), Ping()); runtime.terminate(); AWAIT_ASSERT_READY(runtime.wait()); @@ -375,13 +374,13 @@ TEST_F(GRPCClientTest, ServerTimeout) }), Return(Status::OK))); - Try<Channel> channel = server.startup(); - ASSERT_SOME(channel); + Try<client::Connection> connection = server.startup(); + ASSERT_SOME(connection); client::Runtime runtime; Future<Try<Pong, StatusError>> pong = - runtime.call(channel.get(), GRPC_CLIENT_METHOD(PingPong, Send), Ping()); + runtime.call(connection.get(), GRPC_CLIENT_METHOD(PingPong, Send), Ping()); // TODO(chhsiao): The gRPC library returns a failure after the default timeout // (5 seconds) is passed. The timeout should be lowered once we support it.