Repository: mesos Updated Branches: refs/heads/master 21305ab47 -> 3d2a1fd49
Updated the CSI client to use the new libprocess gRPC interface. This patch makes the following changes: 1. Replace `GRPC_RPC` with `GRPC_CLIENT_METHOD`. 2. Replace `process::grpc::Channel` with `process::grpc::client::Connection`. 3. Make the CSI client metods return a `Future` of a `Try`. Review: https://reviews.apache.org/r/67158 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/3d2a1fd4 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/3d2a1fd4 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/3d2a1fd4 Branch: refs/heads/master Commit: 3d2a1fd49dfe0325f4ca94e3abbab97366bcebd0 Parents: ddfea09 Author: Chun-Hung Hsiao <[email protected]> Authored: Wed May 16 12:10:10 2018 -0700 Committer: Chun-Hung Hsiao <[email protected]> Committed: Wed May 23 16:31:12 2018 -0700 ---------------------------------------------------------------------- src/csi/client.cpp | 187 +++++++++++++----------------------- src/csi/client.hpp | 6 +- src/tests/csi_client_tests.cpp | 19 ++-- src/tests/mock_csi_plugin.cpp | 8 +- src/tests/mock_csi_plugin.hpp | 2 +- 5 files changed, 84 insertions(+), 138 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/3d2a1fd4/src/csi/client.cpp ---------------------------------------------------------------------- diff --git a/src/csi/client.cpp b/src/csi/client.cpp index 559e805..a4ba1f1 100644 --- a/src/csi/client.cpp +++ b/src/csi/client.cpp @@ -19,7 +19,7 @@ using process::Failure; using process::Future; -using process::grpc::RpcResult; +using process::grpc::StatusError; namespace mesos { namespace csi { @@ -29,14 +29,10 @@ Future<GetPluginInfoResponse> Client::GetPluginInfo( const GetPluginInfoRequest& request) { return runtime - .call(channel, GRPC_RPC(Identity, GetPluginInfo), request) - .then([](const RpcResult<GetPluginInfoResponse>& result) + .call(connection, GRPC_CLIENT_METHOD(Identity, GetPluginInfo), request) + .then([](const Try<GetPluginInfoResponse, StatusError>& result) -> Future<GetPluginInfoResponse> { - if (result.status.ok()) { - return result.response; - } else { - return Failure(result.status.error_message()); - } + return result; }); } @@ -45,14 +41,13 @@ Future<GetPluginCapabilitiesResponse> Client::GetPluginCapabilities( const GetPluginCapabilitiesRequest& request) { return runtime - .call(channel, GRPC_RPC(Identity, GetPluginCapabilities), request) - .then([](const RpcResult<GetPluginCapabilitiesResponse>& result) + .call( + connection, + GRPC_CLIENT_METHOD(Identity, GetPluginCapabilities), + request) + .then([](const Try<GetPluginCapabilitiesResponse, StatusError>& result) -> Future<GetPluginCapabilitiesResponse> { - if (result.status.ok()) { - return result.response; - } else { - return Failure(result.status.error_message()); - } + return result; }); } @@ -61,14 +56,10 @@ Future<ProbeResponse> Client::Probe( const ProbeRequest& request) { return runtime - .call(channel, GRPC_RPC(Identity, Probe), request) - .then([](const RpcResult<ProbeResponse>& result) + .call(connection, GRPC_CLIENT_METHOD(Identity, Probe), request) + .then([](const Try<ProbeResponse, StatusError>& result) -> Future<ProbeResponse> { - if (result.status.ok()) { - return result.response; - } else { - return Failure(result.status.error_message()); - } + return result; }); } @@ -77,14 +68,10 @@ Future<CreateVolumeResponse> Client::CreateVolume( const CreateVolumeRequest& request) { return runtime - .call(channel, GRPC_RPC(Controller, CreateVolume), request) - .then([](const RpcResult<CreateVolumeResponse>& result) + .call(connection, GRPC_CLIENT_METHOD(Controller, CreateVolume), request) + .then([](const Try<CreateVolumeResponse, StatusError>& result) -> Future<CreateVolumeResponse> { - if (result.status.ok()) { - return result.response; - } else { - return Failure(result.status.error_message()); - } + return result; }); } @@ -93,14 +80,10 @@ Future<DeleteVolumeResponse> Client::DeleteVolume( const DeleteVolumeRequest& request) { return runtime - .call(channel, GRPC_RPC(Controller, DeleteVolume), request) - .then([](const RpcResult<DeleteVolumeResponse>& result) + .call(connection, GRPC_CLIENT_METHOD(Controller, DeleteVolume), request) + .then([](const Try<DeleteVolumeResponse, StatusError>& result) -> Future<DeleteVolumeResponse> { - if (result.status.ok()) { - return result.response; - } else { - return Failure(result.status.error_message()); - } + return result; }); } @@ -109,14 +92,13 @@ Future<ControllerPublishVolumeResponse> Client::ControllerPublishVolume( const ControllerPublishVolumeRequest& request) { return runtime - .call(channel, GRPC_RPC(Controller, ControllerPublishVolume), request) - .then([](const RpcResult<ControllerPublishVolumeResponse>& result) + .call( + connection, + GRPC_CLIENT_METHOD(Controller, ControllerPublishVolume), + request) + .then([](const Try<ControllerPublishVolumeResponse, StatusError>& result) -> Future<ControllerPublishVolumeResponse> { - if (result.status.ok()) { - return result.response; - } else { - return Failure(result.status.error_message()); - } + return result; }); } @@ -125,14 +107,13 @@ Future<ControllerUnpublishVolumeResponse> Client::ControllerUnpublishVolume( const ControllerUnpublishVolumeRequest& request) { return runtime - .call(channel, GRPC_RPC(Controller, ControllerUnpublishVolume), request) - .then([](const RpcResult<ControllerUnpublishVolumeResponse>& result) + .call( + connection, + GRPC_CLIENT_METHOD(Controller, ControllerUnpublishVolume), + request) + .then([](const Try<ControllerUnpublishVolumeResponse, StatusError>& result) -> Future<ControllerUnpublishVolumeResponse> { - if (result.status.ok()) { - return result.response; - } else { - return Failure(result.status.error_message()); - } + return result; }); } @@ -141,14 +122,13 @@ Future<ValidateVolumeCapabilitiesResponse> Client::ValidateVolumeCapabilities( const ValidateVolumeCapabilitiesRequest& request) { return runtime - .call(channel, GRPC_RPC(Controller, ValidateVolumeCapabilities), request) - .then([](const RpcResult<ValidateVolumeCapabilitiesResponse>& result) + .call( + connection, + GRPC_CLIENT_METHOD(Controller, ValidateVolumeCapabilities), + request) + .then([](const Try<ValidateVolumeCapabilitiesResponse, StatusError>& result) -> Future<ValidateVolumeCapabilitiesResponse> { - if (result.status.ok()) { - return result.response; - } else { - return Failure(result.status.error_message()); - } + return result; }); } @@ -157,14 +137,10 @@ Future<ListVolumesResponse> Client::ListVolumes( const ListVolumesRequest& request) { return runtime - .call(channel, GRPC_RPC(Controller, ListVolumes), request) - .then([](const RpcResult<ListVolumesResponse>& result) + .call(connection, GRPC_CLIENT_METHOD(Controller, ListVolumes), request) + .then([](const Try<ListVolumesResponse, StatusError>& result) -> Future<ListVolumesResponse> { - if (result.status.ok()) { - return result.response; - } else { - return Failure(result.status.error_message()); - } + return result; }); } @@ -173,14 +149,10 @@ Future<GetCapacityResponse> Client::GetCapacity( const GetCapacityRequest& request) { return runtime - .call(channel, GRPC_RPC(Controller, GetCapacity), request) - .then([](const RpcResult<GetCapacityResponse>& result) + .call(connection, GRPC_CLIENT_METHOD(Controller, GetCapacity), request) + .then([](const Try<GetCapacityResponse, StatusError>& result) -> Future<GetCapacityResponse> { - if (result.status.ok()) { - return result.response; - } else { - return Failure(result.status.error_message()); - } + return result; }); } @@ -189,14 +161,13 @@ Future<ControllerGetCapabilitiesResponse> Client::ControllerGetCapabilities( const ControllerGetCapabilitiesRequest& request) { return runtime - .call(channel, GRPC_RPC(Controller, ControllerGetCapabilities), request) - .then([](const RpcResult<ControllerGetCapabilitiesResponse>& result) + .call( + connection, + GRPC_CLIENT_METHOD(Controller, ControllerGetCapabilities), + request) + .then([](const Try<ControllerGetCapabilitiesResponse, StatusError>& result) -> Future<ControllerGetCapabilitiesResponse> { - if (result.status.ok()) { - return result.response; - } else { - return Failure(result.status.error_message()); - } + return result; }); } @@ -205,14 +176,10 @@ Future<NodeStageVolumeResponse> Client::NodeStageVolume( const NodeStageVolumeRequest& request) { return runtime - .call(channel, GRPC_RPC(Node, NodeStageVolume), request) - .then([](const RpcResult<NodeStageVolumeResponse>& result) + .call(connection, GRPC_CLIENT_METHOD(Node, NodeStageVolume), request) + .then([](const Try<NodeStageVolumeResponse, StatusError>& result) -> Future<NodeStageVolumeResponse> { - if (result.status.ok()) { - return result.response; - } else { - return Failure(result.status.error_message()); - } + return result; }); } @@ -221,14 +188,10 @@ Future<NodeUnstageVolumeResponse> Client::NodeUnstageVolume( const NodeUnstageVolumeRequest& request) { return runtime - .call(channel, GRPC_RPC(Node, NodeUnstageVolume), request) - .then([](const RpcResult<NodeUnstageVolumeResponse>& result) + .call(connection, GRPC_CLIENT_METHOD(Node, NodeUnstageVolume), request) + .then([](const Try<NodeUnstageVolumeResponse, StatusError>& result) -> Future<NodeUnstageVolumeResponse> { - if (result.status.ok()) { - return result.response; - } else { - return Failure(result.status.error_message()); - } + return result; }); } @@ -237,14 +200,10 @@ Future<NodePublishVolumeResponse> Client::NodePublishVolume( const NodePublishVolumeRequest& request) { return runtime - .call(channel, GRPC_RPC(Node, NodePublishVolume), request) - .then([](const RpcResult<NodePublishVolumeResponse>& result) + .call(connection, GRPC_CLIENT_METHOD(Node, NodePublishVolume), request) + .then([](const Try<NodePublishVolumeResponse, StatusError>& result) -> Future<NodePublishVolumeResponse> { - if (result.status.ok()) { - return result.response; - } else { - return Failure(result.status.error_message()); - } + return result; }); } @@ -253,14 +212,10 @@ Future<NodeUnpublishVolumeResponse> Client::NodeUnpublishVolume( const NodeUnpublishVolumeRequest& request) { return runtime - .call(channel, GRPC_RPC(Node, NodeUnpublishVolume), request) - .then([](const RpcResult<NodeUnpublishVolumeResponse>& result) + .call(connection, GRPC_CLIENT_METHOD(Node, NodeUnpublishVolume), request) + .then([](const Try<NodeUnpublishVolumeResponse, StatusError>& result) -> Future<NodeUnpublishVolumeResponse> { - if (result.status.ok()) { - return result.response; - } else { - return Failure(result.status.error_message()); - } + return result; }); } @@ -269,14 +224,10 @@ Future<NodeGetIdResponse> Client::NodeGetId( const NodeGetIdRequest& request) { return runtime - .call(channel, GRPC_RPC(Node, NodeGetId), request) - .then([](const RpcResult<NodeGetIdResponse>& result) + .call(connection, GRPC_CLIENT_METHOD(Node, NodeGetId), request) + .then([](const Try<NodeGetIdResponse, StatusError>& result) -> Future<NodeGetIdResponse> { - if (result.status.ok()) { - return result.response; - } else { - return Failure(result.status.error_message()); - } + return result; }); } @@ -285,14 +236,10 @@ Future<NodeGetCapabilitiesResponse> Client::NodeGetCapabilities( const NodeGetCapabilitiesRequest& request) { return runtime - .call(channel, GRPC_RPC(Node, NodeGetCapabilities), request) - .then([](const RpcResult<NodeGetCapabilitiesResponse>& result) + .call(connection, GRPC_CLIENT_METHOD(Node, NodeGetCapabilities), request) + .then([](const Try<NodeGetCapabilitiesResponse, StatusError>& result) -> Future<NodeGetCapabilitiesResponse> { - if (result.status.ok()) { - return result.response; - } else { - return Failure(result.status.error_message()); - } + return result; }); } http://git-wip-us.apache.org/repos/asf/mesos/blob/3d2a1fd4/src/csi/client.hpp ---------------------------------------------------------------------- diff --git a/src/csi/client.hpp b/src/csi/client.hpp index 5d84674..9d7019a 100644 --- a/src/csi/client.hpp +++ b/src/csi/client.hpp @@ -30,9 +30,9 @@ namespace v0 { class Client { public: - Client(const process::grpc::Channel& _channel, + Client(const process::grpc::client::Connection& _connection, const process::grpc::client::Runtime& _runtime) - : channel(_channel), runtime(_runtime) {} + : connection(_connection), runtime(_runtime) {} // RPCs for the Identity service. process::Future<GetPluginInfoResponse> @@ -90,7 +90,7 @@ public: NodeGetCapabilities(const NodeGetCapabilitiesRequest& request); private: - process::grpc::Channel channel; + process::grpc::client::Connection connection; process::grpc::client::Runtime runtime; }; http://git-wip-us.apache.org/repos/asf/mesos/blob/3d2a1fd4/src/tests/csi_client_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/csi_client_tests.cpp b/src/tests/csi_client_tests.cpp index f5b9eac..d5993d6 100644 --- a/src/tests/csi_client_tests.cpp +++ b/src/tests/csi_client_tests.cpp @@ -32,8 +32,7 @@ using mesos::csi::v0::Client; using process::Future; -using process::grpc::Channel; - +using process::grpc::client::Connection; using process::grpc::client::Runtime; using testing::TestParamInfo; @@ -57,13 +56,13 @@ struct RPCParam template <typename Request, typename Response> RPCParam(const string& _name, Future<Response>(Client::*rpc)(const Request&)) : name(_name), - call([=](const Channel& channel, const Runtime runtime) { - return (Client(channel, runtime).*rpc)(Request()) + call([=](const Connection& connection, const Runtime runtime) { + return (Client(connection, runtime).*rpc)(Request()) .then([] { return Nothing(); }); }) {} string name; - lambda::function<Future<Nothing>(const Channel&, const Runtime&)> call; + lambda::function<Future<Nothing>(const Connection&, const Runtime&)> call; }; @@ -76,10 +75,10 @@ protected: { TemporaryDirectoryTest::SetUp(); - Try<Channel> _channel = plugin.startup(); - ASSERT_SOME(_channel); + Try<Connection> _connection = plugin.startup(); + ASSERT_SOME(_connection); - channel = _channel.get(); + connection = _connection.get(); } virtual void TearDown() override @@ -91,7 +90,7 @@ protected: } MockCSIPlugin plugin; - Option<process::grpc::Channel> channel; + Option<process::grpc::client::Connection> connection; process::grpc::client::Runtime runtime; }; @@ -139,7 +138,7 @@ INSTANTIATE_TEST_CASE_P( // This test verifies that the all methods of CSI clients work. TEST_P(CSIClientTest, Call) { - Future<Nothing> call = GetParam().call(channel.get(), runtime); + Future<Nothing> call = GetParam().call(connection.get(), runtime); AWAIT_EXPECT_READY(call); } http://git-wip-us.apache.org/repos/asf/mesos/blob/3d2a1fd4/src/tests/mock_csi_plugin.cpp ---------------------------------------------------------------------- diff --git a/src/tests/mock_csi_plugin.cpp b/src/tests/mock_csi_plugin.cpp index 6983b84..17c6335 100644 --- a/src/tests/mock_csi_plugin.cpp +++ b/src/tests/mock_csi_plugin.cpp @@ -30,7 +30,7 @@ using mesos::csi::v0::Controller; using mesos::csi::v0::Identity; using mesos::csi::v0::Node; -using process::grpc::Channel; +using process::grpc::client::Connection; using testing::_; using testing::Return; @@ -49,7 +49,7 @@ MockCSIPlugin::MockCSIPlugin() } -Try<Channel> MockCSIPlugin::startup(const Option<string>& address) +Try<Connection> MockCSIPlugin::startup(const Option<string>& address) { ServerBuilder builder; @@ -67,8 +67,8 @@ Try<Channel> MockCSIPlugin::startup(const Option<string>& address) } return address.isSome() - ? Channel(address.get()) - : Channel(server->InProcessChannel(ChannelArguments())); + ? Connection(address.get()) + : Connection(server->InProcessChannel(ChannelArguments())); } http://git-wip-us.apache.org/repos/asf/mesos/blob/3d2a1fd4/src/tests/mock_csi_plugin.hpp ---------------------------------------------------------------------- diff --git a/src/tests/mock_csi_plugin.hpp b/src/tests/mock_csi_plugin.hpp index 6f7a5ab..4642326 100644 --- a/src/tests/mock_csi_plugin.hpp +++ b/src/tests/mock_csi_plugin.hpp @@ -72,7 +72,7 @@ public: CSI_METHOD_FOREACH(DECLARE_MOCK_CSI_METHOD) - Try<process::grpc::Channel> startup( + Try<process::grpc::client::Connection> startup( const Option<std::string>& address = None()); Try<Nothing> shutdown();
