This is an automated email from the ASF dual-hosted git repository. chhsiao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/mesos.git
commit e7f4e6100c7987fc2c2b031b2ec5ba17737ef76e Author: Chun-Hung Hsiao <[email protected]> AuthorDate: Tue Jan 29 15:48:36 2019 -0800 Updated SLRP test `ProfileDisappeared` to request operation feedback. This patch updates `StorageLocalResourceProviderTest.ProfileDisappeared` to use the v1 scheduler API to request operation feedback, so MESOS-9537 would be triggered when an outstanding `UPDATE_STATE` call from the resource provider races with an offer operation. Review: https://reviews.apache.org/r/69866 --- src/tests/mesos.hpp | 27 +- .../storage_local_resource_provider_tests.cpp | 309 ++++++++++++--------- 2 files changed, 204 insertions(+), 132 deletions(-) diff --git a/src/tests/mesos.hpp b/src/tests/mesos.hpp index b10ec0a..f3f1e64 100644 --- a/src/tests/mesos.hpp +++ b/src/tests/mesos.hpp @@ -1438,13 +1438,18 @@ inline typename TOffer::Operation CREATE_DISK( } -template <typename TResource, typename TOffer> -inline typename TOffer::Operation DESTROY_DISK(const TResource& source) +template <typename TResource, typename TOperationID, typename TOffer> +inline typename TOffer::Operation DESTROY_DISK( + const TResource& source, const Option<TOperationID>& operationId = None()) { typename TOffer::Operation operation; operation.set_type(TOffer::Operation::DESTROY_DISK); operation.mutable_destroy_disk()->mutable_source()->CopyFrom(source); + if (operationId.isSome()) { + operation.mutable_id()->CopyFrom(operationId.get()); + } + return operation; } @@ -1818,7 +1823,8 @@ inline Offer::Operation CREATE_DISK(Args&&... args) template <typename... Args> inline Offer::Operation DESTROY_DISK(Args&&... args) { - return common::DESTROY_DISK<Resource, Offer>(std::forward<Args>(args)...); + return common::DESTROY_DISK<Resource, OperationID, Offer>( + std::forward<Args>(args)...); } @@ -2125,8 +2131,10 @@ inline mesos::v1::Offer::Operation CREATE_DISK(Args&&... args) template <typename... Args> inline mesos::v1::Offer::Operation DESTROY_DISK(Args&&... args) { - return common::DESTROY_DISK<mesos::v1::Resource, mesos::v1::Offer>( - std::forward<Args>(args)...); + return common::DESTROY_DISK< + mesos::v1::Resource, + mesos::v1::OperationID, + mesos::v1::Offer>(std::forward<Args>(args)...); } @@ -2668,6 +2676,15 @@ MATCHER_P(OffersHaveAnyResource, filter, "") } +// This matcher is used to match the operation ID of an +// `Event.update_operation_status.status` message. +MATCHER_P(OperationStatusUpdateOperationIdEq, operationId, "") +{ + return arg.status().has_operation_id() && + arg.status().operation_id() == operationId; +} + + // Like LaunchTasks, but decline the entire offer and don't launch any tasks. ACTION(DeclineOffers) { diff --git a/src/tests/storage_local_resource_provider_tests.cpp b/src/tests/storage_local_resource_provider_tests.cpp index fb001aa..a661951 100644 --- a/src/tests/storage_local_resource_provider_tests.cpp +++ b/src/tests/storage_local_resource_provider_tests.cpp @@ -15,6 +15,10 @@ // limitations under the License. #include <algorithm> +#include <list> +#include <memory> +#include <string> +#include <vector> #include <process/clock.hpp> #include <process/future.hpp> @@ -1104,18 +1108,29 @@ TEST_F(StorageLocalResourceProviderTest, CreateDestroyDiskRecovery) // This test verifies that a framework cannot create a volume during and after // the profile disappears, and destroying a volume with a stale profile will // recover the freed disk with another appeared profile. +// +// To accomplish this: +// 1. Create a 2GB MOUNT disk from a 4GB RAW disk of profile 'test1'. +// 2. Create another MOUNT disk from the rest RAW disk of profile 'test1'. +// 3. Remove profile 'test1' and adds profile 'test2' before the second +// operation is applied. The operation would then be dropped, and the rest +// RAW disk would show up as of profile 'test2'. +// 4. Destroy the MOUNT disk of profile 'test1'. All 4GB RAW disk would show +// up as of profile 'test2'. TEST_F(StorageLocalResourceProviderTest, ProfileDisappeared) { Clock::pause(); Future<shared_ptr<TestDiskProfileServer>> server = TestDiskProfileServer::create(); + AWAIT_READY(server); Promise<http::Response> updatedProfileMapping; EXPECT_CALL(*server.get()->process, profiles(_)) .WillOnce(Return(http::OK(createDiskProfileMapping({{"test1", None()}})))) - .WillOnce(Return(updatedProfileMapping.future())); + .WillOnce(Return(updatedProfileMapping.future())) + .WillRepeatedly(Return(Future<http::Response>())); // Stop subsequent polls. const Duration pollInterval = Seconds(10); loadUriDiskProfileAdaptorModule( @@ -1163,47 +1178,51 @@ TEST_F(StorageLocalResourceProviderTest, ProfileDisappeared) Clock::pause(); - // Register a framework to receive offers. - FrameworkInfo framework = DEFAULT_FRAMEWORK_INFO; + // Register a v1 framework to exercise operations with feedback. + v1::FrameworkInfo framework = v1::DEFAULT_FRAMEWORK_INFO; framework.set_roles(0, "storage"); - MockScheduler sched; - MesosSchedulerDriver driver( - &sched, framework, master.get()->pid, DEFAULT_CREDENTIAL); + auto scheduler = std::make_shared<v1::MockHTTPScheduler>(); - EXPECT_CALL(sched, registered(&driver, _, _)); + EXPECT_CALL(*scheduler, connected(_)) + .WillOnce(v1::scheduler::SendSubscribe(framework)); - // The framework is expected to see the following offers in sequence: - // 1. A 4GB RAW disk with profile 'test1' before the 1st `CREATE_DISK`. - // 2. A 2GB MOUNT disk and a 2GB RAW disk, both with profile 'test1', after - // the 1st `CREATE_DISK` finishes. - // 3. A 2GB MOUNT disk with profile 'test1' and a 2GB RAW disk with profile - // 'test2', after the profile mapping is updated and the 2nd - // `CREATE_DISK` fails due to a mismatched resource version. - // 4. A 4GB RAW disk with profile 'test2', after the `DESTROY_DISK`. - Future<vector<Offer>> rawDiskOffers; - Future<vector<Offer>> volumeCreatedOffers; - Future<vector<Offer>> profileDisappearedOffers; - Future<vector<Offer>> volumeDestroyedOffers; + Future<v1::scheduler::Event::Subscribed> subscribed; + EXPECT_CALL(*scheduler, subscribed(_, _)) + .WillOnce(FutureArg<1>(&subscribed)); - // We use the following filter to filter offers that do not have - // wanted resources for 365 days (the maximum). - Filters declineFilters; - declineFilters.set_refuse_seconds(Days(365).secs()); + EXPECT_CALL(*scheduler, heartbeat(_)) + .WillRepeatedly(Return()); // Ignore heartbeats. - // Decline offers that contain only the agent's default resources. - EXPECT_CALL(sched, resourceOffers(&driver, _)) - .WillRepeatedly(DeclineOffers(declineFilters)); + // Decline unwanted offers. The master can send such offers before the + // resource provider receives profile updates. + EXPECT_CALL(*scheduler, offers(_, _)) + .WillRepeatedly(v1::scheduler::DeclineOffers()); - EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveAnyResource( - &Resources::hasResourceProvider))) - .WillOnce(FutureArg<1>(&rawDiskOffers)) - .WillOnce(FutureArg<1>(&volumeCreatedOffers)) - .WillOnce(FutureArg<1>(&profileDisappearedOffers)) - .WillOnce(FutureArg<1>(&volumeDestroyedOffers)) - .WillRepeatedly(Return()); // Ignore subsequent offers. + auto isStoragePool = [](const v1::Resource& r, const string& profile) { + return r.has_disk() && + r.disk().has_source() && + r.disk().source().type() == v1::Resource::DiskInfo::Source::RAW && + r.disk().source().has_vendor() && + r.disk().source().vendor() == TEST_CSI_VENDOR && + !r.disk().source().has_id() && + r.disk().source().has_profile() && + r.disk().source().profile() == profile; + }; - driver.start(); + Future<v1::scheduler::Event::Offers> offers; + EXPECT_CALL( + *scheduler, + offers(_, v1::scheduler::OffersHaveAnyResource( + std::bind(isStoragePool, lambda::_1, "test1")))) + .WillOnce(FutureArg<1>(&offers)); + + v1::scheduler::TestMesos mesos( + master.get()->pid, ContentType::PROTOBUF, scheduler); + + AWAIT_READY(subscribed); + + const v1::FrameworkID& frameworkId = subscribed->framework_id(); // NOTE: If the framework has not declined an unwanted offer yet when the // resource provider reports its RAW resources, the new allocation triggered @@ -1213,58 +1232,77 @@ TEST_F(StorageLocalResourceProviderTest, ProfileDisappeared) Clock::settle(); Clock::advance(masterFlags.allocation_interval); - AWAIT_READY(rawDiskOffers); - ASSERT_FALSE(rawDiskOffers->empty()); + AWAIT_READY(offers); + ASSERT_EQ(1, offers->offers_size()); - auto hasSourceTypeAndProfile = []( - const Resource& r, - const Resource::DiskInfo::Source::Type& type, - const string& profile) { - return r.has_disk() && - r.disk().has_source() && - r.disk().source().type() == type && - r.disk().source().has_profile() && - r.disk().source().profile() == profile; - }; + v1::Offer offer = offers->offers(0); - // We use the following filter so that the resources will not be - // filtered for 5 seconds (the default). - Filters acceptFilters; - acceptFilters.set_refuse_seconds(0); + EXPECT_CALL( + *scheduler, + offers(_, v1::scheduler::OffersHaveAnyResource( + std::bind(isStoragePool, lambda::_1, "test1")))) + .WillOnce(FutureArg<1>(&offers)); - // Create a volume with profile 'test1'. + // Create a 2GB MOUNT disk of profile 'test1'. { - Resources raw = - Resources(rawDiskOffers->at(0).resources()).filter(std::bind( - hasSourceTypeAndProfile, - lambda::_1, - Resource::DiskInfo::Source::RAW, - "test1")); + v1::Resources raw = v1::Resources(offer.resources()) + .filter(std::bind(isStoragePool, lambda::_1, "test1")); ASSERT_SOME_EQ(Gigabytes(4), raw.disk()); // Just use 2GB of the storage pool. - Resource source = *raw.begin(); + v1::Resource source = *raw.begin(); source.mutable_scalar()->set_value( - (double) Gigabytes(2).bytes() / Bytes::MEGABYTES); - - Future<UpdateOperationStatusMessage> createVolumeStatus = - FUTURE_PROTOBUF(UpdateOperationStatusMessage(), _, _); - - driver.acceptOffers( - {rawDiskOffers->at(0).id()}, - {CREATE_DISK(source, Resource::DiskInfo::Source::MOUNT)}, - acceptFilters); - - AWAIT_READY(createVolumeStatus); - EXPECT_EQ(OPERATION_FINISHED, createVolumeStatus->status().state()); + static_cast<double>(Gigabytes(2).bytes()) / Bytes::MEGABYTES); + + v1::OperationID operationId; + operationId.set_value(id::UUID::random().toString()); + + Future<v1::scheduler::Event::UpdateOperationStatus> update; + EXPECT_CALL( + *scheduler, + updateOperationStatus( + _, v1::scheduler::OperationStatusUpdateOperationIdEq(operationId))) + .WillOnce(FutureArg<1>(&update)) + .WillRepeatedly(Return()); // Ignore subsequent updates. + + mesos.send(v1::createCallAccept( + frameworkId, + offer, + {v1::CREATE_DISK( + source, + v1::Resource::DiskInfo::Source::MOUNT, + None(), + operationId)})); + + AWAIT_READY(update); + EXPECT_EQ(v1::OPERATION_FINISHED, update->status().state()); } // Advance the clock to trigger another allocation. Clock::advance(masterFlags.allocation_interval); - AWAIT_READY(volumeCreatedOffers); - ASSERT_FALSE(volumeCreatedOffers->empty()); + AWAIT_READY(offers); + ASSERT_EQ(1, offers->offers_size()); + + offer = offers->offers(0); + + auto isMountDisk = [](const v1::Resource& r, const string& profile) { + return r.has_disk() && + r.disk().has_source() && + r.disk().source().type() == v1::Resource::DiskInfo::Source::MOUNT && + r.disk().source().has_vendor() && + r.disk().source().vendor() == TEST_CSI_VENDOR && + r.disk().source().has_id() && + r.disk().source().has_profile() && + r.disk().source().profile() == profile; + }; + + EXPECT_CALL( + *scheduler, + offers(_, v1::scheduler::OffersHaveAnyResource( + std::bind(isMountDisk, lambda::_1, "test1")))) + .WillOnce(FutureArg<1>(&offers)); // We drop the agent update (which is triggered by the changes in the known // set of profiles) to simulate the situation where the update races with @@ -1272,86 +1310,103 @@ TEST_F(StorageLocalResourceProviderTest, ProfileDisappeared) Future<UpdateSlaveMessage> updateSlave3 = DROP_PROTOBUF(UpdateSlaveMessage(), _, _); - // Trigger another poll for profiles. Profile 'test1' will disappear and - // profile 'test2' will appear. - // - // NOTE: We advance the clock before updating the disk profile mapping so - // there will only be one poll. + // Remove profile 'test1' and add profile 'test2'. No allocation will be + // triggered since the framework is still holding the current offer. Clock::advance(pollInterval); - - // Update the disk profile mapping. updatedProfileMapping.set( http::OK(createDiskProfileMapping({{"test2", None()}}))); AWAIT_READY(updateSlave3); - // Try to create another volume with profile 'test1', which will be dropped - // due to a mismatched resource version. + // Create another MOUNT disk from the rest RAW disk of profile 'test1'. This + // operation would be dropped due to a mismatched resource version. { - Resources raw = - Resources(volumeCreatedOffers->at(0).resources()).filter(std::bind( - hasSourceTypeAndProfile, - lambda::_1, - Resource::DiskInfo::Source::RAW, - "test1")); + v1::Resources raw = v1::Resources(offer.resources()) + .filter(std::bind(isStoragePool, lambda::_1, "test1")); ASSERT_SOME_EQ(Gigabytes(2), raw.disk()); - Future<UpdateOperationStatusMessage> createVolumeStatus = - FUTURE_PROTOBUF(UpdateOperationStatusMessage(), _, _); - - driver.acceptOffers( - {volumeCreatedOffers->at(0).id()}, - {CREATE_DISK(*raw.begin(), Resource::DiskInfo::Source::MOUNT)}, - acceptFilters); - - AWAIT_READY(createVolumeStatus); - EXPECT_EQ(OPERATION_DROPPED, createVolumeStatus->status().state()); + v1::OperationID operationId; + operationId.set_value(id::UUID::random().toString()); + + Future<v1::scheduler::Event::UpdateOperationStatus> update; + EXPECT_CALL( + *scheduler, + updateOperationStatus( + _, v1::scheduler::OperationStatusUpdateOperationIdEq(operationId))) + .WillOnce(FutureArg<1>(&update)) + .WillRepeatedly(Return()); // Ignore subsequent updates; + + mesos.send(v1::createCallAccept( + frameworkId, + offer, + {v1::CREATE_DISK( + *raw.begin(), + v1::Resource::DiskInfo::Source::MOUNT, + None(), + operationId)})); + + AWAIT_READY(update); + EXPECT_EQ(v1::OPERATION_DROPPED, update->status().state()); } // Forward the dropped agent update to trigger another allocation. post(slave.get()->pid, master.get()->pid, updateSlave3.get()); - AWAIT_READY(profileDisappearedOffers); - ASSERT_FALSE(profileDisappearedOffers->empty()); + AWAIT_READY(offers); + ASSERT_EQ(1, offers->offers_size()); - // Destroy the volume with profile 'test1', which will trigger an agent update - // to recover the freed disk with profile 'test2' and thus another allocation. + offer = offers->offers(0); + + EXPECT_CALL( + *scheduler, + offers(_, v1::scheduler::OffersHaveAnyResource( + std::bind(isStoragePool, lambda::_1, "test2")))) + .WillOnce(FutureArg<1>(&offers)); + + // Destroy the MOUNT disk of profile 'test1'. The returned converted resources + // should be empty. { - Resources volumes = - Resources(profileDisappearedOffers->at(0).resources()).filter(std::bind( - hasSourceTypeAndProfile, - lambda::_1, - Resource::DiskInfo::Source::MOUNT, - "test1")); + v1::Resources created = v1::Resources(offer.resources()) + .filter(std::bind(isMountDisk, lambda::_1, "test1")); - ASSERT_SOME_EQ(Gigabytes(2), volumes.disk()); + ASSERT_SOME_EQ(Gigabytes(2), created.disk()); - Future<UpdateOperationStatusMessage> destroyVolumeStatus = - FUTURE_PROTOBUF(UpdateOperationStatusMessage(), _, _); + v1::OperationID operationId; + operationId.set_value(id::UUID::random().toString()); - driver.acceptOffers( - {profileDisappearedOffers->at(0).id()}, - {DESTROY_DISK(*volumes.begin())}, - acceptFilters); + Future<v1::scheduler::Event::UpdateOperationStatus> update; + EXPECT_CALL( + *scheduler, + updateOperationStatus( + _, v1::scheduler::OperationStatusUpdateOperationIdEq(operationId))) + .WillOnce(FutureArg<1>(&update)) + .WillRepeatedly(Return()); // Ignore subsequent updates; + + mesos.send(v1::createCallAccept( + frameworkId, offer, {v1::DESTROY_DISK(*created.begin(), operationId)})); - AWAIT_READY(destroyVolumeStatus); - EXPECT_EQ(OPERATION_FINISHED, destroyVolumeStatus->status().state()); + AWAIT_READY(update); + EXPECT_EQ(v1::OPERATION_FINISHED, update->status().state()); + EXPECT_TRUE(update->status().converted_resources().empty()); } - AWAIT_READY(volumeDestroyedOffers); - ASSERT_FALSE(volumeDestroyedOffers->empty()); + // The resource provider will reconcile the storage pools to reclaim the + // space freed by destroying a MOUNT disk of a disappeared profile, which + // would in turn trigger another agent update and thus another allocation. + // + // TODO(chhsiao): This might change once MESOS-9254 is done. + AWAIT_READY(offers); + ASSERT_EQ(1, offers->offers_size()); + + offer = offers->offers(0); - // Check that the freed disk has been recovered with profile 'test2'. + // Check that the freed disk shows up as of profile 'test2'. { - Resources storagePool = - Resources(volumeDestroyedOffers->at(0).resources()).filter(std::bind( - hasSourceTypeAndProfile, - lambda::_1, - Resource::DiskInfo::Source::RAW, - "test2")); - - EXPECT_SOME_EQ(Gigabytes(4), storagePool.disk()); + v1::Resources raw = v1::Resources(offer.resources()) + .filter(std::bind(isStoragePool, lambda::_1, "test2")); + + EXPECT_SOME_EQ(Gigabytes(4), raw.disk()); } }
