This is an automated email from the ASF dual-hosted git repository. chhsiao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/mesos.git
commit f728ce3b5014993db54d5bf6472fcecb3c098934 Author: Chun-Hung Hsiao <[email protected]> AuthorDate: Mon Mar 25 21:34:02 2019 -0700 Cleaned up `mesos::csi::v0::Client` interface. Since per-CSI-call metrics are no longer implemented, there is very less value to keep the `mesos::csi::v0::RPC` enum, which is tightly coupled with `mesos::csi::v0::Client`. Therefore, the enum and its header are removed. The header and implementation file for `mesos::csi::v0::Client` is also renamed for future CSI v1 support. Review: https://reviews.apache.org/r/70321 --- src/CMakeLists.txt | 3 +- src/Makefile.am | 6 +- src/csi/client.hpp | 142 -------------- src/csi/rpc.cpp | 105 ---------- src/csi/rpc.hpp | 212 --------------------- src/csi/service_manager.cpp | 8 +- src/csi/{client.cpp => v0_client.cpp} | 107 ++++------- src/csi/v0_client.hpp | 101 ++++++++++ src/csi/v0_volume_manager.cpp | 131 +++++++------ src/csi/v0_volume_manager_process.hpp | 30 +-- src/tests/csi_client_tests.cpp | 88 ++++----- .../storage_local_resource_provider_tests.cpp | 27 ++- 12 files changed, 280 insertions(+), 680 deletions(-) diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 6ef0c8c..af3715a 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -232,13 +232,12 @@ set(COMMON_SRC common/values.cpp) set(CSI_SRC - csi/client.cpp csi/types.cpp csi/metrics.cpp csi/paths.cpp - csi/rpc.cpp csi/service_manager.cpp csi/v0.cpp + csi/v0_client.cpp csi/v0_utils.cpp csi/v0_volume_manager.cpp csi/volume_manager.cpp) diff --git a/src/Makefile.am b/src/Makefile.am index 61ded56..a800444 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -1560,20 +1560,18 @@ libmesos_no_3rdparty_la_LIBADD += libbuild.la # Convenience library for build the CSI client. noinst_LTLIBRARIES += libcsi.la libcsi_la_SOURCES = \ - csi/client.cpp \ - csi/client.hpp \ csi/types.cpp \ csi/metrics.cpp \ csi/metrics.hpp \ csi/paths.cpp \ csi/paths.hpp \ - csi/rpc.cpp \ - csi/rpc.hpp \ csi/service_manager.cpp \ csi/service_manager.hpp \ csi/state.hpp \ csi/state.proto \ csi/v0.cpp \ + csi/v0_client.cpp \ + csi/v0_client.hpp \ csi/v0_utils.cpp \ csi/v0_utils.hpp \ csi/v0_volume_manager.cpp \ diff --git a/src/csi/client.hpp b/src/csi/client.hpp deleted file mode 100644 index 1429526..0000000 --- a/src/csi/client.hpp +++ /dev/null @@ -1,142 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#ifndef __CSI_CLIENT_HPP__ -#define __CSI_CLIENT_HPP__ - -#include <mesos/csi/v0.hpp> - -#include <process/grpc.hpp> - -#include "csi/rpc.hpp" - -namespace mesos { -namespace csi { -namespace v0 { - -class Client -{ -public: - Client(const process::grpc::client::Connection& _connection, - const process::grpc::client::Runtime& _runtime) - : connection(_connection), runtime(_runtime) {} - - template <RPC rpc> - process::Future<Try<Response<rpc>, process::grpc::StatusError>> call( - Request<rpc> request); - -private: - process::grpc::client::Connection connection; - process::grpc::client::Runtime runtime; -}; - - -template <> -process::Future<Try<GetPluginInfoResponse, process::grpc::StatusError>> -Client::call<GET_PLUGIN_INFO>(GetPluginInfoRequest request); - - -template <> -process::Future<Try<GetPluginCapabilitiesResponse, process::grpc::StatusError>> -Client::call<GET_PLUGIN_CAPABILITIES>(GetPluginCapabilitiesRequest request); - - -template <> -process::Future<Try<ProbeResponse, process::grpc::StatusError>> -Client::call<PROBE>(ProbeRequest request); - - -template <> -process::Future<Try<CreateVolumeResponse, process::grpc::StatusError>> -Client::call<CREATE_VOLUME>(CreateVolumeRequest request); - - -template <> -process::Future<Try<DeleteVolumeResponse, process::grpc::StatusError>> -Client::call<DELETE_VOLUME>(DeleteVolumeRequest request); - - -template <> -process::Future< - Try<ControllerPublishVolumeResponse, process::grpc::StatusError>> -Client::call<CONTROLLER_PUBLISH_VOLUME>(ControllerPublishVolumeRequest request); - - -template <> -process::Future< - Try<ControllerUnpublishVolumeResponse, process::grpc::StatusError>> -Client::call<CONTROLLER_UNPUBLISH_VOLUME>( - ControllerUnpublishVolumeRequest request); - - -template <> -process::Future< - Try<ValidateVolumeCapabilitiesResponse, process::grpc::StatusError>> -Client::call<VALIDATE_VOLUME_CAPABILITIES>( - ValidateVolumeCapabilitiesRequest request); - - -template <> -process::Future<Try<ListVolumesResponse, process::grpc::StatusError>> -Client::call<LIST_VOLUMES>(ListVolumesRequest request); - - -template <> -process::Future<Try<GetCapacityResponse, process::grpc::StatusError>> -Client::call<GET_CAPACITY>(GetCapacityRequest request); - - -template <> -process::Future< - Try<ControllerGetCapabilitiesResponse, process::grpc::StatusError>> -Client::call<CONTROLLER_GET_CAPABILITIES>( - ControllerGetCapabilitiesRequest request); - - -template <> -process::Future<Try<NodeStageVolumeResponse, process::grpc::StatusError>> -Client::call<NODE_STAGE_VOLUME>(NodeStageVolumeRequest request); - - -template <> -process::Future<Try<NodeUnstageVolumeResponse, process::grpc::StatusError>> -Client::call<NODE_UNSTAGE_VOLUME>(NodeUnstageVolumeRequest request); - - -template <> -process::Future<Try<NodePublishVolumeResponse, process::grpc::StatusError>> -Client::call<NODE_PUBLISH_VOLUME>(NodePublishVolumeRequest request); - - -template <> -process::Future<Try<NodeUnpublishVolumeResponse, process::grpc::StatusError>> -Client::call<NODE_UNPUBLISH_VOLUME>(NodeUnpublishVolumeRequest request); - - -template <> -process::Future<Try<NodeGetIdResponse, process::grpc::StatusError>> -Client::call<NODE_GET_ID>(NodeGetIdRequest request); - - -template <> -process::Future<Try<NodeGetCapabilitiesResponse, process::grpc::StatusError>> -Client::call<NODE_GET_CAPABILITIES>(NodeGetCapabilitiesRequest request); - -} // namespace v0 { -} // namespace csi { -} // namespace mesos { - -#endif // __CSI_CLIENT_HPP__ diff --git a/src/csi/rpc.cpp b/src/csi/rpc.cpp deleted file mode 100644 index e8a2770..0000000 --- a/src/csi/rpc.cpp +++ /dev/null @@ -1,105 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include "csi/rpc.hpp" - -#include <stout/unreachable.hpp> - -using std::ostream; - -namespace mesos { -namespace csi { -namespace v0 { - -ostream& operator<<(ostream& stream, const RPC& rpc) -{ - switch (rpc) { - case GET_PLUGIN_INFO: - return stream - << Identity::service_full_name() - << ".GetPluginInfo"; - case GET_PLUGIN_CAPABILITIES: - return stream - << Identity::service_full_name() - << ".GetPluginCapabilities"; - case PROBE: - return stream - << Identity::service_full_name() - << ".Probe"; - case CREATE_VOLUME: - return stream - << Controller::service_full_name() - << ".CreateVolume"; - case DELETE_VOLUME: - return stream - << Controller::service_full_name() - << ".DeleteVolume"; - case CONTROLLER_PUBLISH_VOLUME: - return stream - << Controller::service_full_name() - << ".ControllerPublishVolume"; - case CONTROLLER_UNPUBLISH_VOLUME: - return stream - << Controller::service_full_name() - << ".ControllerUnpublishVolume"; - case VALIDATE_VOLUME_CAPABILITIES: - return stream - << Controller::service_full_name() - << ".ValidateVolumeCapabilities"; - case LIST_VOLUMES: - return stream - << Controller::service_full_name() - << ".ListVolumes"; - case GET_CAPACITY: - return stream - << Controller::service_full_name() - << ".GetCapacity"; - case CONTROLLER_GET_CAPABILITIES: - return stream - << Controller::service_full_name() - << ".ControllerGetCapabilities"; - case NODE_STAGE_VOLUME: - return stream - << Node::service_full_name() - << ".NodeStageVolume"; - case NODE_UNSTAGE_VOLUME: - return stream - << Node::service_full_name() - << ".NodeUnstageVolume"; - case NODE_PUBLISH_VOLUME: - return stream - << Node::service_full_name() - << ".NodePublishVolume"; - case NODE_UNPUBLISH_VOLUME: - return stream - << Node::service_full_name() - << ".NodeUnpublishVolume"; - case NODE_GET_ID: - return stream - << Node::service_full_name() - << ".NodeGetId"; - case NODE_GET_CAPABILITIES: - return stream - << Node::service_full_name() - << ".NodeGetCapabilities"; - } - - UNREACHABLE(); -} - -} // namespace v0 { -} // namespace csi { -} // namespace mesos { diff --git a/src/csi/rpc.hpp b/src/csi/rpc.hpp deleted file mode 100644 index 4d0ce49..0000000 --- a/src/csi/rpc.hpp +++ /dev/null @@ -1,212 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#ifndef __CSI_RPC_HPP__ -#define __CSI_RPC_HPP__ - -#include <ostream> - -#include <mesos/csi/v0.hpp> - -namespace mesos { -namespace csi { -namespace v0 { - -enum RPC -{ - // RPCs for the Identity service. - GET_PLUGIN_INFO, - GET_PLUGIN_CAPABILITIES, - PROBE, - - // RPCs for the Controller service. - CREATE_VOLUME, - DELETE_VOLUME, - CONTROLLER_PUBLISH_VOLUME, - CONTROLLER_UNPUBLISH_VOLUME, - VALIDATE_VOLUME_CAPABILITIES, - LIST_VOLUMES, - GET_CAPACITY, - CONTROLLER_GET_CAPABILITIES, - - // RPCs for the Node service. - NODE_STAGE_VOLUME, - NODE_UNSTAGE_VOLUME, - NODE_PUBLISH_VOLUME, - NODE_UNPUBLISH_VOLUME, - NODE_GET_ID, - NODE_GET_CAPABILITIES -}; - - -namespace internal { - -template <RPC> -struct RPCTraits; - - -template <> -struct RPCTraits<GET_PLUGIN_INFO> -{ - typedef GetPluginInfoRequest request_type; - typedef GetPluginInfoResponse response_type; -}; - - -template <> -struct RPCTraits<GET_PLUGIN_CAPABILITIES> -{ - typedef GetPluginCapabilitiesRequest request_type; - typedef GetPluginCapabilitiesResponse response_type; -}; - - -template <> -struct RPCTraits<PROBE> -{ - typedef ProbeRequest request_type; - typedef ProbeResponse response_type; -}; - - -template <> -struct RPCTraits<CREATE_VOLUME> -{ - typedef CreateVolumeRequest request_type; - typedef CreateVolumeResponse response_type; -}; - - -template <> -struct RPCTraits<DELETE_VOLUME> -{ - typedef DeleteVolumeRequest request_type; - typedef DeleteVolumeResponse response_type; -}; - - -template <> -struct RPCTraits<CONTROLLER_PUBLISH_VOLUME> -{ - typedef ControllerPublishVolumeRequest request_type; - typedef ControllerPublishVolumeResponse response_type; -}; - - -template <> -struct RPCTraits<CONTROLLER_UNPUBLISH_VOLUME> -{ - typedef ControllerUnpublishVolumeRequest request_type; - typedef ControllerUnpublishVolumeResponse response_type; -}; - - -template <> -struct RPCTraits<VALIDATE_VOLUME_CAPABILITIES> -{ - typedef ValidateVolumeCapabilitiesRequest request_type; - typedef ValidateVolumeCapabilitiesResponse response_type; -}; - - -template <> -struct RPCTraits<LIST_VOLUMES> -{ - typedef ListVolumesRequest request_type; - typedef ListVolumesResponse response_type; -}; - - -template <> -struct RPCTraits<GET_CAPACITY> -{ - typedef GetCapacityRequest request_type; - typedef GetCapacityResponse response_type; -}; - - -template <> -struct RPCTraits<CONTROLLER_GET_CAPABILITIES> -{ - typedef ControllerGetCapabilitiesRequest request_type; - typedef ControllerGetCapabilitiesResponse response_type; -}; - - -template <> -struct RPCTraits<NODE_STAGE_VOLUME> -{ - typedef NodeStageVolumeRequest request_type; - typedef NodeStageVolumeResponse response_type; -}; - - -template <> -struct RPCTraits<NODE_UNSTAGE_VOLUME> -{ - typedef NodeUnstageVolumeRequest request_type; - typedef NodeUnstageVolumeResponse response_type; -}; - - -template <> -struct RPCTraits<NODE_PUBLISH_VOLUME> -{ - typedef NodePublishVolumeRequest request_type; - typedef NodePublishVolumeResponse response_type; -}; - - -template <> -struct RPCTraits<NODE_UNPUBLISH_VOLUME> -{ - typedef NodeUnpublishVolumeRequest request_type; - typedef NodeUnpublishVolumeResponse response_type; -}; - - -template <> -struct RPCTraits<NODE_GET_ID> -{ - typedef NodeGetIdRequest request_type; - typedef NodeGetIdResponse response_type; -}; - - -template <> -struct RPCTraits<NODE_GET_CAPABILITIES> -{ - typedef NodeGetCapabilitiesRequest request_type; - typedef NodeGetCapabilitiesResponse response_type; -}; - -} // namespace internal { - - -template <RPC rpc> -using Request = typename internal::RPCTraits<rpc>::request_type; - -template <RPC rpc> -using Response = typename internal::RPCTraits<rpc>::response_type; - - -std::ostream& operator<<(std::ostream& stream, const RPC& rpc); - -} // namespace v0 { -} // namespace csi { -} // namespace mesos { - -#endif // __CSI_RPC_HPP__ diff --git a/src/csi/service_manager.cpp b/src/csi/service_manager.cpp index f8a42f6..0a3663c 100644 --- a/src/csi/service_manager.cpp +++ b/src/csi/service_manager.cpp @@ -49,8 +49,8 @@ #include "common/http.hpp" -#include "csi/client.hpp" #include "csi/paths.hpp" +#include "csi/v0_client.hpp" #include "csi/v0_utils.hpp" #include "internal/devolve.hpp" @@ -493,11 +493,9 @@ Future<Nothing> ServiceManagerProcess::waitEndpoint(const string& endpoint) // `Probe` calls to support CSI v1 in a backward compatible way. ++metrics->csi_plugin_rpcs_pending; - return v0::Client(endpoint, runtime) - .call<v0::PROBE>(v0::ProbeRequest()) + return v0::Client(endpoint, runtime).probe(v0::ProbeRequest()) .then(process::defer(self(), [=]( - const Try<v0::ProbeResponse, StatusError>& result) - -> Future<Nothing> { + const v0::RPCResult<v0::ProbeResponse>& result) -> Future<Nothing> { if (result.isError()) { return Failure( "Failed to probe endpoint '" + endpoint + diff --git a/src/csi/client.cpp b/src/csi/v0_client.cpp similarity index 57% rename from src/csi/client.cpp rename to src/csi/v0_client.cpp index 9e17f5b..02ee0bc 100644 --- a/src/csi/client.cpp +++ b/src/csi/v0_client.cpp @@ -14,24 +14,20 @@ // See the License for the specific language governing permissions and // limitations under the License. -#include <utility> +#include "csi/v0_client.hpp" -#include "csi/client.hpp" +#include <utility> -using process::Failure; using process::Future; -using process::grpc::StatusError; - using process::grpc::client::CallOptions; namespace mesos { namespace csi { namespace v0 { -template <> -Future<Try<GetPluginInfoResponse, process::grpc::StatusError>> -Client::call<GET_PLUGIN_INFO>(GetPluginInfoRequest request) +Future<RPCResult<GetPluginInfoResponse>> +Client::getPluginInfo(GetPluginInfoRequest request) { return runtime.call( connection, @@ -41,10 +37,8 @@ Client::call<GET_PLUGIN_INFO>(GetPluginInfoRequest request) } -template <> -Future<Try<GetPluginCapabilitiesResponse, process::grpc::StatusError>> -Client::call<GET_PLUGIN_CAPABILITIES>( - GetPluginCapabilitiesRequest request) +Future<RPCResult<GetPluginCapabilitiesResponse>> +Client::getPluginCapabilities(GetPluginCapabilitiesRequest request) { return runtime.call( connection, @@ -54,10 +48,7 @@ Client::call<GET_PLUGIN_CAPABILITIES>( } -template <> -Future<Try<ProbeResponse, process::grpc::StatusError>> -Client::call<PROBE>( - ProbeRequest request) +Future<RPCResult<ProbeResponse>> Client::probe(ProbeRequest request) { return runtime.call( connection, @@ -67,10 +58,8 @@ Client::call<PROBE>( } -template <> -Future<Try<CreateVolumeResponse, process::grpc::StatusError>> -Client::call<CREATE_VOLUME>( - CreateVolumeRequest request) +Future<RPCResult<CreateVolumeResponse>> +Client::createVolume(CreateVolumeRequest request) { return runtime.call( connection, @@ -80,10 +69,8 @@ Client::call<CREATE_VOLUME>( } -template <> -Future<Try<DeleteVolumeResponse, process::grpc::StatusError>> -Client::call<DELETE_VOLUME>( - DeleteVolumeRequest request) +Future<RPCResult<DeleteVolumeResponse>> +Client::deleteVolume(DeleteVolumeRequest request) { return runtime.call( connection, @@ -93,10 +80,8 @@ Client::call<DELETE_VOLUME>( } -template <> -Future<Try<ControllerPublishVolumeResponse, process::grpc::StatusError>> -Client::call<CONTROLLER_PUBLISH_VOLUME>( - ControllerPublishVolumeRequest request) +Future<RPCResult<ControllerPublishVolumeResponse>> +Client::controllerPublishVolume(ControllerPublishVolumeRequest request) { return runtime.call( connection, @@ -106,10 +91,8 @@ Client::call<CONTROLLER_PUBLISH_VOLUME>( } -template <> -Future<Try<ControllerUnpublishVolumeResponse, process::grpc::StatusError>> -Client::call<CONTROLLER_UNPUBLISH_VOLUME>( - ControllerUnpublishVolumeRequest request) +Future<RPCResult<ControllerUnpublishVolumeResponse>> +Client::controllerUnpublishVolume(ControllerUnpublishVolumeRequest request) { return runtime.call( connection, @@ -119,10 +102,8 @@ Client::call<CONTROLLER_UNPUBLISH_VOLUME>( } -template <> -Future<Try<ValidateVolumeCapabilitiesResponse, process::grpc::StatusError>> -Client::call<VALIDATE_VOLUME_CAPABILITIES>( - ValidateVolumeCapabilitiesRequest request) +Future<RPCResult<ValidateVolumeCapabilitiesResponse>> +Client::validateVolumeCapabilities(ValidateVolumeCapabilitiesRequest request) { return runtime.call( connection, @@ -132,10 +113,8 @@ Client::call<VALIDATE_VOLUME_CAPABILITIES>( } -template <> -Future<Try<ListVolumesResponse, process::grpc::StatusError>> -Client::call<LIST_VOLUMES>( - ListVolumesRequest request) +Future<RPCResult<ListVolumesResponse>> +Client::listVolumes(ListVolumesRequest request) { return runtime.call( connection, @@ -145,10 +124,8 @@ Client::call<LIST_VOLUMES>( } -template <> -Future<Try<GetCapacityResponse, process::grpc::StatusError>> -Client::call<GET_CAPACITY>( - GetCapacityRequest request) +Future<RPCResult<GetCapacityResponse>> +Client::getCapacity(GetCapacityRequest request) { return runtime.call( connection, @@ -158,10 +135,8 @@ Client::call<GET_CAPACITY>( } -template <> -Future<Try<ControllerGetCapabilitiesResponse, process::grpc::StatusError>> -Client::call<CONTROLLER_GET_CAPABILITIES>( - ControllerGetCapabilitiesRequest request) +Future<RPCResult<ControllerGetCapabilitiesResponse>> +Client::controllerGetCapabilities(ControllerGetCapabilitiesRequest request) { return runtime.call( connection, @@ -171,10 +146,8 @@ Client::call<CONTROLLER_GET_CAPABILITIES>( } -template <> -Future<Try<NodeStageVolumeResponse, process::grpc::StatusError>> -Client::call<NODE_STAGE_VOLUME>( - NodeStageVolumeRequest request) +Future<RPCResult<NodeStageVolumeResponse>> +Client::nodeStageVolume(NodeStageVolumeRequest request) { return runtime.call( connection, @@ -184,10 +157,8 @@ Client::call<NODE_STAGE_VOLUME>( } -template <> -Future<Try<NodeUnstageVolumeResponse, process::grpc::StatusError>> -Client::call<NODE_UNSTAGE_VOLUME>( - NodeUnstageVolumeRequest request) +Future<RPCResult<NodeUnstageVolumeResponse>> +Client::nodeUnstageVolume(NodeUnstageVolumeRequest request) { return runtime.call( connection, @@ -197,10 +168,8 @@ Client::call<NODE_UNSTAGE_VOLUME>( } -template <> -Future<Try<NodePublishVolumeResponse, process::grpc::StatusError>> -Client::call<NODE_PUBLISH_VOLUME>( - NodePublishVolumeRequest request) +Future<RPCResult<NodePublishVolumeResponse>> +Client::nodePublishVolume(NodePublishVolumeRequest request) { return runtime.call( connection, @@ -210,10 +179,8 @@ Client::call<NODE_PUBLISH_VOLUME>( } -template <> -Future<Try<NodeUnpublishVolumeResponse, process::grpc::StatusError>> -Client::call<NODE_UNPUBLISH_VOLUME>( - NodeUnpublishVolumeRequest request) +Future<RPCResult<NodeUnpublishVolumeResponse>> +Client::nodeUnpublishVolume(NodeUnpublishVolumeRequest request) { return runtime.call( connection, @@ -223,10 +190,8 @@ Client::call<NODE_UNPUBLISH_VOLUME>( } -template <> -Future<Try<NodeGetIdResponse, process::grpc::StatusError>> -Client::call<NODE_GET_ID>( - NodeGetIdRequest request) +Future<RPCResult<NodeGetIdResponse>> +Client::nodeGetId(NodeGetIdRequest request) { return runtime.call( connection, @@ -236,10 +201,8 @@ Client::call<NODE_GET_ID>( } -template <> -Future<Try<NodeGetCapabilitiesResponse, process::grpc::StatusError>> -Client::call<NODE_GET_CAPABILITIES>( - NodeGetCapabilitiesRequest request) +Future<RPCResult<NodeGetCapabilitiesResponse>> +Client::nodeGetCapabilities(NodeGetCapabilitiesRequest request) { return runtime.call( connection, diff --git a/src/csi/v0_client.hpp b/src/csi/v0_client.hpp new file mode 100644 index 0000000..977a440 --- /dev/null +++ b/src/csi/v0_client.hpp @@ -0,0 +1,101 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef __CSI_V0_CLIENT_HPP__ +#define __CSI_V0_CLIENT_HPP__ + +#include <mesos/csi/v0.hpp> + +#include <process/future.hpp> +#include <process/grpc.hpp> + +#include <stout/try.hpp> + +namespace mesos { +namespace csi { +namespace v0 { + +template <typename Response> +using RPCResult = Try<Response, process::grpc::StatusError>; + + +class Client +{ +public: + Client(const process::grpc::client::Connection& _connection, + const process::grpc::client::Runtime& _runtime) + : connection(_connection), runtime(_runtime) {} + + process::Future<RPCResult<GetPluginInfoResponse>> + getPluginInfo(GetPluginInfoRequest request); + + process::Future<RPCResult<GetPluginCapabilitiesResponse>> + getPluginCapabilities(GetPluginCapabilitiesRequest request); + + process::Future<RPCResult<ProbeResponse>> probe(ProbeRequest request); + + process::Future<RPCResult<CreateVolumeResponse>> + createVolume(CreateVolumeRequest request); + + process::Future<RPCResult<DeleteVolumeResponse>> + deleteVolume(DeleteVolumeRequest request); + + process::Future<RPCResult<ControllerPublishVolumeResponse>> + controllerPublishVolume(ControllerPublishVolumeRequest request); + + process::Future<RPCResult<ControllerUnpublishVolumeResponse>> + controllerUnpublishVolume(ControllerUnpublishVolumeRequest request); + + process::Future<RPCResult<ValidateVolumeCapabilitiesResponse>> + validateVolumeCapabilities(ValidateVolumeCapabilitiesRequest request); + + process::Future<RPCResult<ListVolumesResponse>> + listVolumes(ListVolumesRequest request); + + process::Future<RPCResult<GetCapacityResponse>> + getCapacity(GetCapacityRequest request); + + process::Future<RPCResult<ControllerGetCapabilitiesResponse>> + controllerGetCapabilities(ControllerGetCapabilitiesRequest request); + + process::Future<RPCResult<NodeStageVolumeResponse>> + nodeStageVolume(NodeStageVolumeRequest request); + + process::Future<RPCResult<NodeUnstageVolumeResponse>> + nodeUnstageVolume(NodeUnstageVolumeRequest request); + + process::Future<RPCResult<NodePublishVolumeResponse>> + nodePublishVolume(NodePublishVolumeRequest request); + + process::Future<RPCResult<NodeUnpublishVolumeResponse>> + nodeUnpublishVolume(NodeUnpublishVolumeRequest request); + + process::Future<RPCResult<NodeGetIdResponse>> + nodeGetId(NodeGetIdRequest request); + + process::Future<RPCResult<NodeGetCapabilitiesResponse>> + nodeGetCapabilities(NodeGetCapabilitiesRequest request); + +private: + process::grpc::client::Connection connection; + process::grpc::client::Runtime runtime; +}; + +} // namespace v0 { +} // namespace csi { +} // namespace mesos { + +#endif // __CSI_V0_CLIENT_HPP__ diff --git a/src/csi/v0_volume_manager.cpp b/src/csi/v0_volume_manager.cpp index bf9f00e..7de9000 100644 --- a/src/csi/v0_volume_manager.cpp +++ b/src/csi/v0_volume_manager.cpp @@ -40,8 +40,8 @@ #include <stout/try.hpp> #include <stout/unreachable.hpp> -#include "csi/client.hpp" #include "csi/paths.hpp" +#include "csi/v0_client.hpp" #include "csi/v0_utils.hpp" #include "csi/v0_volume_manager_process.hpp" @@ -253,7 +253,7 @@ Future<vector<VolumeInfo>> VolumeManagerProcess::listVolumes() // TODO(chhsiao): Set the max entries and use a loop to do multiple // `ListVolumes` calls. - return call<LIST_VOLUMES>(CONTROLLER_SERVICE, ListVolumesRequest()) + return call(CONTROLLER_SERVICE, &Client::listVolumes, ListVolumesRequest()) .then(process::defer(self(), [](const ListVolumesResponse& response) { vector<VolumeInfo> result; foreach (const auto& entry, response.entries()) { @@ -279,7 +279,7 @@ Future<Bytes> VolumeManagerProcess::getCapacity( *request.add_volume_capabilities() = evolve(capability); *request.mutable_parameters() = parameters; - return call<GET_CAPACITY>(CONTROLLER_SERVICE, std::move(request)) + return call(CONTROLLER_SERVICE, &Client::getCapacity, std::move(request)) .then([](const GetCapacityResponse& response) { return Bytes(response.available_capacity()); }); @@ -308,7 +308,8 @@ Future<VolumeInfo> VolumeManagerProcess::createVolume( *request.mutable_parameters() = parameters; // We retry the `CreateVolume` call for MESOS-9517. - return call<CREATE_VOLUME>(CONTROLLER_SERVICE, std::move(request), true) + return call( + CONTROLLER_SERVICE, &Client::createVolume, std::move(request), true) .then(process::defer(self(), [=]( const CreateVolumeResponse& response) -> Future<VolumeInfo> { const string& volumeId = response.volume().id(); @@ -371,8 +372,10 @@ Future<Option<Error>> VolumeManagerProcess::validateVolume( *request.add_volume_capabilities() = evolve(capability); *request.mutable_volume_attributes() = volumeInfo.context; - return call<VALIDATE_VOLUME_CAPABILITIES>( - CONTROLLER_SERVICE, std::move(request)) + return call( + CONTROLLER_SERVICE, + &Client::validateVolumeCapabilities, + std::move(request)) .then(process::defer(self(), [=]( const ValidateVolumeCapabilitiesResponse& response) -> Future<Option<Error>> { @@ -494,10 +497,11 @@ Future<Nothing> VolumeManagerProcess::unpublishVolume(const string& volumeId) } -template <RPC Rpc> -Future<Response<Rpc>> VolumeManagerProcess::call( +template <typename Request, typename Response> +Future<Response> VolumeManagerProcess::call( const Service& service, - const Request<Rpc>& request, + Future<RPCResult<Response>> (Client::*rpc)(Request), + const Request& request, const bool retry) // Made immutable in the following mutable lambda. { Duration maxBackoff = DEFAULT_CSI_RETRY_BACKOFF_FACTOR; @@ -508,10 +512,14 @@ Future<Response<Rpc>> VolumeManagerProcess::call( // Make the call to the latest service endpoint. return serviceManager->getServiceEndpoint(service) .then(process::defer( - self(), &VolumeManagerProcess::_call<Rpc>, lambda::_1, request)); + self(), + &VolumeManagerProcess::_call<Request, Response>, + lambda::_1, + rpc, + request)); }, - [=](const Try<Response<Rpc>, StatusError>& result) mutable - -> Future<ControlFlow<Response<Rpc>>> { + [=](const RPCResult<Response>& result) mutable + -> Future<ControlFlow<Response>> { Option<Duration> backoff = retry ? maxBackoff * (static_cast<double>(os::random()) / RAND_MAX) : Option<Duration>::none(); @@ -520,36 +528,36 @@ Future<Response<Rpc>> VolumeManagerProcess::call( // We dispatch `__call` for testing purpose. return process::dispatch( - self(), &VolumeManagerProcess::__call<Rpc>, result, backoff); + self(), &VolumeManagerProcess::__call<Response>, result, backoff); }); } -template <RPC Rpc> -Future<Try<Response<Rpc>, StatusError>> VolumeManagerProcess::_call( - const string& endpoint, const Request<Rpc>& request) +template <typename Request, typename Response> +Future<RPCResult<Response>> VolumeManagerProcess::_call( + const string& endpoint, + Future<RPCResult<Response>> (Client::*rpc)(Request), + const Request& request) { ++metrics->csi_plugin_rpcs_pending; - return Client(endpoint, runtime).call<Rpc>(request) - .onAny(defer(self(), [=]( - const Future<Try<Response<Rpc>, StatusError>>& future) { - --metrics->csi_plugin_rpcs_pending; - if (future.isReady() && future->isSome()) { - ++metrics->csi_plugin_rpcs_finished; - } else if (future.isDiscarded()) { - ++metrics->csi_plugin_rpcs_cancelled; - } else { - ++metrics->csi_plugin_rpcs_failed; - } - })); + return (Client(endpoint, runtime).*rpc)(request).onAny( + process::defer(self(), [=](const Future<RPCResult<Response>>& future) { + --metrics->csi_plugin_rpcs_pending; + if (future.isReady() && future->isSome()) { + ++metrics->csi_plugin_rpcs_finished; + } else if (future.isDiscarded()) { + ++metrics->csi_plugin_rpcs_cancelled; + } else { + ++metrics->csi_plugin_rpcs_failed; + } + })); } -template <RPC Rpc> -Future<ControlFlow<Response<Rpc>>> VolumeManagerProcess::__call( - const Try<Response<Rpc>, StatusError>& result, - const Option<Duration>& backoff) +template <typename Response> +Future<ControlFlow<Response>> VolumeManagerProcess::__call( + const RPCResult<Response>& result, const Option<Duration>& backoff) { if (result.isSome()) { return Break(result.get()); @@ -564,13 +572,12 @@ Future<ControlFlow<Response<Rpc>>> VolumeManagerProcess::__call( switch (result.error().status.error_code()) { case grpc::DEADLINE_EXCEEDED: case grpc::UNAVAILABLE: { - LOG(ERROR) << "Received '" << result.error() << "' while calling " << Rpc - << ". Retrying in " << backoff.get(); + LOG(ERROR) << "Received '" << result.error() << "' while expecting " + << Response::descriptor()->name() << ". Retrying in " + << backoff.get(); return process::after(backoff.get()) - .then([]() -> Future<ControlFlow<Response<Rpc>>> { - return Continue(); - }); + .then([]() -> Future<ControlFlow<Response>> { return Continue(); }); } case grpc::CANCELLED: case grpc::UNKNOWN: @@ -603,8 +610,10 @@ Future<Nothing> VolumeManagerProcess::prepareServices() CHECK(!services.empty()); // Get the plugin capabilities. - return call<GET_PLUGIN_CAPABILITIES>( - *services.begin(), GetPluginCapabilitiesRequest()) + return call( + *services.begin(), + &Client::getPluginCapabilities, + GetPluginCapabilitiesRequest()) .then(process::defer(self(), [=]( const GetPluginCapabilitiesResponse& response) -> Future<Nothing> { pluginCapabilities = response.capabilities(); @@ -622,11 +631,11 @@ Future<Nothing> VolumeManagerProcess::prepareServices() .then(process::defer(self(), [this] { vector<Future<GetPluginInfoResponse>> futures; foreach (const Service& service, services) { - futures.push_back( - call<GET_PLUGIN_INFO>(CONTROLLER_SERVICE, GetPluginInfoRequest()) - .onReady([service](const GetPluginInfoResponse& response) { - LOG(INFO) << service << " loaded: " << stringify(response); - })); + futures.push_back(call( + CONTROLLER_SERVICE, &Client::getPluginInfo, GetPluginInfoRequest()) + .onReady([service](const GetPluginInfoResponse& response) { + LOG(INFO) << service << " loaded: " << stringify(response); + })); } return process::collect(futures) @@ -650,8 +659,10 @@ Future<Nothing> VolumeManagerProcess::prepareServices() return Nothing(); } - return call<CONTROLLER_GET_CAPABILITIES>( - CONTROLLER_SERVICE, ControllerGetCapabilitiesRequest()) + return call( + CONTROLLER_SERVICE, + &Client::controllerGetCapabilities, + ControllerGetCapabilitiesRequest()) .then(process::defer(self(), [this]( const ControllerGetCapabilitiesResponse& response) { controllerCapabilities = response.capabilities(); @@ -665,14 +676,16 @@ Future<Nothing> VolumeManagerProcess::prepareServices() return Nothing(); } - return call<NODE_GET_CAPABILITIES>( - NODE_SERVICE, NodeGetCapabilitiesRequest()) + return call( + NODE_SERVICE, + &Client::nodeGetCapabilities, + NodeGetCapabilitiesRequest()) .then(process::defer(self(), [this]( const NodeGetCapabilitiesResponse& response) -> Future<Nothing> { nodeCapabilities = response.capabilities(); if (controllerCapabilities->publishUnpublishVolume) { - return call<NODE_GET_ID>(NODE_SERVICE, NodeGetIdRequest()) + return call(NODE_SERVICE, &Client::nodeGetId, NodeGetIdRequest()) .then(process::defer(self(), [this]( const NodeGetIdResponse& response) { nodeId = response.node_id(); @@ -754,7 +767,8 @@ Future<bool> VolumeManagerProcess::__deleteVolume( request.set_volume_id(volumeId); // We retry the `DeleteVolume` call for MESOS-9517. - return call<DELETE_VOLUME>(CONTROLLER_SERVICE, std::move(request), true) + return call( + CONTROLLER_SERVICE, &Client::deleteVolume, std::move(request), true) .then([] { return true; }); } @@ -808,7 +822,8 @@ Future<Nothing> VolumeManagerProcess::_attachVolume(const string& volumeId) request.set_readonly(false); *request.mutable_volume_attributes() = volumeState.volume_attributes(); - return call<CONTROLLER_PUBLISH_VOLUME>(CONTROLLER_SERVICE, std::move(request)) + return call( + CONTROLLER_SERVICE, &Client::controllerPublishVolume, std::move(request)) .then(process::defer(self(), [this, volumeId]( const ControllerPublishVolumeResponse& response) { CHECK(volumes.contains(volumeId)); @@ -863,8 +878,10 @@ Future<Nothing> VolumeManagerProcess::_detachVolume(const string& volumeId) request.set_volume_id(volumeId); request.set_node_id(CHECK_NOTNONE(nodeId)); - return call<CONTROLLER_UNPUBLISH_VOLUME>( - CONTROLLER_SERVICE, std::move(request)) + return call( + CONTROLLER_SERVICE, + &Client::controllerUnpublishVolume, + std::move(request)) .then(process::defer(self(), [this, volumeId] { CHECK(volumes.contains(volumeId)); VolumeState& volumeState = volumes.at(volumeId).state; @@ -941,7 +958,7 @@ Future<Nothing> VolumeManagerProcess::_publishVolume(const string& volumeId) request.set_staging_target_path(stagingPath); } - return call<NODE_PUBLISH_VOLUME>(NODE_SERVICE, std::move(request)) + return call(NODE_SERVICE, &Client::nodePublishVolume, std::move(request)) .then(defer(self(), [this, volumeId, targetPath] { CHECK(volumes.contains(volumeId)); VolumeState& volumeState = volumes.at(volumeId).state; @@ -1022,7 +1039,7 @@ Future<Nothing> VolumeManagerProcess::__publishVolume(const string& volumeId) evolve(volumeState.volume_capability()); *request.mutable_volume_attributes() = volumeState.volume_attributes(); - return call<NODE_STAGE_VOLUME>(NODE_SERVICE, std::move(request)) + return call(NODE_SERVICE, &Client::nodeStageVolume, std::move(request)) .then(process::defer(self(), [this, volumeId] { CHECK(volumes.contains(volumeId)); VolumeState& volumeState = volumes.at(volumeId).state; @@ -1082,7 +1099,7 @@ Future<Nothing> VolumeManagerProcess::_unpublishVolume(const string& volumeId) request.set_volume_id(volumeId); request.set_staging_target_path(stagingPath); - return call<NODE_UNSTAGE_VOLUME>(NODE_SERVICE, std::move(request)) + return call(NODE_SERVICE, &Client::nodeUnstageVolume, std::move(request)) .then(process::defer(self(), [this, volumeId] { CHECK(volumes.contains(volumeId)); VolumeState& volumeState = volumes.at(volumeId).state; @@ -1134,7 +1151,7 @@ Future<Nothing> VolumeManagerProcess::__unpublishVolume(const string& volumeId) request.set_volume_id(volumeId); request.set_target_path(targetPath); - return call<NODE_UNPUBLISH_VOLUME>(NODE_SERVICE, std::move(request)) + return call(NODE_SERVICE, &Client::nodeUnpublishVolume, std::move(request)) .then(process::defer(self(), [this, volumeId] { CHECK(volumes.contains(volumeId)); VolumeState& volumeState = volumes.at(volumeId).state; diff --git a/src/csi/v0_volume_manager_process.hpp b/src/csi/v0_volume_manager_process.hpp index 170c3a6..88073e4 100644 --- a/src/csi/v0_volume_manager_process.hpp +++ b/src/csi/v0_volume_manager_process.hpp @@ -44,9 +44,9 @@ #include <stout/try.hpp> #include "csi/metrics.hpp" -#include "csi/rpc.hpp" #include "csi/service_manager.hpp" #include "csi/state.hpp" +#include "csi/v0_client.hpp" #include "csi/v0_utils.hpp" #include "csi/v0_volume_manager.hpp" #include "csi/volume_manager.hpp" @@ -118,18 +118,22 @@ public: // NOTE: We currently ensure this by 1) resource locking to forbid concurrent // calls on the same volume, and 2) no profile update while there are ongoing // `CREATE_DISK` or `DESTROY_DISK` operations. - template <RPC Rpc> - process::Future<Response<Rpc>> call( - const Service& service, const Request<Rpc>& request, bool retry = false); - - template <RPC Rpc> - process::Future<Try<Response<Rpc>, process::grpc::StatusError>> - _call(const std::string& endpoint, const Request<Rpc>& request); - - template <RPC Rpc> - process::Future<process::ControlFlow<Response<Rpc>>> __call( - const Try<Response<Rpc>, process::grpc::StatusError>& result, - const Option<Duration>& backoff); + template <typename Request, typename Response> + process::Future<Response> call( + const Service& service, + process::Future<RPCResult<Response>> (Client::*rpc)(Request), + const Request& request, + bool retry = false); + + template <typename Request, typename Response> + process::Future<RPCResult<Response>> _call( + const std::string& endpoint, + process::Future<RPCResult<Response>> (Client::*rpc)(Request), + const Request& request); + + template <typename Response> + process::Future<process::ControlFlow<Response>> __call( + const RPCResult<Response>& result, const Option<Duration>& backoff); private: process::Future<Nothing> prepareServices(); diff --git a/src/tests/csi_client_tests.cpp b/src/tests/csi_client_tests.cpp index c8f3f04..a2a181b 100644 --- a/src/tests/csi_client_tests.cpp +++ b/src/tests/csi_client_tests.cpp @@ -15,26 +15,24 @@ // limitations under the License. #include <functional> -#include <ostream> +#include <string> #include <process/gtest.hpp> #include <stout/nothing.hpp> -#include <stout/strings.hpp> -#include <stout/unreachable.hpp> #include <stout/tests/utils.hpp> -#include "csi/client.hpp" -#include "csi/rpc.hpp" +#include "csi/v0_client.hpp" #include "tests/mock_csi_plugin.hpp" -using std::ostream; using std::string; using process::Future; +using process::grpc::StatusError; + using process::grpc::client::Connection; using process::grpc::client::Runtime; @@ -52,25 +50,24 @@ struct RPCParam { string operator()(const TestParamInfo<RPCParam>& info) const { - return strings::replace(stringify(info.param.value), ".", "_"); + return info.param.name; } }; - template <csi::v0::RPC rpc> - static RPCParam create() + template <typename Client, typename Request, typename Response> + static RPCParam create( + Future<Try<Response, StatusError>> (Client::*rpc)(Request)) { return RPCParam{ - rpc, - [](csi::v0::Client client) { - return client - .call<rpc>(csi::v0::Request<rpc>()) + Request::descriptor()->name(), + [rpc](const Connection& connection, const Runtime& runtime) { + return (Client(connection, runtime).*rpc)(Request()) .then([] { return Nothing(); }); - } - }; + }}; } - const csi::v0::RPC value; - const std::function<Future<Nothing>(csi::v0::Client)> call; + const string name; + const std::function<Future<Nothing>(const Connection&, const Runtime&)> call; }; @@ -100,54 +97,39 @@ protected: } MockCSIPlugin plugin; - Option<process::grpc::client::Connection> connection; - process::grpc::client::Runtime runtime; + Option<Connection> connection; + Runtime runtime; }; INSTANTIATE_TEST_CASE_P( - Identity, - CSIClientTest, - Values( - RPCParam::create<csi::v0::GET_PLUGIN_INFO>(), - RPCParam::create<csi::v0::GET_PLUGIN_CAPABILITIES>(), - RPCParam::create<csi::v0::PROBE>()), - RPCParam::Printer()); - - -INSTANTIATE_TEST_CASE_P( - Controller, - CSIClientTest, - Values( - RPCParam::create<csi::v0::CREATE_VOLUME>(), - RPCParam::create<csi::v0::DELETE_VOLUME>(), - RPCParam::create<csi::v0::CONTROLLER_PUBLISH_VOLUME>(), - RPCParam::create<csi::v0::CONTROLLER_UNPUBLISH_VOLUME>(), - RPCParam::create<csi::v0::VALIDATE_VOLUME_CAPABILITIES>(), - RPCParam::create<csi::v0::LIST_VOLUMES>(), - RPCParam::create<csi::v0::GET_CAPACITY>(), - RPCParam::create<csi::v0::CONTROLLER_GET_CAPABILITIES>()), - RPCParam::Printer()); - - -INSTANTIATE_TEST_CASE_P( - Node, + V0, CSIClientTest, Values( - RPCParam::create<csi::v0::NODE_STAGE_VOLUME>(), - RPCParam::create<csi::v0::NODE_UNSTAGE_VOLUME>(), - RPCParam::create<csi::v0::NODE_PUBLISH_VOLUME>(), - RPCParam::create<csi::v0::NODE_UNPUBLISH_VOLUME>(), - RPCParam::create<csi::v0::NODE_GET_ID>(), - RPCParam::create<csi::v0::NODE_GET_CAPABILITIES>()), + RPCParam::create(&csi::v0::Client::getPluginInfo), + RPCParam::create(&csi::v0::Client::getPluginCapabilities), + RPCParam::create(&csi::v0::Client::probe), + RPCParam::create(&csi::v0::Client::createVolume), + RPCParam::create(&csi::v0::Client::deleteVolume), + RPCParam::create(&csi::v0::Client::controllerPublishVolume), + RPCParam::create(&csi::v0::Client::controllerUnpublishVolume), + RPCParam::create(&csi::v0::Client::validateVolumeCapabilities), + RPCParam::create(&csi::v0::Client::listVolumes), + RPCParam::create(&csi::v0::Client::getCapacity), + RPCParam::create(&csi::v0::Client::controllerGetCapabilities), + RPCParam::create(&csi::v0::Client::nodeStageVolume), + RPCParam::create(&csi::v0::Client::nodeUnstageVolume), + RPCParam::create(&csi::v0::Client::nodePublishVolume), + RPCParam::create(&csi::v0::Client::nodeUnpublishVolume), + RPCParam::create(&csi::v0::Client::nodeGetId), + RPCParam::create(&csi::v0::Client::nodeGetCapabilities)), RPCParam::Printer()); // This test verifies that the all methods of CSI clients work. TEST_P(CSIClientTest, Call) { - AWAIT_EXPECT_READY( - GetParam().call(csi::v0::Client(connection.get(), runtime))); + AWAIT_EXPECT_READY(GetParam().call(connection.get(), runtime)); } } // namespace tests { diff --git a/src/tests/storage_local_resource_provider_tests.cpp b/src/tests/storage_local_resource_provider_tests.cpp index 8243ff3..da8db41 100644 --- a/src/tests/storage_local_resource_provider_tests.cpp +++ b/src/tests/storage_local_resource_provider_tests.cpp @@ -41,7 +41,6 @@ #include <stout/os/realpath.hpp> #include "csi/paths.hpp" -#include "csi/rpc.hpp" #include "csi/state.hpp" #include "csi/v0_volume_manager_process.hpp" @@ -4951,14 +4950,14 @@ TEST_F(StorageLocalResourceProviderTest, RetryRpcWithExponentialBackoff) {CREATE_DISK(raw, Resource::DiskInfo::Source::MOUNT)}); AWAIT_READY(createVolumeRequests.get()) - << "Failed to wait for " << csi::v0::CREATE_VOLUME << " call #1"; + << "Failed to wait for CreateVolumeRequest #1"; // Settle the clock to verify that there is no more outstanding request. Clock::settle(); ASSERT_EQ(0u, createVolumeRequests.size()); Future<Nothing> createVolumeCall = FUTURE_DISPATCH( - _, &csi::v0::VolumeManagerProcess::__call<csi::v0::CREATE_VOLUME>); + _, &csi::v0::VolumeManagerProcess::__call<csi::v0::CreateVolumeResponse>); // Return `DEADLINE_EXCEEDED` for the first `CreateVolume` call. createVolumeResults.put( @@ -4976,15 +4975,15 @@ TEST_F(StorageLocalResourceProviderTest, RetryRpcWithExponentialBackoff) // Return `UNAVAILABLE` for subsequent `CreateVolume` calls. for (size_t i = 1; i < numRetryableErrors; i++) { AWAIT_READY(createVolumeRequests.get()) - << "Failed to wait for " << csi::v0::CREATE_VOLUME << " call #" - << (i + 1); + << "Failed to wait for CreateVolumeRequest #" << (i + 1); // Settle the clock to verify that there is no more outstanding request. Clock::settle(); ASSERT_EQ(0u, createVolumeRequests.size()); createVolumeCall = FUTURE_DISPATCH( - _, &csi::v0::VolumeManagerProcess::__call<csi::v0::CREATE_VOLUME>); + _, + &csi::v0::VolumeManagerProcess::__call<csi::v0::CreateVolumeResponse>); createVolumeResults.put(StatusError(grpc::Status(grpc::UNAVAILABLE, ""))); @@ -5000,8 +4999,7 @@ TEST_F(StorageLocalResourceProviderTest, RetryRpcWithExponentialBackoff) } AWAIT_READY(createVolumeRequests.get()) - << "Failed to wait for " << csi::v0::CREATE_VOLUME << " call #" - << (numRetryableErrors + 1); + << "Failed to wait for CreateVolumeRequest #" << (numRetryableErrors + 1); // Settle the clock to verify that there is no more outstanding request. Clock::settle(); @@ -5038,14 +5036,14 @@ TEST_F(StorageLocalResourceProviderTest, RetryRpcWithExponentialBackoff) driver.acceptOffers({offers->at(0).id()}, {DESTROY_DISK(created)}); AWAIT_READY(deleteVolumeRequests.get()) - << "Failed to wait for " << csi::v0::DELETE_VOLUME << " call #1"; + << "Failed to wait for DeleteVolumeRequest #1"; // Settle the clock to verify that there is no more outstanding request. Clock::settle(); ASSERT_EQ(0u, deleteVolumeRequests.size()); Future<Nothing> deleteVolumeCall = FUTURE_DISPATCH( - _, &csi::v0::VolumeManagerProcess::__call<csi::v0::DELETE_VOLUME>); + _, &csi::v0::VolumeManagerProcess::__call<csi::v0::DeleteVolumeResponse>); // Return `DEADLINE_EXCEEDED` for the first `DeleteVolume` call. deleteVolumeResults.put( @@ -5063,15 +5061,15 @@ TEST_F(StorageLocalResourceProviderTest, RetryRpcWithExponentialBackoff) // Return `UNAVAILABLE` for subsequent `DeleteVolume` calls. for (size_t i = 1; i < numRetryableErrors; i++) { AWAIT_READY(deleteVolumeRequests.get()) - << "Failed to wait for " << csi::v0::DELETE_VOLUME << " call #" - << (i + 1); + << "Failed to wait for DeleteVolumeRequest #" << (i + 1); // Settle the clock to verify that there is no more outstanding request. Clock::settle(); ASSERT_EQ(0u, deleteVolumeRequests.size()); deleteVolumeCall = FUTURE_DISPATCH( - _, &csi::v0::VolumeManagerProcess::__call<csi::v0::DELETE_VOLUME>); + _, + &csi::v0::VolumeManagerProcess::__call<csi::v0::DeleteVolumeResponse>); deleteVolumeResults.put(StatusError(grpc::Status(grpc::UNAVAILABLE, ""))); @@ -5087,8 +5085,7 @@ TEST_F(StorageLocalResourceProviderTest, RetryRpcWithExponentialBackoff) } AWAIT_READY(deleteVolumeRequests.get()) - << "Failed to wait for " << csi::v0::DELETE_VOLUME << " call #" - << (numRetryableErrors + 1); + << "Failed to wait for DeleteVolumeRequest #" << (numRetryableErrors + 1); // Settle the clock to verify that there is no more outstanding request. Clock::settle();
