This is an automated email from the ASF dual-hosted git repository. mzhu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/mesos.git
commit d8b53ede93446ac681683125fb0b6d49cdfa6f71 Author: Andrei Sekretenko <[email protected]> AuthorDate: Fri Oct 4 18:14:24 2019 -0400 Added `isAllocated` flag to the `Allocator::recoverResources()` method. This patch extends the signature of `recoverResources()` with a flag indicating wheteher the resources being recovered were actaully allocated to a framework, or only offered. This is a prerequisite for tracking quota consumption in the allocator. Review: https://reviews.apache.org/r/71486/ --- include/mesos/allocator/allocator.hpp | 13 ++- src/master/allocator/mesos/allocator.hpp | 12 +- src/master/allocator/mesos/hierarchical.cpp | 6 +- src/master/allocator/mesos/hierarchical.hpp | 3 +- src/master/master.cpp | 50 +++++--- src/tests/allocator.hpp | 13 ++- src/tests/hierarchical_allocator_benchmarks.cpp | 3 +- src/tests/hierarchical_allocator_tests.cpp | 145 ++++++++++++++---------- src/tests/master_allocator_tests.cpp | 49 ++++---- src/tests/slave_recovery_tests.cpp | 4 +- 10 files changed, 181 insertions(+), 117 deletions(-) diff --git a/include/mesos/allocator/allocator.hpp b/include/mesos/allocator/allocator.hpp index 2bab53a..3c5fd55 100644 --- a/include/mesos/allocator/allocator.hpp +++ b/include/mesos/allocator/allocator.hpp @@ -376,10 +376,12 @@ public: * Recovers resources. * * Used to update the set of available resources for a specific agent. This - * method is invoked to inform the allocator about allocated resources that - * have been refused or are no longer in use. Allocated resources will have - * an `allocation_info.role` assigned and callers are expected to only call - * this with resources allocated to a single role. + * method is invoked to inform the allocator about offered resources that + * have been refused or allocated (i.e. used for launching tasks) resources + * that are no longer in use. The resources will have an + * `allocation_info.role` assigned and callers are expected to only call this + * with resources allocated to a single role. + * * * TODO(bmahler): We could allow resources allocated to multiple roles * within a single call here, but filtering them in the same way does @@ -389,7 +391,8 @@ public: const FrameworkID& frameworkId, const SlaveID& slaveId, const Resources& resources, - const Option<Filters>& filters) = 0; + const Option<Filters>& filters, + bool isAllocated) = 0; /** * Suppresses offers. diff --git a/src/master/allocator/mesos/allocator.hpp b/src/master/allocator/mesos/allocator.hpp index 6921581..9467912 100644 --- a/src/master/allocator/mesos/allocator.hpp +++ b/src/master/allocator/mesos/allocator.hpp @@ -148,7 +148,8 @@ public: const FrameworkID& frameworkId, const SlaveID& slaveId, const Resources& resources, - const Option<Filters>& filters) override; + const Option<Filters>& filters, + bool isAllocated) override; void suppressOffers( const FrameworkID& frameworkId, @@ -292,7 +293,8 @@ public: const FrameworkID& frameworkId, const SlaveID& slaveId, const Resources& resources, - const Option<Filters>& filters) = 0; + const Option<Filters>& filters, + bool isAllocated) = 0; virtual void suppressOffers( const FrameworkID& frameworkId, @@ -632,7 +634,8 @@ inline void MesosAllocator<AllocatorProcess>::recoverResources( const FrameworkID& frameworkId, const SlaveID& slaveId, const Resources& resources, - const Option<Filters>& filters) + const Option<Filters>& filters, + bool isAllocated) { process::dispatch( process, @@ -640,7 +643,8 @@ inline void MesosAllocator<AllocatorProcess>::recoverResources( frameworkId, slaveId, resources, - filters); + filters, + isAllocated); } diff --git a/src/master/allocator/mesos/hierarchical.cpp b/src/master/allocator/mesos/hierarchical.cpp index 4728154..7a2da4e 100644 --- a/src/master/allocator/mesos/hierarchical.cpp +++ b/src/master/allocator/mesos/hierarchical.cpp @@ -1462,7 +1462,8 @@ void HierarchicalAllocatorProcess::recoverResources( const FrameworkID& frameworkId, const SlaveID& slaveId, const Resources& resources, - const Option<Filters>& filters) + const Option<Filters>& filters, + bool isAllocated) { CHECK(initialized); @@ -1470,6 +1471,9 @@ void HierarchicalAllocatorProcess::recoverResources( return; } + // TODO(asekretenko): untrack allocated resources in the roles tree + // if recovering actually used resources (isAllocated==true) + Option<Framework*> framework = getFramework(frameworkId); Option<Slave*> slave = getSlave(slaveId); diff --git a/src/master/allocator/mesos/hierarchical.hpp b/src/master/allocator/mesos/hierarchical.hpp index d42124f..0dcfc87 100644 --- a/src/master/allocator/mesos/hierarchical.hpp +++ b/src/master/allocator/mesos/hierarchical.hpp @@ -584,7 +584,8 @@ public: const FrameworkID& frameworkId, const SlaveID& slaveId, const Resources& resources, - const Option<Filters>& filters) override; + const Option<Filters>& filters, + bool isAllocated) override; void suppressOffers( const FrameworkID& frameworkId, diff --git a/src/master/master.cpp b/src/master/master.cpp index 1414317..6c7ae4b 100644 --- a/src/master/master.cpp +++ b/src/master/master.cpp @@ -6133,13 +6133,13 @@ void Master::_accept( // resources should not be implicitly declined. if (!speculativelyConverted.empty()) { allocator->recoverResources( - frameworkId, slaveId, speculativelyConverted, None()); + frameworkId, slaveId, speculativelyConverted, None(), false); } // Tell the allocator about the implicitly declined resources. if (!implicitlyDeclined.empty()) { allocator->recoverResources( - frameworkId, slaveId, implicitlyDeclined, accept.filters()); + frameworkId, slaveId, implicitlyDeclined, accept.filters(), false); } allocator->resume(); @@ -10202,7 +10202,8 @@ void Master::offer( foreachpair (const SlaveID& slaveId, const Resources& offered, resources.at(role)) { - allocator->recoverResources(frameworkId, slaveId, offered, None()); + allocator->recoverResources( + frameworkId, slaveId, offered, None(), false); } } return; @@ -10234,7 +10235,9 @@ void Master::offer( << "Master returning resources offered to framework " << *framework << " because agent " << slaveId << " is not valid"; - allocator->recoverResources(frameworkId, slaveId, offered, None()); + allocator->recoverResources( + frameworkId, slaveId, offered, None(), false); + continue; } @@ -10245,7 +10248,9 @@ void Master::offer( << "Master returning resources offered because agent " << *slave << " is " << (slave->connected ? "deactivated" : "disconnected"); - allocator->recoverResources(frameworkId, slaveId, offered, None()); + allocator->recoverResources( + frameworkId, slaveId, offered, None(), false); + continue; } @@ -10269,7 +10274,8 @@ void Master::offer( // Pass a default filter to avoid getting this same offer immediately // from the allocator. Note that a default-constructed `Filters` // object has its `refuse_seconds` offer filter set to 5 seconds. - allocator->recoverResources(frameworkId, slaveId, offered, Filters()); + allocator->recoverResources( + frameworkId, slaveId, offered, Filters(), false); continue; } } @@ -12012,7 +12018,8 @@ void Master::updateTask(Task* task, const StatusUpdate& update) task->framework_id(), task->slave_id(), task->resources(), - None()); + None(), + true); // The slave owns the Task object and cannot be nullptr. Slave* slave = slaves.registered.get(task->slave_id()); @@ -12094,7 +12101,8 @@ void Master::removeTask(Task* task, bool unreachable) task->framework_id(), task->slave_id(), resources, - None()); + None(), + true); } else { // Note that we use `Resources` for output as it's faster than // logging raw protobuf data. @@ -12142,7 +12150,7 @@ void Master::removeExecutor( << "' with resources " << resources << " of framework " << frameworkId << " on agent " << *slave; - allocator->recoverResources(frameworkId, slave->id, resources, None()); + allocator->recoverResources(frameworkId, slave->id, resources, None(), true); Framework* framework = getFramework(frameworkId); if (framework != nullptr) { // The framework might not be reregistered yet. @@ -12344,7 +12352,8 @@ void Master::updateOperation( operation->framework_id(), operation->slave_id(), converted, - None()); + None(), + false); Resources consumedUnallocated = consumed.get(); consumedUnallocated.unallocate(); @@ -12359,7 +12368,8 @@ void Master::updateOperation( operation->framework_id(), operation->slave_id(), consumed.get(), - None()); + None(), + false); } break; @@ -12374,7 +12384,8 @@ void Master::updateOperation( operation->framework_id(), operation->slave_id(), consumed.get(), - None()); + None(), + false); break; } @@ -12454,7 +12465,8 @@ void Master::removeOperation(Operation* operation) operation->framework_id(), operation->slave_id(), consumed.get(), - None()); + None(), + false); } delete operation; @@ -12627,7 +12639,11 @@ void Master::rescindOffer(Offer* offer, const Option<Filters>& filters) framework->send(message); allocator->recoverResources( - offer->framework_id(), offer->slave_id(), offer->resources(), filters); + offer->framework_id(), + offer->slave_id(), + offer->resources(), + filters, + false); _removeOffer(framework, offer); } @@ -12641,7 +12657,11 @@ void Master::discardOffer(Offer* offer, const Option<Filters>& filters) << " in the offer " << offer->id(); allocator->recoverResources( - offer->framework_id(), offer->slave_id(), offer->resources(), filters); + offer->framework_id(), + offer->slave_id(), + offer->resources(), + filters, + false); _removeOffer(framework, offer); } diff --git a/src/tests/allocator.hpp b/src/tests/allocator.hpp index 01e6d2c..a1f6737 100644 --- a/src/tests/allocator.hpp +++ b/src/tests/allocator.hpp @@ -180,7 +180,7 @@ ACTION_P(InvokeGetInverseOfferStatuses, allocator) ACTION_P(InvokeRecoverResources, allocator) { - allocator->real->recoverResources(arg0, arg1, arg2, arg3); + allocator->real->recoverResources(arg0, arg1, arg2, arg3, arg4); } @@ -189,7 +189,7 @@ ACTION_P2(InvokeRecoverResourcesWithFilters, allocator, timeout) Filters filters; filters.set_refuse_seconds(timeout); - allocator->real->recoverResources(arg0, arg1, arg2, filters); + allocator->real->recoverResources(arg0, arg1, arg2, filters, false); } @@ -356,9 +356,9 @@ public: EXPECT_CALL(*this, getInverseOfferStatuses()) .WillRepeatedly(DoDefault()); - ON_CALL(*this, recoverResources(_, _, _, _)) + ON_CALL(*this, recoverResources(_, _, _, _, _)) .WillByDefault(InvokeRecoverResources(this)); - EXPECT_CALL(*this, recoverResources(_, _, _, _)) + EXPECT_CALL(*this, recoverResources(_, _, _, _, _)) .WillRepeatedly(DoDefault()); ON_CALL(*this, suppressOffers(_, _)) @@ -489,11 +489,12 @@ public: FrameworkID, mesos::allocator::InverseOfferStatus>>>()); - MOCK_METHOD4(recoverResources, void( + MOCK_METHOD5(recoverResources, void( const FrameworkID&, const SlaveID&, const Resources&, - const Option<Filters>& filters)); + const Option<Filters>& filters, + bool isAllocated)); MOCK_METHOD2(suppressOffers, void( const FrameworkID&, diff --git a/src/tests/hierarchical_allocator_benchmarks.cpp b/src/tests/hierarchical_allocator_benchmarks.cpp index ede1169..e2362df 100644 --- a/src/tests/hierarchical_allocator_benchmarks.cpp +++ b/src/tests/hierarchical_allocator_benchmarks.cpp @@ -444,7 +444,8 @@ TEST_F(BENCHMARK_HierarchicalAllocations, MultiFrameworkAllocations) frameworkId, offer.slaveId, remainingResources, - None()); + None(), + false); offer_ = offers.get(); } diff --git a/src/tests/hierarchical_allocator_tests.cpp b/src/tests/hierarchical_allocator_tests.cpp index 56c8caf..38fd19c 100644 --- a/src/tests/hierarchical_allocator_tests.cpp +++ b/src/tests/hierarchical_allocator_tests.cpp @@ -861,7 +861,8 @@ TEST_F(HierarchicalAllocatorTest, OfferFilter) framework.id(), agent.id(), allocation->resources.at(ROLE).at(agent.id()), - offerFilter); + offerFilter, + false); // Ensure the offer filter timeout is set before advancing the clock. Clock::settle(); @@ -987,7 +988,8 @@ TEST_F(HierarchicalAllocatorTest, SmallOfferFilterTimeout) framework2.id(), agent2.id(), allocation->resources.at(ROLE).at(agent2.id()), - offerFilter); + offerFilter, + false); // Total cluster resources (2 agents): cpus=2, mem=1024. // ROLE1 share = 0.5 (cpus=1, mem=512) @@ -1028,7 +1030,8 @@ TEST_F(HierarchicalAllocatorTest, SmallOfferFilterTimeout) framework1.id(), agent2.id(), allocation->resources.at(ROLE).at(agent2.id()), - None()); + None(), + false); // Total cluster resources (2 agents): cpus=2, mem=1024. // ROLE1 share = 0.5 (cpus=1, mem=512) @@ -1111,7 +1114,8 @@ TEST_F(HierarchicalAllocatorTest, MaintenanceInverseOffers) framework.id(), agent2.id(), allocatedResources(agent2.resources(), "*"), - filter1day); + filter1day, + false); const process::Time start = Clock::now() + Seconds(60); // Give both agents some unavailability. @@ -1195,12 +1199,15 @@ TEST_F(HierarchicalAllocatorTest, CoarseGrained) framework1.id(), slave1.id(), allocation->resources.at("role1").at(slave1.id()), - None()); + None(), + false); + allocator->recoverResources( framework1.id(), slave2.id(), allocation->resources.at("role1").at(slave2.id()), - None()); + None(), + false); // Now add the second framework, we expect there to be 2 subsequent // allocations, each framework being allocated a full slave. @@ -1283,7 +1290,8 @@ TEST_F(HierarchicalAllocatorTest, SameShareFairness) allocation->frameworkId, slave.id(), allocation->resources.at("*").at(slave.id()), - None()); + None(), + false); Clock::advance(flags.allocation_interval); } @@ -1361,7 +1369,8 @@ TEST_P(HierarchicalAllocatorTestWithReservations, ReservationUnallocated) framework1.id(), agent1.id(), allocatedResources(reserved, QUOTA_ROLE), - filter1day); + filter1day, + false); // Add another agent with unreserved resources. // This will trigger a batch allocation. @@ -1532,7 +1541,8 @@ TEST_P(HierarchicalAllocatorTestWithReservations, framework1.id(), agent1.id(), allocatedResources(unreserved, QUOTA_ROLE), - filter1day); + filter1day, + false); // Create `framework2` which belongs to the `NON_QUOTA_ROLE` // and is entitled to its reserved resources. @@ -1657,7 +1667,8 @@ TEST_P(HierarchicalAllocatorTestWithReservations, framework.id(), agent1.id(), recover, - None()); + None(), + false); // Quota: "cpus:3;mem:2048;disk:100". // Allocated quota: "cpus:1;mem:512;disk:50". @@ -2056,7 +2067,8 @@ TEST_F(HierarchicalAllocatorTest, RecoverResources) allocation->frameworkId, slave.id(), reserved, - None()); + None(), + false); Clock::advance(flags.allocation_interval); @@ -2072,7 +2084,8 @@ TEST_F(HierarchicalAllocatorTest, RecoverResources) allocation->frameworkId, slave.id(), unreserved, - None()); + None(), + false); Clock::advance(flags.allocation_interval); @@ -2486,7 +2499,8 @@ TEST_F(HierarchicalAllocatorTest, UpdateAllocation) framework.id(), slave.id(), updated.get(), - None()); + None(), + false); Clock::advance(flags.allocation_interval); @@ -2557,7 +2571,8 @@ TEST_F(HierarchicalAllocatorTest, UpdateAllocationRemoveResources) framework.id(), slave.id(), updated.get(), - None()); + None(), + false); Clock::advance(flags.allocation_interval); @@ -2633,7 +2648,8 @@ TEST_F(HierarchicalAllocatorTest, UpdateAllocationSharedPersistentVolume) framework.id(), slave.id(), update.get(), - None()); + None(), + false); Clock::advance(flags.allocation_interval); @@ -2669,7 +2685,8 @@ TEST_F(HierarchicalAllocatorTest, UpdateAllocationSharedPersistentVolume) framework.id(), slave.id(), update.get(), - None()); + None(), + false); Clock::advance(flags.allocation_interval); @@ -2740,7 +2757,8 @@ TEST_F(HierarchicalAllocatorTest, SharedResourcesCapability) framework1.id(), slave.id(), update.get(), - None()); + None(), + false); // Shared volume not offered to `framework1` since it has not // opted in for SHARED_RESOURCES. @@ -2758,7 +2776,8 @@ TEST_F(HierarchicalAllocatorTest, SharedResourcesCapability) framework1.id(), slave.id(), allocation->resources.at("role1").at(slave.id()), - None()); + None(), + false); // Create `framework2` with opting in for SHARED_RESOURCES. FrameworkInfo framework2 = createFrameworkInfo( @@ -2990,7 +3009,8 @@ TEST_F(HierarchicalAllocatorTest, UpdateSlaveTotalResources) agent.id(), expected1.resources.at("role1").at(agent.id()) + expected2.resources.at("role1").at(agent.id()), - None()); + None(), + false); // Advance the clock to trigger allocation of // the available `agentResources2` resources. @@ -3354,7 +3374,8 @@ TEST_F(HierarchicalAllocatorTest, RecoverOversubscribedResources) recovered += Resources::parse("cpus:2").get(); recovered.allocate("role1"); - allocator->recoverResources(framework.id(), slave.id(), recovered, None()); + allocator->recoverResources( + framework.id(), slave.id(), recovered, None(), false); Clock::advance(flags.allocation_interval); @@ -3611,7 +3632,8 @@ TEST_F(HierarchicalAllocatorTest, QuotaProvidesGuarantee) framework1.id(), agent2.id(), allocation->resources.at(QUOTA_ROLE).at(agent2.id()), - offerFilter); + offerFilter, + false); // Total cluster resources: cpus=2, mem=1024. // QUOTA_ROLE share = 0.5 (cpus=1, mem=512) [quota: cpus=2, mem=1024] @@ -3805,7 +3827,8 @@ TEST_F(HierarchicalAllocatorTest, RemoveQuota) framework1.id(), agent1.id(), allocatedResources(agent1.resources(), QUOTA_ROLE), - None()); + None(), + false); // Trigger the next batch allocation. Clock::advance(flags.allocation_interval); @@ -3957,7 +3980,8 @@ TEST_F(HierarchicalAllocatorTest, MultipleFrameworksInRoleWithQuota) framework1a.id(), agent3.id(), allocatedResources(agent3.resources(), QUOTA_ROLE), - filter5s); + filter5s, + false); // Trigger the next batch allocation. Clock::advance(flags.allocation_interval); @@ -4537,7 +4561,8 @@ TEST_F(HierarchicalAllocatorTest, QuotaAgainstStarvation) framework2.id(), agent2.id(), allocatedResources(agent2.resources(), NO_QUOTA_ROLE), - filter0s); + filter0s, + false); // Total cluster resources (2 identical agents): cpus=2, mem=1024. // QUOTA_ROLE share = 0.5 (cpus=1, mem=512) @@ -4559,7 +4584,8 @@ TEST_F(HierarchicalAllocatorTest, QuotaAgainstStarvation) framework2.id(), agent2.id(), allocatedResources(agent2.resources(), NO_QUOTA_ROLE), - filter0s); + filter0s, + false); // We set quota for the "starving" `QUOTA_ROLE` role. const Quota quota = createQuota("cpus:2;mem:1024"); @@ -4962,7 +4988,8 @@ TEST_F(HierarchicalAllocatorTest, QuotaSetAsideReservedResources) agent1.id(), allocatedResources( CHECK_NOTERROR(Resources::parse(quotaResourcesString)), QUOTA_ROLE), - longFilter); + longFilter, + false); // Trigger a batch allocation for good measure, but don't expect any // allocations. @@ -4990,7 +5017,8 @@ TEST_F(HierarchicalAllocatorTest, QuotaSetAsideReservedResources) framework2.id(), agent2.id(), allocatedResources(dynamicallyReserved, NO_QUOTA_ROLE), - longFilter); + longFilter, + false); // No more resource offers should be made until the filters expire: // `framework1` should not be offered the resources at `agent2` @@ -5044,7 +5072,8 @@ TEST_F(HierarchicalAllocatorTest, DeactivateAndReactivateFramework) framework.id(), agent.id(), allocatedResources(agent.resources(), "role1"), - None()); + None(), + false); // Suppress offers and disconnect framework. allocator->suppressOffers(framework.id(), {}); @@ -5120,7 +5149,8 @@ TEST_F(HierarchicalAllocatorTest, SuppressAndReviveOffers) framework.id(), agent.id(), allocatedResources(agent.resources(), "role1"), - None()); + None(), + false); allocator->suppressOffers(framework.id(), {}); @@ -5574,7 +5604,8 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS( allocation->frameworkId, agent.id(), allocation->resources.at("roleA").at(agent.id()), - offerFilter); + offerFilter, + false); JSON::Object expected; expected.values = { @@ -5599,7 +5630,8 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS( allocation->frameworkId, agent.id(), allocation->resources.at("roleB").at(agent.id()), - offerFilter); + offerFilter, + false); expected.values = { {"allocator/mesos/offer_filters/roles/roleA/active", 1}, @@ -5624,7 +5656,8 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS( allocation->frameworkId, agent.id(), allocation->resources.at("roleA").at(agent.id()), - offerFilter); + offerFilter, + false); expected.values = { {"allocator/mesos/offer_filters/roles/roleA/active", 2}, @@ -5679,7 +5712,9 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(HierarchicalAllocatorTest, DominantShareMetrics) allocation->frameworkId, agent1.id(), allocation->resources.at("roleA").at(agent1.id()), - None()); + None(), + false); + Clock::settle(); expected.values = { @@ -5860,11 +5895,8 @@ TEST_F(HierarchicalAllocatorTest, UpdateWeight) foreachpair (const SlaveID& slaveId, const Resources& resources, allocation->resources.at(role)) { - allocator->recoverResources( - allocation->frameworkId, - slaveId, - resources, - None()); + allocator->recoverResources( + allocation->frameworkId, slaveId, resources, None(), false); } } } @@ -6081,7 +6113,8 @@ TEST_F(HierarchicalAllocatorTest, ReviveOffers) framework.id(), agent.id(), allocatedResources(agent.resources(), "role1"), - filter1000s); + filter1000s, + false); // Advance the clock to trigger a batch allocation. Clock::advance(flags.allocation_interval); @@ -6140,7 +6173,8 @@ TEST_F(HierarchicalAllocatorTest, SuppressAndReviveOffersWithMultiRole) framework.id(), agent.id(), allocatedResources(agent.resources(), "role2"), - filter1day); + filter1day, + false); // Advance the clock to trigger a batch allocation. Clock::advance(flags.allocation_interval); @@ -6163,7 +6197,8 @@ TEST_F(HierarchicalAllocatorTest, SuppressAndReviveOffersWithMultiRole) framework.id(), agent.id(), allocatedResources(agent.resources(), "role1"), - filter1day); + filter1day, + false); // Advance the clock to trigger a batch allocation. Clock::advance(flags.allocation_interval); @@ -6735,7 +6770,8 @@ TEST_F(HierarchicalAllocatorTest, DISABLED_NestedRoleQuota) framework1.id(), agent.id(), allocatedResources(agent.resources(), PARENT_ROLE), - longFilter); + longFilter, + false); // Create `framework2` in CHILD_ROLE1, which is a child role of // PARENT_ROLE. CHILD_ROLE1 does not have quota. In the current @@ -7062,7 +7098,8 @@ TEST_P(HierarchicalAllocatorTestWithParam, AllocateSharedResources) framework1.id(), slave.id(), updated.get() - allocatedResources(task.resources(), "role1"), - None()); + None(), + false); // The offer to 'framework2` should contain the shared volume. Clock::advance(flags.allocation_interval); @@ -7277,10 +7314,7 @@ TEST_P(HierarchicalAllocations_BENCHMARK_Test, PersistentVolumes) for (size_t count = 0; count < allocationsCount; count++) { foreach (const OfferedResources& offer, offers) { allocator->recoverResources( - offer.frameworkId, - offer.slaveId, - offer.resources, - None()); + offer.frameworkId, offer.slaveId, offer.resources, None(), false); } Clock::settle(); @@ -7550,7 +7584,7 @@ TEST_P(HierarchicalAllocator_BENCHMARK_Test, DeclineOffers) filters.set_refuse_seconds(INT_MAX); allocator->recoverResources( - offer.frameworkId, offer.slaveId, offer.resources, filters); + offer.frameworkId, offer.slaveId, offer.resources, filters, false); } declinedOfferCount += offers.size(); @@ -7747,7 +7781,7 @@ TEST_P(HierarchicalAllocator_BENCHMARK_Test, ResourceLabels) filters.set_refuse_seconds(INT_MAX); allocator->recoverResources( - offer.frameworkId, offer.slaveId, offer.resources, filters); + offer.frameworkId, offer.slaveId, offer.resources, filters, false); } declinedOfferCount += offers.size(); @@ -7889,10 +7923,7 @@ TEST_P(HierarchicalAllocator_BENCHMARK_Test, SuppressOffers) // effect of suppression alone. foreach (const OfferedResources& offer, offers) { allocator->recoverResources( - offer.frameworkId, - offer.slaveId, - offer.resources, - None()); + offer.frameworkId, offer.slaveId, offer.resources, None(), false); } // Wait for all declined offers to be processed. @@ -8053,10 +8084,7 @@ TEST_P(HierarchicalAllocator_BENCHMARK_Test, ExtremeSuppressOffers) // effect of suppression alone. foreach (const OfferedResources& offer, offers) { allocator->recoverResources( - offer.frameworkId, - offer.slaveId, - offer.resources, - None()); + offer.frameworkId, offer.slaveId, offer.resources, None(), false); } // Wait for all declined offers to be processed. @@ -8205,7 +8233,8 @@ TEST_F(HierarchicalAllocatorTest, RemoveFilters) framework.id(), agent.id(), allocation->resources.at(ROLE).at(agent.id()), - offerFilter); + offerFilter, + false); // There should be no allocation due to the offer filter. Clock::advance(flags.allocation_interval); diff --git a/src/tests/master_allocator_tests.cpp b/src/tests/master_allocator_tests.cpp index e960757..d2580c3 100644 --- a/src/tests/master_allocator_tests.cpp +++ b/src/tests/master_allocator_tests.cpp @@ -257,7 +257,7 @@ TYPED_TEST(MasterAllocatorTest, ResourcesUnused) .WillOnce(LaunchTasks(DEFAULT_EXECUTOR_INFO, 1, 1, 512, "*")); Future<Nothing> recoverResources; - EXPECT_CALL(allocator, recoverResources(_, _, _, _)) + EXPECT_CALL(allocator, recoverResources(_, _, _, _, _)) .WillOnce(DoAll(InvokeRecoverResources(&allocator), FutureSatisfy(&recoverResources))); @@ -300,7 +300,7 @@ TYPED_TEST(MasterAllocatorTest, ResourcesUnused) AWAIT_READY(resourceOffers); // Shut everything down. - EXPECT_CALL(allocator, recoverResources(_, _, _, _)) + EXPECT_CALL(allocator, recoverResources(_, _, _, _, _)) .WillRepeatedly(DoDefault()); Future<Nothing> shutdown; @@ -374,7 +374,7 @@ TYPED_TEST(MasterAllocatorTest, OutOfOrderDispatch) // framework has terminated or is inactive. Future<SlaveID> slaveId; Future<Resources> savedResources; - EXPECT_CALL(allocator, recoverResources(_, _, _, _)) + EXPECT_CALL(allocator, recoverResources(_, _, _, _, _)) // "Catches" the recoverResources call from the master, so // that it doesn't get processed until we redispatch it after // the removeFramework trigger. @@ -395,7 +395,7 @@ TYPED_TEST(MasterAllocatorTest, OutOfOrderDispatch) AWAIT_READY(slaveId); AWAIT_READY(savedResources); - EXPECT_CALL(allocator, recoverResources(_, _, _, _)) + EXPECT_CALL(allocator, recoverResources(_, _, _, _, _)) .WillOnce(DoDefault()); // For the re-dispatch. // Re-dispatch the recoverResources call which we "caught" @@ -405,7 +405,8 @@ TYPED_TEST(MasterAllocatorTest, OutOfOrderDispatch) frameworkId1.get(), slaveId.get(), savedResources.get(), - None()); + None(), + false); // TODO(benh): Seems like we should wait for the above // recoverResources to be executed. @@ -435,7 +436,7 @@ TYPED_TEST(MasterAllocatorTest, OutOfOrderDispatch) AWAIT_READY(resourceOffers); // Called when driver2 stops. - EXPECT_CALL(allocator, recoverResources(_, _, _, _)) + EXPECT_CALL(allocator, recoverResources(_, _, _, _, _)) .WillRepeatedly(DoDefault()); EXPECT_CALL(allocator, deactivateFramework(_)) .WillRepeatedly(DoDefault()); @@ -510,7 +511,7 @@ TYPED_TEST(MasterAllocatorTest, SchedulerFailover) // We don't filter the unused resources to make sure that // they get offered to the framework as soon as it fails over. - EXPECT_CALL(allocator, recoverResources(_, _, _, _)) + EXPECT_CALL(allocator, recoverResources(_, _, _, _, _)) .WillOnce(InvokeRecoverResourcesWithFilters(&allocator, 0)) // For subsequent offers. .WillRepeatedly(InvokeRecoverResourcesWithFilters(&allocator, 0)); @@ -569,7 +570,7 @@ TYPED_TEST(MasterAllocatorTest, SchedulerFailover) .Times(AtMost(1)); // Shut everything down. - EXPECT_CALL(allocator, recoverResources(_, _, _, _)) + EXPECT_CALL(allocator, recoverResources(_, _, _, _, _)) .WillRepeatedly(DoDefault()); EXPECT_CALL(allocator, deactivateFramework(_)) @@ -645,7 +646,7 @@ TYPED_TEST(MasterAllocatorTest, FrameworkExited) // The framework does not use all the resources. Future<Nothing> recoverResources; - EXPECT_CALL(allocator, recoverResources(_, _, _, _)) + EXPECT_CALL(allocator, recoverResources(_, _, _, _, _)) .WillOnce(DoAll(InvokeRecoverResources(&allocator), FutureSatisfy(&recoverResources))); @@ -683,7 +684,7 @@ TYPED_TEST(MasterAllocatorTest, FrameworkExited) // The framework 2 does not use all the resources. Future<Nothing> recoverResources2; - EXPECT_CALL(allocator, recoverResources(_, _, _, _)) + EXPECT_CALL(allocator, recoverResources(_, _, _, _, _)) .WillOnce(DoAll(InvokeRecoverResources(&allocator), FutureSatisfy(&recoverResources2))); @@ -700,7 +701,7 @@ TYPED_TEST(MasterAllocatorTest, FrameworkExited) // Shutdown framework 1; framework 2 should then be offered the // resources that were consumed by framework 1. - EXPECT_CALL(allocator, recoverResources(_, _, _, _)) + EXPECT_CALL(allocator, recoverResources(_, _, _, _, _)) .WillRepeatedly(DoDefault()); Future<Nothing> removeFramework; @@ -785,7 +786,7 @@ TYPED_TEST(MasterAllocatorTest, SlaveLost) .WillOnce(LaunchTasks(DEFAULT_EXECUTOR_INFO, 1, 2, 512, "*")); Future<Nothing> recoverResources; - EXPECT_CALL(allocator, recoverResources(_, _, _, _)) + EXPECT_CALL(allocator, recoverResources(_, _, _, _, _)) .WillOnce(DoAll(InvokeRecoverResources(&allocator), FutureSatisfy(&recoverResources))); @@ -812,7 +813,7 @@ TYPED_TEST(MasterAllocatorTest, SlaveLost) // 'recoverResources' should be called twice, once for the task // and once for the executor. - EXPECT_CALL(allocator, recoverResources(_, _, _, _)) + EXPECT_CALL(allocator, recoverResources(_, _, _, _, _)) .Times(2); Future<Nothing> removeSlave; @@ -855,7 +856,7 @@ TYPED_TEST(MasterAllocatorTest, SlaveLost) DEFAULT_FRAMEWORK_INFO.roles(0))); // Shut everything down. - EXPECT_CALL(allocator, recoverResources(_, _, _, _)) + EXPECT_CALL(allocator, recoverResources(_, _, _, _, _)) .WillRepeatedly(DoDefault()); driver.stop(); @@ -917,7 +918,7 @@ TYPED_TEST(MasterAllocatorTest, SlaveAdded) // on slave1 from the task launch won't get reoffered // immediately and will get combined with slave2's // resources for a single offer. - EXPECT_CALL(allocator, recoverResources(_, _, _, _)) + EXPECT_CALL(allocator, recoverResources(_, _, _, _, _)) .WillOnce(InvokeRecoverResourcesWithFilters(&allocator, 0.1)) .WillRepeatedly(InvokeRecoverResourcesWithFilters(&allocator, 0)); @@ -955,7 +956,7 @@ TYPED_TEST(MasterAllocatorTest, SlaveAdded) .Times(AtMost(1)); // Shut everything down. - EXPECT_CALL(allocator, recoverResources(_, _, _, _)) + EXPECT_CALL(allocator, recoverResources(_, _, _, _, _)) .WillRepeatedly(DoDefault()); driver.stop(); @@ -1017,7 +1018,7 @@ TYPED_TEST(MasterAllocatorTest, TaskFinished) // allocator knows about the unused resources so that it can // aggregate them with the resources from the finished task. Future<Nothing> recoverResources; - EXPECT_CALL(allocator, recoverResources(_, _, _, _)) + EXPECT_CALL(allocator, recoverResources(_, _, _, _, _)) .WillRepeatedly(DoAll(InvokeRecoverResources(&allocator), FutureSatisfy(&recoverResources))); @@ -1046,7 +1047,7 @@ TYPED_TEST(MasterAllocatorTest, TaskFinished) status.mutable_task_id()->MergeFrom(taskInfo.task_id()); status.set_state(TASK_FINISHED); - EXPECT_CALL(allocator, recoverResources(_, _, _, _)); + EXPECT_CALL(allocator, recoverResources(_, _, _, _, _)); // After the first task gets killed. Future<Nothing> resourceOffers; @@ -1061,7 +1062,7 @@ TYPED_TEST(MasterAllocatorTest, TaskFinished) .Times(AtMost(1)); // Shut everything down. - EXPECT_CALL(allocator, recoverResources(_, _, _, _)) + EXPECT_CALL(allocator, recoverResources(_, _, _, _, _)) .WillRepeatedly(DoDefault()); driver.stop(); @@ -1521,7 +1522,7 @@ TYPED_TEST(MasterAllocatorTest, FrameworkReregistersFirst) .WillOnce(LaunchTasks(DEFAULT_EXECUTOR_INFO, 1, 1, 500, "*")) .WillRepeatedly(DeclineOffers()); - EXPECT_CALL(allocator, recoverResources(_, _, _, _)); + EXPECT_CALL(allocator, recoverResources(_, _, _, _, _)); EXPECT_CALL(exec, registered(_, _, _, _)); @@ -1545,7 +1546,7 @@ TYPED_TEST(MasterAllocatorTest, FrameworkReregistersFirst) // that it doesn't try to retry the update after master failover. AWAIT_READY(_statusUpdateAcknowledgement); - EXPECT_CALL(allocator, recoverResources(_, _, _, _)) + EXPECT_CALL(allocator, recoverResources(_, _, _, _, _)) .WillRepeatedly(DoDefault()); } @@ -1640,7 +1641,7 @@ TYPED_TEST(MasterAllocatorTest, SlaveReregistersFirst) ASSERT_SOME(slave); EXPECT_CALL(allocator, addFramework(_, _, _, _, _)); - EXPECT_CALL(allocator, recoverResources(_, _, _, _)); + EXPECT_CALL(allocator, recoverResources(_, _, _, _, _)); EXPECT_CALL(sched, registered(&driver, _, _)); @@ -1672,7 +1673,7 @@ TYPED_TEST(MasterAllocatorTest, SlaveReregistersFirst) // that it doesn't try to retry the update after master failover. AWAIT_READY(_statusUpdateAcknowledgement); - EXPECT_CALL(allocator, recoverResources(_, _, _, _)) + EXPECT_CALL(allocator, recoverResources(_, _, _, _, _)) .WillRepeatedly(DoDefault()); } @@ -1848,7 +1849,7 @@ TYPED_TEST(MasterAllocatorTest, RebalancedForUpdatedWeights) EXPECT_CALL(sched1, offerRescinded(&driver1, _)).Times(3); Future<Resources> recoverResources1, recoverResources2, recoverResources3; - EXPECT_CALL(allocator, recoverResources(_, _, _, _)) + EXPECT_CALL(allocator, recoverResources(_, _, _, _, _)) .WillOnce(DoAll(FutureArg<2>(&recoverResources1), InvokeRecoverResources(&allocator))) .WillOnce(DoAll(FutureArg<2>(&recoverResources2), diff --git a/src/tests/slave_recovery_tests.cpp b/src/tests/slave_recovery_tests.cpp index b23162d..0efd3a6 100644 --- a/src/tests/slave_recovery_tests.cpp +++ b/src/tests/slave_recovery_tests.cpp @@ -4037,7 +4037,7 @@ TYPED_TEST(SlaveRecoveryTest, ReconcileTasksMissingFromSlave) FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _); EXPECT_CALL(allocator, activateSlave(_)); - EXPECT_CALL(allocator, recoverResources(_, _, _, _)); + EXPECT_CALL(allocator, recoverResources(_, _, _, _, _)); Future<TaskStatus> status; EXPECT_CALL(sched, statusUpdate(_, _)) @@ -4088,7 +4088,7 @@ TYPED_TEST(SlaveRecoveryTest, ReconcileTasksMissingFromSlave) // If there was an outstanding offer, we can get a call to // recoverResources when we stop the scheduler. - EXPECT_CALL(allocator, recoverResources(_, _, _, _)) + EXPECT_CALL(allocator, recoverResources(_, _, _, _, _)) .WillRepeatedly(Return()); driver.stop();
