Replaced `RpcResult<Response>` with `Try<Response, StatusError>`.

The `process::grpc::client::Runtime::call` method currently returns a
`RpcResult<Response>`, which contains both a `::grpc::Status` object
and the resulting response protobuf. However, if the `::grpc::Status`
represents a non-OK status, the gRPC library does not guarantee that
the response protobuf is valid. This patch replaces `RpcResult` with
`Try` to provide better type safety.

Review: https://reviews.apache.org/r/67154


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/9a94eb69
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/9a94eb69
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/9a94eb69

Branch: refs/heads/master
Commit: 9a94eb697f8a7b38955cdf2f21b81d3fda3e2845
Parents: f8829f8
Author: Chun-Hung Hsiao <[email protected]>
Authored: Mon May 14 22:00:35 2018 -0700
Committer: Chun-Hung Hsiao <[email protected]>
Committed: Wed May 23 16:31:12 2018 -0700

----------------------------------------------------------------------
 3rdparty/libprocess/include/process/grpc.hpp | 42 +++++++++------
 3rdparty/libprocess/src/tests/grpc_tests.cpp | 66 +++++++++++++----------
 2 files changed, 63 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/9a94eb69/3rdparty/libprocess/include/process/grpc.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/grpc.hpp 
b/3rdparty/libprocess/include/process/grpc.hpp
index 321a46e..f7f3171 100644
--- a/3rdparty/libprocess/include/process/grpc.hpp
+++ b/3rdparty/libprocess/include/process/grpc.hpp
@@ -18,6 +18,7 @@
 #include <memory>
 #include <thread>
 #include <type_traits>
+#include <utility>
 
 #include <google/protobuf/message.h>
 
@@ -75,20 +76,22 @@ private:
 
 
 /**
- * The response of a RPC call. It includes the gRPC `Status`
- * (https://grpc.io/grpc/cpp/classgrpc_1_1_status.html), and
- * the actual protobuf response body.
+ * Represents errors caused by non-OK gRPC statuses. See:
+ * https://grpc.io/grpc/cpp/classgrpc_1_1_status.html
  */
-template <typename T>
-struct RpcResult
+class StatusError : public Error
 {
-  RpcResult(const ::grpc::Status& _status, const T& _response)
-    : status(_status), response(_response) {}
+public:
+  StatusError(::grpc::Status _status)
+    : Error(_status.error_message()), status(std::move(_status))
+  {
+    CHECK(!status.ok());
+  }
 
-  ::grpc::Status status;
-  T response;
+  const ::grpc::Status status;
 };
 
+
 namespace client {
 
 /**
@@ -112,14 +115,20 @@ public:
   /**
    * Sends an asynchronous gRPC call.
    *
+   * This function returns a `Future` of a `Try` such that the response 
protobuf
+   * is returned only if the gRPC call returns an OK status to ensure type
+   * safety (see https://github.com/grpc/grpc/issues/12824). Note that the
+   * future never fails; it will return a `StatusError` if a non-OK status is
+   * returned for the call, so the caller can handle the error 
programmatically.
+   *
    * @param channel A connection to a gRPC server.
    * @param rpc The asynchronous gRPC call to make. This can be obtained
    *     by the `GRPC_RPC(Service, RPC)` macro.
    * @param request The request protobuf for the gRPC call.
-   * @return a `Future` waiting for a response protobuf.
+   * @return a `Future` of `Try` waiting for a response protobuf or an error.
    */
   template <typename Stub, typename Request, typename Response>
-  Future<RpcResult<Response>> call(
+  Future<Try<Response, StatusError>> call(
       const Channel& channel,
       std::unique_ptr<::grpc::ClientAsyncResponseReader<Response>>(Stub::*rpc)(
           ::grpc::ClientContext*,
@@ -152,8 +161,8 @@ public:
       // an asynchronous gRPC call through the `CompletionQueue`
       // managed by `data`. The `Promise` will be set by the callback
       // upon server response.
-      std::shared_ptr<Promise<RpcResult<Response>>> promise(
-          new Promise<RpcResult<Response>>);
+      std::shared_ptr<Promise<Try<Response, StatusError>>> promise(
+          new Promise<Try<Response, StatusError>>);
 
       promise->future().onDiscard([=] { context->TryCancel(); });
 
@@ -175,10 +184,11 @@ public:
                 CHECK(promise->future().isPending());
                 if (promise->future().hasDiscard()) {
                   promise->discard();
-                  return;
+                } else {
+                  promise->set(status->ok()
+                    ? std::move(*response)
+                    : Try<Response, StatusError>::error(std::move(*status)));
                 }
-
-                promise->set(RpcResult<Response>(*status, *response));
               }));
 
       return promise->future();

http://git-wip-us.apache.org/repos/asf/mesos/blob/9a94eb69/3rdparty/libprocess/src/tests/grpc_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/grpc_tests.cpp 
b/3rdparty/libprocess/src/tests/grpc_tests.cpp
index 38cd6c6..07c2f3e 100644
--- a/3rdparty/libprocess/src/tests/grpc_tests.cpp
+++ b/3rdparty/libprocess/src/tests/grpc_tests.cpp
@@ -47,7 +47,7 @@ using process::Future;
 using process::Promise;
 
 using process::grpc::Channel;
-using process::grpc::RpcResult;
+using process::grpc::StatusError;
 
 using testing::_;
 using testing::DoAll;
@@ -123,11 +123,11 @@ TEST_F(GRPCClientTest, Success)
 
   client::Runtime runtime;
 
-  Future<RpcResult<Pong>> pong =
+  Future<Try<Pong, StatusError>> pong =
     runtime.call(channel.get(), GRPC_RPC(PingPong, Send), Ping());
 
   AWAIT_ASSERT_READY(pong);
-  EXPECT_TRUE(pong->status.ok());
+  EXPECT_SOME(pong.get());
 
   runtime.terminate();
   AWAIT_ASSERT_READY(runtime.wait());
@@ -170,13 +170,13 @@ TEST_F(GRPCClientTest, ConcurrentRPCs)
 
   client::Runtime runtime;
 
-  Future<RpcResult<Pong>> pong1 =
+  Future<Try<Pong, StatusError>> pong1 =
     runtime.call(channel.get(), GRPC_RPC(PingPong, Send), Ping());
 
-  Future<RpcResult<Pong>> pong2 =
+  Future<Try<Pong, StatusError>> pong2 =
     runtime.call(channel.get(), GRPC_RPC(PingPong, Send), Ping());
 
-  Future<RpcResult<Pong>> pong3 =
+  Future<Try<Pong, StatusError>> pong3 =
     runtime.call(channel.get(), GRPC_RPC(PingPong, Send), Ping());
 
   AWAIT_READY(processed1->future());
@@ -186,13 +186,13 @@ TEST_F(GRPCClientTest, ConcurrentRPCs)
   pinged->set(Nothing());
 
   AWAIT_ASSERT_READY(pong1);
-  EXPECT_TRUE(pong1->status.ok());
+  EXPECT_SOME(pong1.get());
 
   AWAIT_ASSERT_READY(pong2);
-  EXPECT_TRUE(pong2->status.ok());
+  EXPECT_SOME(pong2.get());
 
   AWAIT_ASSERT_READY(pong3);
-  EXPECT_TRUE(pong3->status.ok());
+  EXPECT_SOME(pong3.get());
 
   runtime.terminate();
   AWAIT_ASSERT_READY(runtime.wait());
@@ -201,9 +201,9 @@ TEST_F(GRPCClientTest, ConcurrentRPCs)
 }
 
 
-// This test verifies that a gRPC future fails when the server responds
-// with a status other than OK for the given call.
-TEST_F(GRPCClientTest, Failed)
+// This test verifies that a gRPC future is set with an error when the server
+// responds with a status other than OK for the given call.
+TEST_F(GRPCClientTest, StatusError)
 {
   PingPongServer server;
 
@@ -215,11 +215,12 @@ TEST_F(GRPCClientTest, Failed)
 
   client::Runtime runtime;
 
-  Future<RpcResult<Pong>> pong =
+  Future<Try<Pong, StatusError>> pong =
     runtime.call(channel.get(), GRPC_RPC(PingPong, Send), Ping());
 
   AWAIT_ASSERT_READY(pong);
-  EXPECT_FALSE(pong->status.ok());
+  EXPECT_ERROR(pong.get());
+  EXPECT_EQ(::grpc::CANCELLED, pong->error().status.error_code());
 
   runtime.terminate();
   AWAIT_ASSERT_READY(runtime.wait());
@@ -240,7 +241,7 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(GRPCClientTest, 
DiscardedBeforeServerStarted)
   Channel channel(server_address());
   client::Runtime runtime;
 
-  Future<RpcResult<Pong>> pong =
+  Future<Try<Pong, StatusError>> pong =
     runtime.call(channel, GRPC_RPC(PingPong, Send), Ping());
 
   pong.discard();
@@ -278,7 +279,7 @@ TEST_F(GRPCClientTest, DiscardedWhenServerProcessing)
 
   client::Runtime runtime;
 
-  Future<RpcResult<Pong>> pong =
+  Future<Try<Pong, StatusError>> pong =
     runtime.call(channel.get(), GRPC_RPC(PingPong, Send), Ping());
 
   AWAIT_READY(processed->future());
@@ -295,7 +296,7 @@ TEST_F(GRPCClientTest, DiscardedWhenServerProcessing)
 }
 
 
-// This test verifies that a gRPC future fails properly when the runtime
+// This test verifies that a gRPC future is set with an error when the runtime
 // is shut down before the server responds.
 TEST_F(GRPCClientTest, ClientShutdown)
 {
@@ -317,7 +318,7 @@ TEST_F(GRPCClientTest, ClientShutdown)
 
   client::Runtime runtime;
 
-  Future<RpcResult<Pong>> pong =
+  Future<Try<Pong, StatusError>> pong =
     runtime.call(channel.get(), GRPC_RPC(PingPong, Send), Ping());
 
   AWAIT_READY(processed->future());
@@ -327,32 +328,39 @@ TEST_F(GRPCClientTest, ClientShutdown)
 
   shutdown->set(Nothing());
 
+  // TODO(chhsiao): The gRPC library returns a failure after the default
+  // timeout (5 seconds) is passed, no matter when the `CompletionQueue`
+  // is shut down. The timeout should be lowered once we support it.
   AWAIT_ASSERT_READY(pong);
-  EXPECT_FALSE(pong->status.ok());
+  EXPECT_ERROR(pong.get());
+  EXPECT_EQ(::grpc::DEADLINE_EXCEEDED, pong->error().status.error_code());
 
   ASSERT_SOME(server.shutdown());
 }
 
 
-// This test verifies that a gRPC future fails when it is unable to
-// connect to the server.
+// This test verifies that a gRPC future is set with an error when it is unable
+// to connect to the server.
 TEST_F(GRPCClientTest, ServerUnreachable)
 {
   Channel channel("nosuchhost");
   client::Runtime runtime;
 
-  Future<RpcResult<Pong>> pong =
+  Future<Try<Pong, StatusError>> pong =
     runtime.call(channel, GRPC_RPC(PingPong, Send), Ping());
 
   runtime.terminate();
   AWAIT_ASSERT_READY(runtime.wait());
 
+  // TODO(chhsiao): The gRPC library returns a failure after the default 
timeout
+  // (5 seconds) is passed. The timeout should be lowered once we support it.
   AWAIT_ASSERT_READY(pong);
-  EXPECT_FALSE(pong->status.ok());
+  EXPECT_ERROR(pong.get());
+  EXPECT_EQ(::grpc::DEADLINE_EXCEEDED, pong->error().status.error_code());
 }
 
 
-// This test verifies that a gRPC future fails properly when the server
+// This test verifies that a gRPC future is set with an error when the server
 // hangs when processing the given call.
 TEST_F(GRPCClientTest, ServerTimeout)
 {
@@ -372,14 +380,14 @@ TEST_F(GRPCClientTest, ServerTimeout)
 
   client::Runtime runtime;
 
-  Future<RpcResult<Pong>> pong =
+  Future<Try<Pong, StatusError>> pong =
     runtime.call(channel.get(), GRPC_RPC(PingPong, Send), Ping());
 
-  // TODO(chhsiao): The gRPC library returns a failure after the default
-  // timeout (5 seconds) is passed, no matter when the `CompletionQueue`
-  // is shut down. The timeout should be lowered once we support it.
+  // TODO(chhsiao): The gRPC library returns a failure after the default 
timeout
+  // (5 seconds) is passed. The timeout should be lowered once we support it.
   AWAIT_ASSERT_READY(pong);
-  EXPECT_FALSE(pong->status.ok());
+  EXPECT_ERROR(pong.get());
+  EXPECT_EQ(::grpc::DEADLINE_EXCEEDED, pong->error().status.error_code());
 
   done->set(Nothing());
 

Reply via email to