This is an automated email from the ASF dual-hosted git repository.

chhsiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit f728ce3b5014993db54d5bf6472fcecb3c098934
Author: Chun-Hung Hsiao <[email protected]>
AuthorDate: Mon Mar 25 21:34:02 2019 -0700

    Cleaned up `mesos::csi::v0::Client` interface.
    
    Since per-CSI-call metrics are no longer implemented, there is very less
    value to keep the `mesos::csi::v0::RPC` enum, which is tightly coupled
    with `mesos::csi::v0::Client`. Therefore, the enum and its header are
    removed.
    
    The header and implementation file for `mesos::csi::v0::Client` is also
    renamed for future CSI v1 support.
    
    Review: https://reviews.apache.org/r/70321
---
 src/CMakeLists.txt                                 |   3 +-
 src/Makefile.am                                    |   6 +-
 src/csi/client.hpp                                 | 142 --------------
 src/csi/rpc.cpp                                    | 105 ----------
 src/csi/rpc.hpp                                    | 212 ---------------------
 src/csi/service_manager.cpp                        |   8 +-
 src/csi/{client.cpp => v0_client.cpp}              | 107 ++++-------
 src/csi/v0_client.hpp                              | 101 ++++++++++
 src/csi/v0_volume_manager.cpp                      | 131 +++++++------
 src/csi/v0_volume_manager_process.hpp              |  30 +--
 src/tests/csi_client_tests.cpp                     |  88 ++++-----
 .../storage_local_resource_provider_tests.cpp      |  27 ++-
 12 files changed, 280 insertions(+), 680 deletions(-)

diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 6ef0c8c..af3715a 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -232,13 +232,12 @@ set(COMMON_SRC
   common/values.cpp)
 
 set(CSI_SRC
-  csi/client.cpp
   csi/types.cpp
   csi/metrics.cpp
   csi/paths.cpp
-  csi/rpc.cpp
   csi/service_manager.cpp
   csi/v0.cpp
+  csi/v0_client.cpp
   csi/v0_utils.cpp
   csi/v0_volume_manager.cpp
   csi/volume_manager.cpp)
diff --git a/src/Makefile.am b/src/Makefile.am
index 61ded56..a800444 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -1560,20 +1560,18 @@ libmesos_no_3rdparty_la_LIBADD += libbuild.la
 # Convenience library for build the CSI client.
 noinst_LTLIBRARIES += libcsi.la
 libcsi_la_SOURCES =                                                    \
-  csi/client.cpp                                                       \
-  csi/client.hpp                                                       \
   csi/types.cpp                                                                
\
   csi/metrics.cpp                                                      \
   csi/metrics.hpp                                                      \
   csi/paths.cpp                                                                
\
   csi/paths.hpp                                                                
\
-  csi/rpc.cpp                                                          \
-  csi/rpc.hpp                                                          \
   csi/service_manager.cpp                                              \
   csi/service_manager.hpp                                              \
   csi/state.hpp                                                                
\
   csi/state.proto                                                      \
   csi/v0.cpp                                                           \
+  csi/v0_client.cpp                                                    \
+  csi/v0_client.hpp                                                    \
   csi/v0_utils.cpp                                                     \
   csi/v0_utils.hpp                                                     \
   csi/v0_volume_manager.cpp                                            \
diff --git a/src/csi/client.hpp b/src/csi/client.hpp
deleted file mode 100644
index 1429526..0000000
--- a/src/csi/client.hpp
+++ /dev/null
@@ -1,142 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-#ifndef __CSI_CLIENT_HPP__
-#define __CSI_CLIENT_HPP__
-
-#include <mesos/csi/v0.hpp>
-
-#include <process/grpc.hpp>
-
-#include "csi/rpc.hpp"
-
-namespace mesos {
-namespace csi {
-namespace v0 {
-
-class Client
-{
-public:
-  Client(const process::grpc::client::Connection& _connection,
-         const process::grpc::client::Runtime& _runtime)
-    : connection(_connection), runtime(_runtime) {}
-
-  template <RPC rpc>
-  process::Future<Try<Response<rpc>, process::grpc::StatusError>> call(
-      Request<rpc> request);
-
-private:
-  process::grpc::client::Connection connection;
-  process::grpc::client::Runtime runtime;
-};
-
-
-template <>
-process::Future<Try<GetPluginInfoResponse, process::grpc::StatusError>>
-Client::call<GET_PLUGIN_INFO>(GetPluginInfoRequest request);
-
-
-template <>
-process::Future<Try<GetPluginCapabilitiesResponse, process::grpc::StatusError>>
-Client::call<GET_PLUGIN_CAPABILITIES>(GetPluginCapabilitiesRequest request);
-
-
-template <>
-process::Future<Try<ProbeResponse, process::grpc::StatusError>>
-Client::call<PROBE>(ProbeRequest request);
-
-
-template <>
-process::Future<Try<CreateVolumeResponse, process::grpc::StatusError>>
-Client::call<CREATE_VOLUME>(CreateVolumeRequest request);
-
-
-template <>
-process::Future<Try<DeleteVolumeResponse, process::grpc::StatusError>>
-Client::call<DELETE_VOLUME>(DeleteVolumeRequest request);
-
-
-template <>
-process::Future<
-    Try<ControllerPublishVolumeResponse, process::grpc::StatusError>>
-Client::call<CONTROLLER_PUBLISH_VOLUME>(ControllerPublishVolumeRequest 
request);
-
-
-template <>
-process::Future<
-    Try<ControllerUnpublishVolumeResponse, process::grpc::StatusError>>
-Client::call<CONTROLLER_UNPUBLISH_VOLUME>(
-    ControllerUnpublishVolumeRequest request);
-
-
-template <>
-process::Future<
-    Try<ValidateVolumeCapabilitiesResponse, process::grpc::StatusError>>
-Client::call<VALIDATE_VOLUME_CAPABILITIES>(
-    ValidateVolumeCapabilitiesRequest request);
-
-
-template <>
-process::Future<Try<ListVolumesResponse, process::grpc::StatusError>>
-Client::call<LIST_VOLUMES>(ListVolumesRequest request);
-
-
-template <>
-process::Future<Try<GetCapacityResponse, process::grpc::StatusError>>
-Client::call<GET_CAPACITY>(GetCapacityRequest request);
-
-
-template <>
-process::Future<
-    Try<ControllerGetCapabilitiesResponse, process::grpc::StatusError>>
-Client::call<CONTROLLER_GET_CAPABILITIES>(
-    ControllerGetCapabilitiesRequest request);
-
-
-template <>
-process::Future<Try<NodeStageVolumeResponse, process::grpc::StatusError>>
-Client::call<NODE_STAGE_VOLUME>(NodeStageVolumeRequest request);
-
-
-template <>
-process::Future<Try<NodeUnstageVolumeResponse, process::grpc::StatusError>>
-Client::call<NODE_UNSTAGE_VOLUME>(NodeUnstageVolumeRequest request);
-
-
-template <>
-process::Future<Try<NodePublishVolumeResponse, process::grpc::StatusError>>
-Client::call<NODE_PUBLISH_VOLUME>(NodePublishVolumeRequest request);
-
-
-template <>
-process::Future<Try<NodeUnpublishVolumeResponse, process::grpc::StatusError>>
-Client::call<NODE_UNPUBLISH_VOLUME>(NodeUnpublishVolumeRequest request);
-
-
-template <>
-process::Future<Try<NodeGetIdResponse, process::grpc::StatusError>>
-Client::call<NODE_GET_ID>(NodeGetIdRequest request);
-
-
-template <>
-process::Future<Try<NodeGetCapabilitiesResponse, process::grpc::StatusError>>
-Client::call<NODE_GET_CAPABILITIES>(NodeGetCapabilitiesRequest request);
-
-} // namespace v0 {
-} // namespace csi {
-} // namespace mesos {
-
-#endif // __CSI_CLIENT_HPP__
diff --git a/src/csi/rpc.cpp b/src/csi/rpc.cpp
deleted file mode 100644
index e8a2770..0000000
--- a/src/csi/rpc.cpp
+++ /dev/null
@@ -1,105 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-#include "csi/rpc.hpp"
-
-#include <stout/unreachable.hpp>
-
-using std::ostream;
-
-namespace mesos {
-namespace csi {
-namespace v0 {
-
-ostream& operator<<(ostream& stream, const RPC& rpc)
-{
-  switch (rpc) {
-    case GET_PLUGIN_INFO:
-      return stream
-        << Identity::service_full_name()
-        << ".GetPluginInfo";
-    case GET_PLUGIN_CAPABILITIES:
-      return stream
-        << Identity::service_full_name()
-        << ".GetPluginCapabilities";
-    case PROBE:
-      return stream
-        << Identity::service_full_name()
-        << ".Probe";
-    case CREATE_VOLUME:
-      return stream
-        << Controller::service_full_name()
-        << ".CreateVolume";
-    case DELETE_VOLUME:
-      return stream
-        << Controller::service_full_name()
-        << ".DeleteVolume";
-    case CONTROLLER_PUBLISH_VOLUME:
-      return stream
-        << Controller::service_full_name()
-        << ".ControllerPublishVolume";
-    case CONTROLLER_UNPUBLISH_VOLUME:
-      return stream
-        << Controller::service_full_name()
-        << ".ControllerUnpublishVolume";
-    case VALIDATE_VOLUME_CAPABILITIES:
-      return stream
-        << Controller::service_full_name()
-        << ".ValidateVolumeCapabilities";
-    case LIST_VOLUMES:
-      return stream
-        << Controller::service_full_name()
-        << ".ListVolumes";
-    case GET_CAPACITY:
-      return stream
-        << Controller::service_full_name()
-        << ".GetCapacity";
-    case CONTROLLER_GET_CAPABILITIES:
-      return stream
-        << Controller::service_full_name()
-        << ".ControllerGetCapabilities";
-    case NODE_STAGE_VOLUME:
-      return stream
-        << Node::service_full_name()
-        << ".NodeStageVolume";
-    case NODE_UNSTAGE_VOLUME:
-      return stream
-        << Node::service_full_name()
-        << ".NodeUnstageVolume";
-    case NODE_PUBLISH_VOLUME:
-      return stream
-        << Node::service_full_name()
-        << ".NodePublishVolume";
-    case NODE_UNPUBLISH_VOLUME:
-      return stream
-        << Node::service_full_name()
-        << ".NodeUnpublishVolume";
-    case NODE_GET_ID:
-      return stream
-        << Node::service_full_name()
-        << ".NodeGetId";
-    case NODE_GET_CAPABILITIES:
-      return stream
-        << Node::service_full_name()
-        << ".NodeGetCapabilities";
-  }
-
-  UNREACHABLE();
-}
-
-} // namespace v0 {
-} // namespace csi {
-} // namespace mesos {
diff --git a/src/csi/rpc.hpp b/src/csi/rpc.hpp
deleted file mode 100644
index 4d0ce49..0000000
--- a/src/csi/rpc.hpp
+++ /dev/null
@@ -1,212 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-#ifndef __CSI_RPC_HPP__
-#define __CSI_RPC_HPP__
-
-#include <ostream>
-
-#include <mesos/csi/v0.hpp>
-
-namespace mesos {
-namespace csi {
-namespace v0 {
-
-enum RPC
-{
-  // RPCs for the Identity service.
-  GET_PLUGIN_INFO,
-  GET_PLUGIN_CAPABILITIES,
-  PROBE,
-
-  // RPCs for the Controller service.
-  CREATE_VOLUME,
-  DELETE_VOLUME,
-  CONTROLLER_PUBLISH_VOLUME,
-  CONTROLLER_UNPUBLISH_VOLUME,
-  VALIDATE_VOLUME_CAPABILITIES,
-  LIST_VOLUMES,
-  GET_CAPACITY,
-  CONTROLLER_GET_CAPABILITIES,
-
-  // RPCs for the Node service.
-  NODE_STAGE_VOLUME,
-  NODE_UNSTAGE_VOLUME,
-  NODE_PUBLISH_VOLUME,
-  NODE_UNPUBLISH_VOLUME,
-  NODE_GET_ID,
-  NODE_GET_CAPABILITIES
-};
-
-
-namespace internal {
-
-template <RPC>
-struct RPCTraits;
-
-
-template <>
-struct RPCTraits<GET_PLUGIN_INFO>
-{
-  typedef GetPluginInfoRequest request_type;
-  typedef GetPluginInfoResponse response_type;
-};
-
-
-template <>
-struct RPCTraits<GET_PLUGIN_CAPABILITIES>
-{
-  typedef GetPluginCapabilitiesRequest request_type;
-  typedef GetPluginCapabilitiesResponse response_type;
-};
-
-
-template <>
-struct RPCTraits<PROBE>
-{
-  typedef ProbeRequest request_type;
-  typedef ProbeResponse response_type;
-};
-
-
-template <>
-struct RPCTraits<CREATE_VOLUME>
-{
-  typedef CreateVolumeRequest request_type;
-  typedef CreateVolumeResponse response_type;
-};
-
-
-template <>
-struct RPCTraits<DELETE_VOLUME>
-{
-  typedef DeleteVolumeRequest request_type;
-  typedef DeleteVolumeResponse response_type;
-};
-
-
-template <>
-struct RPCTraits<CONTROLLER_PUBLISH_VOLUME>
-{
-  typedef ControllerPublishVolumeRequest request_type;
-  typedef ControllerPublishVolumeResponse response_type;
-};
-
-
-template <>
-struct RPCTraits<CONTROLLER_UNPUBLISH_VOLUME>
-{
-  typedef ControllerUnpublishVolumeRequest request_type;
-  typedef ControllerUnpublishVolumeResponse response_type;
-};
-
-
-template <>
-struct RPCTraits<VALIDATE_VOLUME_CAPABILITIES>
-{
-  typedef ValidateVolumeCapabilitiesRequest request_type;
-  typedef ValidateVolumeCapabilitiesResponse response_type;
-};
-
-
-template <>
-struct RPCTraits<LIST_VOLUMES>
-{
-  typedef ListVolumesRequest request_type;
-  typedef ListVolumesResponse response_type;
-};
-
-
-template <>
-struct RPCTraits<GET_CAPACITY>
-{
-  typedef GetCapacityRequest request_type;
-  typedef GetCapacityResponse response_type;
-};
-
-
-template <>
-struct RPCTraits<CONTROLLER_GET_CAPABILITIES>
-{
-  typedef ControllerGetCapabilitiesRequest request_type;
-  typedef ControllerGetCapabilitiesResponse response_type;
-};
-
-
-template <>
-struct RPCTraits<NODE_STAGE_VOLUME>
-{
-  typedef NodeStageVolumeRequest request_type;
-  typedef NodeStageVolumeResponse response_type;
-};
-
-
-template <>
-struct RPCTraits<NODE_UNSTAGE_VOLUME>
-{
-  typedef NodeUnstageVolumeRequest request_type;
-  typedef NodeUnstageVolumeResponse response_type;
-};
-
-
-template <>
-struct RPCTraits<NODE_PUBLISH_VOLUME>
-{
-  typedef NodePublishVolumeRequest request_type;
-  typedef NodePublishVolumeResponse response_type;
-};
-
-
-template <>
-struct RPCTraits<NODE_UNPUBLISH_VOLUME>
-{
-  typedef NodeUnpublishVolumeRequest request_type;
-  typedef NodeUnpublishVolumeResponse response_type;
-};
-
-
-template <>
-struct RPCTraits<NODE_GET_ID>
-{
-  typedef NodeGetIdRequest request_type;
-  typedef NodeGetIdResponse response_type;
-};
-
-
-template <>
-struct RPCTraits<NODE_GET_CAPABILITIES>
-{
-  typedef NodeGetCapabilitiesRequest request_type;
-  typedef NodeGetCapabilitiesResponse response_type;
-};
-
-} // namespace internal {
-
-
-template <RPC rpc>
-using Request = typename internal::RPCTraits<rpc>::request_type;
-
-template <RPC rpc>
-using Response = typename internal::RPCTraits<rpc>::response_type;
-
-
-std::ostream& operator<<(std::ostream& stream, const RPC& rpc);
-
-} // namespace v0 {
-} // namespace csi {
-} // namespace mesos {
-
-#endif // __CSI_RPC_HPP__
diff --git a/src/csi/service_manager.cpp b/src/csi/service_manager.cpp
index f8a42f6..0a3663c 100644
--- a/src/csi/service_manager.cpp
+++ b/src/csi/service_manager.cpp
@@ -49,8 +49,8 @@
 
 #include "common/http.hpp"
 
-#include "csi/client.hpp"
 #include "csi/paths.hpp"
+#include "csi/v0_client.hpp"
 #include "csi/v0_utils.hpp"
 
 #include "internal/devolve.hpp"
@@ -493,11 +493,9 @@ Future<Nothing> ServiceManagerProcess::waitEndpoint(const 
string& endpoint)
       // `Probe` calls to support CSI v1 in a backward compatible way.
       ++metrics->csi_plugin_rpcs_pending;
 
-      return v0::Client(endpoint, runtime)
-        .call<v0::PROBE>(v0::ProbeRequest())
+      return v0::Client(endpoint, runtime).probe(v0::ProbeRequest())
         .then(process::defer(self(), [=](
-            const Try<v0::ProbeResponse, StatusError>& result)
-            -> Future<Nothing> {
+            const v0::RPCResult<v0::ProbeResponse>& result) -> Future<Nothing> 
{
           if (result.isError()) {
             return Failure(
                 "Failed to probe endpoint '" + endpoint +
diff --git a/src/csi/client.cpp b/src/csi/v0_client.cpp
similarity index 57%
rename from src/csi/client.cpp
rename to src/csi/v0_client.cpp
index 9e17f5b..02ee0bc 100644
--- a/src/csi/client.cpp
+++ b/src/csi/v0_client.cpp
@@ -14,24 +14,20 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-#include <utility>
+#include "csi/v0_client.hpp"
 
-#include "csi/client.hpp"
+#include <utility>
 
-using process::Failure;
 using process::Future;
 
-using process::grpc::StatusError;
-
 using process::grpc::client::CallOptions;
 
 namespace mesos {
 namespace csi {
 namespace v0 {
 
-template <>
-Future<Try<GetPluginInfoResponse, process::grpc::StatusError>>
-Client::call<GET_PLUGIN_INFO>(GetPluginInfoRequest request)
+Future<RPCResult<GetPluginInfoResponse>>
+Client::getPluginInfo(GetPluginInfoRequest request)
 {
   return runtime.call(
       connection,
@@ -41,10 +37,8 @@ Client::call<GET_PLUGIN_INFO>(GetPluginInfoRequest request)
 }
 
 
-template <>
-Future<Try<GetPluginCapabilitiesResponse, process::grpc::StatusError>>
-Client::call<GET_PLUGIN_CAPABILITIES>(
-    GetPluginCapabilitiesRequest request)
+Future<RPCResult<GetPluginCapabilitiesResponse>>
+Client::getPluginCapabilities(GetPluginCapabilitiesRequest request)
 {
   return runtime.call(
       connection,
@@ -54,10 +48,7 @@ Client::call<GET_PLUGIN_CAPABILITIES>(
 }
 
 
-template <>
-Future<Try<ProbeResponse, process::grpc::StatusError>>
-Client::call<PROBE>(
-    ProbeRequest request)
+Future<RPCResult<ProbeResponse>> Client::probe(ProbeRequest request)
 {
   return runtime.call(
       connection,
@@ -67,10 +58,8 @@ Client::call<PROBE>(
 }
 
 
-template <>
-Future<Try<CreateVolumeResponse, process::grpc::StatusError>>
-Client::call<CREATE_VOLUME>(
-    CreateVolumeRequest request)
+Future<RPCResult<CreateVolumeResponse>>
+Client::createVolume(CreateVolumeRequest request)
 {
   return runtime.call(
       connection,
@@ -80,10 +69,8 @@ Client::call<CREATE_VOLUME>(
 }
 
 
-template <>
-Future<Try<DeleteVolumeResponse, process::grpc::StatusError>>
-Client::call<DELETE_VOLUME>(
-    DeleteVolumeRequest request)
+Future<RPCResult<DeleteVolumeResponse>>
+Client::deleteVolume(DeleteVolumeRequest request)
 {
   return runtime.call(
       connection,
@@ -93,10 +80,8 @@ Client::call<DELETE_VOLUME>(
 }
 
 
-template <>
-Future<Try<ControllerPublishVolumeResponse, process::grpc::StatusError>>
-Client::call<CONTROLLER_PUBLISH_VOLUME>(
-    ControllerPublishVolumeRequest request)
+Future<RPCResult<ControllerPublishVolumeResponse>>
+Client::controllerPublishVolume(ControllerPublishVolumeRequest request)
 {
   return runtime.call(
       connection,
@@ -106,10 +91,8 @@ Client::call<CONTROLLER_PUBLISH_VOLUME>(
 }
 
 
-template <>
-Future<Try<ControllerUnpublishVolumeResponse, process::grpc::StatusError>>
-Client::call<CONTROLLER_UNPUBLISH_VOLUME>(
-    ControllerUnpublishVolumeRequest request)
+Future<RPCResult<ControllerUnpublishVolumeResponse>>
+Client::controllerUnpublishVolume(ControllerUnpublishVolumeRequest request)
 {
   return runtime.call(
       connection,
@@ -119,10 +102,8 @@ Client::call<CONTROLLER_UNPUBLISH_VOLUME>(
 }
 
 
-template <>
-Future<Try<ValidateVolumeCapabilitiesResponse, process::grpc::StatusError>>
-Client::call<VALIDATE_VOLUME_CAPABILITIES>(
-    ValidateVolumeCapabilitiesRequest request)
+Future<RPCResult<ValidateVolumeCapabilitiesResponse>>
+Client::validateVolumeCapabilities(ValidateVolumeCapabilitiesRequest request)
 {
   return runtime.call(
       connection,
@@ -132,10 +113,8 @@ Client::call<VALIDATE_VOLUME_CAPABILITIES>(
 }
 
 
-template <>
-Future<Try<ListVolumesResponse, process::grpc::StatusError>>
-Client::call<LIST_VOLUMES>(
-    ListVolumesRequest request)
+Future<RPCResult<ListVolumesResponse>>
+Client::listVolumes(ListVolumesRequest request)
 {
   return runtime.call(
       connection,
@@ -145,10 +124,8 @@ Client::call<LIST_VOLUMES>(
 }
 
 
-template <>
-Future<Try<GetCapacityResponse, process::grpc::StatusError>>
-Client::call<GET_CAPACITY>(
-    GetCapacityRequest request)
+Future<RPCResult<GetCapacityResponse>>
+Client::getCapacity(GetCapacityRequest request)
 {
   return runtime.call(
       connection,
@@ -158,10 +135,8 @@ Client::call<GET_CAPACITY>(
 }
 
 
-template <>
-Future<Try<ControllerGetCapabilitiesResponse, process::grpc::StatusError>>
-Client::call<CONTROLLER_GET_CAPABILITIES>(
-    ControllerGetCapabilitiesRequest request)
+Future<RPCResult<ControllerGetCapabilitiesResponse>>
+Client::controllerGetCapabilities(ControllerGetCapabilitiesRequest request)
 {
   return runtime.call(
       connection,
@@ -171,10 +146,8 @@ Client::call<CONTROLLER_GET_CAPABILITIES>(
 }
 
 
-template <>
-Future<Try<NodeStageVolumeResponse, process::grpc::StatusError>>
-Client::call<NODE_STAGE_VOLUME>(
-    NodeStageVolumeRequest request)
+Future<RPCResult<NodeStageVolumeResponse>>
+Client::nodeStageVolume(NodeStageVolumeRequest request)
 {
   return runtime.call(
       connection,
@@ -184,10 +157,8 @@ Client::call<NODE_STAGE_VOLUME>(
 }
 
 
-template <>
-Future<Try<NodeUnstageVolumeResponse, process::grpc::StatusError>>
-Client::call<NODE_UNSTAGE_VOLUME>(
-    NodeUnstageVolumeRequest request)
+Future<RPCResult<NodeUnstageVolumeResponse>>
+Client::nodeUnstageVolume(NodeUnstageVolumeRequest request)
 {
   return runtime.call(
       connection,
@@ -197,10 +168,8 @@ Client::call<NODE_UNSTAGE_VOLUME>(
 }
 
 
-template <>
-Future<Try<NodePublishVolumeResponse, process::grpc::StatusError>>
-Client::call<NODE_PUBLISH_VOLUME>(
-    NodePublishVolumeRequest request)
+Future<RPCResult<NodePublishVolumeResponse>>
+Client::nodePublishVolume(NodePublishVolumeRequest request)
 {
   return runtime.call(
       connection,
@@ -210,10 +179,8 @@ Client::call<NODE_PUBLISH_VOLUME>(
 }
 
 
-template <>
-Future<Try<NodeUnpublishVolumeResponse, process::grpc::StatusError>>
-Client::call<NODE_UNPUBLISH_VOLUME>(
-    NodeUnpublishVolumeRequest request)
+Future<RPCResult<NodeUnpublishVolumeResponse>>
+Client::nodeUnpublishVolume(NodeUnpublishVolumeRequest request)
 {
   return runtime.call(
       connection,
@@ -223,10 +190,8 @@ Client::call<NODE_UNPUBLISH_VOLUME>(
 }
 
 
-template <>
-Future<Try<NodeGetIdResponse, process::grpc::StatusError>>
-Client::call<NODE_GET_ID>(
-    NodeGetIdRequest request)
+Future<RPCResult<NodeGetIdResponse>>
+Client::nodeGetId(NodeGetIdRequest request)
 {
   return runtime.call(
       connection,
@@ -236,10 +201,8 @@ Client::call<NODE_GET_ID>(
 }
 
 
-template <>
-Future<Try<NodeGetCapabilitiesResponse, process::grpc::StatusError>>
-Client::call<NODE_GET_CAPABILITIES>(
-    NodeGetCapabilitiesRequest request)
+Future<RPCResult<NodeGetCapabilitiesResponse>>
+Client::nodeGetCapabilities(NodeGetCapabilitiesRequest request)
 {
   return runtime.call(
       connection,
diff --git a/src/csi/v0_client.hpp b/src/csi/v0_client.hpp
new file mode 100644
index 0000000..977a440
--- /dev/null
+++ b/src/csi/v0_client.hpp
@@ -0,0 +1,101 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#ifndef __CSI_V0_CLIENT_HPP__
+#define __CSI_V0_CLIENT_HPP__
+
+#include <mesos/csi/v0.hpp>
+
+#include <process/future.hpp>
+#include <process/grpc.hpp>
+
+#include <stout/try.hpp>
+
+namespace mesos {
+namespace csi {
+namespace v0 {
+
+template <typename Response>
+using RPCResult = Try<Response, process::grpc::StatusError>;
+
+
+class Client
+{
+public:
+  Client(const process::grpc::client::Connection& _connection,
+         const process::grpc::client::Runtime& _runtime)
+    : connection(_connection), runtime(_runtime) {}
+
+  process::Future<RPCResult<GetPluginInfoResponse>>
+  getPluginInfo(GetPluginInfoRequest request);
+
+  process::Future<RPCResult<GetPluginCapabilitiesResponse>>
+  getPluginCapabilities(GetPluginCapabilitiesRequest request);
+
+  process::Future<RPCResult<ProbeResponse>> probe(ProbeRequest request);
+
+  process::Future<RPCResult<CreateVolumeResponse>>
+  createVolume(CreateVolumeRequest request);
+
+  process::Future<RPCResult<DeleteVolumeResponse>>
+  deleteVolume(DeleteVolumeRequest request);
+
+  process::Future<RPCResult<ControllerPublishVolumeResponse>>
+  controllerPublishVolume(ControllerPublishVolumeRequest request);
+
+  process::Future<RPCResult<ControllerUnpublishVolumeResponse>>
+  controllerUnpublishVolume(ControllerUnpublishVolumeRequest request);
+
+  process::Future<RPCResult<ValidateVolumeCapabilitiesResponse>>
+  validateVolumeCapabilities(ValidateVolumeCapabilitiesRequest request);
+
+  process::Future<RPCResult<ListVolumesResponse>>
+  listVolumes(ListVolumesRequest request);
+
+  process::Future<RPCResult<GetCapacityResponse>>
+  getCapacity(GetCapacityRequest request);
+
+  process::Future<RPCResult<ControllerGetCapabilitiesResponse>>
+  controllerGetCapabilities(ControllerGetCapabilitiesRequest request);
+
+  process::Future<RPCResult<NodeStageVolumeResponse>>
+  nodeStageVolume(NodeStageVolumeRequest request);
+
+  process::Future<RPCResult<NodeUnstageVolumeResponse>>
+  nodeUnstageVolume(NodeUnstageVolumeRequest request);
+
+  process::Future<RPCResult<NodePublishVolumeResponse>>
+  nodePublishVolume(NodePublishVolumeRequest request);
+
+  process::Future<RPCResult<NodeUnpublishVolumeResponse>>
+  nodeUnpublishVolume(NodeUnpublishVolumeRequest request);
+
+  process::Future<RPCResult<NodeGetIdResponse>>
+  nodeGetId(NodeGetIdRequest request);
+
+  process::Future<RPCResult<NodeGetCapabilitiesResponse>>
+  nodeGetCapabilities(NodeGetCapabilitiesRequest request);
+
+private:
+  process::grpc::client::Connection connection;
+  process::grpc::client::Runtime runtime;
+};
+
+} // namespace v0 {
+} // namespace csi {
+} // namespace mesos {
+
+#endif // __CSI_V0_CLIENT_HPP__
diff --git a/src/csi/v0_volume_manager.cpp b/src/csi/v0_volume_manager.cpp
index bf9f00e..7de9000 100644
--- a/src/csi/v0_volume_manager.cpp
+++ b/src/csi/v0_volume_manager.cpp
@@ -40,8 +40,8 @@
 #include <stout/try.hpp>
 #include <stout/unreachable.hpp>
 
-#include "csi/client.hpp"
 #include "csi/paths.hpp"
+#include "csi/v0_client.hpp"
 #include "csi/v0_utils.hpp"
 #include "csi/v0_volume_manager_process.hpp"
 
@@ -253,7 +253,7 @@ Future<vector<VolumeInfo>> 
VolumeManagerProcess::listVolumes()
 
   // TODO(chhsiao): Set the max entries and use a loop to do multiple
   // `ListVolumes` calls.
-  return call<LIST_VOLUMES>(CONTROLLER_SERVICE, ListVolumesRequest())
+  return call(CONTROLLER_SERVICE, &Client::listVolumes, ListVolumesRequest())
     .then(process::defer(self(), [](const ListVolumesResponse& response) {
       vector<VolumeInfo> result;
       foreach (const auto& entry, response.entries()) {
@@ -279,7 +279,7 @@ Future<Bytes> VolumeManagerProcess::getCapacity(
   *request.add_volume_capabilities() = evolve(capability);
   *request.mutable_parameters() = parameters;
 
-  return call<GET_CAPACITY>(CONTROLLER_SERVICE, std::move(request))
+  return call(CONTROLLER_SERVICE, &Client::getCapacity, std::move(request))
     .then([](const GetCapacityResponse& response) {
       return Bytes(response.available_capacity());
     });
@@ -308,7 +308,8 @@ Future<VolumeInfo> VolumeManagerProcess::createVolume(
   *request.mutable_parameters() = parameters;
 
   // We retry the `CreateVolume` call for MESOS-9517.
-  return call<CREATE_VOLUME>(CONTROLLER_SERVICE, std::move(request), true)
+  return call(
+      CONTROLLER_SERVICE, &Client::createVolume, std::move(request), true)
     .then(process::defer(self(), [=](
         const CreateVolumeResponse& response) -> Future<VolumeInfo> {
       const string& volumeId = response.volume().id();
@@ -371,8 +372,10 @@ Future<Option<Error>> VolumeManagerProcess::validateVolume(
   *request.add_volume_capabilities() = evolve(capability);
   *request.mutable_volume_attributes() = volumeInfo.context;
 
-  return call<VALIDATE_VOLUME_CAPABILITIES>(
-      CONTROLLER_SERVICE, std::move(request))
+  return call(
+      CONTROLLER_SERVICE,
+      &Client::validateVolumeCapabilities,
+      std::move(request))
     .then(process::defer(self(), [=](
         const ValidateVolumeCapabilitiesResponse& response)
         -> Future<Option<Error>> {
@@ -494,10 +497,11 @@ Future<Nothing> 
VolumeManagerProcess::unpublishVolume(const string& volumeId)
 }
 
 
-template <RPC Rpc>
-Future<Response<Rpc>> VolumeManagerProcess::call(
+template <typename Request, typename Response>
+Future<Response> VolumeManagerProcess::call(
     const Service& service,
-    const Request<Rpc>& request,
+    Future<RPCResult<Response>> (Client::*rpc)(Request),
+    const Request& request,
     const bool retry) // Made immutable in the following mutable lambda.
 {
   Duration maxBackoff = DEFAULT_CSI_RETRY_BACKOFF_FACTOR;
@@ -508,10 +512,14 @@ Future<Response<Rpc>> VolumeManagerProcess::call(
         // Make the call to the latest service endpoint.
         return serviceManager->getServiceEndpoint(service)
           .then(process::defer(
-              self(), &VolumeManagerProcess::_call<Rpc>, lambda::_1, request));
+              self(),
+              &VolumeManagerProcess::_call<Request, Response>,
+              lambda::_1,
+              rpc,
+              request));
       },
-      [=](const Try<Response<Rpc>, StatusError>& result) mutable
-          -> Future<ControlFlow<Response<Rpc>>> {
+      [=](const RPCResult<Response>& result) mutable
+          -> Future<ControlFlow<Response>> {
         Option<Duration> backoff = retry
           ? maxBackoff * (static_cast<double>(os::random()) / RAND_MAX)
           : Option<Duration>::none();
@@ -520,36 +528,36 @@ Future<Response<Rpc>> VolumeManagerProcess::call(
 
         // We dispatch `__call` for testing purpose.
         return process::dispatch(
-            self(), &VolumeManagerProcess::__call<Rpc>, result, backoff);
+            self(), &VolumeManagerProcess::__call<Response>, result, backoff);
       });
 }
 
 
-template <RPC Rpc>
-Future<Try<Response<Rpc>, StatusError>> VolumeManagerProcess::_call(
-    const string& endpoint, const Request<Rpc>& request)
+template <typename Request, typename Response>
+Future<RPCResult<Response>> VolumeManagerProcess::_call(
+    const string& endpoint,
+    Future<RPCResult<Response>> (Client::*rpc)(Request),
+    const Request& request)
 {
   ++metrics->csi_plugin_rpcs_pending;
 
-  return Client(endpoint, runtime).call<Rpc>(request)
-    .onAny(defer(self(), [=](
-        const Future<Try<Response<Rpc>, StatusError>>& future) {
-      --metrics->csi_plugin_rpcs_pending;
-      if (future.isReady() && future->isSome()) {
-        ++metrics->csi_plugin_rpcs_finished;
-      } else if (future.isDiscarded()) {
-        ++metrics->csi_plugin_rpcs_cancelled;
-      } else {
-        ++metrics->csi_plugin_rpcs_failed;
-      }
-    }));
+  return (Client(endpoint, runtime).*rpc)(request).onAny(
+      process::defer(self(), [=](const Future<RPCResult<Response>>& future) {
+        --metrics->csi_plugin_rpcs_pending;
+        if (future.isReady() && future->isSome()) {
+          ++metrics->csi_plugin_rpcs_finished;
+        } else if (future.isDiscarded()) {
+          ++metrics->csi_plugin_rpcs_cancelled;
+        } else {
+          ++metrics->csi_plugin_rpcs_failed;
+        }
+      }));
 }
 
 
-template <RPC Rpc>
-Future<ControlFlow<Response<Rpc>>> VolumeManagerProcess::__call(
-    const Try<Response<Rpc>, StatusError>& result,
-    const Option<Duration>& backoff)
+template <typename Response>
+Future<ControlFlow<Response>> VolumeManagerProcess::__call(
+    const RPCResult<Response>& result, const Option<Duration>& backoff)
 {
   if (result.isSome()) {
     return Break(result.get());
@@ -564,13 +572,12 @@ Future<ControlFlow<Response<Rpc>>> 
VolumeManagerProcess::__call(
   switch (result.error().status.error_code()) {
     case grpc::DEADLINE_EXCEEDED:
     case grpc::UNAVAILABLE: {
-      LOG(ERROR) << "Received '" << result.error() << "' while calling " << Rpc
-                 << ". Retrying in " << backoff.get();
+      LOG(ERROR) << "Received '" << result.error() << "' while expecting "
+                 << Response::descriptor()->name() << ". Retrying in "
+                 << backoff.get();
 
       return process::after(backoff.get())
-        .then([]() -> Future<ControlFlow<Response<Rpc>>> {
-          return Continue();
-        });
+        .then([]() -> Future<ControlFlow<Response>> { return Continue(); });
     }
     case grpc::CANCELLED:
     case grpc::UNKNOWN:
@@ -603,8 +610,10 @@ Future<Nothing> VolumeManagerProcess::prepareServices()
   CHECK(!services.empty());
 
   // Get the plugin capabilities.
-  return call<GET_PLUGIN_CAPABILITIES>(
-      *services.begin(), GetPluginCapabilitiesRequest())
+  return call(
+      *services.begin(),
+      &Client::getPluginCapabilities,
+      GetPluginCapabilitiesRequest())
     .then(process::defer(self(), [=](
         const GetPluginCapabilitiesResponse& response) -> Future<Nothing> {
       pluginCapabilities = response.capabilities();
@@ -622,11 +631,11 @@ Future<Nothing> VolumeManagerProcess::prepareServices()
     .then(process::defer(self(), [this] {
       vector<Future<GetPluginInfoResponse>> futures;
       foreach (const Service& service, services) {
-        futures.push_back(
-            call<GET_PLUGIN_INFO>(CONTROLLER_SERVICE, GetPluginInfoRequest())
-              .onReady([service](const GetPluginInfoResponse& response) {
-                LOG(INFO) << service << " loaded: " << stringify(response);
-              }));
+        futures.push_back(call(
+            CONTROLLER_SERVICE, &Client::getPluginInfo, GetPluginInfoRequest())
+          .onReady([service](const GetPluginInfoResponse& response) {
+            LOG(INFO) << service << " loaded: " << stringify(response);
+          }));
       }
 
       return process::collect(futures)
@@ -650,8 +659,10 @@ Future<Nothing> VolumeManagerProcess::prepareServices()
         return Nothing();
       }
 
-      return call<CONTROLLER_GET_CAPABILITIES>(
-          CONTROLLER_SERVICE, ControllerGetCapabilitiesRequest())
+      return call(
+          CONTROLLER_SERVICE,
+          &Client::controllerGetCapabilities,
+          ControllerGetCapabilitiesRequest())
         .then(process::defer(self(), [this](
             const ControllerGetCapabilitiesResponse& response) {
           controllerCapabilities = response.capabilities();
@@ -665,14 +676,16 @@ Future<Nothing> VolumeManagerProcess::prepareServices()
         return Nothing();
       }
 
-      return call<NODE_GET_CAPABILITIES>(
-          NODE_SERVICE, NodeGetCapabilitiesRequest())
+      return call(
+          NODE_SERVICE,
+          &Client::nodeGetCapabilities,
+          NodeGetCapabilitiesRequest())
         .then(process::defer(self(), [this](
             const NodeGetCapabilitiesResponse& response) -> Future<Nothing> {
           nodeCapabilities = response.capabilities();
 
           if (controllerCapabilities->publishUnpublishVolume) {
-            return call<NODE_GET_ID>(NODE_SERVICE, NodeGetIdRequest())
+            return call(NODE_SERVICE, &Client::nodeGetId, NodeGetIdRequest())
               .then(process::defer(self(), [this](
                   const NodeGetIdResponse& response) {
                 nodeId = response.node_id();
@@ -754,7 +767,8 @@ Future<bool> VolumeManagerProcess::__deleteVolume(
   request.set_volume_id(volumeId);
 
   // We retry the `DeleteVolume` call for MESOS-9517.
-  return call<DELETE_VOLUME>(CONTROLLER_SERVICE, std::move(request), true)
+  return call(
+      CONTROLLER_SERVICE, &Client::deleteVolume, std::move(request), true)
     .then([] { return true; });
 }
 
@@ -808,7 +822,8 @@ Future<Nothing> VolumeManagerProcess::_attachVolume(const 
string& volumeId)
   request.set_readonly(false);
   *request.mutable_volume_attributes() = volumeState.volume_attributes();
 
-  return call<CONTROLLER_PUBLISH_VOLUME>(CONTROLLER_SERVICE, 
std::move(request))
+  return call(
+      CONTROLLER_SERVICE, &Client::controllerPublishVolume, std::move(request))
     .then(process::defer(self(), [this, volumeId](
         const ControllerPublishVolumeResponse& response) {
       CHECK(volumes.contains(volumeId));
@@ -863,8 +878,10 @@ Future<Nothing> VolumeManagerProcess::_detachVolume(const 
string& volumeId)
   request.set_volume_id(volumeId);
   request.set_node_id(CHECK_NOTNONE(nodeId));
 
-  return call<CONTROLLER_UNPUBLISH_VOLUME>(
-      CONTROLLER_SERVICE, std::move(request))
+  return call(
+      CONTROLLER_SERVICE,
+      &Client::controllerUnpublishVolume,
+      std::move(request))
     .then(process::defer(self(), [this, volumeId] {
       CHECK(volumes.contains(volumeId));
       VolumeState& volumeState = volumes.at(volumeId).state;
@@ -941,7 +958,7 @@ Future<Nothing> VolumeManagerProcess::_publishVolume(const 
string& volumeId)
     request.set_staging_target_path(stagingPath);
   }
 
-  return call<NODE_PUBLISH_VOLUME>(NODE_SERVICE, std::move(request))
+  return call(NODE_SERVICE, &Client::nodePublishVolume, std::move(request))
     .then(defer(self(), [this, volumeId, targetPath] {
       CHECK(volumes.contains(volumeId));
       VolumeState& volumeState = volumes.at(volumeId).state;
@@ -1022,7 +1039,7 @@ Future<Nothing> 
VolumeManagerProcess::__publishVolume(const string& volumeId)
     evolve(volumeState.volume_capability());
   *request.mutable_volume_attributes() = volumeState.volume_attributes();
 
-  return call<NODE_STAGE_VOLUME>(NODE_SERVICE, std::move(request))
+  return call(NODE_SERVICE, &Client::nodeStageVolume, std::move(request))
     .then(process::defer(self(), [this, volumeId] {
       CHECK(volumes.contains(volumeId));
       VolumeState& volumeState = volumes.at(volumeId).state;
@@ -1082,7 +1099,7 @@ Future<Nothing> 
VolumeManagerProcess::_unpublishVolume(const string& volumeId)
   request.set_volume_id(volumeId);
   request.set_staging_target_path(stagingPath);
 
-  return call<NODE_UNSTAGE_VOLUME>(NODE_SERVICE, std::move(request))
+  return call(NODE_SERVICE, &Client::nodeUnstageVolume, std::move(request))
     .then(process::defer(self(), [this, volumeId] {
       CHECK(volumes.contains(volumeId));
       VolumeState& volumeState = volumes.at(volumeId).state;
@@ -1134,7 +1151,7 @@ Future<Nothing> 
VolumeManagerProcess::__unpublishVolume(const string& volumeId)
   request.set_volume_id(volumeId);
   request.set_target_path(targetPath);
 
-  return call<NODE_UNPUBLISH_VOLUME>(NODE_SERVICE, std::move(request))
+  return call(NODE_SERVICE, &Client::nodeUnpublishVolume, std::move(request))
     .then(process::defer(self(), [this, volumeId] {
       CHECK(volumes.contains(volumeId));
       VolumeState& volumeState = volumes.at(volumeId).state;
diff --git a/src/csi/v0_volume_manager_process.hpp 
b/src/csi/v0_volume_manager_process.hpp
index 170c3a6..88073e4 100644
--- a/src/csi/v0_volume_manager_process.hpp
+++ b/src/csi/v0_volume_manager_process.hpp
@@ -44,9 +44,9 @@
 #include <stout/try.hpp>
 
 #include "csi/metrics.hpp"
-#include "csi/rpc.hpp"
 #include "csi/service_manager.hpp"
 #include "csi/state.hpp"
+#include "csi/v0_client.hpp"
 #include "csi/v0_utils.hpp"
 #include "csi/v0_volume_manager.hpp"
 #include "csi/volume_manager.hpp"
@@ -118,18 +118,22 @@ public:
   // NOTE: We currently ensure this by 1) resource locking to forbid concurrent
   // calls on the same volume, and 2) no profile update while there are ongoing
   // `CREATE_DISK` or `DESTROY_DISK` operations.
-  template <RPC Rpc>
-  process::Future<Response<Rpc>> call(
-      const Service& service, const Request<Rpc>& request, bool retry = false);
-
-  template <RPC Rpc>
-  process::Future<Try<Response<Rpc>, process::grpc::StatusError>>
-  _call(const std::string& endpoint, const Request<Rpc>& request);
-
-  template <RPC Rpc>
-  process::Future<process::ControlFlow<Response<Rpc>>> __call(
-      const Try<Response<Rpc>, process::grpc::StatusError>& result,
-      const Option<Duration>& backoff);
+  template <typename Request, typename Response>
+  process::Future<Response> call(
+      const Service& service,
+      process::Future<RPCResult<Response>> (Client::*rpc)(Request),
+      const Request& request,
+      bool retry = false);
+
+  template <typename Request, typename Response>
+  process::Future<RPCResult<Response>> _call(
+      const std::string& endpoint,
+      process::Future<RPCResult<Response>> (Client::*rpc)(Request),
+      const Request& request);
+
+  template <typename Response>
+  process::Future<process::ControlFlow<Response>> __call(
+      const RPCResult<Response>& result, const Option<Duration>& backoff);
 
 private:
   process::Future<Nothing> prepareServices();
diff --git a/src/tests/csi_client_tests.cpp b/src/tests/csi_client_tests.cpp
index c8f3f04..a2a181b 100644
--- a/src/tests/csi_client_tests.cpp
+++ b/src/tests/csi_client_tests.cpp
@@ -15,26 +15,24 @@
 // limitations under the License.
 
 #include <functional>
-#include <ostream>
+#include <string>
 
 #include <process/gtest.hpp>
 
 #include <stout/nothing.hpp>
-#include <stout/strings.hpp>
-#include <stout/unreachable.hpp>
 
 #include <stout/tests/utils.hpp>
 
-#include "csi/client.hpp"
-#include "csi/rpc.hpp"
+#include "csi/v0_client.hpp"
 
 #include "tests/mock_csi_plugin.hpp"
 
-using std::ostream;
 using std::string;
 
 using process::Future;
 
+using process::grpc::StatusError;
+
 using process::grpc::client::Connection;
 using process::grpc::client::Runtime;
 
@@ -52,25 +50,24 @@ struct RPCParam
   {
     string operator()(const TestParamInfo<RPCParam>& info) const
     {
-      return strings::replace(stringify(info.param.value), ".", "_");
+      return info.param.name;
     }
   };
 
-  template <csi::v0::RPC rpc>
-  static RPCParam create()
+  template <typename Client, typename Request, typename Response>
+  static RPCParam create(
+      Future<Try<Response, StatusError>> (Client::*rpc)(Request))
   {
     return RPCParam{
-      rpc,
-      [](csi::v0::Client client) {
-        return client
-          .call<rpc>(csi::v0::Request<rpc>())
+      Request::descriptor()->name(),
+      [rpc](const Connection& connection, const Runtime& runtime) {
+        return (Client(connection, runtime).*rpc)(Request())
           .then([] { return Nothing(); });
-      }
-    };
+      }};
   }
 
-  const csi::v0::RPC value;
-  const std::function<Future<Nothing>(csi::v0::Client)> call;
+  const string name;
+  const std::function<Future<Nothing>(const Connection&, const Runtime&)> call;
 };
 
 
@@ -100,54 +97,39 @@ protected:
   }
 
   MockCSIPlugin plugin;
-  Option<process::grpc::client::Connection> connection;
-  process::grpc::client::Runtime runtime;
+  Option<Connection> connection;
+  Runtime runtime;
 };
 
 
 INSTANTIATE_TEST_CASE_P(
-    Identity,
-    CSIClientTest,
-    Values(
-        RPCParam::create<csi::v0::GET_PLUGIN_INFO>(),
-        RPCParam::create<csi::v0::GET_PLUGIN_CAPABILITIES>(),
-        RPCParam::create<csi::v0::PROBE>()),
-    RPCParam::Printer());
-
-
-INSTANTIATE_TEST_CASE_P(
-    Controller,
-    CSIClientTest,
-    Values(
-        RPCParam::create<csi::v0::CREATE_VOLUME>(),
-        RPCParam::create<csi::v0::DELETE_VOLUME>(),
-        RPCParam::create<csi::v0::CONTROLLER_PUBLISH_VOLUME>(),
-        RPCParam::create<csi::v0::CONTROLLER_UNPUBLISH_VOLUME>(),
-        RPCParam::create<csi::v0::VALIDATE_VOLUME_CAPABILITIES>(),
-        RPCParam::create<csi::v0::LIST_VOLUMES>(),
-        RPCParam::create<csi::v0::GET_CAPACITY>(),
-        RPCParam::create<csi::v0::CONTROLLER_GET_CAPABILITIES>()),
-    RPCParam::Printer());
-
-
-INSTANTIATE_TEST_CASE_P(
-    Node,
+    V0,
     CSIClientTest,
     Values(
-        RPCParam::create<csi::v0::NODE_STAGE_VOLUME>(),
-        RPCParam::create<csi::v0::NODE_UNSTAGE_VOLUME>(),
-        RPCParam::create<csi::v0::NODE_PUBLISH_VOLUME>(),
-        RPCParam::create<csi::v0::NODE_UNPUBLISH_VOLUME>(),
-        RPCParam::create<csi::v0::NODE_GET_ID>(),
-        RPCParam::create<csi::v0::NODE_GET_CAPABILITIES>()),
+        RPCParam::create(&csi::v0::Client::getPluginInfo),
+        RPCParam::create(&csi::v0::Client::getPluginCapabilities),
+        RPCParam::create(&csi::v0::Client::probe),
+        RPCParam::create(&csi::v0::Client::createVolume),
+        RPCParam::create(&csi::v0::Client::deleteVolume),
+        RPCParam::create(&csi::v0::Client::controllerPublishVolume),
+        RPCParam::create(&csi::v0::Client::controllerUnpublishVolume),
+        RPCParam::create(&csi::v0::Client::validateVolumeCapabilities),
+        RPCParam::create(&csi::v0::Client::listVolumes),
+        RPCParam::create(&csi::v0::Client::getCapacity),
+        RPCParam::create(&csi::v0::Client::controllerGetCapabilities),
+        RPCParam::create(&csi::v0::Client::nodeStageVolume),
+        RPCParam::create(&csi::v0::Client::nodeUnstageVolume),
+        RPCParam::create(&csi::v0::Client::nodePublishVolume),
+        RPCParam::create(&csi::v0::Client::nodeUnpublishVolume),
+        RPCParam::create(&csi::v0::Client::nodeGetId),
+        RPCParam::create(&csi::v0::Client::nodeGetCapabilities)),
     RPCParam::Printer());
 
 
 // This test verifies that the all methods of CSI clients work.
 TEST_P(CSIClientTest, Call)
 {
-  AWAIT_EXPECT_READY(
-      GetParam().call(csi::v0::Client(connection.get(), runtime)));
+  AWAIT_EXPECT_READY(GetParam().call(connection.get(), runtime));
 }
 
 } // namespace tests {
diff --git a/src/tests/storage_local_resource_provider_tests.cpp 
b/src/tests/storage_local_resource_provider_tests.cpp
index 8243ff3..da8db41 100644
--- a/src/tests/storage_local_resource_provider_tests.cpp
+++ b/src/tests/storage_local_resource_provider_tests.cpp
@@ -41,7 +41,6 @@
 #include <stout/os/realpath.hpp>
 
 #include "csi/paths.hpp"
-#include "csi/rpc.hpp"
 #include "csi/state.hpp"
 #include "csi/v0_volume_manager_process.hpp"
 
@@ -4951,14 +4950,14 @@ TEST_F(StorageLocalResourceProviderTest, 
RetryRpcWithExponentialBackoff)
       {CREATE_DISK(raw, Resource::DiskInfo::Source::MOUNT)});
 
   AWAIT_READY(createVolumeRequests.get())
-    << "Failed to wait for " << csi::v0::CREATE_VOLUME << " call #1";
+    << "Failed to wait for CreateVolumeRequest #1";
 
   // Settle the clock to verify that there is no more outstanding request.
   Clock::settle();
   ASSERT_EQ(0u, createVolumeRequests.size());
 
   Future<Nothing> createVolumeCall = FUTURE_DISPATCH(
-      _, &csi::v0::VolumeManagerProcess::__call<csi::v0::CREATE_VOLUME>);
+      _, 
&csi::v0::VolumeManagerProcess::__call<csi::v0::CreateVolumeResponse>);
 
   // Return `DEADLINE_EXCEEDED` for the first `CreateVolume` call.
   createVolumeResults.put(
@@ -4976,15 +4975,15 @@ TEST_F(StorageLocalResourceProviderTest, 
RetryRpcWithExponentialBackoff)
   // Return `UNAVAILABLE` for subsequent `CreateVolume` calls.
   for (size_t i = 1; i < numRetryableErrors; i++) {
     AWAIT_READY(createVolumeRequests.get())
-      << "Failed to wait for " << csi::v0::CREATE_VOLUME << " call #"
-      << (i + 1);
+      << "Failed to wait for CreateVolumeRequest #" << (i + 1);
 
     // Settle the clock to verify that there is no more outstanding request.
     Clock::settle();
     ASSERT_EQ(0u, createVolumeRequests.size());
 
     createVolumeCall = FUTURE_DISPATCH(
-        _, &csi::v0::VolumeManagerProcess::__call<csi::v0::CREATE_VOLUME>);
+        _,
+        &csi::v0::VolumeManagerProcess::__call<csi::v0::CreateVolumeResponse>);
 
     createVolumeResults.put(StatusError(grpc::Status(grpc::UNAVAILABLE, "")));
 
@@ -5000,8 +4999,7 @@ TEST_F(StorageLocalResourceProviderTest, 
RetryRpcWithExponentialBackoff)
   }
 
   AWAIT_READY(createVolumeRequests.get())
-    << "Failed to wait for " << csi::v0::CREATE_VOLUME << " call #"
-    << (numRetryableErrors + 1);
+    << "Failed to wait for CreateVolumeRequest #" << (numRetryableErrors + 1);
 
   // Settle the clock to verify that there is no more outstanding request.
   Clock::settle();
@@ -5038,14 +5036,14 @@ TEST_F(StorageLocalResourceProviderTest, 
RetryRpcWithExponentialBackoff)
   driver.acceptOffers({offers->at(0).id()}, {DESTROY_DISK(created)});
 
   AWAIT_READY(deleteVolumeRequests.get())
-    << "Failed to wait for " << csi::v0::DELETE_VOLUME << " call #1";
+    << "Failed to wait for DeleteVolumeRequest #1";
 
   // Settle the clock to verify that there is no more outstanding request.
   Clock::settle();
   ASSERT_EQ(0u, deleteVolumeRequests.size());
 
   Future<Nothing> deleteVolumeCall = FUTURE_DISPATCH(
-      _, &csi::v0::VolumeManagerProcess::__call<csi::v0::DELETE_VOLUME>);
+      _, 
&csi::v0::VolumeManagerProcess::__call<csi::v0::DeleteVolumeResponse>);
 
   // Return `DEADLINE_EXCEEDED` for the first `DeleteVolume` call.
   deleteVolumeResults.put(
@@ -5063,15 +5061,15 @@ TEST_F(StorageLocalResourceProviderTest, 
RetryRpcWithExponentialBackoff)
   // Return `UNAVAILABLE` for subsequent `DeleteVolume` calls.
   for (size_t i = 1; i < numRetryableErrors; i++) {
     AWAIT_READY(deleteVolumeRequests.get())
-      << "Failed to wait for " << csi::v0::DELETE_VOLUME << " call #"
-      << (i + 1);
+      << "Failed to wait for DeleteVolumeRequest #" << (i + 1);
 
     // Settle the clock to verify that there is no more outstanding request.
     Clock::settle();
     ASSERT_EQ(0u, deleteVolumeRequests.size());
 
     deleteVolumeCall = FUTURE_DISPATCH(
-        _, &csi::v0::VolumeManagerProcess::__call<csi::v0::DELETE_VOLUME>);
+        _,
+        &csi::v0::VolumeManagerProcess::__call<csi::v0::DeleteVolumeResponse>);
 
     deleteVolumeResults.put(StatusError(grpc::Status(grpc::UNAVAILABLE, "")));
 
@@ -5087,8 +5085,7 @@ TEST_F(StorageLocalResourceProviderTest, 
RetryRpcWithExponentialBackoff)
   }
 
   AWAIT_READY(deleteVolumeRequests.get())
-    << "Failed to wait for " << csi::v0::DELETE_VOLUME << " call #"
-    << (numRetryableErrors + 1);
+    << "Failed to wait for DeleteVolumeRequest #" << (numRetryableErrors + 1);
 
   // Settle the clock to verify that there is no more outstanding request.
   Clock::settle();

Reply via email to