Repository: mesos
Updated Branches:
  refs/heads/master 21305ab47 -> 3d2a1fd49


Updated the CSI client to use the new libprocess gRPC interface.

This patch makes the following changes:
1. Replace `GRPC_RPC` with `GRPC_CLIENT_METHOD`.
2. Replace `process::grpc::Channel` with
   `process::grpc::client::Connection`.
3. Make the CSI client metods return a `Future` of a `Try`.

Review: https://reviews.apache.org/r/67158


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/3d2a1fd4
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/3d2a1fd4
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/3d2a1fd4

Branch: refs/heads/master
Commit: 3d2a1fd49dfe0325f4ca94e3abbab97366bcebd0
Parents: ddfea09
Author: Chun-Hung Hsiao <[email protected]>
Authored: Wed May 16 12:10:10 2018 -0700
Committer: Chun-Hung Hsiao <[email protected]>
Committed: Wed May 23 16:31:12 2018 -0700

----------------------------------------------------------------------
 src/csi/client.cpp             | 187 +++++++++++++-----------------------
 src/csi/client.hpp             |   6 +-
 src/tests/csi_client_tests.cpp |  19 ++--
 src/tests/mock_csi_plugin.cpp  |   8 +-
 src/tests/mock_csi_plugin.hpp  |   2 +-
 5 files changed, 84 insertions(+), 138 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/3d2a1fd4/src/csi/client.cpp
----------------------------------------------------------------------
diff --git a/src/csi/client.cpp b/src/csi/client.cpp
index 559e805..a4ba1f1 100644
--- a/src/csi/client.cpp
+++ b/src/csi/client.cpp
@@ -19,7 +19,7 @@
 using process::Failure;
 using process::Future;
 
-using process::grpc::RpcResult;
+using process::grpc::StatusError;
 
 namespace mesos {
 namespace csi {
@@ -29,14 +29,10 @@ Future<GetPluginInfoResponse> Client::GetPluginInfo(
     const GetPluginInfoRequest& request)
 {
   return runtime
-    .call(channel, GRPC_RPC(Identity, GetPluginInfo), request)
-    .then([](const RpcResult<GetPluginInfoResponse>& result)
+    .call(connection, GRPC_CLIENT_METHOD(Identity, GetPluginInfo), request)
+    .then([](const Try<GetPluginInfoResponse, StatusError>& result)
         -> Future<GetPluginInfoResponse> {
-      if (result.status.ok()) {
-        return result.response;
-      } else {
-        return Failure(result.status.error_message());
-      }
+      return result;
     });
 }
 
@@ -45,14 +41,13 @@ Future<GetPluginCapabilitiesResponse> 
Client::GetPluginCapabilities(
     const GetPluginCapabilitiesRequest& request)
 {
   return runtime
-    .call(channel, GRPC_RPC(Identity, GetPluginCapabilities), request)
-    .then([](const RpcResult<GetPluginCapabilitiesResponse>& result)
+    .call(
+        connection,
+        GRPC_CLIENT_METHOD(Identity, GetPluginCapabilities),
+        request)
+    .then([](const Try<GetPluginCapabilitiesResponse, StatusError>& result)
         -> Future<GetPluginCapabilitiesResponse> {
-      if (result.status.ok()) {
-        return result.response;
-      } else {
-        return Failure(result.status.error_message());
-      }
+      return result;
     });
 }
 
@@ -61,14 +56,10 @@ Future<ProbeResponse> Client::Probe(
     const ProbeRequest& request)
 {
   return runtime
-    .call(channel, GRPC_RPC(Identity, Probe), request)
-    .then([](const RpcResult<ProbeResponse>& result)
+    .call(connection, GRPC_CLIENT_METHOD(Identity, Probe), request)
+    .then([](const Try<ProbeResponse, StatusError>& result)
         -> Future<ProbeResponse> {
-      if (result.status.ok()) {
-        return result.response;
-      } else {
-        return Failure(result.status.error_message());
-      }
+      return result;
     });
 }
 
@@ -77,14 +68,10 @@ Future<CreateVolumeResponse> Client::CreateVolume(
     const CreateVolumeRequest& request)
 {
   return runtime
-    .call(channel, GRPC_RPC(Controller, CreateVolume), request)
-    .then([](const RpcResult<CreateVolumeResponse>& result)
+    .call(connection, GRPC_CLIENT_METHOD(Controller, CreateVolume), request)
+    .then([](const Try<CreateVolumeResponse, StatusError>& result)
         -> Future<CreateVolumeResponse> {
-      if (result.status.ok()) {
-        return result.response;
-      } else {
-        return Failure(result.status.error_message());
-      }
+      return result;
     });
 }
 
@@ -93,14 +80,10 @@ Future<DeleteVolumeResponse> Client::DeleteVolume(
     const DeleteVolumeRequest& request)
 {
   return runtime
-    .call(channel, GRPC_RPC(Controller, DeleteVolume), request)
-    .then([](const RpcResult<DeleteVolumeResponse>& result)
+    .call(connection, GRPC_CLIENT_METHOD(Controller, DeleteVolume), request)
+    .then([](const Try<DeleteVolumeResponse, StatusError>& result)
         -> Future<DeleteVolumeResponse> {
-      if (result.status.ok()) {
-        return result.response;
-      } else {
-        return Failure(result.status.error_message());
-      }
+      return result;
     });
 }
 
@@ -109,14 +92,13 @@ Future<ControllerPublishVolumeResponse> 
Client::ControllerPublishVolume(
     const ControllerPublishVolumeRequest& request)
 {
   return runtime
-    .call(channel, GRPC_RPC(Controller, ControllerPublishVolume), request)
-    .then([](const RpcResult<ControllerPublishVolumeResponse>& result)
+    .call(
+        connection,
+        GRPC_CLIENT_METHOD(Controller, ControllerPublishVolume),
+        request)
+    .then([](const Try<ControllerPublishVolumeResponse, StatusError>& result)
         -> Future<ControllerPublishVolumeResponse> {
-      if (result.status.ok()) {
-        return result.response;
-      } else {
-        return Failure(result.status.error_message());
-      }
+      return result;
     });
 }
 
@@ -125,14 +107,13 @@ Future<ControllerUnpublishVolumeResponse> 
Client::ControllerUnpublishVolume(
     const ControllerUnpublishVolumeRequest& request)
 {
   return runtime
-    .call(channel, GRPC_RPC(Controller, ControllerUnpublishVolume), request)
-    .then([](const RpcResult<ControllerUnpublishVolumeResponse>& result)
+    .call(
+        connection,
+        GRPC_CLIENT_METHOD(Controller, ControllerUnpublishVolume),
+        request)
+    .then([](const Try<ControllerUnpublishVolumeResponse, StatusError>& result)
         -> Future<ControllerUnpublishVolumeResponse> {
-      if (result.status.ok()) {
-        return result.response;
-      } else {
-        return Failure(result.status.error_message());
-      }
+      return result;
     });
 }
 
@@ -141,14 +122,13 @@ Future<ValidateVolumeCapabilitiesResponse> 
Client::ValidateVolumeCapabilities(
     const ValidateVolumeCapabilitiesRequest& request)
 {
   return runtime
-    .call(channel, GRPC_RPC(Controller, ValidateVolumeCapabilities), request)
-    .then([](const RpcResult<ValidateVolumeCapabilitiesResponse>& result)
+    .call(
+        connection,
+        GRPC_CLIENT_METHOD(Controller, ValidateVolumeCapabilities),
+        request)
+    .then([](const Try<ValidateVolumeCapabilitiesResponse, StatusError>& 
result)
         -> Future<ValidateVolumeCapabilitiesResponse> {
-      if (result.status.ok()) {
-        return result.response;
-      } else {
-        return Failure(result.status.error_message());
-      }
+      return result;
     });
 }
 
@@ -157,14 +137,10 @@ Future<ListVolumesResponse> Client::ListVolumes(
     const ListVolumesRequest& request)
 {
   return runtime
-    .call(channel, GRPC_RPC(Controller, ListVolumes), request)
-    .then([](const RpcResult<ListVolumesResponse>& result)
+    .call(connection, GRPC_CLIENT_METHOD(Controller, ListVolumes), request)
+    .then([](const Try<ListVolumesResponse, StatusError>& result)
         -> Future<ListVolumesResponse> {
-      if (result.status.ok()) {
-        return result.response;
-      } else {
-        return Failure(result.status.error_message());
-      }
+      return result;
     });
 }
 
@@ -173,14 +149,10 @@ Future<GetCapacityResponse> Client::GetCapacity(
     const GetCapacityRequest& request)
 {
   return runtime
-    .call(channel, GRPC_RPC(Controller, GetCapacity), request)
-    .then([](const RpcResult<GetCapacityResponse>& result)
+    .call(connection, GRPC_CLIENT_METHOD(Controller, GetCapacity), request)
+    .then([](const Try<GetCapacityResponse, StatusError>& result)
         -> Future<GetCapacityResponse> {
-      if (result.status.ok()) {
-        return result.response;
-      } else {
-        return Failure(result.status.error_message());
-      }
+      return result;
     });
 }
 
@@ -189,14 +161,13 @@ Future<ControllerGetCapabilitiesResponse> 
Client::ControllerGetCapabilities(
     const ControllerGetCapabilitiesRequest& request)
 {
   return runtime
-    .call(channel, GRPC_RPC(Controller, ControllerGetCapabilities), request)
-    .then([](const RpcResult<ControllerGetCapabilitiesResponse>& result)
+    .call(
+        connection,
+        GRPC_CLIENT_METHOD(Controller, ControllerGetCapabilities),
+        request)
+    .then([](const Try<ControllerGetCapabilitiesResponse, StatusError>& result)
         -> Future<ControllerGetCapabilitiesResponse> {
-      if (result.status.ok()) {
-        return result.response;
-      } else {
-        return Failure(result.status.error_message());
-      }
+      return result;
     });
 }
 
@@ -205,14 +176,10 @@ Future<NodeStageVolumeResponse> Client::NodeStageVolume(
     const NodeStageVolumeRequest& request)
 {
   return runtime
-    .call(channel, GRPC_RPC(Node, NodeStageVolume), request)
-    .then([](const RpcResult<NodeStageVolumeResponse>& result)
+    .call(connection, GRPC_CLIENT_METHOD(Node, NodeStageVolume), request)
+    .then([](const Try<NodeStageVolumeResponse, StatusError>& result)
         -> Future<NodeStageVolumeResponse> {
-      if (result.status.ok()) {
-        return result.response;
-      } else {
-        return Failure(result.status.error_message());
-      }
+      return result;
     });
 }
 
@@ -221,14 +188,10 @@ Future<NodeUnstageVolumeResponse> 
Client::NodeUnstageVolume(
     const NodeUnstageVolumeRequest& request)
 {
   return runtime
-    .call(channel, GRPC_RPC(Node, NodeUnstageVolume), request)
-    .then([](const RpcResult<NodeUnstageVolumeResponse>& result)
+    .call(connection, GRPC_CLIENT_METHOD(Node, NodeUnstageVolume), request)
+    .then([](const Try<NodeUnstageVolumeResponse, StatusError>& result)
         -> Future<NodeUnstageVolumeResponse> {
-      if (result.status.ok()) {
-        return result.response;
-      } else {
-        return Failure(result.status.error_message());
-      }
+      return result;
     });
 }
 
@@ -237,14 +200,10 @@ Future<NodePublishVolumeResponse> 
Client::NodePublishVolume(
     const NodePublishVolumeRequest& request)
 {
   return runtime
-    .call(channel, GRPC_RPC(Node, NodePublishVolume), request)
-    .then([](const RpcResult<NodePublishVolumeResponse>& result)
+    .call(connection, GRPC_CLIENT_METHOD(Node, NodePublishVolume), request)
+    .then([](const Try<NodePublishVolumeResponse, StatusError>& result)
         -> Future<NodePublishVolumeResponse> {
-      if (result.status.ok()) {
-        return result.response;
-      } else {
-        return Failure(result.status.error_message());
-      }
+      return result;
     });
 }
 
@@ -253,14 +212,10 @@ Future<NodeUnpublishVolumeResponse> 
Client::NodeUnpublishVolume(
     const NodeUnpublishVolumeRequest& request)
 {
   return runtime
-    .call(channel, GRPC_RPC(Node, NodeUnpublishVolume), request)
-    .then([](const RpcResult<NodeUnpublishVolumeResponse>& result)
+    .call(connection, GRPC_CLIENT_METHOD(Node, NodeUnpublishVolume), request)
+    .then([](const Try<NodeUnpublishVolumeResponse, StatusError>& result)
         -> Future<NodeUnpublishVolumeResponse> {
-      if (result.status.ok()) {
-        return result.response;
-      } else {
-        return Failure(result.status.error_message());
-      }
+      return result;
     });
 }
 
@@ -269,14 +224,10 @@ Future<NodeGetIdResponse> Client::NodeGetId(
     const NodeGetIdRequest& request)
 {
   return runtime
-    .call(channel, GRPC_RPC(Node, NodeGetId), request)
-    .then([](const RpcResult<NodeGetIdResponse>& result)
+    .call(connection, GRPC_CLIENT_METHOD(Node, NodeGetId), request)
+    .then([](const Try<NodeGetIdResponse, StatusError>& result)
         -> Future<NodeGetIdResponse> {
-      if (result.status.ok()) {
-        return result.response;
-      } else {
-        return Failure(result.status.error_message());
-      }
+      return result;
     });
 }
 
@@ -285,14 +236,10 @@ Future<NodeGetCapabilitiesResponse> 
Client::NodeGetCapabilities(
     const NodeGetCapabilitiesRequest& request)
 {
   return runtime
-    .call(channel, GRPC_RPC(Node, NodeGetCapabilities), request)
-    .then([](const RpcResult<NodeGetCapabilitiesResponse>& result)
+    .call(connection, GRPC_CLIENT_METHOD(Node, NodeGetCapabilities), request)
+    .then([](const Try<NodeGetCapabilitiesResponse, StatusError>& result)
         -> Future<NodeGetCapabilitiesResponse> {
-      if (result.status.ok()) {
-        return result.response;
-      } else {
-        return Failure(result.status.error_message());
-      }
+      return result;
     });
 }
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/3d2a1fd4/src/csi/client.hpp
----------------------------------------------------------------------
diff --git a/src/csi/client.hpp b/src/csi/client.hpp
index 5d84674..9d7019a 100644
--- a/src/csi/client.hpp
+++ b/src/csi/client.hpp
@@ -30,9 +30,9 @@ namespace v0 {
 class Client
 {
 public:
-  Client(const process::grpc::Channel& _channel,
+  Client(const process::grpc::client::Connection& _connection,
          const process::grpc::client::Runtime& _runtime)
-    : channel(_channel), runtime(_runtime) {}
+    : connection(_connection), runtime(_runtime) {}
 
   // RPCs for the Identity service.
   process::Future<GetPluginInfoResponse>
@@ -90,7 +90,7 @@ public:
     NodeGetCapabilities(const NodeGetCapabilitiesRequest& request);
 
 private:
-  process::grpc::Channel channel;
+  process::grpc::client::Connection connection;
   process::grpc::client::Runtime runtime;
 };
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/3d2a1fd4/src/tests/csi_client_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/csi_client_tests.cpp b/src/tests/csi_client_tests.cpp
index f5b9eac..d5993d6 100644
--- a/src/tests/csi_client_tests.cpp
+++ b/src/tests/csi_client_tests.cpp
@@ -32,8 +32,7 @@ using mesos::csi::v0::Client;
 
 using process::Future;
 
-using process::grpc::Channel;
-
+using process::grpc::client::Connection;
 using process::grpc::client::Runtime;
 
 using testing::TestParamInfo;
@@ -57,13 +56,13 @@ struct RPCParam
   template <typename Request, typename Response>
   RPCParam(const string& _name, Future<Response>(Client::*rpc)(const Request&))
     : name(_name),
-      call([=](const Channel& channel, const Runtime runtime) {
-        return (Client(channel, runtime).*rpc)(Request())
+      call([=](const Connection& connection, const Runtime runtime) {
+        return (Client(connection, runtime).*rpc)(Request())
           .then([] { return Nothing(); });
       }) {}
 
   string name;
-  lambda::function<Future<Nothing>(const Channel&, const Runtime&)> call;
+  lambda::function<Future<Nothing>(const Connection&, const Runtime&)> call;
 };
 
 
@@ -76,10 +75,10 @@ protected:
   {
     TemporaryDirectoryTest::SetUp();
 
-    Try<Channel> _channel = plugin.startup();
-    ASSERT_SOME(_channel);
+    Try<Connection> _connection = plugin.startup();
+    ASSERT_SOME(_connection);
 
-    channel = _channel.get();
+    connection = _connection.get();
   }
 
   virtual void TearDown() override
@@ -91,7 +90,7 @@ protected:
   }
 
   MockCSIPlugin plugin;
-  Option<process::grpc::Channel> channel;
+  Option<process::grpc::client::Connection> connection;
   process::grpc::client::Runtime runtime;
 };
 
@@ -139,7 +138,7 @@ INSTANTIATE_TEST_CASE_P(
 // This test verifies that the all methods of CSI clients work.
 TEST_P(CSIClientTest, Call)
 {
-  Future<Nothing> call = GetParam().call(channel.get(), runtime);
+  Future<Nothing> call = GetParam().call(connection.get(), runtime);
   AWAIT_EXPECT_READY(call);
 }
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/3d2a1fd4/src/tests/mock_csi_plugin.cpp
----------------------------------------------------------------------
diff --git a/src/tests/mock_csi_plugin.cpp b/src/tests/mock_csi_plugin.cpp
index 6983b84..17c6335 100644
--- a/src/tests/mock_csi_plugin.cpp
+++ b/src/tests/mock_csi_plugin.cpp
@@ -30,7 +30,7 @@ using mesos::csi::v0::Controller;
 using mesos::csi::v0::Identity;
 using mesos::csi::v0::Node;
 
-using process::grpc::Channel;
+using process::grpc::client::Connection;
 
 using testing::_;
 using testing::Return;
@@ -49,7 +49,7 @@ MockCSIPlugin::MockCSIPlugin()
 }
 
 
-Try<Channel> MockCSIPlugin::startup(const Option<string>& address)
+Try<Connection> MockCSIPlugin::startup(const Option<string>& address)
 {
   ServerBuilder builder;
 
@@ -67,8 +67,8 @@ Try<Channel> MockCSIPlugin::startup(const Option<string>& 
address)
   }
 
   return address.isSome()
-    ? Channel(address.get())
-    : Channel(server->InProcessChannel(ChannelArguments()));
+    ? Connection(address.get())
+    : Connection(server->InProcessChannel(ChannelArguments()));
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/3d2a1fd4/src/tests/mock_csi_plugin.hpp
----------------------------------------------------------------------
diff --git a/src/tests/mock_csi_plugin.hpp b/src/tests/mock_csi_plugin.hpp
index 6f7a5ab..4642326 100644
--- a/src/tests/mock_csi_plugin.hpp
+++ b/src/tests/mock_csi_plugin.hpp
@@ -72,7 +72,7 @@ public:
 
   CSI_METHOD_FOREACH(DECLARE_MOCK_CSI_METHOD)
 
-  Try<process::grpc::Channel> startup(
+  Try<process::grpc::client::Connection> startup(
       const Option<std::string>& address = None());
   Try<Nothing> shutdown();
 

Reply via email to