This is an automated email from the ASF dual-hosted git repository.

chhsiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit 5eff8b208e23b0ce9d064a7acea92a984f9b7c64
Author: Chun-Hung Hsiao <[email protected]>
AuthorDate: Mon Apr 8 21:22:36 2019 -0700

    Made the `RetryRpcWithExponentialBackoff` SLRP test work with CSI v1.
    
    This patch enables the unit test to test against CSI v1 through the
    following changes:
    
      * The forwarding mode of the test CSI plugin now respects the
        `--api_version` option. When specified, only requests of the proper
        CSI version will be forwarded.
    
      * The expectations of `CreateVolume` and `DeleteVolume` calls in the
        unit tests are parameterized against the CSI version string.
    
      * The mock CSI plugin now provides a default implementation for the
        `GetCapacity` call so the unit test can be simplified.
    
    Review: https://reviews.apache.org/r/70431
---
 src/examples/test_csi_plugin.cpp                   |  53 ++-
 src/tests/mock_csi_plugin.cpp                      |  24 +-
 .../storage_local_resource_provider_tests.cpp      | 396 +++++++++++----------
 3 files changed, 259 insertions(+), 214 deletions(-)

diff --git a/src/examples/test_csi_plugin.cpp b/src/examples/test_csi_plugin.cpp
index b54d666..03f782e 100644
--- a/src/examples/test_csi_plugin.cpp
+++ b/src/examples/test_csi_plugin.cpp
@@ -1716,8 +1716,12 @@ Try<Nothing, StatusError> 
TestCSIPlugin::nodeUnpublishVolume(
 class CSIProxy
 {
 public:
-  CSIProxy(const string& _endpoint, const string& forward)
-    : endpoint(_endpoint),
+  CSIProxy(
+      const Option<string>& _apiVersion,
+      const string& _endpoint,
+      const string& forward)
+    : apiVersion(_apiVersion),
+      endpoint(_endpoint),
       stub(grpc::CreateChannel(forward, grpc::InsecureChannelCredentials())),
       service(new AsyncGenericService()) {}
 
@@ -1747,6 +1751,7 @@ private:
 
   void serve(ServerCompletionQueue* completionQueue);
 
+  const Option<string> apiVersion;
   const string endpoint;
 
   GenericStub stub;
@@ -1779,13 +1784,13 @@ void CSIProxy::run()
 // The lifecycle of a forwarded CSI call is shown as follows. The transitions
 // happen after the completions of the API calls.
 //
-//                                                     Server-side
-//        +-------------+             +-------------+ WriteAndFinish +---+
-//        | INITIALIZED |             |  FINISHING  +----------------> X |
-//        +------+------+             +------^------+                +---+
-//   Server-side |                           | Client-side
-//   RequestCall |        Server-side        | Finish (unary call)
-//        +------v------+    Read     +------+------+
+//                        Unsupported                  Server-side
+//        +-------------+ API version +-------------+ WriteAndFinish +---+
+//        | INITIALIZED |   +--------->  FINISHING  +----------------> X |
+//        +------+------+   |         +------^------+                +---+
+//   Server-side |   +------+                | Client-side
+//   RequestCall |   |    Server-side        | Finish (unary call)
+//        +------v---+--+    Read     +------+------+
 //        |  REQUESTED  +-------------> FORWARDING  |
 //        +-------------+             +-------------+
 //
@@ -1816,7 +1821,7 @@ void CSIProxy::serve(ServerCompletionQueue* 
completionQueue)
         if (!ok) {
           // Server-side `RequestCall`: the server has been shutdown so 
continue
           // to drain the queue.
-          continue;
+          break;
         }
 
         call->state = Call::State::REQUESTED;
@@ -1841,13 +1846,29 @@ void CSIProxy::serve(ServerCompletionQueue* 
completionQueue)
       case Call::State::REQUESTED: {
         if (!ok) {
           // Server-side `Read`: the client has done a `WritesDone` already, so
-          // clean up the call and move on to the next one.
+          // clean up the call and move to the next iteration immediately.
           delete call;
           continue;
         }
 
-        LOG(INFO) << "Forwarding " << call->serverContext.method() << " call";
+        // The expected method names are of the following form:
+        //   /csi.<api_version>.<service_name>/<rpc_name>
+        // See: 
https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests // 
NOLINT
+        if (apiVersion.isSome() &&
+            !strings::startsWith(
+                call->serverContext.method(),
+                "/csi." + apiVersion.get() + ".")) {
+          // The proxy does not support the API version of the call so respond
+          // with `UNIMPLEMENTED`.
+          call->state = Call::State::FINISHING;
+          call->status = Status(grpc::UNIMPLEMENTED, "");
+          call->serverReaderWriter.WriteAndFinish(
+              call->response, WriteOptions(), call->status, call);
+
+          break;
+        }
 
+        LOG(INFO) << "Forwarding " << call->serverContext.method() << " call";
         call->state = Call::State::FORWARDING;
 
         call->clientContext.set_wait_for_ready(true);
@@ -1895,10 +1916,10 @@ void CSIProxy::serve(ServerCompletionQueue* 
completionQueue)
                      << call->serverContext.method() << " call";
         }
 
-        // The call is completed so clean it up.
+        // The call is completed so clean it up and move to the next iteration
+        // immediately.
         delete call;
-
-        break;
+        continue;
       }
     }
   }
@@ -2000,7 +2021,7 @@ int main(int argc, char** argv)
   }
 
   if (flags.forward.isSome()) {
-    CSIProxy proxy(flags.endpoint, flags.forward.get());
+    CSIProxy proxy(flags.api_version, flags.endpoint, flags.forward.get());
 
     proxy.run();
   } else {
diff --git a/src/tests/mock_csi_plugin.cpp b/src/tests/mock_csi_plugin.cpp
index cbcb59f..dacdc15 100644
--- a/src/tests/mock_csi_plugin.cpp
+++ b/src/tests/mock_csi_plugin.cpp
@@ -16,6 +16,8 @@
 
 #include "tests/mock_csi_plugin.hpp"
 
+#include <stout/bytes.hpp>
+
 using std::string;
 using std::unique_ptr;
 
@@ -80,8 +82,17 @@ MockCSIPlugin::MockCSIPlugin()
   EXPECT_CALL(*this, ListVolumes(_, _, A<csi::v0::ListVolumesResponse*>()))
     .WillRepeatedly(Return(Status::OK));
 
+  // Return an arbitrary available capacity by default for testing with the 
test
+  // CSI plugin in forwarding mode.
   EXPECT_CALL(*this, GetCapacity(_, _, A<csi::v0::GetCapacityResponse*>()))
-    .WillRepeatedly(Return(Status::OK));
+    .WillRepeatedly(Invoke([](
+        ServerContext* context,
+        const csi::v0::GetCapacityRequest* request,
+        csi::v0::GetCapacityResponse* response) {
+      response->set_available_capacity(Gigabytes(4).bytes());
+
+      return Status::OK;
+    }));
 
   // Enable all controller RPC capabilities by default for testing with the 
test
   // CSI plugin in forwarding mode.
@@ -179,8 +190,17 @@ MockCSIPlugin::MockCSIPlugin()
   EXPECT_CALL(*this, ListVolumes(_, _, A<csi::v1::ListVolumesResponse*>()))
     .WillRepeatedly(Return(Status::OK));
 
+  // Return an arbitrary available capacity by default for testing with the 
test
+  // CSI plugin in forwarding mode.
   EXPECT_CALL(*this, GetCapacity(_, _, A<csi::v1::GetCapacityResponse*>()))
-    .WillRepeatedly(Return(Status::OK));
+    .WillRepeatedly(Invoke([](
+        ServerContext* context,
+        const csi::v1::GetCapacityRequest* request,
+        csi::v1::GetCapacityResponse* response) {
+      response->set_available_capacity(Gigabytes(4).bytes());
+
+      return Status::OK;
+    }));
 
   // Enable all controller RPC capabilities by default for testing with the 
test
   // CSI plugin in forwarding mode.
diff --git a/src/tests/storage_local_resource_provider_tests.cpp 
b/src/tests/storage_local_resource_provider_tests.cpp
index 8bf4d23..09e7ca0 100644
--- a/src/tests/storage_local_resource_provider_tests.cpp
+++ b/src/tests/storage_local_resource_provider_tests.cpp
@@ -38,7 +38,9 @@
 
 #include <stout/foreach.hpp>
 #include <stout/hashmap.hpp>
+#include <stout/stringify.hpp>
 #include <stout/strings.hpp>
+#include <stout/unreachable.hpp>
 #include <stout/uri.hpp>
 
 #include <stout/os/realpath.hpp>
@@ -46,6 +48,8 @@
 #include "csi/paths.hpp"
 #include "csi/state.hpp"
 #include "csi/v0_volume_manager_process.hpp"
+#include "csi/v1_volume_manager_process.hpp"
+#include "csi/volume_manager.hpp"
 
 #include "linux/fs.hpp"
 
@@ -114,7 +118,8 @@ constexpr char TEST_CSI_VENDOR[] = 
"org.apache.mesos.csi.test.local";
 
 
 class StorageLocalResourceProviderTest
-  : public ContainerizerTest<slave::MesosContainerizer>
+  : public ContainerizerTest<slave::MesosContainerizer>,
+    public testing::WithParamInterface<string>
 {
 public:
   void SetUp() override
@@ -261,6 +266,7 @@ public:
                     "value": "%s",
                     "arguments": [
                       "%s",
+                      "--api_version=%s",
                       "--work_dir=%s",
                       "--available_capacity=%s",
                       "%s",
@@ -296,6 +302,7 @@ public:
         TEST_CSI_PLUGIN_NAME,
         testCsiPluginPath,
         testCsiPluginPath,
+        GetParam(),
         testCsiPluginWorkDir.get(),
         stringify(capacity),
         createParameters.isSome()
@@ -310,6 +317,105 @@ public:
         resourceProviderConfig.get()));
   }
 
+  // Set up an expected `CreateVolume` CSI call for a given mock CSI plugin.
+  // When the call is made to the mock plugin, `result` will be responded. When
+  // the response is received by the volume manager, the returned future will 
be
+  // satisfied.
+  Future<Nothing> futureCreateVolumeCall(
+      MockCSIPlugin* plugin, const Try<csi::VolumeInfo, StatusError>& result)
+  {
+    if (GetParam() == csi::v0::API_VERSION) {
+      EXPECT_CALL(*plugin, CreateVolume(
+          _, _, A<csi::v0::CreateVolumeResponse*>()))
+        .WillOnce(Invoke([result](
+            grpc::ServerContext* context,
+            const csi::v0::CreateVolumeRequest* request,
+            csi::v0::CreateVolumeResponse* response) {
+          if (result.isError()) {
+            return result.error().status;
+          }
+
+          response->mutable_volume()->set_id(result->id);
+          response->mutable_volume()
+            ->set_capacity_bytes(result->capacity.bytes());
+          *response->mutable_volume()->mutable_attributes() = result->context;
+
+          return grpc::Status::OK;
+        }));
+
+      return FUTURE_DISPATCH(_, &csi::v0::VolumeManagerProcess::__call<
+          csi::v0::CreateVolumeResponse>);
+    } else if (GetParam() == csi::v1::API_VERSION) {
+      EXPECT_CALL(*plugin, CreateVolume(
+          _, _, A<csi::v1::CreateVolumeResponse*>()))
+        .WillOnce(Invoke([result](
+            grpc::ServerContext* context,
+            const csi::v1::CreateVolumeRequest* request,
+            csi::v1::CreateVolumeResponse* response) {
+          if (result.isError()) {
+            return result.error().status;
+          }
+
+          response->mutable_volume()->set_volume_id(result->id);
+          response->mutable_volume()
+            ->set_capacity_bytes(result->capacity.bytes());
+          *response->mutable_volume()
+            ->mutable_volume_context() = result->context;
+
+          return grpc::Status::OK;
+        }));
+
+      return FUTURE_DISPATCH(_, &csi::v1::VolumeManagerProcess::__call<
+          csi::v1::CreateVolumeResponse>);
+    }
+
+    // This extra closure is necessary in order to use `FAIL` as it requires a
+    // void return type.
+    [&] { FAIL() << "Unsupported CSI API version " << GetParam(); }();
+
+    UNREACHABLE();
+  }
+
+  // Set up an expected `DeleteVolume` CSI call for a given mock CSI plugin.
+  // When the call is made to the mock plugin, `result` will be responded. When
+  // the response is received by the volume manager, the returned future will 
be
+  // satisfied.
+  Future<Nothing> futureDeleteVolumeCall(
+      MockCSIPlugin* plugin, const Try<Nothing, StatusError>& result)
+  {
+    if (GetParam() == csi::v0::API_VERSION) {
+      EXPECT_CALL(*plugin, DeleteVolume(
+          _, _, A<csi::v0::DeleteVolumeResponse*>()))
+        .WillOnce(Invoke([result](
+            grpc::ServerContext* context,
+            const csi::v0::DeleteVolumeRequest* request,
+            csi::v0::DeleteVolumeResponse* response) {
+          return result.isError() ? result.error().status : grpc::Status::OK;
+        }));
+
+      return FUTURE_DISPATCH(_, &csi::v0::VolumeManagerProcess::__call<
+          csi::v0::DeleteVolumeResponse>);
+    } else if (GetParam() == csi::v1::API_VERSION) {
+      EXPECT_CALL(*plugin, DeleteVolume(
+          _, _, A<csi::v1::DeleteVolumeResponse*>()))
+        .WillOnce(Invoke([result](
+            grpc::ServerContext* context,
+            const csi::v1::DeleteVolumeRequest* request,
+            csi::v1::DeleteVolumeResponse* response) {
+          return result.isError() ? result.error().status : grpc::Status::OK;
+        }));
+
+      return FUTURE_DISPATCH(_, &csi::v1::VolumeManagerProcess::__call<
+          csi::v1::DeleteVolumeResponse>);
+    }
+
+    // This extra closure is necessary in order to use `FAIL` as it requires a
+    // void return type.
+    [&] { FAIL() << "Unsupported CSI API version " << GetParam(); }();
+
+    UNREACHABLE();
+  }
+
   // Create a JSON string representing a disk profile mapping containing the
   // given profile-parameter pairs.
   static string createDiskProfileMapping(
@@ -410,9 +516,16 @@ static bool isPreprovisionedVolume(const Resource& r)
 }
 
 
+INSTANTIATE_TEST_CASE_P(
+    CSIVersion,
+    StorageLocalResourceProviderTest,
+    testing::Values(csi::v0::API_VERSION, csi::v1::API_VERSION),
+    [](const testing::TestParamInfo<string>& info) { return info.param; });
+
+
 // This test verifies that a storage local resource provider can report
 // no resource and recover from this state.
-TEST_F(StorageLocalResourceProviderTest, NoResource)
+TEST_P(StorageLocalResourceProviderTest, NoResource)
 {
   Clock::pause();
 
@@ -493,7 +606,7 @@ TEST_F(StorageLocalResourceProviderTest, NoResource)
 
 // This test verifies that any zero-sized volume reported by a CSI
 // plugin will be ignored by the storage local resource provider.
-TEST_F(StorageLocalResourceProviderTest, DISABLED_ZeroSizedDisk)
+TEST_P(StorageLocalResourceProviderTest, DISABLED_ZeroSizedDisk)
 {
   Clock::pause();
 
@@ -545,7 +658,7 @@ TEST_F(StorageLocalResourceProviderTest, 
DISABLED_ZeroSizedDisk)
 
 // This test verifies that the storage local resource provider can
 // handle disks less than 1MB correctly.
-TEST_F(StorageLocalResourceProviderTest, DISABLED_SmallDisk)
+TEST_P(StorageLocalResourceProviderTest, DISABLED_SmallDisk)
 {
   const string profilesPath = path::join(sandbox.get(), "profiles.json");
 
@@ -629,7 +742,7 @@ TEST_F(StorageLocalResourceProviderTest, DISABLED_SmallDisk)
 
 // This test verifies that a framework can receive offers having new storage
 // pools from the storage local resource provider after a profile appears.
-TEST_F(StorageLocalResourceProviderTest, ProfileAppeared)
+TEST_P(StorageLocalResourceProviderTest, ProfileAppeared)
 {
   Clock::pause();
 
@@ -749,7 +862,7 @@ TEST_F(StorageLocalResourceProviderTest, ProfileAppeared)
 
 // This test verifies that the storage local resource provider can create then
 // destroy a MOUNT disk from a storage pool with other pipelined operations.
-TEST_F(StorageLocalResourceProviderTest, CreateDestroyDisk)
+TEST_P(StorageLocalResourceProviderTest, CreateDestroyDisk)
 {
   const string profilesPath = path::join(sandbox.get(), "profiles.json");
 
@@ -888,7 +1001,7 @@ TEST_F(StorageLocalResourceProviderTest, CreateDestroyDisk)
 // This test verifies that the storage local resource provider can destroy a
 // MOUNT disk created from a storage pool with other pipelined operations after
 // recovery.
-TEST_F(StorageLocalResourceProviderTest, CreateDestroyDiskWithRecovery)
+TEST_P(StorageLocalResourceProviderTest, CreateDestroyDiskWithRecovery)
 {
   const string profilesPath = path::join(sandbox.get(), "profiles.json");
 
@@ -1091,7 +1204,7 @@ TEST_F(StorageLocalResourceProviderTest, 
CreateDestroyDiskWithRecovery)
 //      RAW disk would show up as of profile 'test2'.
 //   4. Destroy the MOUNT disk of profile 'test1'. All 4GB RAW disk would show
 //      up as of profile 'test2'.
-TEST_F(StorageLocalResourceProviderTest, ProfileDisappeared)
+TEST_P(StorageLocalResourceProviderTest, ProfileDisappeared)
 {
   Clock::pause();
 
@@ -1365,7 +1478,7 @@ TEST_F(StorageLocalResourceProviderTest, 
ProfileDisappeared)
 
 // This test verifies that the storage local resource provider can
 // recover if the plugin is killed during an agent failover..
-TEST_F(StorageLocalResourceProviderTest, AgentFailoverPluginKilled)
+TEST_P(StorageLocalResourceProviderTest, AgentFailoverPluginKilled)
 {
   setupResourceProviderConfig(Bytes(0), "volume0:4GB");
 
@@ -1473,7 +1586,7 @@ TEST_F(StorageLocalResourceProviderTest, 
AgentFailoverPluginKilled)
 // provider ID would change as well, and any created volume becomes a
 // preprovisioned volume. A framework should be able to either create a MOUNT
 // disk from a preprovisioned volume and use it, or destroy it directly.
-TEST_F(StorageLocalResourceProviderTest, ROOT_AgentRegisteredWithNewId)
+TEST_P(StorageLocalResourceProviderTest, ROOT_AgentRegisteredWithNewId)
 {
   const string profilesPath = path::join(sandbox.get(), "profiles.json");
 
@@ -1824,7 +1937,7 @@ TEST_F(StorageLocalResourceProviderTest, 
ROOT_AgentRegisteredWithNewId)
 //   3. Destroy the persistent volume but keep the MOUNT disk. The file should
 //      be deleted.
 //   4. Destroy the MOUNT disk.
-TEST_F(
+TEST_P(
     StorageLocalResourceProviderTest, ROOT_CreateDestroyPersistentMountVolume)
 {
   const string profilesPath = path::join(sandbox.get(), "profiles.json");
@@ -2018,7 +2131,7 @@ TEST_F(
 //   6. Destroy the persistent volume but keep the MOUNT disk. The file should
 //      be deleted.
 //   7. Destroy the MOUNT disk.
-TEST_F(
+TEST_P(
     StorageLocalResourceProviderTest,
     ROOT_CreateDestroyPersistentMountVolumeWithRecovery)
 {
@@ -2296,7 +2409,7 @@ TEST_F(
 //   6. Destroy the persistent volume but keep the MOUNT disk. The file should
 //      be deleted.
 //   7. Destroy the MOUNT disk.
-TEST_F(
+TEST_P(
     StorageLocalResourceProviderTest,
     ROOT_CreateDestroyPersistentMountVolumeWithReboot)
 {
@@ -2608,7 +2721,7 @@ TEST_F(
 // This test verifies that the storage local resource provider can
 // restart its CSI plugin after it is killed and continue to work
 // properly.
-TEST_F(
+TEST_P(
     StorageLocalResourceProviderTest,
     ROOT_PublishUnpublishResourcesPluginKilled)
 {
@@ -2866,7 +2979,7 @@ TEST_F(
 //      would fail with EBUSY.
 //   4. Destroys the persistent volume and the MOUNT disk. `DESTROY` would fail
 //      and `DESTROY_DISK` would be dropped.
-TEST_F(
+TEST_P(
     StorageLocalResourceProviderTest, ROOT_DestroyPersistentMountVolumeFailed)
 {
   const string profilesPath = path::join(sandbox.get(), "profiles.json");
@@ -3031,7 +3144,7 @@ TEST_F(
 // is known to the disk profile adaptor, and can return the space back to the
 // storage pool through either destroying the MOUNT disk, or destroying a RAW
 // disk directly.
-TEST_F(StorageLocalResourceProviderTest, CreateDestroyPreprovisionedVolume)
+TEST_P(StorageLocalResourceProviderTest, CreateDestroyPreprovisionedVolume)
 {
   const string profilesPath = path::join(sandbox.get(), "profiles.json");
 
@@ -3235,7 +3348,7 @@ TEST_F(StorageLocalResourceProviderTest, 
CreateDestroyPreprovisionedVolume)
 //      master, so that it isn't acknowledged by the master.
 //   3. Advances the clock and verifies that the agent resends the operation
 //      status update.
-TEST_F(StorageLocalResourceProviderTest, RetryOperationStatusUpdate)
+TEST_P(StorageLocalResourceProviderTest, RetryOperationStatusUpdate)
 {
   Clock::pause();
 
@@ -3384,7 +3497,7 @@ TEST_F(StorageLocalResourceProviderTest, 
RetryOperationStatusUpdate)
 //      master, so that it isn't acknowledged by the master.
 //   3. Restarts the agent.
 //   4. Verifies that the agent resends the operation status update.
-TEST_F(
+TEST_P(
     StorageLocalResourceProviderTest,
     RetryOperationStatusUpdateAfterRecovery)
 {
@@ -3555,7 +3668,7 @@ TEST_F(
 
 // This test verifies that storage local resource provider properly
 // reports the metric related to CSI plugin container terminations.
-TEST_F(StorageLocalResourceProviderTest, ContainerTerminationMetric)
+TEST_P(StorageLocalResourceProviderTest, ContainerTerminationMetric)
 {
   // Since we want to observe a fixed number of `UpdateSlaveMessage`s,
   // register the agent with paused clock to ensure it does not
@@ -3663,7 +3776,7 @@ TEST_F(StorageLocalResourceProviderTest, 
ContainerTerminationMetric)
 
 // This test verifies that operation status updates contain the
 // agent ID and resource provider ID of originating providers.
-TEST_F(StorageLocalResourceProviderTest, OperationUpdate)
+TEST_P(StorageLocalResourceProviderTest, OperationUpdate)
 {
   Clock::pause();
 
@@ -3826,7 +3939,7 @@ TEST_F(StorageLocalResourceProviderTest, OperationUpdate)
 // operations since we have no control over the completion of an operation. 
Once
 // we support out-of-band CSI plugins through domain sockets, we could test 
this
 // metric against a mock CSI plugin.
-TEST_F(StorageLocalResourceProviderTest, OperationStateMetrics)
+TEST_P(StorageLocalResourceProviderTest, OperationStateMetrics)
 {
   const string profilesPath = path::join(sandbox.get(), "profiles.json");
 
@@ -4052,7 +4165,7 @@ TEST_F(StorageLocalResourceProviderTest, 
OperationStateMetrics)
 // `cancelled` metrics for RPCs since we have no control over the completion of
 // an operation. Once we support out-of-band CSI plugins through domain 
sockets,
 // we could test these metrics against a mock CSI plugin.
-TEST_F(StorageLocalResourceProviderTest, CsiPluginRpcMetrics)
+TEST_P(StorageLocalResourceProviderTest, CsiPluginRpcMetrics)
 {
   const string profilesPath = path::join(sandbox.get(), "profiles.json");
 
@@ -4218,7 +4331,7 @@ TEST_F(StorageLocalResourceProviderTest, 
CsiPluginRpcMetrics)
 //
 // TODO(greggomann): Test operations on agent default resources: for such
 // operations, the agent generates the dropped status.
-TEST_F(StorageLocalResourceProviderTest, ReconcileDroppedOperation)
+TEST_P(StorageLocalResourceProviderTest, ReconcileDroppedOperation)
 {
   Clock::pause();
 
@@ -4404,7 +4517,7 @@ TEST_F(StorageLocalResourceProviderTest, 
ReconcileDroppedOperation)
 
 // This test verifies that if an operation ID is specified, operation status
 // updates are resent to the scheduler until acknowledged.
-TEST_F(StorageLocalResourceProviderTest, RetryOperationStatusUpdateToScheduler)
+TEST_P(StorageLocalResourceProviderTest, RetryOperationStatusUpdateToScheduler)
 {
   Clock::pause();
 
@@ -4591,7 +4704,7 @@ TEST_F(StorageLocalResourceProviderTest, 
RetryOperationStatusUpdateToScheduler)
 // This test ensures that the master responds with the latest state
 // for operations that are terminal at the master, but have not been
 // acknowledged by the framework.
-TEST_F(
+TEST_P(
     StorageLocalResourceProviderTest,
     ReconcileUnacknowledgedTerminalOperation)
 {
@@ -4778,7 +4891,7 @@ TEST_F(
 //      clock is advanced exponentially to trigger retries.
 //   6. Returns `UNIMPLEMENTED` for the next `DeleteVolume` call to verify that
 //      there is no retry on a non-retryable error.
-TEST_F(StorageLocalResourceProviderTest, RetryRpcWithExponentialBackoff)
+TEST_P(StorageLocalResourceProviderTest, RetryRpcWithExponentialBackoff)
 {
   Clock::pause();
 
@@ -4798,71 +4911,6 @@ TEST_F(StorageLocalResourceProviderTest, 
RetryRpcWithExponentialBackoff)
   MockCSIPlugin plugin;
   ASSERT_SOME(plugin.startup(mockCsiEndpoint));
 
-  // TODO(chhsiao): Since this test expects CSI v0 protobufs, we disable CSI v1
-  // for now. Remove this once the expectations are parameterized.
-  EXPECT_CALL(plugin, Probe(_, _, A<csi::v1::ProbeResponse*>()))
-    .WillRepeatedly(Return(grpc::Status(grpc::UNIMPLEMENTED, "")));
-
-  EXPECT_CALL(plugin, GetCapacity(_, _, A<csi::v0::GetCapacityResponse*>()))
-    .WillRepeatedly(Invoke([](
-        grpc::ServerContext* context,
-        const csi::v0::GetCapacityRequest* request,
-        csi::v0::GetCapacityResponse* response) {
-      response->set_available_capacity(Gigabytes(4).bytes());
-
-      return grpc::Status::OK;
-    }));
-
-  Queue<csi::v0::CreateVolumeRequest> createVolumeRequests;
-  Queue<Try<csi::v0::CreateVolumeResponse, StatusError>> createVolumeResults;
-  EXPECT_CALL(plugin, CreateVolume(_, _, A<csi::v0::CreateVolumeResponse*>()))
-    .WillRepeatedly(Invoke([&](
-        grpc::ServerContext* context,
-        const csi::v0::CreateVolumeRequest* request,
-        csi::v0::CreateVolumeResponse* response) -> grpc::Status {
-      Future<Try<csi::v0::CreateVolumeResponse, StatusError>> result =
-        createVolumeResults.get();
-
-      EXPECT_TRUE(result.isPending());
-      createVolumeRequests.put(*request);
-
-      // This extra closure is necessary in order to use `AWAIT_ASSERT_*`, as
-      // these macros require a void return type.
-      [&] { AWAIT_ASSERT_READY(result); }();
-
-      if (result->isError()) {
-        return result->error().status;
-      }
-
-      *response = result->get();
-      return grpc::Status::OK;
-    }));
-
-  Queue<csi::v0::DeleteVolumeRequest> deleteVolumeRequests;
-  Queue<Try<csi::v0::DeleteVolumeResponse, StatusError>> deleteVolumeResults;
-  EXPECT_CALL(plugin, DeleteVolume(_, _, A<csi::v0::DeleteVolumeResponse*>()))
-    .WillRepeatedly(Invoke([&](
-        grpc::ServerContext* context,
-        const csi::v0::DeleteVolumeRequest* request,
-        csi::v0::DeleteVolumeResponse* response) -> grpc::Status {
-      Future<Try<csi::v0::DeleteVolumeResponse, StatusError>> result =
-        deleteVolumeResults.get();
-
-      EXPECT_TRUE(result.isPending());
-      deleteVolumeRequests.put(*request);
-
-      // This extra closure is necessary in order to use `AWAIT_ASSERT_*`, as
-      // these macros require a void return type.
-      [&] { AWAIT_ASSERT_READY(result); }();
-
-      if (result->isError()) {
-        return result->error().status;
-      }
-
-      *response = result->get();
-      return grpc::Status::OK;
-    }));
-
   setupResourceProviderConfig(Bytes(0), None(), None(), mockCsiEndpoint);
 
   master::Flags masterFlags = CreateMasterFlags();
@@ -4935,9 +4983,7 @@ TEST_F(StorageLocalResourceProviderTest, 
RetryRpcWithExponentialBackoff)
   AWAIT_READY(offers);
   ASSERT_EQ(1u, offers->size());
 
-  Resource raw = *Resources(offers->at(0).resources())
-    .filter(std::bind(isStoragePool<Resource>, lambda::_1, "test"))
-    .begin();
+  Offer offer = offers->at(0);
 
   // We expect that the following RPC calls are made during startup: `Probe`,
   // `GetPluginInfo` (2), `GetPluginCapabilities, `ControllerGetCapabilities`,
@@ -4952,80 +4998,58 @@ TEST_F(StorageLocalResourceProviderTest, 
RetryRpcWithExponentialBackoff)
   EXPECT_TRUE(metricEquals(metricName("csi_plugin/rpcs_failed"), 0));
 
   // Create a MOUNT disk.
-  Future<UpdateOperationStatusMessage> updateOperationStatus =
-    FUTURE_PROTOBUF(UpdateOperationStatusMessage(), _, _);
-
-  driver.acceptOffers(
-      {offers->at(0).id()},
-      {CREATE_DISK(raw, Resource::DiskInfo::Source::MOUNT)});
-
-  AWAIT_READY(createVolumeRequests.get())
-    << "Failed to wait for CreateVolumeRequest #1";
+  Resource raw = *Resources(offer.resources())
+    .filter(std::bind(isStoragePool<Resource>, lambda::_1, "test"))
+    .begin();
 
-  // Settle the clock to verify that there is no more outstanding request.
-  Clock::settle();
-  ASSERT_EQ(0u, createVolumeRequests.size());
+  EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveAnyResource(
+      std::bind(isMountDisk<Resource>, lambda::_1, "test"))))
+    .WillOnce(FutureArg<1>(&offers));
 
-  Future<Nothing> createVolumeCall = FUTURE_DISPATCH(
-      _, 
&csi::v0::VolumeManagerProcess::__call<csi::v0::CreateVolumeResponse>);
+  Future<UpdateOperationStatusMessage> updateOperationStatus =
+    FUTURE_PROTOBUF(UpdateOperationStatusMessage(), _, _);
 
   // Return `DEADLINE_EXCEEDED` for the first `CreateVolume` call.
-  createVolumeResults.put(
-      StatusError(grpc::Status(grpc::DEADLINE_EXCEEDED, "")));
+  Future<Nothing> createVolumeCall = futureCreateVolumeCall(
+      &plugin, StatusError(grpc::Status(grpc::DEADLINE_EXCEEDED, "")));
 
-  AWAIT_READY(createVolumeCall);
+  driver.acceptOffers(
+      {offer.id()}, {CREATE_DISK(raw, Resource::DiskInfo::Source::MOUNT)});
 
-  Duration createVolumeBackoff = csi::v0::DEFAULT_CSI_RETRY_BACKOFF_FACTOR;
+  AWAIT_READY(createVolumeCall) << "Failed to wait for CreateVolume call #1";
 
-  // Settle the clock to ensure that the retry timer has been set, then advance
-  // the clock by the maximum backoff to trigger a retry.
-  Clock::settle();
-  Clock::advance(createVolumeBackoff);
+  Duration createVolumeBackoff = csi::v0::DEFAULT_CSI_RETRY_BACKOFF_FACTOR;
 
-  // Return `UNAVAILABLE` for subsequent `CreateVolume` calls.
   for (size_t i = 1; i < numRetryableErrors; i++) {
-    AWAIT_READY(createVolumeRequests.get())
-      << "Failed to wait for CreateVolumeRequest #" << (i + 1);
+    // Return `UNAVAILABLE` for subsequent `CreateVolume` calls.
+    createVolumeCall = futureCreateVolumeCall(
+        &plugin, StatusError(grpc::Status(grpc::UNAVAILABLE, "")));
 
-    // Settle the clock to verify that there is no more outstanding request.
+    // Settle the clock to ensure that the retry timer has been set, then
+    // advance the clock by the maximum backoff to trigger a retry.
     Clock::settle();
-    ASSERT_EQ(0u, createVolumeRequests.size());
-
-    createVolumeCall = FUTURE_DISPATCH(
-        _,
-        &csi::v0::VolumeManagerProcess::__call<csi::v0::CreateVolumeResponse>);
-
-    createVolumeResults.put(StatusError(grpc::Status(grpc::UNAVAILABLE, "")));
+    Clock::advance(createVolumeBackoff);
 
-    AWAIT_READY(createVolumeCall);
+    AWAIT_READY(createVolumeCall)
+      << "Failed to wait for CreateVolume call #" << (i + 1);
 
     createVolumeBackoff = std::min(
         createVolumeBackoff * 2, csi::v0::DEFAULT_CSI_RETRY_INTERVAL_MAX);
-
-    // Settle the clock to ensure that the retry timer has been set, then
-    // advance the clock by the maximum backoff to trigger a retry.
-    Clock::settle();
-    Clock::advance(createVolumeBackoff);
   }
 
-  AWAIT_READY(createVolumeRequests.get())
-    << "Failed to wait for CreateVolumeRequest #" << (numRetryableErrors + 1);
+  // Return a successful response for the last `CreateVolume` call.
+  createVolumeCall = futureCreateVolumeCall(
+      &plugin,
+      csi::VolumeInfo{Megabytes(raw.scalar().value()).bytes(),
+                      id::UUID::random().toString()});
 
-  // Settle the clock to verify that there is no more outstanding request.
+  // Settle the clock to ensure that the retry timer has been set, then
+  // advance the clock by the maximum backoff to trigger a retry.
   Clock::settle();
-  ASSERT_EQ(0u, createVolumeRequests.size());
-
-  EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveAnyResource(
-      std::bind(isMountDisk<Resource>, lambda::_1, "test"))))
-    .WillOnce(FutureArg<1>(&offers));
-
-  // Return a successful response for the last `CreateVolume` call.
-  csi::v0::CreateVolumeResponse createVolumeResponse;
-  createVolumeResponse.mutable_volume()->set_id(id::UUID::random().toString());
-  createVolumeResponse.mutable_volume()->set_capacity_bytes(
-      Megabytes(raw.scalar().value()).bytes());
+  Clock::advance(createVolumeBackoff);
 
-  createVolumeResults.put(std::move(createVolumeResponse));
+  AWAIT_READY(createVolumeCall)
+    << "Failed to wait for CreateVolume call #" << (numRetryableErrors + 1);
 
   AWAIT_READY(updateOperationStatus);
   EXPECT_EQ(OPERATION_FINISHED, updateOperationStatus->status().state());
@@ -5036,73 +5060,53 @@ TEST_F(StorageLocalResourceProviderTest, 
RetryRpcWithExponentialBackoff)
   AWAIT_READY(offers);
   ASSERT_EQ(1u, offers->size());
 
-  Resource created = *Resources(offers->at(0).resources())
+  offer = offers->at(0);
+
+  // Destroy the MOUNT disk.
+  Resource created = *Resources(offer.resources())
     .filter(std::bind(isMountDisk<Resource>, lambda::_1, "test"))
     .begin();
 
-  // Destroy the MOUNT disk.
   updateOperationStatus = FUTURE_PROTOBUF(UpdateOperationStatusMessage(), _, 
_);
 
-  driver.acceptOffers({offers->at(0).id()}, {DESTROY_DISK(created)});
-
-  AWAIT_READY(deleteVolumeRequests.get())
-    << "Failed to wait for DeleteVolumeRequest #1";
-
-  // Settle the clock to verify that there is no more outstanding request.
-  Clock::settle();
-  ASSERT_EQ(0u, deleteVolumeRequests.size());
-
-  Future<Nothing> deleteVolumeCall = FUTURE_DISPATCH(
-      _, 
&csi::v0::VolumeManagerProcess::__call<csi::v0::DeleteVolumeResponse>);
-
   // Return `DEADLINE_EXCEEDED` for the first `DeleteVolume` call.
-  deleteVolumeResults.put(
-      StatusError(grpc::Status(grpc::DEADLINE_EXCEEDED, "")));
+  Future<Nothing> deleteVolumeCall = futureDeleteVolumeCall(
+      &plugin, StatusError(grpc::Status(grpc::DEADLINE_EXCEEDED, "")));
 
-  AWAIT_READY(deleteVolumeCall);
+  driver.acceptOffers({offers->at(0).id()}, {DESTROY_DISK(created)});
 
-  Duration deleteVolumeBackoff = csi::v0::DEFAULT_CSI_RETRY_BACKOFF_FACTOR;
+  AWAIT_READY(deleteVolumeCall) << "Failed to wait for DeleteVolume call #1";
 
-  // Settle the clock to ensure that the retry timer has been set, then advance
-  // the clock by the maximum backoff to trigger a retry.
-  Clock::settle();
-  Clock::advance(deleteVolumeBackoff);
+  Duration deleteVolumeBackoff = csi::v0::DEFAULT_CSI_RETRY_BACKOFF_FACTOR;
 
-  // Return `UNAVAILABLE` for subsequent `DeleteVolume` calls.
   for (size_t i = 1; i < numRetryableErrors; i++) {
-    AWAIT_READY(deleteVolumeRequests.get())
-      << "Failed to wait for DeleteVolumeRequest #" << (i + 1);
+    // Return `UNAVAILABLE` for subsequent `DeleteVolume` calls.
+    deleteVolumeCall = futureDeleteVolumeCall(
+        &plugin, StatusError(grpc::Status(grpc::UNAVAILABLE, "")));
 
-    // Settle the clock to verify that there is no more outstanding request.
+    // Settle the clock to ensure that the retry timer has been set, then
+    // advance the clock by the maximum backoff to trigger a retry.
     Clock::settle();
-    ASSERT_EQ(0u, deleteVolumeRequests.size());
-
-    deleteVolumeCall = FUTURE_DISPATCH(
-        _,
-        &csi::v0::VolumeManagerProcess::__call<csi::v0::DeleteVolumeResponse>);
-
-    deleteVolumeResults.put(StatusError(grpc::Status(grpc::UNAVAILABLE, "")));
+    Clock::advance(deleteVolumeBackoff);
 
-    AWAIT_READY(deleteVolumeCall);
+    AWAIT_READY(deleteVolumeCall)
+      << "Failed to wait for DeleteVolume call #" << (i + 1);
 
     deleteVolumeBackoff = std::min(
         deleteVolumeBackoff * 2, csi::v0::DEFAULT_CSI_RETRY_INTERVAL_MAX);
-
-    // Settle the clock to ensure that the retry timer has been set, then
-    // advance the clock by the maximum backoff to trigger a retry.
-    Clock::settle();
-    Clock::advance(deleteVolumeBackoff);
   }
 
-  AWAIT_READY(deleteVolumeRequests.get())
-    << "Failed to wait for DeleteVolumeRequest #" << (numRetryableErrors + 1);
+  // Return a non-retryable error for the last `DeleteVolume` call.
+  deleteVolumeCall = futureDeleteVolumeCall(
+      &plugin, StatusError(grpc::Status(grpc::UNIMPLEMENTED, "")));
 
-  // Settle the clock to verify that there is no more outstanding request.
+  // Settle the clock to ensure that the retry timer has been set, then
+  // advance the clock by the maximum backoff to trigger a retry.
   Clock::settle();
-  ASSERT_EQ(0u, deleteVolumeRequests.size());
+  Clock::advance(deleteVolumeBackoff);
 
-  // Return a non-retryable error for the last `DeleteVolume` call.
-  deleteVolumeResults.put(StatusError(grpc::Status(grpc::UNIMPLEMENTED, "")));
+  AWAIT_READY(deleteVolumeCall)
+    << "Failed to wait for DeleteVolume call #" << (numRetryableErrors + 1);
 
   AWAIT_READY(updateOperationStatus);
   EXPECT_EQ(OPERATION_FAILED, updateOperationStatus->status().state());
@@ -5126,7 +5130,7 @@ TEST_F(StorageLocalResourceProviderTest, 
RetryRpcWithExponentialBackoff)
 // (i.e. the master cannot assume the success of the operation) operation
 // status updates that originate from frameworks which have been torn down.
 // Non-speculative operations include CREATE_DISK.
-TEST_F(
+TEST_P(
     StorageLocalResourceProviderTest,
     FrameworkTeardownBeforeTerminalOperationStatusUpdate)
 {
@@ -5297,7 +5301,7 @@ TEST_F(
 // the master fails over, but the operation's originating framework
 // never reregisters. Alternatively, and uncommonly, pending operations
 // on an agent migrated from one master to another will produce orphans.
-TEST_F(
+TEST_P(
     StorageLocalResourceProviderTest,
     TerminalOrphanOperationAfterMasterFailover)
 {
@@ -5485,7 +5489,7 @@ TEST_F(
 
 // This test verifies that operators can reserve/unreserve and
 // create/destroy persistent volumes with resource provider resources.
-TEST_F(
+TEST_P(
     StorageLocalResourceProviderTest,
     OperatorOperationsWithResourceProviderResources)
 {

Reply via email to