Repository: mesos Updated Branches: refs/heads/master cc0a84790 -> e999beb18
Maintenance Primitives: Fixed error in Accept/Decline inverse offers. Added regression test. Note that the test may be slow until inverse offers filters are actually implemented. Review: https://reviews.apache.org/r/38470 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/e999beb1 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/e999beb1 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/e999beb1 Branch: refs/heads/master Commit: e999beb18270601bebbb9131f87a0d1ed6fab37a Parents: cc0a847 Author: Joseph Wu <[email protected]> Authored: Fri Sep 18 15:39:04 2015 -0400 Committer: Joris Van Remoortere <[email protected]> Committed: Fri Sep 18 16:31:45 2015 -0400 ---------------------------------------------------------------------- src/master/master.cpp | 8 +- src/tests/master_maintenance_tests.cpp | 217 +++++++++++++++++++++++++++- 2 files changed, 216 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/e999beb1/src/master/master.cpp ---------------------------------------------------------------------- diff --git a/src/master/master.cpp b/src/master/master.cpp index 0b61f11..ca4d587 100644 --- a/src/master/master.cpp +++ b/src/master/master.cpp @@ -2821,8 +2821,8 @@ void Master::accept( status.set_status(mesos::master::InverseOfferStatus::ACCEPT); allocator->updateInverseOffer( - offer->slave_id(), - offer->framework_id(), + inverseOffer->slave_id(), + inverseOffer->framework_id(), status); removeInverseOffer(inverseOffer); @@ -3292,8 +3292,8 @@ void Master::decline( status.set_status(mesos::master::InverseOfferStatus::DECLINE); allocator->updateInverseOffer( - offer->slave_id(), - offer->framework_id(), + inverseOffer->slave_id(), + inverseOffer->framework_id(), status); removeInverseOffer(inverseOffer); http://git-wip-us.apache.org/repos/asf/mesos/blob/e999beb1/src/tests/master_maintenance_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/master_maintenance_tests.cpp b/src/tests/master_maintenance_tests.cpp index 45112f1..2a41e7e 100644 --- a/src/tests/master_maintenance_tests.cpp +++ b/src/tests/master_maintenance_tests.cpp @@ -122,6 +122,13 @@ public: unavailability = createUnavailability(Clock::now()); } + virtual master::Flags CreateMasterFlags() + { + master::Flags masterFlags = MesosTest::CreateMasterFlags(); + masterFlags.authenticate_frameworks = false; + return masterFlags; + } + virtual slave::Flags CreateSlaveFlags() { slave::Flags slaveFlags = MesosTest::CreateSlaveFlags(); @@ -366,10 +373,7 @@ TEST_F(MasterMaintenanceTest, FailToUnscheduleDeactivatedMachines) // slave is scheduled to go down for maintenance. TEST_F(MasterMaintenanceTest, PendingUnavailabilityTest) { - master::Flags flags = CreateMasterFlags(); - flags.authenticate_frameworks = false; - - Try<PID<Master>> master = StartMaster(flags); + Try<PID<Master>> master = StartMaster(); ASSERT_SOME(master); MockExecutor exec(DEFAULT_EXECUTOR_ID); @@ -425,7 +429,7 @@ TEST_F(MasterMaintenanceTest, PendingUnavailabilityTest) // Schedule this slave for maintenance. MachineID machine; - machine.set_hostname("maintenance-host"); + machine.set_hostname(maintenanceHostname); machine.set_ip(stringify(slave.get().address.ip)); // TODO(jmlvanre): Replace Time(0.0) with `Clock::now()` once JSON double @@ -1028,6 +1032,209 @@ TEST_F(MasterMaintenanceTest, MachineStatus) ASSERT_EQ("0.0.0.2", statuses.get().draining_machines(0).ip()); } + +// Test ensures that accept and decline works with inverse offers. +TEST_F(MasterMaintenanceTest, InverseOffers) +{ + // Set up a master. + Try<PID<Master>> master = StartMaster(); + ASSERT_SOME(master); + + MockExecutor exec(DEFAULT_EXECUTOR_ID); + + Try<PID<Slave>> slave = StartSlave(&exec); + ASSERT_SOME(slave); + + // Before starting any frameworks, put the one machine into `DRAINING` mode. + MachineID machine; + machine.set_hostname(maintenanceHostname); + machine.set_ip(stringify(slave.get().address.ip)); + + // TODO(josephw): Replace Time(0.0) with `Clock::now()` once JSON double + // conversion is fixed. For now using a rounded time avoids the issue. + const Time start = Time::create(0.0).get() + Seconds(60); + const Duration duration = Seconds(120); + const Unavailability unavailability = createUnavailability(start, duration); + + maintenance::Schedule schedule = createSchedule( + {createWindow({machine}, unavailability)}); + + Future<Response> response = process::http::post( + master.get(), + "maintenance/schedule", + headers, + stringify(JSON::Protobuf(schedule))); + + AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response); + + // Now start a framework. + Callbacks callbacks; + + Future<Nothing> connected; + EXPECT_CALL(callbacks, connected()) + .WillOnce(FutureSatisfy(&connected)); + + Mesos mesos( + master.get(), + lambda::bind(&Callbacks::connected, lambda::ref(callbacks)), + lambda::bind(&Callbacks::disconnected, lambda::ref(callbacks)), + lambda::bind(&Callbacks::received, lambda::ref(callbacks), lambda::_1)); + + AWAIT_READY(connected); + + Queue<Event> events; + + EXPECT_CALL(callbacks, received(_)) + .WillRepeatedly(Enqueue(&events)); + + { + Call call; + call.set_type(Call::SUBSCRIBE); + + Call::Subscribe* subscribe = call.mutable_subscribe(); + subscribe->mutable_framework_info()->CopyFrom(DEFAULT_V1_FRAMEWORK_INFO); + + mesos.send(call); + } + + Future<Event> event = events.get(); + AWAIT_READY(event); + EXPECT_EQ(Event::SUBSCRIBED, event.get().type()); + + v1::FrameworkID id(event.get().subscribed().framework_id()); + + // Ensure we receive some regular resource offers. + event = events.get(); + AWAIT_READY(event); + EXPECT_EQ(Event::OFFERS, event.get().type()); + EXPECT_NE(0, event.get().offers().offers().size()); + EXPECT_EQ(0, event.get().offers().inverse_offers().size()); + + // All the offers should have unavailability. + foreach (const v1::Offer& offer, event.get().offers().offers()) { + EXPECT_TRUE(offer.has_unavailability()); + } + + // Just work with a single offer to simplify the rest of the test. + v1::Offer offer = event.get().offers().offers(0); + + EXPECT_CALL(exec, registered(_, _, _, _)) + .Times(1); + + EXPECT_CALL(exec, launchTask(_, _)) + .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING)); + + // A dummy task just for confirming that the offer is accepted. + // TODO(josephw): Replace with v1 API createTask helper. + v1::TaskInfo taskInfo = + evolve(createTask(devolve(offer), "", DEFAULT_EXECUTOR_ID)); + + { + // Accept this one offer. + Call call; + call.mutable_framework_id()->CopyFrom(id); + call.set_type(Call::ACCEPT); + + Call::Accept* accept = call.mutable_accept(); + accept->add_offer_ids()->CopyFrom(offer.id()); + + v1::Offer::Operation* operation = accept->add_operations(); + operation->set_type(v1::Offer::Operation::LAUNCH); + operation->mutable_launch()->add_task_infos()->CopyFrom(taskInfo); + + mesos.send(call); + } + + // Expect an inverse offer. + event = events.get(); + AWAIT_READY(event); + EXPECT_EQ(Event::OFFERS, event.get().type()); + EXPECT_EQ(0, event.get().offers().offers().size()); + EXPECT_EQ(1, event.get().offers().inverse_offers().size()); + + // Save this inverse offer so we can decline it later. + v1::InverseOffer inverseOffer = event.get().offers().inverse_offers(0); + + // Wait for the task to start running. + event = events.get(); + AWAIT_READY(event); + EXPECT_EQ(Event::UPDATE, event.get().type()); + EXPECT_EQ(v1::TASK_RUNNING, event.get().update().status().state()); + + { + // Acknowledge TASK_RUNNING update. + Call call; + call.mutable_framework_id()->CopyFrom(id); + call.set_type(Call::ACKNOWLEDGE); + + Call::Acknowledge* acknowledge = call.mutable_acknowledge(); + acknowledge->mutable_task_id()->CopyFrom(taskInfo.task_id()); + acknowledge->mutable_agent_id()->CopyFrom(offer.agent_id()); + acknowledge->set_uuid(event.get().update().status().uuid()); + + mesos.send(call); + } + + // TODO(hartem): The filters in this test do not actually + // do anything, because inverse offer filters have not been + // implemented yet. Instead, the accept/decline calls use + // the default time, which results in a slow test. + + { + // Decline an inverse offer, with a filter. + Call call; + call.mutable_framework_id()->CopyFrom(id); + call.set_type(Call::DECLINE); + + Call::Decline* decline = call.mutable_decline(); + decline->add_offer_ids()->CopyFrom(inverseOffer.id()); + + // Set a 0 second filter to immediately get another inverse offer. + v1::Filters filters; + filters.set_refuse_seconds(0); + decline->mutable_filters()->CopyFrom(filters); + + mesos.send(call); + } + + // Expect another inverse offer. + event = events.get(); + AWAIT_READY(event); + EXPECT_EQ(Event::OFFERS, event.get().type()); + EXPECT_EQ(0, event.get().offers().offers().size()); + EXPECT_EQ(1, event.get().offers().inverse_offers().size()); + inverseOffer = event.get().offers().inverse_offers(0); + + { + // Accept an inverse offer, with filter. + Call call; + call.mutable_framework_id()->CopyFrom(id); + call.set_type(Call::ACCEPT); + + Call::Accept* accept = call.mutable_accept(); + accept->add_offer_ids()->CopyFrom(inverseOffer.id()); + + // Set a 0 second filter to immediately get another inverse offer. + v1::Filters filters; + filters.set_refuse_seconds(0); + accept->mutable_filters()->CopyFrom(filters); + + mesos.send(call); + } + + // Expect yet another inverse offer. + event = events.get(); + AWAIT_READY(event); + EXPECT_EQ(Event::OFFERS, event.get().type()); + EXPECT_EQ(0, event.get().offers().offers().size()); + EXPECT_EQ(1, event.get().offers().inverse_offers().size()); + + EXPECT_CALL(exec, shutdown(_)) + .Times(AtMost(1)); + + Shutdown(); // Must shutdown before 'containerizer' gets deallocated. +} + } // namespace tests { } // namespace internal { } // namespace mesos {
