Maintenance Primitives: Added test for inverse offer filters. Checks that filters change which inverse offer is sent when.
Review: https://reviews.apache.org/r/38475 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/31defd41 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/31defd41 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/31defd41 Branch: refs/heads/master Commit: 31defd41bae4670247aa5b92f799234bcad9200b Parents: 9d03297 Author: Joseph Wu <[email protected]> Authored: Sat Sep 19 14:24:44 2015 -0400 Committer: Joris Van Remoortere <[email protected]> Committed: Sun Sep 20 14:21:22 2015 -0400 ---------------------------------------------------------------------- src/tests/master_maintenance_tests.cpp | 325 +++++++++++++++++++++++++++- 1 file changed, 320 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/31defd41/src/tests/master_maintenance_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/master_maintenance_tests.cpp b/src/tests/master_maintenance_tests.cpp index 9892bc3..c5277a1 100644 --- a/src/tests/master_maintenance_tests.cpp +++ b/src/tests/master_maintenance_tests.cpp @@ -51,6 +51,7 @@ #include "slave/flags.hpp" +#include "tests/containerizer.hpp" #include "tests/mesos.hpp" #include "tests/utils.hpp" @@ -83,6 +84,7 @@ using std::vector; using testing::AtMost; using testing::DoAll; +using testing::Not; namespace mesos { namespace internal { @@ -1167,11 +1169,6 @@ TEST_F(MasterMaintenanceTest, InverseOffers) mesos.send(call); } - // TODO(hartem): The filters in this test do not actually - // do anything, because inverse offer filters have not been - // implemented yet. Instead, the accept/decline calls use - // the default time, which results in a slow test. - { // Decline an inverse offer, with a filter. Call call; @@ -1227,6 +1224,324 @@ TEST_F(MasterMaintenanceTest, InverseOffers) Shutdown(); // Must shutdown before 'containerizer' gets deallocated. } + +// Test ensures that inverse offers support filters. +TEST_F(MasterMaintenanceTest, InverseOffersFilters) +{ + // Set up a master. + // NOTE: We don't use `StartMaster()` because we need to access these flags. + master::Flags flags = CreateMasterFlags(); + + Try<PID<Master>> master = StartMaster(flags); + ASSERT_SOME(master); + + ExecutorInfo executor1 = CREATE_EXECUTOR_INFO("executor-1", "exit 1"); + ExecutorInfo executor2 = CREATE_EXECUTOR_INFO("executor-2", "exit 2"); + + MockExecutor exec1(executor1.executor_id()); + MockExecutor exec2(executor2.executor_id()); + + hashmap<ExecutorID, Executor*> execs; + execs[executor1.executor_id()] = &exec1; + execs[executor2.executor_id()] = &exec2; + + TestContainerizer containerizer(execs); + + EXPECT_CALL(exec1, registered(_, _, _, _)) + .Times(1); + + EXPECT_CALL(exec1, launchTask(_, _)) + .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING)); + + EXPECT_CALL(exec2, registered(_, _, _, _)) + .Times(1); + + EXPECT_CALL(exec2, launchTask(_, _)) + .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING)); + + // Capture the registration message for the first slave. + Future<SlaveRegisteredMessage> slave1RegisteredMessage = + FUTURE_PROTOBUF(SlaveRegisteredMessage(), master.get(), _); + + // We need two agents for this test. + Try<PID<Slave>> slave1 = StartSlave(&containerizer); + ASSERT_SOME(slave1); + + // We need to make sure the first slave registers before we schedule the + // machine it is running on for maintenance. + AWAIT_READY(slave1RegisteredMessage); + + // Capture the registration message for the second slave. + Future<SlaveRegisteredMessage> slave2RegisteredMessage = + FUTURE_PROTOBUF(SlaveRegisteredMessage(), master.get(), Not(slave1.get())); + + slave::Flags slaveFlags2 = MesosTest::CreateSlaveFlags(); + slaveFlags2.hostname = maintenanceHostname + "-2"; + + Try<PID<Slave>> slave2 = StartSlave(&containerizer, slaveFlags2); + ASSERT_SOME(slave2); + + // We need to make sure the second slave registers before we schedule the + // machine it is running on for maintenance. + AWAIT_READY(slave2RegisteredMessage); + + // Before starting any frameworks, put the first machine into `DRAINING` mode. + MachineID machine1; + machine1.set_hostname(maintenanceHostname); + machine1.set_ip(stringify(slave1.get().address.ip)); + + MachineID machine2; + machine2.set_hostname(slaveFlags2.hostname.get()); + machine2.set_ip(stringify(slave2.get().address.ip)); + + const Time start = Clock::now() + Seconds(60); + const Duration duration = Seconds(120); + const Unavailability unavailability = createUnavailability(start, duration); + + maintenance::Schedule schedule = createSchedule( + {createWindow({machine1, machine2}, unavailability)}); + + Future<Response> response = process::http::post( + master.get(), + "maintenance/schedule", + headers, + stringify(JSON::Protobuf(schedule))); + + AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response); + + // Pause the clock before starting a framework. + // This ensures deterministic offer-ing behavior during the test. + Clock::pause(); + + // Now start a framework. + Callbacks callbacks; + + Future<Nothing> connected; + EXPECT_CALL(callbacks, connected()) + .WillOnce(FutureSatisfy(&connected)); + + Mesos mesos( + master.get(), + lambda::bind(&Callbacks::connected, lambda::ref(callbacks)), + lambda::bind(&Callbacks::disconnected, lambda::ref(callbacks)), + lambda::bind(&Callbacks::received, lambda::ref(callbacks), lambda::_1)); + + AWAIT_READY(connected); + + Queue<Event> events; + + EXPECT_CALL(callbacks, received(_)) + .WillRepeatedly(Enqueue(&events)); + + { + Call call; + call.set_type(Call::SUBSCRIBE); + + Call::Subscribe* subscribe = call.mutable_subscribe(); + subscribe->mutable_framework_info()->CopyFrom(DEFAULT_V1_FRAMEWORK_INFO); + + mesos.send(call); + } + + Future<Event> event = events.get(); + AWAIT_READY(event); + EXPECT_EQ(Event::SUBSCRIBED, event.get().type()); + + v1::FrameworkID id(event.get().subscribed().framework_id()); + + // Trigger a batch allocation. + Clock::advance(flags.allocation_interval); + Clock::settle(); + + event = events.get(); + AWAIT_READY(event); + EXPECT_EQ(Event::OFFERS, event.get().type()); + EXPECT_EQ(2, event.get().offers().offers().size()); + EXPECT_EQ(0, event.get().offers().inverse_offers().size()); + + // All the offers should have unavailability. + foreach (const v1::Offer& offer, event.get().offers().offers()) { + EXPECT_TRUE(offer.has_unavailability()); + } + + // Save both offers. + v1::Offer offer1 = event.get().offers().offers(0); + v1::Offer offer2 = event.get().offers().offers(1); + + // Spawn dummy tasks using both offers. + v1::TaskInfo taskInfo1 = + evolve(createTask(devolve(offer1), "exit 1", executor1.executor_id())); + + v1::TaskInfo taskInfo2 = + evolve(createTask(devolve(offer2), "exit 2", executor2.executor_id())); + + sleep(2); + + { + // Accept the first offer. + Call call; + call.mutable_framework_id()->CopyFrom(id); + call.set_type(Call::ACCEPT); + + Call::Accept* accept = call.mutable_accept(); + accept->add_offer_ids()->CopyFrom(offer1.id()); + + v1::Offer::Operation* operation = accept->add_operations(); + operation->set_type(v1::Offer::Operation::LAUNCH); + operation->mutable_launch()->add_task_infos()->CopyFrom(taskInfo1); + + mesos.send(call); + } + + { + // Accept the second offer. + Call call; + call.mutable_framework_id()->CopyFrom(id); + call.set_type(Call::ACCEPT); + + Call::Accept* accept = call.mutable_accept(); + accept->add_offer_ids()->CopyFrom(offer2.id()); + + v1::Offer::Operation* operation = accept->add_operations(); + operation->set_type(v1::Offer::Operation::LAUNCH); + operation->mutable_launch()->add_task_infos()->CopyFrom(taskInfo2); + + mesos.send(call); + } + + // The order of events is deterministic from here on. + Clock::resume(); + + // Expect two inverse offers. + event = events.get(); + AWAIT_READY(event); + EXPECT_EQ(Event::OFFERS, event.get().type()); + EXPECT_EQ(0, event.get().offers().offers().size()); + EXPECT_EQ(2, event.get().offers().inverse_offers().size()); + + // Save these inverse offers. + v1::InverseOffer inverseOffer1 = event.get().offers().inverse_offers(0); + v1::InverseOffer inverseOffer2 = event.get().offers().inverse_offers(1); + + event = events.get(); + AWAIT_READY(event); + EXPECT_EQ(Event::UPDATE, event.get().type()); + EXPECT_EQ(v1::TASK_RUNNING, event.get().update().status().state()); + + { + // Acknowledge TASK_RUNNING update for one task. + Call call; + call.mutable_framework_id()->CopyFrom(id); + call.set_type(Call::ACKNOWLEDGE); + + Call::Acknowledge* acknowledge = call.mutable_acknowledge(); + acknowledge->mutable_task_id()->CopyFrom(taskInfo1.task_id()); + acknowledge->mutable_agent_id()->CopyFrom(offer1.agent_id()); + acknowledge->set_uuid(event.get().update().status().uuid()); + + mesos.send(call); + } + + event = events.get(); + AWAIT_READY(event); + EXPECT_EQ(Event::UPDATE, event.get().type()); + EXPECT_EQ(v1::TASK_RUNNING, event.get().update().status().state()); + + { + // Acknowledge TASK_RUNNING update for the other task. + Call call; + call.mutable_framework_id()->CopyFrom(id); + call.set_type(Call::ACKNOWLEDGE); + + Call::Acknowledge* acknowledge = call.mutable_acknowledge(); + acknowledge->mutable_task_id()->CopyFrom(taskInfo2.task_id()); + acknowledge->mutable_agent_id()->CopyFrom(offer2.agent_id()); + acknowledge->set_uuid(event.get().update().status().uuid()); + + mesos.send(call); + } + + { + // Decline the second inverse offer, with a filter set such that we + // should not see this inverse offer in the next allocation. + Call call; + call.mutable_framework_id()->CopyFrom(id); + call.set_type(Call::DECLINE); + + Call::Decline* decline = call.mutable_decline(); + decline->add_offer_ids()->CopyFrom(inverseOffer2.id()); + + v1::Filters filters; + filters.set_refuse_seconds(flags.allocation_interval.secs() + 1); + decline->mutable_filters()->CopyFrom(filters); + + mesos.send(call); + } + + { + // Accept the first inverse offer, with a filter set such that we + // should immediately see this inverse offer again. + Call call; + call.mutable_framework_id()->CopyFrom(id); + call.set_type(Call::ACCEPT); + + Call::Accept* accept = call.mutable_accept(); + accept->add_offer_ids()->CopyFrom(inverseOffer1.id()); + + v1::Filters filters; + filters.set_refuse_seconds(0); + accept->mutable_filters()->CopyFrom(filters); + + mesos.send(call); + } + + // Expect one inverse offer. + event = events.get(); + AWAIT_READY(event); + EXPECT_EQ(Event::OFFERS, event.get().type()); + EXPECT_EQ(0, event.get().offers().offers().size()); + EXPECT_EQ(1, event.get().offers().inverse_offers().size()); + EXPECT_EQ( + inverseOffer1.agent_id(), + event.get().offers().inverse_offers(0).agent_id()); + + inverseOffer1 = event.get().offers().inverse_offers(0); + + { + // Do another immediate filter, but decline it this time. + Call call; + call.mutable_framework_id()->CopyFrom(id); + call.set_type(Call::DECLINE); + + Call::Decline* decline = call.mutable_decline(); + decline->add_offer_ids()->CopyFrom(inverseOffer1.id()); + + v1::Filters filters; + filters.set_refuse_seconds(0); + decline->mutable_filters()->CopyFrom(filters); + + mesos.send(call); + } + + // Expect the same inverse offer. + event = events.get(); + AWAIT_READY(event); + EXPECT_EQ(Event::OFFERS, event.get().type()); + EXPECT_EQ(0, event.get().offers().offers().size()); + EXPECT_EQ(1, event.get().offers().inverse_offers().size()); + EXPECT_EQ( + inverseOffer1.agent_id(), + event.get().offers().inverse_offers(0).agent_id()); + + EXPECT_CALL(exec1, shutdown(_)) + .Times(AtMost(1)); + + EXPECT_CALL(exec2, shutdown(_)) + .Times(AtMost(1)); + + Shutdown(); // Must shutdown before 'containerizer' gets deallocated. +} + } // namespace tests { } // namespace internal { } // namespace mesos {
