Repository: mesos Updated Branches: refs/heads/master 08e3d9d88 -> 707bf3b1d
Added optional --offer_timeout to rescind unused offers. The ability to set an offer timeout helps prevent unfair resource allocations in the face of frameworks that hoard offers, or that accidentally drop offers. When optimistic offers are added, hoarding will not affect the fairness for other frameworks. Review: https://reviews.apache.org/r/22066 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/707bf3b1 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/707bf3b1 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/707bf3b1 Branch: refs/heads/master Commit: 707bf3b1d6f042ee92e7a291d3f74a20ae2d494b Parents: 08e3d9d Author: Kapil Arya <[email protected]> Authored: Fri Sep 5 11:15:15 2014 -0700 Committer: Benjamin Mahler <[email protected]> Committed: Fri Sep 5 11:57:00 2014 -0700 ---------------------------------------------------------------------- src/master/flags.hpp | 9 ++ src/master/master.cpp | 34 ++++++++ src/master/master.hpp | 4 + src/tests/master_tests.cpp | 177 ++++++++++++++++++++++++++++++++++++++++ 4 files changed, 224 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/707bf3b1/src/master/flags.hpp ---------------------------------------------------------------------- diff --git a/src/master/flags.hpp b/src/master/flags.hpp index 5e9ecb5..507ca60 100644 --- a/src/master/flags.hpp +++ b/src/master/flags.hpp @@ -289,6 +289,14 @@ public: "max_executors_per_slave", "A maximum number of executors to allow per slave."); #endif // WITH_NETWORK_ISOLATOR + + // TODO(karya): When we have optimistic offers, this will only + // benefit frameworks that accidentally lose an offer. + add(&Flags::offer_timeout, + "offer_timeout", + "Duration of time before an offer is rescinded from a framework.\n" + "This helps fairness when running frameworks that hold on to offers,\n" + "or frameworks that accidentally drop offers.\n"); } bool version; @@ -317,6 +325,7 @@ public: Option<std::string> credentials; Option<ACLs> acls; Option<RateLimits> rate_limits; + Option<Duration> offer_timeout; #ifdef WITH_NETWORK_ISOLATOR Option<size_t> max_executors_per_slave; http://git-wip-us.apache.org/repos/asf/mesos/blob/707bf3b1/src/master/master.cpp ---------------------------------------------------------------------- diff --git a/src/master/master.cpp b/src/master/master.cpp index 2508b38..c6393b2 100644 --- a/src/master/master.cpp +++ b/src/master/master.cpp @@ -461,6 +461,13 @@ void Master::initialize() roles[role] = new Role(roleInfo); } + // Verify the timeout is greater than zero. + if (flags.offer_timeout.isSome() && + flags.offer_timeout.get() <= Duration::zero()) { + EXIT(1) << "Invalid value '" << flags.offer_timeout.get() << "' " + << "for --offer_timeout: Must be greater than zero."; + } + // Initialize the allocator. allocator->initialize(flags, self(), roleInfos); @@ -3560,6 +3567,15 @@ void Master::offer(const FrameworkID& frameworkId, framework->addOffer(offer); slave->addOffer(offer); + if (flags.offer_timeout.isSome()) { + // Rescind the offer after the timeout elapses. + offerTimers[offer->id()] = + delay(flags.offer_timeout.get(), + self(), + &Self::offerTimeout, + offer->id()); + } + // TODO(jieyu): For now, we strip 'ephemeral_ports' resource from // offers so that frameworks do not see this resource. This is a // short term workaround. Revisit this once we resolve MESOS-1654. @@ -4411,6 +4427,17 @@ void Master::removeTask(Task* task) } +void Master::offerTimeout(const OfferID& offerId) +{ + Offer* offer = getOffer(offerId); + if (offer != NULL) { + allocator->resourcesRecovered( + offer->framework_id(), offer->slave_id(), offer->resources(), None()); + removeOffer(offer, true); + } +} + + // TODO(vinod): Instead of 'removeOffer()', consider implementing // 'useOffer()', 'discardOffer()' and 'rescindOffer()' for clarity. void Master::removeOffer(Offer* offer, bool rescind) @@ -4437,6 +4464,13 @@ void Master::removeOffer(Offer* offer, bool rescind) send(framework->pid, message); } + // Remove and cancel offer removal timers. Canceling the Timers is + // only done to avoid having too many active Timers in libprocess. + if (offerTimers.contains(offer->id())) { + Timer::cancel(offerTimers[offer->id()]); + offerTimers.erase(offer->id()); + } + // Delete it. offers.erase(offer->id()); delete offer; http://git-wip-us.apache.org/repos/asf/mesos/blob/707bf3b1/src/master/master.hpp ---------------------------------------------------------------------- diff --git a/src/master/master.hpp b/src/master/master.hpp index c9f989a..f502df3 100644 --- a/src/master/master.hpp +++ b/src/master/master.hpp @@ -377,6 +377,9 @@ protected: const process::UPID& acknowledgee, Framework* framework); + // Remove an offer after specified timeout + void offerTimeout(const OfferID& offerId); + // Remove an offer and optionally rescind the offer as well. void removeOffer(Offer* offer, bool rescind = false); @@ -554,6 +557,7 @@ private: } frameworks; hashmap<OfferID, Offer*> offers; + hashmap<OfferID, process::Timer> offerTimers; hashmap<std::string, Role*> roles; http://git-wip-us.apache.org/repos/asf/mesos/blob/707bf3b1/src/tests/master_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/master_tests.cpp b/src/tests/master_tests.cpp index eaa1675..3d080b2 100644 --- a/src/tests/master_tests.cpp +++ b/src/tests/master_tests.cpp @@ -2093,3 +2093,180 @@ TEST_F(MasterTest, MaxExecutorsPerSlave) Shutdown(); // Must shutdown before 'containerizer' gets deallocated. } #endif // WITH_NETWORK_ISOLATOR + + +// This test verifies that when the Framework has not responded to +// an offer within the default timeout, the offer is rescinded. +TEST_F(MasterTest, OfferTimeout) +{ + master::Flags masterFlags = MesosTest::CreateMasterFlags(); + masterFlags.offer_timeout = Seconds(30); + Try<PID<Master> > master = StartMaster(masterFlags); + ASSERT_SOME(master); + + Try<PID<Slave> > slave = StartSlave(); + ASSERT_SOME(slave); + + MockScheduler sched; + MesosSchedulerDriver driver( + &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL); + + Future<Nothing> registered; + EXPECT_CALL(sched, registered(&driver, _, _)) + .WillOnce(FutureSatisfy(®istered)); + + Future<vector<Offer> > offers1; + Future<vector<Offer> > offers2; + EXPECT_CALL(sched, resourceOffers(&driver, _)) + .WillOnce(FutureArg<1>(&offers1)) + .WillOnce(FutureArg<1>(&offers2)); + + // Expect offer rescinded. + Future<Nothing> offerRescinded; + EXPECT_CALL(sched, offerRescinded(&driver, _)) + .WillOnce(FutureSatisfy(&offerRescinded)); + + Future<Nothing> resourcesRecovered = + FUTURE_DISPATCH(_, &AllocatorProcess::resourcesRecovered); + + driver.start(); + + AWAIT_READY(registered); + AWAIT_READY(offers1); + ASSERT_EQ(1u, offers1.get().size()); + + // Now advance the clock, we need to resume it afterwards to + // allow the allocator to make a new allocation decision. + Clock::pause(); + Clock::advance(masterFlags.offer_timeout.get()); + Clock::resume(); + + AWAIT_READY(offerRescinded); + + AWAIT_READY(resourcesRecovered); + + // Expect that the resources are re-offered to the framework after + // the rescind. + AWAIT_READY(offers2); + ASSERT_EQ(1u, offers2.get().size()); + + EXPECT_EQ(offers1.get()[0].resources(), offers2.get()[0].resources()); + + driver.stop(); + driver.join(); + + Shutdown(); +} + + +// Offer should not be rescinded if it's accepted. +TEST_F(MasterTest, OfferNotRescindedOnceUsed) +{ + master::Flags masterFlags = MesosTest::CreateMasterFlags(); + masterFlags.offer_timeout = Seconds(30); + Try<PID<Master> > master = StartMaster(masterFlags); + ASSERT_SOME(master); + + MockExecutor exec(DEFAULT_EXECUTOR_ID); + + TestContainerizer containerizer(&exec); + + Try<PID<Slave> > slave = StartSlave(&containerizer); + ASSERT_SOME(slave); + + MockScheduler sched; + MesosSchedulerDriver driver( + &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL); + + Future<Nothing> registered; + EXPECT_CALL(sched, registered(&driver, _, _)) + .WillOnce(FutureSatisfy(®istered)); + + EXPECT_CALL(sched, resourceOffers(_, _)) + .WillOnce(LaunchTasks(DEFAULT_EXECUTOR_INFO, 1, 1, 64, "*")) + .WillRepeatedly(Return()); // Ignore subsequent offers. + + EXPECT_CALL(exec, registered(_, _, _, _)); + + EXPECT_CALL(exec, launchTask(_, _)) + .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING)); + + Future<TaskStatus> status; + EXPECT_CALL(sched, statusUpdate(&driver, _)) + .WillOnce(FutureArg<1>(&status)) + .WillRepeatedly(Return()); // Ignore subsequent updates. + + // We don't expect any rescinds if the offer has been accepted. + EXPECT_CALL(sched, offerRescinded(&driver, _)) + .Times(0); + + driver.start(); + AWAIT_READY(registered); + + AWAIT_READY(status); + EXPECT_EQ(TASK_RUNNING, status.get().state()); + + // Now advance to the offer timeout, we need to settle the clock to + // ensure that the offer rescind timeout would be processed + // if triggered. + Clock::pause(); + Clock::advance(masterFlags.offer_timeout.get()); + Clock::settle(); + + driver.stop(); + driver.join(); + + Shutdown(); +} + + +// Offer should not be rescinded if it has been declined. +TEST_F(MasterTest, OfferNotRescindedOnceDeclined) +{ + master::Flags masterFlags = MesosTest::CreateMasterFlags(); + masterFlags.offer_timeout = Seconds(30); + Try<PID<Master> > master = StartMaster(masterFlags); + ASSERT_SOME(master); + + MockExecutor exec(DEFAULT_EXECUTOR_ID); + + TestContainerizer containerizer(&exec); + + Try<PID<Slave> > slave = StartSlave(&containerizer); + ASSERT_SOME(slave); + + MockScheduler sched; + MesosSchedulerDriver driver( + &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL); + + Future<Nothing> registered; + EXPECT_CALL(sched, registered(&driver, _, _)) + .WillOnce(FutureSatisfy(®istered)); + + EXPECT_CALL(sched, resourceOffers(_, _)) + .WillRepeatedly(DeclineOffers()); // Decline all offers. + + Future<LaunchTasksMessage> launchTasksMessage = + FUTURE_PROTOBUF(LaunchTasksMessage(), _, _); + + EXPECT_CALL(sched, offerRescinded(&driver, _)) + .Times(0); + + driver.start(); + AWAIT_READY(registered); + + // Wait for the framework to decline the offers. + AWAIT_READY(launchTasksMessage); + + // Now advance to the offer timeout, we need to settle the clock to + // ensure that the offer rescind timeout would be processed + // if triggered. + Clock::pause(); + Clock::advance(masterFlags.offer_timeout.get()); + Clock::settle(); + + driver.stop(); + driver.join(); + + Shutdown(); +}
