Repository: mesos Updated Branches: refs/heads/master c3c070094 -> cac8fdf01
Checkpointing `OfferOperation` in resource provider states. Instead of checkpointing `ApplyOfferOperation`, we now checkpoint `OfferOperations` in resource provider states such that we can keep track of completed operations as well. This patch also does some code cleanup, and modifies a unit test for storage local resource provider to issue operations in batches. Review: https://reviews.apache.org/r/64559/ Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/eeb09cbc Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/eeb09cbc Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/eeb09cbc Branch: refs/heads/master Commit: eeb09cbc18827068fe56dc826e26d2e98fbcb494 Parents: c3c0700 Author: Chun-Hung Hsiao <[email protected]> Authored: Wed Dec 13 18:57:16 2017 -0800 Committer: Jie Yu <[email protected]> Committed: Wed Dec 13 18:57:16 2017 -0800 ---------------------------------------------------------------------- src/resource_provider/state.proto | 20 +- src/resource_provider/storage/provider.cpp | 361 ++++++++----------- .../storage_local_resource_provider_tests.cpp | 52 +-- 3 files changed, 172 insertions(+), 261 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/eeb09cbc/src/resource_provider/state.proto ---------------------------------------------------------------------- diff --git a/src/resource_provider/state.proto b/src/resource_provider/state.proto index 321201e..ea759d8 100644 --- a/src/resource_provider/state.proto +++ b/src/resource_provider/state.proto @@ -20,9 +20,9 @@ import "mesos/mesos.proto"; import "mesos/resource_provider/resource_provider.proto"; -package mesos.resource_provider.state; +package mesos.resource_provider; -option java_package = "org.apache.mesos.resource_provider.state"; +option java_package = "org.apache.mesos.resource_provider"; option java_outer_classname = "Protos"; @@ -30,22 +30,8 @@ message ResourceProviderState { // This includes only pending operations. Operations that have // unacknowledged statuses should be recovered through the status // update manager. - repeated Event.ApplyOfferOperation operations = 1; + repeated OfferOperation operations = 1; // The total resources provided by this resource provider. repeated Resource resources = 2; - - // Used to establish the relationship between the operation and - // the resources that the operation is operating on. Each resource - // provider will keep a resource version UUID, and change it when - // it believes that the resources from this resource provider are - // out of sync from the master's view. The master will keep track - // of the last known resource version UUID for each resource - // provider, and attach the resource version UUID in each - // operation it sends out. The resource provider should reject - // operations that have a different resource version UUID than - // that it maintains, because this means the operation is - // operating on resources that might have already been - // invalidated. - required bytes resource_version_uuid = 3; } http://git-wip-us.apache.org/repos/asf/mesos/blob/eeb09cbc/src/resource_provider/storage/provider.cpp ---------------------------------------------------------------------- diff --git a/src/resource_provider/storage/provider.cpp b/src/resource_provider/storage/provider.cpp index 2fd4a3b..80bcea0 100644 --- a/src/resource_provider/storage/provider.cpp +++ b/src/resource_provider/storage/provider.cpp @@ -105,8 +105,7 @@ using mesos::internal::slave::ContainerDaemon; using mesos::resource_provider::Call; using mesos::resource_provider::Event; - -using mesos::resource_provider::state::ResourceProviderState; +using mesos::resource_provider::ResourceProviderState; using mesos::v1::resource_provider::Driver; @@ -148,7 +147,8 @@ static bool isValidType(const string& s) // Timeout for a CSI plugin component to create its endpoint socket. -static const Duration CSI_ENDPOINT_CREATION_TIMEOUT = Seconds(5); +// TODO(chhsiao): Make the timeout configurable. +static const Duration CSI_ENDPOINT_CREATION_TIMEOUT = Minutes(1); // Returns a prefix for naming standalone containers to run CSI plugins @@ -292,7 +292,8 @@ public: contentType(ContentType::PROTOBUF), info(_info), slaveId(_slaveId), - authToken(_authToken) {} + authToken(_authToken), + resourceVersion(UUID::random()) {} StorageLocalResourceProviderProcess( const StorageLocalResourceProviderProcess& other) = delete; @@ -331,7 +332,7 @@ private: Future<Nothing> recoverVolumes(); Future<Nothing> recoverStatusUpdates(); void doReliableRegistration(); - Future<Nothing> reconcile(); + Future<Nothing> reconcileResourceProviderState(); // Functions for received events. void subscribed(const Event::Subscribed& subscribed); @@ -362,7 +363,8 @@ private: Future<Nothing> deleteVolume(const string& volumeId); // Applies the offer operation. Conventional operations will be - // synchronously applied. + // synchronously applied. Do nothing if the operation is already in a + // terminal state. Future<Nothing> _applyOfferOperation(const UUID& operationUuid); Future<vector<ResourceConversion>> applyCreateVolumeOrBlock( @@ -373,7 +375,7 @@ private: const Resource& resource); // Synchronously update `totalResources` and the offer operation status. - Try<Nothing> applyResourceConversions( + Try<Nothing> updateOfferOperationStatus( const UUID& operationUuid, const Try<vector<ResourceConversion>>& conversions); @@ -421,12 +423,15 @@ private: Option<csi::ControllerCapabilities> controllerCapabilities; Option<string> nodeId; - // NOTE: We store the list of pending operations in a `LinkedHashMap` - // to preserve the order we receive the operations. This is useful - // when we replay depending operations during recovery. - LinkedHashMap<UUID, Event::ApplyOfferOperation> pendingOperations; + // We maintain the following invariant: if one operation depends on + // another, they cannot be in PENDING state at the same time, i.e., + // the result of the preceding operation must have been reflected in + // the total resources. + // NOTE: We store the list of offer operations in a `LinkedHashMap` to + // preserve the order we receive the operations in case we need it. + LinkedHashMap<UUID, OfferOperation> offerOperations; Resources totalResources; - Option<UUID> resourceVersion; + UUID resourceVersion; hashmap<string, VolumeData> volumes; }; @@ -569,10 +574,9 @@ Future<Nothing> StorageLocalResourceProviderProcess::recover() .then(defer(self(), &Self::recoverVolumes)) .then(defer(self(), [=]() -> Future<Nothing> { // Recover the resource provider ID and state from the latest - // symlink. If the symlink cannot be resolved, this is a new - // resource provider, and `totalResources` and `resourceVersion` - // will be empty, which is fine since they will be set up during - // reconciliation. + // symlink. If the symlink does not exist, this is a new resource + // provider, and the total resources will be empty, which is fine + // since new resources will be added during reconciliation. Result<string> realpath = os::realpath( slave::paths::getLatestResourceProviderPath( metaDir, slaveId, info.type(), info.name())); @@ -600,39 +604,37 @@ Future<Nothing> StorageLocalResourceProviderProcess::recover() } if (resourceProviderState.isSome()) { - foreach (const Event::ApplyOfferOperation& operation, + foreach (const OfferOperation& operation, resourceProviderState->operations()) { Try<UUID> uuid = UUID::fromBytes(operation.operation_uuid()); CHECK_SOME(uuid); - pendingOperations[uuid.get()] = operation; + offerOperations[uuid.get()] = operation; } totalResources = resourceProviderState->resources(); - - Try<UUID> uuid = - UUID::fromBytes(resourceProviderState->resource_version_uuid()); - CHECK_SOME(uuid); - - resourceVersion = uuid.get(); } } // We replay all pending operations here, so that if a volume is - // actually created before the last failover, it will be reflected - // in the updated total resources before we do the reconciliation. - // NOTE: `_applyOfferOperation` will remove the applied operation - // from the list of pending operations, so we make a copy of keys - // here. - foreach (const UUID& uuid, pendingOperations.keys()) { + // actually created or deleted before the last failover, it will + // be reflected in the total resources before reconciliation. + foreachpair (const UUID& uuid, + const OfferOperation& operation, + offerOperations) { + if (protobuf::isTerminalState(operation.latest_status().state())) { + continue; + } + + auto err = [](const UUID& uuid, const string& message) { + LOG(ERROR) + << "Falied to apply offer operation with UUID " << uuid << ": " + << message; + }; + _applyOfferOperation(uuid) - .onAny(defer(self(), [=](const Future<Nothing>& future) { - if (!future.isReady()) { - LOG(ERROR) - << "Failed to apply offer operation with UUID " << uuid << ": " - << (future.isFailed() ? future.failure() : "future discarded"); - } - })); + .onFailed(std::bind(err, uuid, lambda::_1)) + .onDiscarded(std::bind(err, uuid, "future discarded")); } state = DISCONNECTED; @@ -901,7 +903,8 @@ void StorageLocalResourceProviderProcess::doReliableRegistration() } -Future<Nothing> StorageLocalResourceProviderProcess::reconcile() +Future<Nothing> +StorageLocalResourceProviderProcess::reconcileResourceProviderState() { return recoverStatusUpdates() .then(defer(self(), &Self::importResources)) @@ -970,9 +973,8 @@ Future<Nothing> StorageLocalResourceProviderProcess::reconcile() LOG(INFO) << "Adding new resource '" << resource << "'"; } - if (resourceVersion.isNone() || result != totalResources) { + if (result != totalResources) { totalResources = result; - resourceVersion = UUID::random(); checkpointResourceProviderState(); } @@ -1007,11 +1009,11 @@ void StorageLocalResourceProviderProcess::subscribed( } const string message = - "Failed to update state for resource provider " + stringify(info.id()); + "Failed to reconcile resource provider " + stringify(info.id()); // Reconcile resources after obtaining the resource provider ID. // TODO(chhsiao): Do the reconciliation early. - reconcile() + reconcileResourceProviderState() .onFailed(defer(self(), &Self::fatal, message, lambda::_1)) .onDiscarded(defer(self(), &Self::fatal, message, "future discarded")); } @@ -1020,32 +1022,55 @@ void StorageLocalResourceProviderProcess::subscribed( void StorageLocalResourceProviderProcess::applyOfferOperation( const Event::ApplyOfferOperation& operation) { - Future<Resources> converted; - - if (state == SUBSCRIBED) { - // TODO(chhsiao): Reject this operation. - return; - } - - CHECK_EQ(READY, state); - - LOG(INFO) << "Received " << operation.info().type() << " operation"; + // NOTE: If we receive an offer operation in SUBSCRIBED state, there + // must be a resource version mismatch since the current resource + // version is not reported yet. + CHECK(state == SUBSCRIBED || state == READY); Try<UUID> uuid = UUID::fromBytes(operation.operation_uuid()); CHECK_SOME(uuid); - CHECK(!pendingOperations.contains(uuid.get())); - pendingOperations[uuid.get()] = operation; + LOG(INFO) + << "Received " << operation.info().type() << " operation with UUID " + << uuid.get(); + + CHECK(!offerOperations.contains(uuid.get())); + offerOperations[uuid.get()] = protobuf::createOfferOperation( + operation.info(), + protobuf::createOfferOperationStatus( + OFFER_OPERATION_PENDING, + operation.info().has_id() + ? operation.info().id() : Option<OfferOperationID>::none()), + operation.has_framework_id() + ? operation.framework_id() : Option<FrameworkID>::none(), + slaveId, + uuid.get()); + checkpointResourceProviderState(); - _applyOfferOperation(uuid.get()) - .onAny(defer(self(), [=](const Future<Nothing>& future) { - if (!future.isReady()) { - LOG(ERROR) - << "Failed to apply " << operation.info().type() << " operation: " - << (future.isFailed() ? future.failure() : "future discarded"); - } - })); + Future<Nothing> result; + + Try<UUID> operationVersion = + UUID::fromBytes(operation.resource_version_uuid()); + CHECK_SOME(operationVersion); + + if (operationVersion.get() != resourceVersion) { + result = updateOfferOperationStatus(uuid.get(), Error( + "Mismatched resource version " + stringify(operationVersion.get()) + + " (expected: " + stringify(resourceVersion) + ")")); + } else { + result = _applyOfferOperation(uuid.get()); + } + + auto err = [](const UUID& uuid, const string& message) { + LOG(ERROR) + << "Failed to apply offer operation with UUID " << uuid << ": " + << message; + }; + + result + .onFailed(std::bind(err, uuid.get(), lambda::_1)) + .onDiscarded(std::bind(err, uuid.get(), "future discarded")); } @@ -2074,81 +2099,59 @@ Future<Nothing> StorageLocalResourceProviderProcess::deleteVolume( Future<Nothing> StorageLocalResourceProviderProcess::_applyOfferOperation( const UUID& operationUuid) { - Future<vector<ResourceConversion>> conversions; - Option<Error> error; + CHECK(offerOperations.contains(operationUuid)); + const OfferOperation& operation = offerOperations.at(operationUuid); - CHECK(pendingOperations.contains(operationUuid)); - const Event::ApplyOfferOperation& operation = - pendingOperations.at(operationUuid); - - Try<UUID> operationVersion = - UUID::fromBytes(operation.resource_version_uuid()); - CHECK_SOME(operationVersion); + CHECK(!protobuf::isTerminalState(operation.latest_status().state())); - if (resourceVersion.get() != operationVersion.get()) { - error = Error( - "Mismatched resource version " + stringify(operationVersion.get()) + - " (expected: " + stringify(resourceVersion.get()) + ")"); - } + Future<vector<ResourceConversion>> conversions; switch (operation.info().type()) { case Offer::Operation::RESERVE: case Offer::Operation::UNRESERVE: case Offer::Operation::CREATE: case Offer::Operation::DESTROY: { - // Synchronously apply the conventional operations. - return applyResourceConversions( + // Synchronously apply the conventional operations to ensure that + // its result is reflected in the total resources before any of + // its succeeding operations is applied. + return updateOfferOperationStatus( operationUuid, - error.isNone() - ? getResourceConversions(operation.info()) - : Try<vector<ResourceConversion>>::error(error.get())); + getResourceConversions(operation.info())); } case Offer::Operation::CREATE_VOLUME: { CHECK(operation.info().has_create_volume()); - if (error.isNone()) { - conversions = applyCreateVolumeOrBlock( - operation.info().create_volume().source(), - operationUuid, - operation.info().create_volume().target_type()); - } else { - conversions = Failure(error.get()); - } + conversions = applyCreateVolumeOrBlock( + operation.info().create_volume().source(), + operationUuid, + operation.info().create_volume().target_type()); + break; } case Offer::Operation::DESTROY_VOLUME: { CHECK(operation.info().has_destroy_volume()); - if (error.isNone()) { - conversions = applyDestroyVolumeOrBlock( - operation.info().destroy_volume().volume()); - } else { - conversions = Failure(error.get()); - } + conversions = applyDestroyVolumeOrBlock( + operation.info().destroy_volume().volume()); + break; } case Offer::Operation::CREATE_BLOCK: { CHECK(operation.info().has_create_block()); - if (error.isNone()) { - conversions = applyCreateVolumeOrBlock( - operation.info().create_block().source(), - operationUuid, - Resource::DiskInfo::Source::BLOCK); - } else { - conversions = Failure(error.get()); - } + conversions = applyCreateVolumeOrBlock( + operation.info().create_block().source(), + operationUuid, + Resource::DiskInfo::Source::BLOCK); + break; } case Offer::Operation::DESTROY_BLOCK: { CHECK(operation.info().has_destroy_block()); - if (error.isNone()) { - conversions = applyDestroyVolumeOrBlock( - operation.info().destroy_block().block()); - } else { - conversions = Failure(error.get()); - } + conversions = applyDestroyVolumeOrBlock( + operation.info().destroy_block().block()); + break; } case Offer::Operation::UNKNOWN: @@ -2163,27 +2166,23 @@ Future<Nothing> StorageLocalResourceProviderProcess::_applyOfferOperation( conversions .onAny(defer(self(), [=](const Future<vector<ResourceConversion>>& future) { - Option<Error> error; - if (!future.isReady()) { - error = - Error(future.isFailed() ? future.failure() : "future discarded"); - } + Try<vector<ResourceConversion>> conversions = future.isReady() + ? Try<vector<ResourceConversion>>::some(future.get()) + : Error(future.isFailed() ? future.failure() : "future discarded"); - if (future.isReady()) { + if (conversions.isSome()) { LOG(INFO) - << "Applying conversion from '" << future->at(0).consumed << "' to '" - << future->at(0).converted << "'"; + << "Applying conversion from '" << conversions->at(0).consumed + << "' to '" << conversions->at(0).converted + << "' for offer operation with UUID " << operationUuid; } else { LOG(ERROR) - << "Failed to apply " << operation.info().type() << " operation: " - << error->message; + << "Failed to apply offer operation with UUID " << operationUuid + << ": " << conversions.error(); } - promise->associate(applyResourceConversions( - operationUuid, - error.isNone() - ? future.get() - : Try<vector<ResourceConversion>>::error(error.get()))); + promise->associate( + updateOfferOperationStatus(operationUuid, conversions)); })); return promise->future(); @@ -2341,21 +2340,22 @@ StorageLocalResourceProviderProcess::applyDestroyVolumeOrBlock( } -Try<Nothing> StorageLocalResourceProviderProcess::applyResourceConversions( +Try<Nothing> StorageLocalResourceProviderProcess::updateOfferOperationStatus( const UUID& operationUuid, const Try<vector<ResourceConversion>>& conversions) { Option<Error> error; + Resources convertedResources; - CHECK(pendingOperations.contains(operationUuid)); - const Event::ApplyOfferOperation& operation = - pendingOperations.at(operationUuid); + CHECK(offerOperations.contains(operationUuid)); + OfferOperation& operation = offerOperations.at(operationUuid); if (conversions.isSome()) { // Strip away the allocation info when applying the convertion to // the total resources. vector<ResourceConversion> _conversions; foreach (ResourceConversion conversion, conversions.get()) { + convertedResources += conversion.converted; conversion.consumed.unallocate(); conversion.converted.unallocate(); _conversions.push_back(std::move(conversion)); @@ -2371,27 +2371,16 @@ Try<Nothing> StorageLocalResourceProviderProcess::applyResourceConversions( error = conversions.error(); } - // We first ask the status update manager to checkpoint the operation - // status, then checkpoint the resource provider state. - // TODO(chhsiao): Use the status update manager. - Call call; - call.set_type(Call::UPDATE_OFFER_OPERATION_STATUS); - call.mutable_resource_provider_id()->CopyFrom(info.id()); - - Call::UpdateOfferOperationStatus* update = - call.mutable_update_offer_operation_status(); - update->set_operation_uuid(operation.operation_uuid()); - - if (operation.has_framework_id()) { - update->mutable_framework_id()->CopyFrom(operation.framework_id()); - } + operation.mutable_latest_status()->CopyFrom( + protobuf::createOfferOperationStatus( + error.isNone() ? OFFER_OPERATION_FINISHED : OFFER_OPERATION_FAILED, + operation.info().has_id() + ? operation.info().id() : Option<OfferOperationID>::none(), + error.isNone() ? Option<string>::none() : error->message, + error.isNone() ? convertedResources : Option<Resources>::none(), + UUID::random())); - OfferOperationStatus* status = update->mutable_status(); - status->set_status_uuid(UUID::random().toBytes()); - - if (operation.info().has_id()) { - status->mutable_operation_id()->CopyFrom(operation.info().id()); - } + operation.add_statuses()->CopyFrom(operation.latest_status()); if (error.isSome()) { // We only update the resource version for failed conventional @@ -2405,39 +2394,24 @@ Try<Nothing> StorageLocalResourceProviderProcess::applyResourceConversions( // Send an `UPDATE_STATE` after we finish the current operation. dispatch(self(), &Self::sendResourceProviderStateUpdate); } - - status->set_state(OFFER_OPERATION_FAILED); - status->set_message(error->message); - } else { - status->set_state(OFFER_OPERATION_FINISHED); - - foreach (const ResourceConversion& conversion, conversions.get()) { - foreach (const Resource& resource, conversion.converted) { - status->add_converted_resources()->CopyFrom(resource); - } - } } - update->mutable_latest_status()->CopyFrom(*status); - - auto err = [](const UUID& operationUuid, const string& message) { - LOG(ERROR) - << "Failed to send status update for offer operation with UUID " - << operationUuid << ": " << message; - }; - - driver->send(evolve(call)) - .onFailed(std::bind(err, operationUuid, lambda::_1)) - .onDiscarded(std::bind(err, operationUuid, "future discarded")); - - pendingOperations.erase(operationUuid); checkpointResourceProviderState(); - if (error.isSome()) { - return error.get(); - } - - return Nothing(); + // Send out the status update for the offer operation. + // TODO(chhsiao): Use the status update manager. + OfferOperationStatusUpdate update = + protobuf::createOfferOperationStatusUpdate( + operationUuid, + operation.latest_status(), + operation.latest_status(), + operation.has_framework_id() + ? operation.framework_id() : Option<FrameworkID>::none(), + slaveId); + + sendOfferOperationStatusUpdate(update); + + return error.isNone() ? Nothing() : Try<Nothing>::error(error.get()); } @@ -2445,17 +2419,12 @@ void StorageLocalResourceProviderProcess::checkpointResourceProviderState() { ResourceProviderState state; - foreachvalue ( - const Event::ApplyOfferOperation& operation, - pendingOperations) { + foreachvalue (const OfferOperation& operation, offerOperations) { state.add_operations()->CopyFrom(operation); } state.mutable_resources()->CopyFrom(totalResources); - CHECK_SOME(resourceVersion); - state.set_resource_version_uuid(resourceVersion->toBytes()); - const string statePath = slave::paths::getResourceProviderStatePath( metaDir, slaveId, info.type(), info.name(), info.id()); @@ -2491,30 +2460,12 @@ void StorageLocalResourceProviderProcess::sendResourceProviderStateUpdate() call.mutable_resource_provider_id()->CopyFrom(info.id()); Call::UpdateState* update = call.mutable_update_state(); - - foreachpair (const UUID& uuid, - const Event::ApplyOfferOperation& operation, - pendingOperations) { - // TODO(chhsiao): Maintain a list of terminated but unacknowledged - // offer operations in memory and reconstruct it during recovery - // by querying the status update manager. - update->add_operations()->CopyFrom( - protobuf::createOfferOperation( - operation.info(), - protobuf::createOfferOperationStatus( - OFFER_OPERATION_PENDING, - operation.info().has_id() - ? operation.info().id() : Option<OfferOperationID>::none()), - operation.has_framework_id() - ? operation.framework_id() : Option<FrameworkID>::none(), - slaveId, - uuid)); - } - update->mutable_resources()->CopyFrom(totalResources); + update->set_resource_version_uuid(resourceVersion.toBytes()); - CHECK_SOME(resourceVersion); - update->set_resource_version_uuid(resourceVersion->toBytes()); + foreachvalue (const OfferOperation& operation, offerOperations) { + update->add_operations()->CopyFrom(operation); + } auto err = [](const ResourceProviderID& id, const string& message) { LOG(ERROR) http://git-wip-us.apache.org/repos/asf/mesos/blob/eeb09cbc/src/tests/storage_local_resource_provider_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/storage_local_resource_provider_tests.cpp b/src/tests/storage_local_resource_provider_tests.cpp index e5868bf..f01d533 100644 --- a/src/tests/storage_local_resource_provider_tests.cpp +++ b/src/tests/storage_local_resource_provider_tests.cpp @@ -344,17 +344,14 @@ TEST_F(StorageLocalResourceProviderTest, ROOT_LaunchAndDestroyVolume) // The framework is expected to see the following offers in sequence: // 1. One containing a RAW disk resource before `CREATE_VOLUME`. // 2. One containing a MOUNT disk resource after `CREATE_VOLUME`. - // 3. One containing a persistent volume after `CREATE`. - // 4. One containing the same persistent volume after `LAUNCH`. - // 5. One containing the same MOUNT disk resource after `DESTROY`. - // 6. One containing the same RAW disk resource after `DESTROY_VOLUME`. + // 3. One containing the same MOUNT disk resource after `CREADE`, + // `LAUNCH` and `DESTROY`. + // 4. One containing the same RAW disk resource after `DESTROY_VOLUME`. // // We set up the expectations for these offers as the test progresses. Future<vector<Offer>> rawDiskOffers; Future<vector<Offer>> volumeCreatedOffers; - Future<vector<Offer>> persistenceCreatedOffers; Future<vector<Offer>> taskFinishedOffers; - Future<vector<Offer>> persistenceDestroyedOffers; Future<vector<Offer>> volumeDestroyedOffers; Sequence offers; @@ -441,7 +438,8 @@ TEST_F(StorageLocalResourceProviderTest, ROOT_LaunchAndDestroyVolume) // Put a file into the volume. ASSERT_SOME(os::touch(path::join(volumePath.get(), "file"))); - // Create a persistent volume on the CSI volume. + // Create a persistent volume on the CSI volume, then launch a task to + // use the persistent volume. Resource persistentVolume = volume.get(); persistentVolume.mutable_disk()->mutable_persistence() ->set_id(UUID::random().toString()); @@ -451,20 +449,6 @@ TEST_F(StorageLocalResourceProviderTest, ROOT_LaunchAndDestroyVolume) ->set_container_path("volume"); persistentVolume.mutable_disk()->mutable_volume()->set_mode(Volume::RW); - EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveResource( - persistentVolume))) - .InSequence(offers) - .WillOnce(FutureArg<1>(&persistenceCreatedOffers)); - - driver.acceptOffers( - {volumeCreatedOffers->at(0).id()}, - {CREATE(persistentVolume)}, - filters); - - AWAIT_READY(persistenceCreatedOffers); - ASSERT_FALSE(persistenceCreatedOffers->empty()); - - // Launch a task to use the persistent volume. Future<TaskStatus> taskStarting; Future<TaskStatus> taskRunning; Future<TaskStatus> taskFinished; @@ -480,9 +464,10 @@ TEST_F(StorageLocalResourceProviderTest, ROOT_LaunchAndDestroyVolume) .WillOnce(FutureArg<1>(&taskFinishedOffers)); driver.acceptOffers( - {persistenceCreatedOffers->at(0).id()}, - {LAUNCH({createTask( - persistenceCreatedOffers->at(0).slave_id(), + {volumeCreatedOffers->at(0).id()}, + {CREATE(persistentVolume), + LAUNCH({createTask( + volumeCreatedOffers->at(0).slave_id(), persistentVolume, createCommandInfo("test -f " + path::join("volume", "file")))})}, filters); @@ -498,26 +483,15 @@ TEST_F(StorageLocalResourceProviderTest, ROOT_LaunchAndDestroyVolume) AWAIT_READY(taskFinishedOffers); - // Destroy the persistent volume on the CSI volume. - EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveResource(volume.get()))) - .InSequence(offers) - .WillOnce(FutureArg<1>(&persistenceDestroyedOffers)); - - driver.acceptOffers( - {taskFinishedOffers->at(0).id()}, - {DESTROY(persistentVolume)}, - filters); - - AWAIT_READY(persistenceDestroyedOffers); - - // Destroy the created volume. + // Destroy the persistent volume and the CSI volume. EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveResource(source.get()))) .InSequence(offers) .WillOnce(FutureArg<1>(&volumeDestroyedOffers)); driver.acceptOffers( - {persistenceDestroyedOffers->at(0).id()}, - {DESTROY_VOLUME(volume.get())}, + {taskFinishedOffers->at(0).id()}, + {DESTROY(persistentVolume), + DESTROY_VOLUME(volume.get())}, filters); AWAIT_READY(volumeDestroyedOffers);
