This is an automated email from the ASF dual-hosted git repository. chhsiao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/mesos.git
commit 5eff8b208e23b0ce9d064a7acea92a984f9b7c64 Author: Chun-Hung Hsiao <[email protected]> AuthorDate: Mon Apr 8 21:22:36 2019 -0700 Made the `RetryRpcWithExponentialBackoff` SLRP test work with CSI v1. This patch enables the unit test to test against CSI v1 through the following changes: * The forwarding mode of the test CSI plugin now respects the `--api_version` option. When specified, only requests of the proper CSI version will be forwarded. * The expectations of `CreateVolume` and `DeleteVolume` calls in the unit tests are parameterized against the CSI version string. * The mock CSI plugin now provides a default implementation for the `GetCapacity` call so the unit test can be simplified. Review: https://reviews.apache.org/r/70431 --- src/examples/test_csi_plugin.cpp | 53 ++- src/tests/mock_csi_plugin.cpp | 24 +- .../storage_local_resource_provider_tests.cpp | 396 +++++++++++---------- 3 files changed, 259 insertions(+), 214 deletions(-) diff --git a/src/examples/test_csi_plugin.cpp b/src/examples/test_csi_plugin.cpp index b54d666..03f782e 100644 --- a/src/examples/test_csi_plugin.cpp +++ b/src/examples/test_csi_plugin.cpp @@ -1716,8 +1716,12 @@ Try<Nothing, StatusError> TestCSIPlugin::nodeUnpublishVolume( class CSIProxy { public: - CSIProxy(const string& _endpoint, const string& forward) - : endpoint(_endpoint), + CSIProxy( + const Option<string>& _apiVersion, + const string& _endpoint, + const string& forward) + : apiVersion(_apiVersion), + endpoint(_endpoint), stub(grpc::CreateChannel(forward, grpc::InsecureChannelCredentials())), service(new AsyncGenericService()) {} @@ -1747,6 +1751,7 @@ private: void serve(ServerCompletionQueue* completionQueue); + const Option<string> apiVersion; const string endpoint; GenericStub stub; @@ -1779,13 +1784,13 @@ void CSIProxy::run() // The lifecycle of a forwarded CSI call is shown as follows. The transitions // happen after the completions of the API calls. // -// Server-side -// +-------------+ +-------------+ WriteAndFinish +---+ -// | INITIALIZED | | FINISHING +----------------> X | -// +------+------+ +------^------+ +---+ -// Server-side | | Client-side -// RequestCall | Server-side | Finish (unary call) -// +------v------+ Read +------+------+ +// Unsupported Server-side +// +-------------+ API version +-------------+ WriteAndFinish +---+ +// | INITIALIZED | +---------> FINISHING +----------------> X | +// +------+------+ | +------^------+ +---+ +// Server-side | +------+ | Client-side +// RequestCall | | Server-side | Finish (unary call) +// +------v---+--+ Read +------+------+ // | REQUESTED +-------------> FORWARDING | // +-------------+ +-------------+ // @@ -1816,7 +1821,7 @@ void CSIProxy::serve(ServerCompletionQueue* completionQueue) if (!ok) { // Server-side `RequestCall`: the server has been shutdown so continue // to drain the queue. - continue; + break; } call->state = Call::State::REQUESTED; @@ -1841,13 +1846,29 @@ void CSIProxy::serve(ServerCompletionQueue* completionQueue) case Call::State::REQUESTED: { if (!ok) { // Server-side `Read`: the client has done a `WritesDone` already, so - // clean up the call and move on to the next one. + // clean up the call and move to the next iteration immediately. delete call; continue; } - LOG(INFO) << "Forwarding " << call->serverContext.method() << " call"; + // The expected method names are of the following form: + // /csi.<api_version>.<service_name>/<rpc_name> + // See: https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests // NOLINT + if (apiVersion.isSome() && + !strings::startsWith( + call->serverContext.method(), + "/csi." + apiVersion.get() + ".")) { + // The proxy does not support the API version of the call so respond + // with `UNIMPLEMENTED`. + call->state = Call::State::FINISHING; + call->status = Status(grpc::UNIMPLEMENTED, ""); + call->serverReaderWriter.WriteAndFinish( + call->response, WriteOptions(), call->status, call); + + break; + } + LOG(INFO) << "Forwarding " << call->serverContext.method() << " call"; call->state = Call::State::FORWARDING; call->clientContext.set_wait_for_ready(true); @@ -1895,10 +1916,10 @@ void CSIProxy::serve(ServerCompletionQueue* completionQueue) << call->serverContext.method() << " call"; } - // The call is completed so clean it up. + // The call is completed so clean it up and move to the next iteration + // immediately. delete call; - - break; + continue; } } } @@ -2000,7 +2021,7 @@ int main(int argc, char** argv) } if (flags.forward.isSome()) { - CSIProxy proxy(flags.endpoint, flags.forward.get()); + CSIProxy proxy(flags.api_version, flags.endpoint, flags.forward.get()); proxy.run(); } else { diff --git a/src/tests/mock_csi_plugin.cpp b/src/tests/mock_csi_plugin.cpp index cbcb59f..dacdc15 100644 --- a/src/tests/mock_csi_plugin.cpp +++ b/src/tests/mock_csi_plugin.cpp @@ -16,6 +16,8 @@ #include "tests/mock_csi_plugin.hpp" +#include <stout/bytes.hpp> + using std::string; using std::unique_ptr; @@ -80,8 +82,17 @@ MockCSIPlugin::MockCSIPlugin() EXPECT_CALL(*this, ListVolumes(_, _, A<csi::v0::ListVolumesResponse*>())) .WillRepeatedly(Return(Status::OK)); + // Return an arbitrary available capacity by default for testing with the test + // CSI plugin in forwarding mode. EXPECT_CALL(*this, GetCapacity(_, _, A<csi::v0::GetCapacityResponse*>())) - .WillRepeatedly(Return(Status::OK)); + .WillRepeatedly(Invoke([]( + ServerContext* context, + const csi::v0::GetCapacityRequest* request, + csi::v0::GetCapacityResponse* response) { + response->set_available_capacity(Gigabytes(4).bytes()); + + return Status::OK; + })); // Enable all controller RPC capabilities by default for testing with the test // CSI plugin in forwarding mode. @@ -179,8 +190,17 @@ MockCSIPlugin::MockCSIPlugin() EXPECT_CALL(*this, ListVolumes(_, _, A<csi::v1::ListVolumesResponse*>())) .WillRepeatedly(Return(Status::OK)); + // Return an arbitrary available capacity by default for testing with the test + // CSI plugin in forwarding mode. EXPECT_CALL(*this, GetCapacity(_, _, A<csi::v1::GetCapacityResponse*>())) - .WillRepeatedly(Return(Status::OK)); + .WillRepeatedly(Invoke([]( + ServerContext* context, + const csi::v1::GetCapacityRequest* request, + csi::v1::GetCapacityResponse* response) { + response->set_available_capacity(Gigabytes(4).bytes()); + + return Status::OK; + })); // Enable all controller RPC capabilities by default for testing with the test // CSI plugin in forwarding mode. diff --git a/src/tests/storage_local_resource_provider_tests.cpp b/src/tests/storage_local_resource_provider_tests.cpp index 8bf4d23..09e7ca0 100644 --- a/src/tests/storage_local_resource_provider_tests.cpp +++ b/src/tests/storage_local_resource_provider_tests.cpp @@ -38,7 +38,9 @@ #include <stout/foreach.hpp> #include <stout/hashmap.hpp> +#include <stout/stringify.hpp> #include <stout/strings.hpp> +#include <stout/unreachable.hpp> #include <stout/uri.hpp> #include <stout/os/realpath.hpp> @@ -46,6 +48,8 @@ #include "csi/paths.hpp" #include "csi/state.hpp" #include "csi/v0_volume_manager_process.hpp" +#include "csi/v1_volume_manager_process.hpp" +#include "csi/volume_manager.hpp" #include "linux/fs.hpp" @@ -114,7 +118,8 @@ constexpr char TEST_CSI_VENDOR[] = "org.apache.mesos.csi.test.local"; class StorageLocalResourceProviderTest - : public ContainerizerTest<slave::MesosContainerizer> + : public ContainerizerTest<slave::MesosContainerizer>, + public testing::WithParamInterface<string> { public: void SetUp() override @@ -261,6 +266,7 @@ public: "value": "%s", "arguments": [ "%s", + "--api_version=%s", "--work_dir=%s", "--available_capacity=%s", "%s", @@ -296,6 +302,7 @@ public: TEST_CSI_PLUGIN_NAME, testCsiPluginPath, testCsiPluginPath, + GetParam(), testCsiPluginWorkDir.get(), stringify(capacity), createParameters.isSome() @@ -310,6 +317,105 @@ public: resourceProviderConfig.get())); } + // Set up an expected `CreateVolume` CSI call for a given mock CSI plugin. + // When the call is made to the mock plugin, `result` will be responded. When + // the response is received by the volume manager, the returned future will be + // satisfied. + Future<Nothing> futureCreateVolumeCall( + MockCSIPlugin* plugin, const Try<csi::VolumeInfo, StatusError>& result) + { + if (GetParam() == csi::v0::API_VERSION) { + EXPECT_CALL(*plugin, CreateVolume( + _, _, A<csi::v0::CreateVolumeResponse*>())) + .WillOnce(Invoke([result]( + grpc::ServerContext* context, + const csi::v0::CreateVolumeRequest* request, + csi::v0::CreateVolumeResponse* response) { + if (result.isError()) { + return result.error().status; + } + + response->mutable_volume()->set_id(result->id); + response->mutable_volume() + ->set_capacity_bytes(result->capacity.bytes()); + *response->mutable_volume()->mutable_attributes() = result->context; + + return grpc::Status::OK; + })); + + return FUTURE_DISPATCH(_, &csi::v0::VolumeManagerProcess::__call< + csi::v0::CreateVolumeResponse>); + } else if (GetParam() == csi::v1::API_VERSION) { + EXPECT_CALL(*plugin, CreateVolume( + _, _, A<csi::v1::CreateVolumeResponse*>())) + .WillOnce(Invoke([result]( + grpc::ServerContext* context, + const csi::v1::CreateVolumeRequest* request, + csi::v1::CreateVolumeResponse* response) { + if (result.isError()) { + return result.error().status; + } + + response->mutable_volume()->set_volume_id(result->id); + response->mutable_volume() + ->set_capacity_bytes(result->capacity.bytes()); + *response->mutable_volume() + ->mutable_volume_context() = result->context; + + return grpc::Status::OK; + })); + + return FUTURE_DISPATCH(_, &csi::v1::VolumeManagerProcess::__call< + csi::v1::CreateVolumeResponse>); + } + + // This extra closure is necessary in order to use `FAIL` as it requires a + // void return type. + [&] { FAIL() << "Unsupported CSI API version " << GetParam(); }(); + + UNREACHABLE(); + } + + // Set up an expected `DeleteVolume` CSI call for a given mock CSI plugin. + // When the call is made to the mock plugin, `result` will be responded. When + // the response is received by the volume manager, the returned future will be + // satisfied. + Future<Nothing> futureDeleteVolumeCall( + MockCSIPlugin* plugin, const Try<Nothing, StatusError>& result) + { + if (GetParam() == csi::v0::API_VERSION) { + EXPECT_CALL(*plugin, DeleteVolume( + _, _, A<csi::v0::DeleteVolumeResponse*>())) + .WillOnce(Invoke([result]( + grpc::ServerContext* context, + const csi::v0::DeleteVolumeRequest* request, + csi::v0::DeleteVolumeResponse* response) { + return result.isError() ? result.error().status : grpc::Status::OK; + })); + + return FUTURE_DISPATCH(_, &csi::v0::VolumeManagerProcess::__call< + csi::v0::DeleteVolumeResponse>); + } else if (GetParam() == csi::v1::API_VERSION) { + EXPECT_CALL(*plugin, DeleteVolume( + _, _, A<csi::v1::DeleteVolumeResponse*>())) + .WillOnce(Invoke([result]( + grpc::ServerContext* context, + const csi::v1::DeleteVolumeRequest* request, + csi::v1::DeleteVolumeResponse* response) { + return result.isError() ? result.error().status : grpc::Status::OK; + })); + + return FUTURE_DISPATCH(_, &csi::v1::VolumeManagerProcess::__call< + csi::v1::DeleteVolumeResponse>); + } + + // This extra closure is necessary in order to use `FAIL` as it requires a + // void return type. + [&] { FAIL() << "Unsupported CSI API version " << GetParam(); }(); + + UNREACHABLE(); + } + // Create a JSON string representing a disk profile mapping containing the // given profile-parameter pairs. static string createDiskProfileMapping( @@ -410,9 +516,16 @@ static bool isPreprovisionedVolume(const Resource& r) } +INSTANTIATE_TEST_CASE_P( + CSIVersion, + StorageLocalResourceProviderTest, + testing::Values(csi::v0::API_VERSION, csi::v1::API_VERSION), + [](const testing::TestParamInfo<string>& info) { return info.param; }); + + // This test verifies that a storage local resource provider can report // no resource and recover from this state. -TEST_F(StorageLocalResourceProviderTest, NoResource) +TEST_P(StorageLocalResourceProviderTest, NoResource) { Clock::pause(); @@ -493,7 +606,7 @@ TEST_F(StorageLocalResourceProviderTest, NoResource) // This test verifies that any zero-sized volume reported by a CSI // plugin will be ignored by the storage local resource provider. -TEST_F(StorageLocalResourceProviderTest, DISABLED_ZeroSizedDisk) +TEST_P(StorageLocalResourceProviderTest, DISABLED_ZeroSizedDisk) { Clock::pause(); @@ -545,7 +658,7 @@ TEST_F(StorageLocalResourceProviderTest, DISABLED_ZeroSizedDisk) // This test verifies that the storage local resource provider can // handle disks less than 1MB correctly. -TEST_F(StorageLocalResourceProviderTest, DISABLED_SmallDisk) +TEST_P(StorageLocalResourceProviderTest, DISABLED_SmallDisk) { const string profilesPath = path::join(sandbox.get(), "profiles.json"); @@ -629,7 +742,7 @@ TEST_F(StorageLocalResourceProviderTest, DISABLED_SmallDisk) // This test verifies that a framework can receive offers having new storage // pools from the storage local resource provider after a profile appears. -TEST_F(StorageLocalResourceProviderTest, ProfileAppeared) +TEST_P(StorageLocalResourceProviderTest, ProfileAppeared) { Clock::pause(); @@ -749,7 +862,7 @@ TEST_F(StorageLocalResourceProviderTest, ProfileAppeared) // This test verifies that the storage local resource provider can create then // destroy a MOUNT disk from a storage pool with other pipelined operations. -TEST_F(StorageLocalResourceProviderTest, CreateDestroyDisk) +TEST_P(StorageLocalResourceProviderTest, CreateDestroyDisk) { const string profilesPath = path::join(sandbox.get(), "profiles.json"); @@ -888,7 +1001,7 @@ TEST_F(StorageLocalResourceProviderTest, CreateDestroyDisk) // This test verifies that the storage local resource provider can destroy a // MOUNT disk created from a storage pool with other pipelined operations after // recovery. -TEST_F(StorageLocalResourceProviderTest, CreateDestroyDiskWithRecovery) +TEST_P(StorageLocalResourceProviderTest, CreateDestroyDiskWithRecovery) { const string profilesPath = path::join(sandbox.get(), "profiles.json"); @@ -1091,7 +1204,7 @@ TEST_F(StorageLocalResourceProviderTest, CreateDestroyDiskWithRecovery) // RAW disk would show up as of profile 'test2'. // 4. Destroy the MOUNT disk of profile 'test1'. All 4GB RAW disk would show // up as of profile 'test2'. -TEST_F(StorageLocalResourceProviderTest, ProfileDisappeared) +TEST_P(StorageLocalResourceProviderTest, ProfileDisappeared) { Clock::pause(); @@ -1365,7 +1478,7 @@ TEST_F(StorageLocalResourceProviderTest, ProfileDisappeared) // This test verifies that the storage local resource provider can // recover if the plugin is killed during an agent failover.. -TEST_F(StorageLocalResourceProviderTest, AgentFailoverPluginKilled) +TEST_P(StorageLocalResourceProviderTest, AgentFailoverPluginKilled) { setupResourceProviderConfig(Bytes(0), "volume0:4GB"); @@ -1473,7 +1586,7 @@ TEST_F(StorageLocalResourceProviderTest, AgentFailoverPluginKilled) // provider ID would change as well, and any created volume becomes a // preprovisioned volume. A framework should be able to either create a MOUNT // disk from a preprovisioned volume and use it, or destroy it directly. -TEST_F(StorageLocalResourceProviderTest, ROOT_AgentRegisteredWithNewId) +TEST_P(StorageLocalResourceProviderTest, ROOT_AgentRegisteredWithNewId) { const string profilesPath = path::join(sandbox.get(), "profiles.json"); @@ -1824,7 +1937,7 @@ TEST_F(StorageLocalResourceProviderTest, ROOT_AgentRegisteredWithNewId) // 3. Destroy the persistent volume but keep the MOUNT disk. The file should // be deleted. // 4. Destroy the MOUNT disk. -TEST_F( +TEST_P( StorageLocalResourceProviderTest, ROOT_CreateDestroyPersistentMountVolume) { const string profilesPath = path::join(sandbox.get(), "profiles.json"); @@ -2018,7 +2131,7 @@ TEST_F( // 6. Destroy the persistent volume but keep the MOUNT disk. The file should // be deleted. // 7. Destroy the MOUNT disk. -TEST_F( +TEST_P( StorageLocalResourceProviderTest, ROOT_CreateDestroyPersistentMountVolumeWithRecovery) { @@ -2296,7 +2409,7 @@ TEST_F( // 6. Destroy the persistent volume but keep the MOUNT disk. The file should // be deleted. // 7. Destroy the MOUNT disk. -TEST_F( +TEST_P( StorageLocalResourceProviderTest, ROOT_CreateDestroyPersistentMountVolumeWithReboot) { @@ -2608,7 +2721,7 @@ TEST_F( // This test verifies that the storage local resource provider can // restart its CSI plugin after it is killed and continue to work // properly. -TEST_F( +TEST_P( StorageLocalResourceProviderTest, ROOT_PublishUnpublishResourcesPluginKilled) { @@ -2866,7 +2979,7 @@ TEST_F( // would fail with EBUSY. // 4. Destroys the persistent volume and the MOUNT disk. `DESTROY` would fail // and `DESTROY_DISK` would be dropped. -TEST_F( +TEST_P( StorageLocalResourceProviderTest, ROOT_DestroyPersistentMountVolumeFailed) { const string profilesPath = path::join(sandbox.get(), "profiles.json"); @@ -3031,7 +3144,7 @@ TEST_F( // is known to the disk profile adaptor, and can return the space back to the // storage pool through either destroying the MOUNT disk, or destroying a RAW // disk directly. -TEST_F(StorageLocalResourceProviderTest, CreateDestroyPreprovisionedVolume) +TEST_P(StorageLocalResourceProviderTest, CreateDestroyPreprovisionedVolume) { const string profilesPath = path::join(sandbox.get(), "profiles.json"); @@ -3235,7 +3348,7 @@ TEST_F(StorageLocalResourceProviderTest, CreateDestroyPreprovisionedVolume) // master, so that it isn't acknowledged by the master. // 3. Advances the clock and verifies that the agent resends the operation // status update. -TEST_F(StorageLocalResourceProviderTest, RetryOperationStatusUpdate) +TEST_P(StorageLocalResourceProviderTest, RetryOperationStatusUpdate) { Clock::pause(); @@ -3384,7 +3497,7 @@ TEST_F(StorageLocalResourceProviderTest, RetryOperationStatusUpdate) // master, so that it isn't acknowledged by the master. // 3. Restarts the agent. // 4. Verifies that the agent resends the operation status update. -TEST_F( +TEST_P( StorageLocalResourceProviderTest, RetryOperationStatusUpdateAfterRecovery) { @@ -3555,7 +3668,7 @@ TEST_F( // This test verifies that storage local resource provider properly // reports the metric related to CSI plugin container terminations. -TEST_F(StorageLocalResourceProviderTest, ContainerTerminationMetric) +TEST_P(StorageLocalResourceProviderTest, ContainerTerminationMetric) { // Since we want to observe a fixed number of `UpdateSlaveMessage`s, // register the agent with paused clock to ensure it does not @@ -3663,7 +3776,7 @@ TEST_F(StorageLocalResourceProviderTest, ContainerTerminationMetric) // This test verifies that operation status updates contain the // agent ID and resource provider ID of originating providers. -TEST_F(StorageLocalResourceProviderTest, OperationUpdate) +TEST_P(StorageLocalResourceProviderTest, OperationUpdate) { Clock::pause(); @@ -3826,7 +3939,7 @@ TEST_F(StorageLocalResourceProviderTest, OperationUpdate) // operations since we have no control over the completion of an operation. Once // we support out-of-band CSI plugins through domain sockets, we could test this // metric against a mock CSI plugin. -TEST_F(StorageLocalResourceProviderTest, OperationStateMetrics) +TEST_P(StorageLocalResourceProviderTest, OperationStateMetrics) { const string profilesPath = path::join(sandbox.get(), "profiles.json"); @@ -4052,7 +4165,7 @@ TEST_F(StorageLocalResourceProviderTest, OperationStateMetrics) // `cancelled` metrics for RPCs since we have no control over the completion of // an operation. Once we support out-of-band CSI plugins through domain sockets, // we could test these metrics against a mock CSI plugin. -TEST_F(StorageLocalResourceProviderTest, CsiPluginRpcMetrics) +TEST_P(StorageLocalResourceProviderTest, CsiPluginRpcMetrics) { const string profilesPath = path::join(sandbox.get(), "profiles.json"); @@ -4218,7 +4331,7 @@ TEST_F(StorageLocalResourceProviderTest, CsiPluginRpcMetrics) // // TODO(greggomann): Test operations on agent default resources: for such // operations, the agent generates the dropped status. -TEST_F(StorageLocalResourceProviderTest, ReconcileDroppedOperation) +TEST_P(StorageLocalResourceProviderTest, ReconcileDroppedOperation) { Clock::pause(); @@ -4404,7 +4517,7 @@ TEST_F(StorageLocalResourceProviderTest, ReconcileDroppedOperation) // This test verifies that if an operation ID is specified, operation status // updates are resent to the scheduler until acknowledged. -TEST_F(StorageLocalResourceProviderTest, RetryOperationStatusUpdateToScheduler) +TEST_P(StorageLocalResourceProviderTest, RetryOperationStatusUpdateToScheduler) { Clock::pause(); @@ -4591,7 +4704,7 @@ TEST_F(StorageLocalResourceProviderTest, RetryOperationStatusUpdateToScheduler) // This test ensures that the master responds with the latest state // for operations that are terminal at the master, but have not been // acknowledged by the framework. -TEST_F( +TEST_P( StorageLocalResourceProviderTest, ReconcileUnacknowledgedTerminalOperation) { @@ -4778,7 +4891,7 @@ TEST_F( // clock is advanced exponentially to trigger retries. // 6. Returns `UNIMPLEMENTED` for the next `DeleteVolume` call to verify that // there is no retry on a non-retryable error. -TEST_F(StorageLocalResourceProviderTest, RetryRpcWithExponentialBackoff) +TEST_P(StorageLocalResourceProviderTest, RetryRpcWithExponentialBackoff) { Clock::pause(); @@ -4798,71 +4911,6 @@ TEST_F(StorageLocalResourceProviderTest, RetryRpcWithExponentialBackoff) MockCSIPlugin plugin; ASSERT_SOME(plugin.startup(mockCsiEndpoint)); - // TODO(chhsiao): Since this test expects CSI v0 protobufs, we disable CSI v1 - // for now. Remove this once the expectations are parameterized. - EXPECT_CALL(plugin, Probe(_, _, A<csi::v1::ProbeResponse*>())) - .WillRepeatedly(Return(grpc::Status(grpc::UNIMPLEMENTED, ""))); - - EXPECT_CALL(plugin, GetCapacity(_, _, A<csi::v0::GetCapacityResponse*>())) - .WillRepeatedly(Invoke([]( - grpc::ServerContext* context, - const csi::v0::GetCapacityRequest* request, - csi::v0::GetCapacityResponse* response) { - response->set_available_capacity(Gigabytes(4).bytes()); - - return grpc::Status::OK; - })); - - Queue<csi::v0::CreateVolumeRequest> createVolumeRequests; - Queue<Try<csi::v0::CreateVolumeResponse, StatusError>> createVolumeResults; - EXPECT_CALL(plugin, CreateVolume(_, _, A<csi::v0::CreateVolumeResponse*>())) - .WillRepeatedly(Invoke([&]( - grpc::ServerContext* context, - const csi::v0::CreateVolumeRequest* request, - csi::v0::CreateVolumeResponse* response) -> grpc::Status { - Future<Try<csi::v0::CreateVolumeResponse, StatusError>> result = - createVolumeResults.get(); - - EXPECT_TRUE(result.isPending()); - createVolumeRequests.put(*request); - - // This extra closure is necessary in order to use `AWAIT_ASSERT_*`, as - // these macros require a void return type. - [&] { AWAIT_ASSERT_READY(result); }(); - - if (result->isError()) { - return result->error().status; - } - - *response = result->get(); - return grpc::Status::OK; - })); - - Queue<csi::v0::DeleteVolumeRequest> deleteVolumeRequests; - Queue<Try<csi::v0::DeleteVolumeResponse, StatusError>> deleteVolumeResults; - EXPECT_CALL(plugin, DeleteVolume(_, _, A<csi::v0::DeleteVolumeResponse*>())) - .WillRepeatedly(Invoke([&]( - grpc::ServerContext* context, - const csi::v0::DeleteVolumeRequest* request, - csi::v0::DeleteVolumeResponse* response) -> grpc::Status { - Future<Try<csi::v0::DeleteVolumeResponse, StatusError>> result = - deleteVolumeResults.get(); - - EXPECT_TRUE(result.isPending()); - deleteVolumeRequests.put(*request); - - // This extra closure is necessary in order to use `AWAIT_ASSERT_*`, as - // these macros require a void return type. - [&] { AWAIT_ASSERT_READY(result); }(); - - if (result->isError()) { - return result->error().status; - } - - *response = result->get(); - return grpc::Status::OK; - })); - setupResourceProviderConfig(Bytes(0), None(), None(), mockCsiEndpoint); master::Flags masterFlags = CreateMasterFlags(); @@ -4935,9 +4983,7 @@ TEST_F(StorageLocalResourceProviderTest, RetryRpcWithExponentialBackoff) AWAIT_READY(offers); ASSERT_EQ(1u, offers->size()); - Resource raw = *Resources(offers->at(0).resources()) - .filter(std::bind(isStoragePool<Resource>, lambda::_1, "test")) - .begin(); + Offer offer = offers->at(0); // We expect that the following RPC calls are made during startup: `Probe`, // `GetPluginInfo` (2), `GetPluginCapabilities, `ControllerGetCapabilities`, @@ -4952,80 +4998,58 @@ TEST_F(StorageLocalResourceProviderTest, RetryRpcWithExponentialBackoff) EXPECT_TRUE(metricEquals(metricName("csi_plugin/rpcs_failed"), 0)); // Create a MOUNT disk. - Future<UpdateOperationStatusMessage> updateOperationStatus = - FUTURE_PROTOBUF(UpdateOperationStatusMessage(), _, _); - - driver.acceptOffers( - {offers->at(0).id()}, - {CREATE_DISK(raw, Resource::DiskInfo::Source::MOUNT)}); - - AWAIT_READY(createVolumeRequests.get()) - << "Failed to wait for CreateVolumeRequest #1"; + Resource raw = *Resources(offer.resources()) + .filter(std::bind(isStoragePool<Resource>, lambda::_1, "test")) + .begin(); - // Settle the clock to verify that there is no more outstanding request. - Clock::settle(); - ASSERT_EQ(0u, createVolumeRequests.size()); + EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveAnyResource( + std::bind(isMountDisk<Resource>, lambda::_1, "test")))) + .WillOnce(FutureArg<1>(&offers)); - Future<Nothing> createVolumeCall = FUTURE_DISPATCH( - _, &csi::v0::VolumeManagerProcess::__call<csi::v0::CreateVolumeResponse>); + Future<UpdateOperationStatusMessage> updateOperationStatus = + FUTURE_PROTOBUF(UpdateOperationStatusMessage(), _, _); // Return `DEADLINE_EXCEEDED` for the first `CreateVolume` call. - createVolumeResults.put( - StatusError(grpc::Status(grpc::DEADLINE_EXCEEDED, ""))); + Future<Nothing> createVolumeCall = futureCreateVolumeCall( + &plugin, StatusError(grpc::Status(grpc::DEADLINE_EXCEEDED, ""))); - AWAIT_READY(createVolumeCall); + driver.acceptOffers( + {offer.id()}, {CREATE_DISK(raw, Resource::DiskInfo::Source::MOUNT)}); - Duration createVolumeBackoff = csi::v0::DEFAULT_CSI_RETRY_BACKOFF_FACTOR; + AWAIT_READY(createVolumeCall) << "Failed to wait for CreateVolume call #1"; - // Settle the clock to ensure that the retry timer has been set, then advance - // the clock by the maximum backoff to trigger a retry. - Clock::settle(); - Clock::advance(createVolumeBackoff); + Duration createVolumeBackoff = csi::v0::DEFAULT_CSI_RETRY_BACKOFF_FACTOR; - // Return `UNAVAILABLE` for subsequent `CreateVolume` calls. for (size_t i = 1; i < numRetryableErrors; i++) { - AWAIT_READY(createVolumeRequests.get()) - << "Failed to wait for CreateVolumeRequest #" << (i + 1); + // Return `UNAVAILABLE` for subsequent `CreateVolume` calls. + createVolumeCall = futureCreateVolumeCall( + &plugin, StatusError(grpc::Status(grpc::UNAVAILABLE, ""))); - // Settle the clock to verify that there is no more outstanding request. + // Settle the clock to ensure that the retry timer has been set, then + // advance the clock by the maximum backoff to trigger a retry. Clock::settle(); - ASSERT_EQ(0u, createVolumeRequests.size()); - - createVolumeCall = FUTURE_DISPATCH( - _, - &csi::v0::VolumeManagerProcess::__call<csi::v0::CreateVolumeResponse>); - - createVolumeResults.put(StatusError(grpc::Status(grpc::UNAVAILABLE, ""))); + Clock::advance(createVolumeBackoff); - AWAIT_READY(createVolumeCall); + AWAIT_READY(createVolumeCall) + << "Failed to wait for CreateVolume call #" << (i + 1); createVolumeBackoff = std::min( createVolumeBackoff * 2, csi::v0::DEFAULT_CSI_RETRY_INTERVAL_MAX); - - // Settle the clock to ensure that the retry timer has been set, then - // advance the clock by the maximum backoff to trigger a retry. - Clock::settle(); - Clock::advance(createVolumeBackoff); } - AWAIT_READY(createVolumeRequests.get()) - << "Failed to wait for CreateVolumeRequest #" << (numRetryableErrors + 1); + // Return a successful response for the last `CreateVolume` call. + createVolumeCall = futureCreateVolumeCall( + &plugin, + csi::VolumeInfo{Megabytes(raw.scalar().value()).bytes(), + id::UUID::random().toString()}); - // Settle the clock to verify that there is no more outstanding request. + // Settle the clock to ensure that the retry timer has been set, then + // advance the clock by the maximum backoff to trigger a retry. Clock::settle(); - ASSERT_EQ(0u, createVolumeRequests.size()); - - EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveAnyResource( - std::bind(isMountDisk<Resource>, lambda::_1, "test")))) - .WillOnce(FutureArg<1>(&offers)); - - // Return a successful response for the last `CreateVolume` call. - csi::v0::CreateVolumeResponse createVolumeResponse; - createVolumeResponse.mutable_volume()->set_id(id::UUID::random().toString()); - createVolumeResponse.mutable_volume()->set_capacity_bytes( - Megabytes(raw.scalar().value()).bytes()); + Clock::advance(createVolumeBackoff); - createVolumeResults.put(std::move(createVolumeResponse)); + AWAIT_READY(createVolumeCall) + << "Failed to wait for CreateVolume call #" << (numRetryableErrors + 1); AWAIT_READY(updateOperationStatus); EXPECT_EQ(OPERATION_FINISHED, updateOperationStatus->status().state()); @@ -5036,73 +5060,53 @@ TEST_F(StorageLocalResourceProviderTest, RetryRpcWithExponentialBackoff) AWAIT_READY(offers); ASSERT_EQ(1u, offers->size()); - Resource created = *Resources(offers->at(0).resources()) + offer = offers->at(0); + + // Destroy the MOUNT disk. + Resource created = *Resources(offer.resources()) .filter(std::bind(isMountDisk<Resource>, lambda::_1, "test")) .begin(); - // Destroy the MOUNT disk. updateOperationStatus = FUTURE_PROTOBUF(UpdateOperationStatusMessage(), _, _); - driver.acceptOffers({offers->at(0).id()}, {DESTROY_DISK(created)}); - - AWAIT_READY(deleteVolumeRequests.get()) - << "Failed to wait for DeleteVolumeRequest #1"; - - // Settle the clock to verify that there is no more outstanding request. - Clock::settle(); - ASSERT_EQ(0u, deleteVolumeRequests.size()); - - Future<Nothing> deleteVolumeCall = FUTURE_DISPATCH( - _, &csi::v0::VolumeManagerProcess::__call<csi::v0::DeleteVolumeResponse>); - // Return `DEADLINE_EXCEEDED` for the first `DeleteVolume` call. - deleteVolumeResults.put( - StatusError(grpc::Status(grpc::DEADLINE_EXCEEDED, ""))); + Future<Nothing> deleteVolumeCall = futureDeleteVolumeCall( + &plugin, StatusError(grpc::Status(grpc::DEADLINE_EXCEEDED, ""))); - AWAIT_READY(deleteVolumeCall); + driver.acceptOffers({offers->at(0).id()}, {DESTROY_DISK(created)}); - Duration deleteVolumeBackoff = csi::v0::DEFAULT_CSI_RETRY_BACKOFF_FACTOR; + AWAIT_READY(deleteVolumeCall) << "Failed to wait for DeleteVolume call #1"; - // Settle the clock to ensure that the retry timer has been set, then advance - // the clock by the maximum backoff to trigger a retry. - Clock::settle(); - Clock::advance(deleteVolumeBackoff); + Duration deleteVolumeBackoff = csi::v0::DEFAULT_CSI_RETRY_BACKOFF_FACTOR; - // Return `UNAVAILABLE` for subsequent `DeleteVolume` calls. for (size_t i = 1; i < numRetryableErrors; i++) { - AWAIT_READY(deleteVolumeRequests.get()) - << "Failed to wait for DeleteVolumeRequest #" << (i + 1); + // Return `UNAVAILABLE` for subsequent `DeleteVolume` calls. + deleteVolumeCall = futureDeleteVolumeCall( + &plugin, StatusError(grpc::Status(grpc::UNAVAILABLE, ""))); - // Settle the clock to verify that there is no more outstanding request. + // Settle the clock to ensure that the retry timer has been set, then + // advance the clock by the maximum backoff to trigger a retry. Clock::settle(); - ASSERT_EQ(0u, deleteVolumeRequests.size()); - - deleteVolumeCall = FUTURE_DISPATCH( - _, - &csi::v0::VolumeManagerProcess::__call<csi::v0::DeleteVolumeResponse>); - - deleteVolumeResults.put(StatusError(grpc::Status(grpc::UNAVAILABLE, ""))); + Clock::advance(deleteVolumeBackoff); - AWAIT_READY(deleteVolumeCall); + AWAIT_READY(deleteVolumeCall) + << "Failed to wait for DeleteVolume call #" << (i + 1); deleteVolumeBackoff = std::min( deleteVolumeBackoff * 2, csi::v0::DEFAULT_CSI_RETRY_INTERVAL_MAX); - - // Settle the clock to ensure that the retry timer has been set, then - // advance the clock by the maximum backoff to trigger a retry. - Clock::settle(); - Clock::advance(deleteVolumeBackoff); } - AWAIT_READY(deleteVolumeRequests.get()) - << "Failed to wait for DeleteVolumeRequest #" << (numRetryableErrors + 1); + // Return a non-retryable error for the last `DeleteVolume` call. + deleteVolumeCall = futureDeleteVolumeCall( + &plugin, StatusError(grpc::Status(grpc::UNIMPLEMENTED, ""))); - // Settle the clock to verify that there is no more outstanding request. + // Settle the clock to ensure that the retry timer has been set, then + // advance the clock by the maximum backoff to trigger a retry. Clock::settle(); - ASSERT_EQ(0u, deleteVolumeRequests.size()); + Clock::advance(deleteVolumeBackoff); - // Return a non-retryable error for the last `DeleteVolume` call. - deleteVolumeResults.put(StatusError(grpc::Status(grpc::UNIMPLEMENTED, ""))); + AWAIT_READY(deleteVolumeCall) + << "Failed to wait for DeleteVolume call #" << (numRetryableErrors + 1); AWAIT_READY(updateOperationStatus); EXPECT_EQ(OPERATION_FAILED, updateOperationStatus->status().state()); @@ -5126,7 +5130,7 @@ TEST_F(StorageLocalResourceProviderTest, RetryRpcWithExponentialBackoff) // (i.e. the master cannot assume the success of the operation) operation // status updates that originate from frameworks which have been torn down. // Non-speculative operations include CREATE_DISK. -TEST_F( +TEST_P( StorageLocalResourceProviderTest, FrameworkTeardownBeforeTerminalOperationStatusUpdate) { @@ -5297,7 +5301,7 @@ TEST_F( // the master fails over, but the operation's originating framework // never reregisters. Alternatively, and uncommonly, pending operations // on an agent migrated from one master to another will produce orphans. -TEST_F( +TEST_P( StorageLocalResourceProviderTest, TerminalOrphanOperationAfterMasterFailover) { @@ -5485,7 +5489,7 @@ TEST_F( // This test verifies that operators can reserve/unreserve and // create/destroy persistent volumes with resource provider resources. -TEST_F( +TEST_P( StorageLocalResourceProviderTest, OperatorOperationsWithResourceProviderResources) {
