Added Decline call support to the master and C++ scheduler library. Review: https://reviews.apache.org/r/35855
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/fa4a9a8c Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/fa4a9a8c Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/fa4a9a8c Branch: refs/heads/master Commit: fa4a9a8c6ee60fb4b459c45c6fac85dc0ddc2264 Parents: c504c3d Author: Vinod Kone <[email protected]> Authored: Wed Jun 24 12:01:22 2015 -0700 Committer: Vinod Kone <[email protected]> Committed: Wed Jul 1 17:54:58 2015 -0700 ---------------------------------------------------------------------- src/master/master.cpp | 120 ++++++++++++++++++++++++++++--------- src/master/master.hpp | 4 ++ src/scheduler/scheduler.cpp | 18 +++--- src/tests/scheduler_tests.cpp | 79 ++++++++++++++++++++++++ 4 files changed, 181 insertions(+), 40 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/fa4a9a8c/src/master/master.cpp ---------------------------------------------------------------------- diff --git a/src/master/master.cpp b/src/master/master.cpp index 9f9f578..78edf6a 100644 --- a/src/master/master.cpp +++ b/src/master/master.cpp @@ -1643,49 +1643,70 @@ void Master::receive( // framework id is set and non-empty except for SUBSCRIBE call. switch (call.type()) { - case scheduler::Call::REVIVE: - case scheduler::Call::DECLINE: - drop(from, call, "Unimplemented"); + case scheduler::Call::TEARDOWN: { + removeFramework(framework); break; + } - case scheduler::Call::ACCEPT: + case scheduler::Call::ACCEPT: { if (!call.has_accept()) { drop(from, call, "Expecting 'accept' to be present"); return; } accept(framework, call.accept()); break; + } - case scheduler::Call::RECONCILE: - if (!call.has_reconcile()) { - drop(from, call, "Expecting 'reconcile' to be present"); + case scheduler::Call::DECLINE: { + if (!call.has_decline()) { + drop(from, call, "Expecting 'decline' to be present"); return; } - reconcile(framework, call.reconcile()); + decline(framework, call.decline()); break; + } - case scheduler::Call::SHUTDOWN: - if (!call.has_shutdown()) { - drop(from, call, "Expecting 'shutdown' to be present"); - } - shutdown(framework, call.shutdown()); + case scheduler::Call::REVIVE: { + drop(from, call, "Unimplemented"); break; + } - case scheduler::Call::KILL: + case scheduler::Call::KILL: { if (!call.has_kill()) { drop(from, call, "Expecting 'kill' to be present"); + return; } kill(framework, call.kill()); break; + } - case scheduler::Call::ACKNOWLEDGE: - case scheduler::Call::MESSAGE: + case scheduler::Call::SHUTDOWN: { + if (!call.has_shutdown()) { + drop(from, call, "Expecting 'shutdown' to be present"); + return; + } + shutdown(framework, call.shutdown()); + break; + } + + case scheduler::Call::ACKNOWLEDGE: { drop(from, call, "Unimplemented"); break; + } - case scheduler::Call::TEARDOWN: - removeFramework(framework); + case scheduler::Call::RECONCILE: { + if (!call.has_reconcile()) { + drop(from, call, "Expecting 'reconcile' to be present"); + return; + } + reconcile(framework, call.reconcile()); + break; + } + + case scheduler::Call::MESSAGE: { + drop(from, call, "Unimplemented"); break; + } default: drop(from, call, "Unknown call type"); @@ -2232,21 +2253,34 @@ void Master::launchTasks( return; } - scheduler::Call::Accept message; - message.mutable_filters()->CopyFrom(filters); + // Currently when no tasks are specified in the launchTasks message + // it is implicitly considered a decline of the offers. + if (!tasks.empty()) { + scheduler::Call::Accept message; + message.mutable_filters()->CopyFrom(filters); - Offer::Operation* operation = message.add_operations(); - operation->set_type(Offer::Operation::LAUNCH); + Offer::Operation* operation = message.add_operations(); + operation->set_type(Offer::Operation::LAUNCH); - foreach (const TaskInfo& task, tasks) { - operation->mutable_launch()->add_task_infos()->CopyFrom(task); - } + foreach (const TaskInfo& task, tasks) { + operation->mutable_launch()->add_task_infos()->CopyFrom(task); + } - foreach (const OfferID& offerId, offerIds) { - message.add_offer_ids()->CopyFrom(offerId); - } + foreach (const OfferID& offerId, offerIds) { + message.add_offer_ids()->CopyFrom(offerId); + } + + accept(framework, message); + } else { + scheduler::Call::Decline message; + message.mutable_filters()->CopyFrom(filters); + + foreach (const OfferID& offerId, offerIds) { + message.add_offer_ids()->CopyFrom(offerId); + } - accept(framework, message); + decline(framework, message); + } } @@ -2797,6 +2831,34 @@ void Master::_accept( } +void Master::decline( + Framework* framework, + const scheduler::Call::Decline& decline) +{ + CHECK_NOTNULL(framework); + + LOG(INFO) << "Processing DECLINE call for offers: " << decline.offer_ids() + << " for framework " << *framework; + + // Return resources to the allocator. + foreach (const OfferID& offerId, decline.offer_ids()) { + Offer* offer = getOffer(offerId); + if (offer != NULL) { + allocator->recoverResources( + offer->framework_id(), + offer->slave_id(), + offer->resources(), + decline.filters()); + + removeOffer(offer); + } else { + LOG(WARNING) << "Ignoring decline of offer " << offerId + << " since it is no longer valid"; + } + } +} + + void Master::reviveOffers(const UPID& from, const FrameworkID& frameworkId) { ++metrics->messages_revive_offers; http://git-wip-us.apache.org/repos/asf/mesos/blob/fa4a9a8c/src/master/master.hpp ---------------------------------------------------------------------- diff --git a/src/master/master.hpp b/src/master/master.hpp index af83d3e..46ff822 100644 --- a/src/master/master.hpp +++ b/src/master/master.hpp @@ -1015,6 +1015,10 @@ private: const scheduler::Call::Accept& accept, const process::Future<std::list<process::Future<bool>>>& authorizations); + void decline( + Framework* framework, + const scheduler::Call::Decline& decline); + void reconcile( Framework* framework, const scheduler::Call::Reconcile& reconcile); http://git-wip-us.apache.org/repos/asf/mesos/blob/fa4a9a8c/src/scheduler/scheduler.cpp ---------------------------------------------------------------------- diff --git a/src/scheduler/scheduler.cpp b/src/scheduler/scheduler.cpp index 38fd286..d9e341f 100644 --- a/src/scheduler/scheduler.cpp +++ b/src/scheduler/scheduler.cpp @@ -246,22 +246,18 @@ public: break; } - case Call::DECLINE: { - if (!call.has_decline()) { - drop(call, "Expecting 'decline' to be present"); + case Call::ACCEPT: { + if (!call.has_accept()) { + drop(call, "Expecting 'accept' to be present"); return; } - LaunchTasksMessage message; - message.mutable_framework_id()->CopyFrom(call.framework_id()); - message.mutable_filters()->CopyFrom(call.decline().filters()); - message.mutable_offer_ids()->CopyFrom(call.decline().offer_ids()); - send(master.get(), message); + send(master.get(), call); break; } - case Call::ACCEPT: { - if (!call.has_accept()) { - drop(call, "Expecting 'accept' to be present"); + case Call::DECLINE: { + if (!call.has_decline()) { + drop(call, "Expecting 'decline' to be present"); return; } send(master.get(), call); http://git-wip-us.apache.org/repos/asf/mesos/blob/fa4a9a8c/src/tests/scheduler_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/scheduler_tests.cpp b/src/tests/scheduler_tests.cpp index 5a0c645..958ccfc 100644 --- a/src/tests/scheduler_tests.cpp +++ b/src/tests/scheduler_tests.cpp @@ -661,6 +661,85 @@ TEST_F(SchedulerTest, Teardown) } +TEST_F(SchedulerTest, Decline) +{ + Try<PID<Master>> master = StartMaster(); + ASSERT_SOME(master); + + Try<PID<Slave>> slave = StartSlave(); + ASSERT_SOME(slave); + + Callbacks callbacks; + + Future<Nothing> connected; + EXPECT_CALL(callbacks, connected()) + .WillOnce(FutureSatisfy(&connected)); + + scheduler::Mesos mesos( + master.get(), + DEFAULT_CREDENTIAL, + lambda::bind(&Callbacks::connected, lambda::ref(callbacks)), + lambda::bind(&Callbacks::disconnected, lambda::ref(callbacks)), + lambda::bind(&Callbacks::received, lambda::ref(callbacks), lambda::_1)); + + AWAIT_READY(connected); + + Queue<Event> events; + + EXPECT_CALL(callbacks, received(_)) + .WillRepeatedly(Enqueue(&events)); + + { + Call call; + call.set_type(Call::SUBSCRIBE); + + Call::Subscribe* subscribe = call.mutable_subscribe(); + subscribe->mutable_framework_info()->CopyFrom(DEFAULT_FRAMEWORK_INFO); + subscribe->set_force(true); + + mesos.send(call); + } + + Future<Event> event = events.get(); + AWAIT_READY(event); + EXPECT_EQ(Event::SUBSCRIBED, event.get().type()); + + FrameworkID id(event.get().subscribed().framework_id()); + + event = events.get(); + AWAIT_READY(event); + EXPECT_EQ(Event::OFFERS, event.get().type()); + ASSERT_EQ(1, event.get().offers().offers().size()); + + Offer offer = event.get().offers().offers(0); + { + Call call; + call.mutable_framework_id()->CopyFrom(id); + call.set_type(Call::DECLINE); + + Call::Decline* decline = call.mutable_decline(); + decline->add_offer_ids()->CopyFrom(offer.id()); + + // Set 0s filter to immediately get another offer. + Filters filters; + filters.set_refuse_seconds(0); + decline->mutable_filters()->CopyFrom(filters); + + mesos.send(call); + } + + // If the resources were properly declined, the scheduler should + // get another offer with same amount of resources. + event = events.get(); + AWAIT_READY(event); + EXPECT_EQ(Event::OFFERS, event.get().type()); + ASSERT_EQ(1, event.get().offers().offers().size()); + ASSERT_EQ(offer.resources(), event.get().offers().offers(0).resources()); + + Shutdown(); // Must shutdown before 'containerizer' gets deallocated. +} + + // TODO(benh): Write test for sending Call::Acknowledgement through // master to slave when Event::Update was generated locally.
