Added Revive call support to the master and C++ scheduler library. Review: https://reviews.apache.org/r/35856
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/49a0d2b4 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/49a0d2b4 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/49a0d2b4 Branch: refs/heads/master Commit: 49a0d2b443636653f773188af2875106c5b3583e Parents: fa4a9a8 Author: Vinod Kone <[email protected]> Authored: Wed Jun 24 15:34:20 2015 -0700 Committer: Vinod Kone <[email protected]> Committed: Wed Jul 1 17:54:58 2015 -0700 ---------------------------------------------------------------------- src/master/master.cpp | 12 ++++- src/master/master.hpp | 2 + src/scheduler/scheduler.cpp | 4 +- src/tests/scheduler_tests.cpp | 95 ++++++++++++++++++++++++++++++++++++++ 4 files changed, 108 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/49a0d2b4/src/master/master.cpp ---------------------------------------------------------------------- diff --git a/src/master/master.cpp b/src/master/master.cpp index 78edf6a..255eaae 100644 --- a/src/master/master.cpp +++ b/src/master/master.cpp @@ -1667,7 +1667,7 @@ void Master::receive( } case scheduler::Call::REVIVE: { - drop(from, call, "Unimplemented"); + revive(framework); break; } @@ -2879,7 +2879,15 @@ void Master::reviveOffers(const UPID& from, const FrameworkID& frameworkId) return; } - LOG(INFO) << "Reviving offers for framework " << *framework; + revive(framework); +} + + +void Master::revive(Framework* framework) +{ + CHECK_NOTNULL(framework); + + LOG(INFO) << "Processing REVIVE call for framework " << *framework; allocator->reviveOffers(framework->id()); } http://git-wip-us.apache.org/repos/asf/mesos/blob/49a0d2b4/src/master/master.hpp ---------------------------------------------------------------------- diff --git a/src/master/master.hpp b/src/master/master.hpp index 46ff822..2d25af4 100644 --- a/src/master/master.hpp +++ b/src/master/master.hpp @@ -1019,6 +1019,8 @@ private: Framework* framework, const scheduler::Call::Decline& decline); + void revive(Framework* framework); + void reconcile( Framework* framework, const scheduler::Call::Reconcile& reconcile); http://git-wip-us.apache.org/repos/asf/mesos/blob/49a0d2b4/src/scheduler/scheduler.cpp ---------------------------------------------------------------------- diff --git a/src/scheduler/scheduler.cpp b/src/scheduler/scheduler.cpp index d9e341f..740c088 100644 --- a/src/scheduler/scheduler.cpp +++ b/src/scheduler/scheduler.cpp @@ -265,9 +265,7 @@ public: } case Call::REVIVE: { - ReviveOffersMessage message; - message.mutable_framework_id()->CopyFrom(call.framework_id()); - send(master.get(), message); + send(master.get(), call); break; } http://git-wip-us.apache.org/repos/asf/mesos/blob/49a0d2b4/src/tests/scheduler_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/scheduler_tests.cpp b/src/tests/scheduler_tests.cpp index 958ccfc..5e2b906 100644 --- a/src/tests/scheduler_tests.cpp +++ b/src/tests/scheduler_tests.cpp @@ -740,6 +740,101 @@ TEST_F(SchedulerTest, Decline) } +TEST_F(SchedulerTest, Revive) +{ + Try<PID<Master>> master = StartMaster(); + ASSERT_SOME(master); + + Try<PID<Slave>> slave = StartSlave(); + ASSERT_SOME(slave); + + Callbacks callbacks; + + Future<Nothing> connected; + EXPECT_CALL(callbacks, connected()) + .WillOnce(FutureSatisfy(&connected)); + + scheduler::Mesos mesos( + master.get(), + DEFAULT_CREDENTIAL, + lambda::bind(&Callbacks::connected, lambda::ref(callbacks)), + lambda::bind(&Callbacks::disconnected, lambda::ref(callbacks)), + lambda::bind(&Callbacks::received, lambda::ref(callbacks), lambda::_1)); + + AWAIT_READY(connected); + + Queue<Event> events; + + EXPECT_CALL(callbacks, received(_)) + .WillRepeatedly(Enqueue(&events)); + + { + Call call; + call.set_type(Call::SUBSCRIBE); + + Call::Subscribe* subscribe = call.mutable_subscribe(); + subscribe->mutable_framework_info()->CopyFrom(DEFAULT_FRAMEWORK_INFO); + subscribe->set_force(true); + + mesos.send(call); + } + + Future<Event> event = events.get(); + AWAIT_READY(event); + EXPECT_EQ(Event::SUBSCRIBED, event.get().type()); + + FrameworkID id(event.get().subscribed().framework_id()); + + event = events.get(); + AWAIT_READY(event); + EXPECT_EQ(Event::OFFERS, event.get().type()); + EXPECT_NE(0, event.get().offers().offers().size()); + + Offer offer = event.get().offers().offers(0); + { + Call call; + call.mutable_framework_id()->CopyFrom(id); + call.set_type(Call::DECLINE); + + Call::Decline* decline = call.mutable_decline(); + decline->add_offer_ids()->CopyFrom(offer.id()); + + // Set 1hr filter to not immediately get another offer. + Filters filters; + filters.set_refuse_seconds(Hours(1).secs()); + decline->mutable_filters()->CopyFrom(filters); + + mesos.send(call); + } + + // No offers should be sent within 30 mins because we set a filter + // for 1 hr. + Clock::pause(); + Clock::advance(Minutes(30)); + Clock::settle(); + + event = events.get(); + ASSERT_TRUE(event.isPending()); + + // On revival the filters should be cleared and the scheduler should + // get another offer with same amount of resources. + { + Call call; + call.mutable_framework_id()->CopyFrom(id); + call.set_type(Call::REVIVE); + + mesos.send(call); + } + + AWAIT_READY(event); + EXPECT_EQ(Event::OFFERS, event.get().type()); + EXPECT_NE(0, event.get().offers().offers().size()); + ASSERT_EQ(offer.resources(), event.get().offers().offers(0).resources()); + + Shutdown(); // Must shutdown before 'containerizer' gets deallocated. +} + + // TODO(benh): Write test for sending Call::Acknowledgement through // master to slave when Event::Update was generated locally.
