Replaced `RpcResult<Response>` with `Try<Response, StatusError>`.
The `process::grpc::client::Runtime::call` method currently returns a `RpcResult<Response>`, which contains both a `::grpc::Status` object and the resulting response protobuf. However, if the `::grpc::Status` represents a non-OK status, the gRPC library does not guarantee that the response protobuf is valid. This patch replaces `RpcResult` with `Try` to provide better type safety. Review: https://reviews.apache.org/r/67154 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/9a94eb69 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/9a94eb69 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/9a94eb69 Branch: refs/heads/master Commit: 9a94eb697f8a7b38955cdf2f21b81d3fda3e2845 Parents: f8829f8 Author: Chun-Hung Hsiao <[email protected]> Authored: Mon May 14 22:00:35 2018 -0700 Committer: Chun-Hung Hsiao <[email protected]> Committed: Wed May 23 16:31:12 2018 -0700 ---------------------------------------------------------------------- 3rdparty/libprocess/include/process/grpc.hpp | 42 +++++++++------ 3rdparty/libprocess/src/tests/grpc_tests.cpp | 66 +++++++++++++---------- 2 files changed, 63 insertions(+), 45 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/9a94eb69/3rdparty/libprocess/include/process/grpc.hpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/include/process/grpc.hpp b/3rdparty/libprocess/include/process/grpc.hpp index 321a46e..f7f3171 100644 --- a/3rdparty/libprocess/include/process/grpc.hpp +++ b/3rdparty/libprocess/include/process/grpc.hpp @@ -18,6 +18,7 @@ #include <memory> #include <thread> #include <type_traits> +#include <utility> #include <google/protobuf/message.h> @@ -75,20 +76,22 @@ private: /** - * The response of a RPC call. It includes the gRPC `Status` - * (https://grpc.io/grpc/cpp/classgrpc_1_1_status.html), and - * the actual protobuf response body. + * Represents errors caused by non-OK gRPC statuses. See: + * https://grpc.io/grpc/cpp/classgrpc_1_1_status.html */ -template <typename T> -struct RpcResult +class StatusError : public Error { - RpcResult(const ::grpc::Status& _status, const T& _response) - : status(_status), response(_response) {} +public: + StatusError(::grpc::Status _status) + : Error(_status.error_message()), status(std::move(_status)) + { + CHECK(!status.ok()); + } - ::grpc::Status status; - T response; + const ::grpc::Status status; }; + namespace client { /** @@ -112,14 +115,20 @@ public: /** * Sends an asynchronous gRPC call. * + * This function returns a `Future` of a `Try` such that the response protobuf + * is returned only if the gRPC call returns an OK status to ensure type + * safety (see https://github.com/grpc/grpc/issues/12824). Note that the + * future never fails; it will return a `StatusError` if a non-OK status is + * returned for the call, so the caller can handle the error programmatically. + * * @param channel A connection to a gRPC server. * @param rpc The asynchronous gRPC call to make. This can be obtained * by the `GRPC_RPC(Service, RPC)` macro. * @param request The request protobuf for the gRPC call. - * @return a `Future` waiting for a response protobuf. + * @return a `Future` of `Try` waiting for a response protobuf or an error. */ template <typename Stub, typename Request, typename Response> - Future<RpcResult<Response>> call( + Future<Try<Response, StatusError>> call( const Channel& channel, std::unique_ptr<::grpc::ClientAsyncResponseReader<Response>>(Stub::*rpc)( ::grpc::ClientContext*, @@ -152,8 +161,8 @@ public: // an asynchronous gRPC call through the `CompletionQueue` // managed by `data`. The `Promise` will be set by the callback // upon server response. - std::shared_ptr<Promise<RpcResult<Response>>> promise( - new Promise<RpcResult<Response>>); + std::shared_ptr<Promise<Try<Response, StatusError>>> promise( + new Promise<Try<Response, StatusError>>); promise->future().onDiscard([=] { context->TryCancel(); }); @@ -175,10 +184,11 @@ public: CHECK(promise->future().isPending()); if (promise->future().hasDiscard()) { promise->discard(); - return; + } else { + promise->set(status->ok() + ? std::move(*response) + : Try<Response, StatusError>::error(std::move(*status))); } - - promise->set(RpcResult<Response>(*status, *response)); })); return promise->future(); http://git-wip-us.apache.org/repos/asf/mesos/blob/9a94eb69/3rdparty/libprocess/src/tests/grpc_tests.cpp ---------------------------------------------------------------------- diff --git a/3rdparty/libprocess/src/tests/grpc_tests.cpp b/3rdparty/libprocess/src/tests/grpc_tests.cpp index 38cd6c6..07c2f3e 100644 --- a/3rdparty/libprocess/src/tests/grpc_tests.cpp +++ b/3rdparty/libprocess/src/tests/grpc_tests.cpp @@ -47,7 +47,7 @@ using process::Future; using process::Promise; using process::grpc::Channel; -using process::grpc::RpcResult; +using process::grpc::StatusError; using testing::_; using testing::DoAll; @@ -123,11 +123,11 @@ TEST_F(GRPCClientTest, Success) client::Runtime runtime; - Future<RpcResult<Pong>> pong = + Future<Try<Pong, StatusError>> pong = runtime.call(channel.get(), GRPC_RPC(PingPong, Send), Ping()); AWAIT_ASSERT_READY(pong); - EXPECT_TRUE(pong->status.ok()); + EXPECT_SOME(pong.get()); runtime.terminate(); AWAIT_ASSERT_READY(runtime.wait()); @@ -170,13 +170,13 @@ TEST_F(GRPCClientTest, ConcurrentRPCs) client::Runtime runtime; - Future<RpcResult<Pong>> pong1 = + Future<Try<Pong, StatusError>> pong1 = runtime.call(channel.get(), GRPC_RPC(PingPong, Send), Ping()); - Future<RpcResult<Pong>> pong2 = + Future<Try<Pong, StatusError>> pong2 = runtime.call(channel.get(), GRPC_RPC(PingPong, Send), Ping()); - Future<RpcResult<Pong>> pong3 = + Future<Try<Pong, StatusError>> pong3 = runtime.call(channel.get(), GRPC_RPC(PingPong, Send), Ping()); AWAIT_READY(processed1->future()); @@ -186,13 +186,13 @@ TEST_F(GRPCClientTest, ConcurrentRPCs) pinged->set(Nothing()); AWAIT_ASSERT_READY(pong1); - EXPECT_TRUE(pong1->status.ok()); + EXPECT_SOME(pong1.get()); AWAIT_ASSERT_READY(pong2); - EXPECT_TRUE(pong2->status.ok()); + EXPECT_SOME(pong2.get()); AWAIT_ASSERT_READY(pong3); - EXPECT_TRUE(pong3->status.ok()); + EXPECT_SOME(pong3.get()); runtime.terminate(); AWAIT_ASSERT_READY(runtime.wait()); @@ -201,9 +201,9 @@ TEST_F(GRPCClientTest, ConcurrentRPCs) } -// This test verifies that a gRPC future fails when the server responds -// with a status other than OK for the given call. -TEST_F(GRPCClientTest, Failed) +// This test verifies that a gRPC future is set with an error when the server +// responds with a status other than OK for the given call. +TEST_F(GRPCClientTest, StatusError) { PingPongServer server; @@ -215,11 +215,12 @@ TEST_F(GRPCClientTest, Failed) client::Runtime runtime; - Future<RpcResult<Pong>> pong = + Future<Try<Pong, StatusError>> pong = runtime.call(channel.get(), GRPC_RPC(PingPong, Send), Ping()); AWAIT_ASSERT_READY(pong); - EXPECT_FALSE(pong->status.ok()); + EXPECT_ERROR(pong.get()); + EXPECT_EQ(::grpc::CANCELLED, pong->error().status.error_code()); runtime.terminate(); AWAIT_ASSERT_READY(runtime.wait()); @@ -240,7 +241,7 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(GRPCClientTest, DiscardedBeforeServerStarted) Channel channel(server_address()); client::Runtime runtime; - Future<RpcResult<Pong>> pong = + Future<Try<Pong, StatusError>> pong = runtime.call(channel, GRPC_RPC(PingPong, Send), Ping()); pong.discard(); @@ -278,7 +279,7 @@ TEST_F(GRPCClientTest, DiscardedWhenServerProcessing) client::Runtime runtime; - Future<RpcResult<Pong>> pong = + Future<Try<Pong, StatusError>> pong = runtime.call(channel.get(), GRPC_RPC(PingPong, Send), Ping()); AWAIT_READY(processed->future()); @@ -295,7 +296,7 @@ TEST_F(GRPCClientTest, DiscardedWhenServerProcessing) } -// This test verifies that a gRPC future fails properly when the runtime +// This test verifies that a gRPC future is set with an error when the runtime // is shut down before the server responds. TEST_F(GRPCClientTest, ClientShutdown) { @@ -317,7 +318,7 @@ TEST_F(GRPCClientTest, ClientShutdown) client::Runtime runtime; - Future<RpcResult<Pong>> pong = + Future<Try<Pong, StatusError>> pong = runtime.call(channel.get(), GRPC_RPC(PingPong, Send), Ping()); AWAIT_READY(processed->future()); @@ -327,32 +328,39 @@ TEST_F(GRPCClientTest, ClientShutdown) shutdown->set(Nothing()); + // TODO(chhsiao): The gRPC library returns a failure after the default + // timeout (5 seconds) is passed, no matter when the `CompletionQueue` + // is shut down. The timeout should be lowered once we support it. AWAIT_ASSERT_READY(pong); - EXPECT_FALSE(pong->status.ok()); + EXPECT_ERROR(pong.get()); + EXPECT_EQ(::grpc::DEADLINE_EXCEEDED, pong->error().status.error_code()); ASSERT_SOME(server.shutdown()); } -// This test verifies that a gRPC future fails when it is unable to -// connect to the server. +// This test verifies that a gRPC future is set with an error when it is unable +// to connect to the server. TEST_F(GRPCClientTest, ServerUnreachable) { Channel channel("nosuchhost"); client::Runtime runtime; - Future<RpcResult<Pong>> pong = + Future<Try<Pong, StatusError>> pong = runtime.call(channel, GRPC_RPC(PingPong, Send), Ping()); runtime.terminate(); AWAIT_ASSERT_READY(runtime.wait()); + // TODO(chhsiao): The gRPC library returns a failure after the default timeout + // (5 seconds) is passed. The timeout should be lowered once we support it. AWAIT_ASSERT_READY(pong); - EXPECT_FALSE(pong->status.ok()); + EXPECT_ERROR(pong.get()); + EXPECT_EQ(::grpc::DEADLINE_EXCEEDED, pong->error().status.error_code()); } -// This test verifies that a gRPC future fails properly when the server +// This test verifies that a gRPC future is set with an error when the server // hangs when processing the given call. TEST_F(GRPCClientTest, ServerTimeout) { @@ -372,14 +380,14 @@ TEST_F(GRPCClientTest, ServerTimeout) client::Runtime runtime; - Future<RpcResult<Pong>> pong = + Future<Try<Pong, StatusError>> pong = runtime.call(channel.get(), GRPC_RPC(PingPong, Send), Ping()); - // TODO(chhsiao): The gRPC library returns a failure after the default - // timeout (5 seconds) is passed, no matter when the `CompletionQueue` - // is shut down. The timeout should be lowered once we support it. + // TODO(chhsiao): The gRPC library returns a failure after the default timeout + // (5 seconds) is passed. The timeout should be lowered once we support it. AWAIT_ASSERT_READY(pong); - EXPECT_FALSE(pong->status.ok()); + EXPECT_ERROR(pong.get()); + EXPECT_EQ(::grpc::DEADLINE_EXCEEDED, pong->error().status.error_code()); done->set(Nothing());
