Repository: mesos Updated Branches: refs/heads/1.2.x 5b810aaeb -> 12e99f753
Fixed a bug around executor not able to use reserved resources. We were not unallocating the resources before checking if the executor resources were contained in the checkpointed resources on the agent. Review: https://reviews.apache.org/r/56778/ Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/12e99f75 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/12e99f75 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/12e99f75 Branch: refs/heads/1.2.x Commit: 12e99f753179641b778834e5d57deeaad904499d Parents: 5b810aa Author: Anand Mazumdar <[email protected]> Authored: Fri Feb 17 12:15:54 2017 -0800 Committer: Anand Mazumdar <[email protected]> Committed: Fri Feb 17 12:27:04 2017 -0800 ---------------------------------------------------------------------- src/slave/slave.cpp | 17 +++-- src/tests/default_executor_tests.cpp | 116 ++++++++++++++++++++++++++++++ 2 files changed, 126 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/12e99f75/src/slave/slave.cpp ---------------------------------------------------------------------- diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp index ebba8e1..7564e8d 100644 --- a/src/slave/slave.cpp +++ b/src/slave/slave.cpp @@ -1904,6 +1904,12 @@ void Slave::_run( return; } + auto unallocated = [](const Resources& resources) { + Resources result = resources; + result.unallocate(); + return result; + }; + // NOTE: If the task/task group or executor uses resources that are // checkpointed on the slave (e.g. persistent volumes), we should // already know about it. If the slave doesn't know about them (e.g. @@ -1913,12 +1919,6 @@ void Slave::_run( // out of order. bool kill = false; foreach (const TaskInfo& _task, tasks) { - auto unallocated = [](const Resources& resources) { - Resources result = resources; - result.unallocate(); - return result; - }; - // We must unallocate the resources to check whether they are // contained in the unallocated total checkpointed resources. Resources checkpointedTaskResources = @@ -1971,8 +1971,11 @@ void Slave::_run( } CHECK_EQ(kill, false); + + // Refer to the comment above when looping across tasks on + // why we need to unallocate resources. Resources checkpointedExecutorResources = - Resources(executorInfo.resources()).filter(needCheckpointing); + unallocated(executorInfo.resources()).filter(needCheckpointing); foreach (const Resource& resource, checkpointedExecutorResources) { if (!checkpointedResources.contains(resource)) { http://git-wip-us.apache.org/repos/asf/mesos/blob/12e99f75/src/tests/default_executor_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/default_executor_tests.cpp b/src/tests/default_executor_tests.cpp index ffb69e9..eaf6394 100644 --- a/src/tests/default_executor_tests.cpp +++ b/src/tests/default_executor_tests.cpp @@ -1305,6 +1305,122 @@ TEST_P(DefaultExecutorTest, CommitSuicideOnKillTask) ASSERT_EQ(0, executorFailure->status()); } + +// This test verifies that the default executor can be +// launched using reserved resources. +TEST_P(DefaultExecutorTest, ReservedResources) +{ + FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO; + frameworkInfo.set_role("role"); + + Try<Owned<cluster::Master>> master = StartMaster(); + ASSERT_SOME(master); + + auto scheduler = std::make_shared<v1::MockHTTPScheduler>(); + + Resources unreserved = + Resources::parse("cpus:0.1;mem:32;disk:32").get(); + + Resources reserved = unreserved.flatten( + frameworkInfo.role(), + createReservationInfo(frameworkInfo.principal())).get(); + + ExecutorInfo executorInfo; + executorInfo.set_type(ExecutorInfo::DEFAULT); + + executorInfo.mutable_executor_id()->CopyFrom(DEFAULT_EXECUTOR_ID); + executorInfo.mutable_resources()->CopyFrom(reserved); + + // Disable AuthN on the agent. + slave::Flags flags = CreateSlaveFlags(); + flags.authenticate_http_readwrite = false; + + Owned<MasterDetector> detector = master.get()->createDetector(); + Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), flags); + ASSERT_SOME(slave); + + Future<Nothing> connected; + EXPECT_CALL(*scheduler, connected(_)) + .WillOnce(FutureSatisfy(&connected)); + + v1::scheduler::TestMesos mesos( + master.get()->pid, + ContentType::PROTOBUF, + scheduler); + + AWAIT_READY(connected); + + Future<v1::scheduler::Event::Subscribed> subscribed; + EXPECT_CALL(*scheduler, subscribed(_, _)) + .WillOnce(FutureArg<1>(&subscribed)); + + Future<v1::scheduler::Event::Offers> offers; + EXPECT_CALL(*scheduler, offers(_, _)) + .WillOnce(FutureArg<1>(&offers)); + + EXPECT_CALL(*scheduler, heartbeat(_)) + .WillRepeatedly(Return()); // Ignore heartbeats. + + { + Call call; + call.set_type(Call::SUBSCRIBE); + Call::Subscribe* subscribe = call.mutable_subscribe(); + subscribe->mutable_framework_info()->CopyFrom(evolve(frameworkInfo)); + + mesos.send(call); + } + + AWAIT_READY(subscribed); + + v1::FrameworkID frameworkId(subscribed->framework_id()); + + // Update `executorInfo` with the subscribed `frameworkId`. + executorInfo.mutable_framework_id()->CopyFrom(devolve(frameworkId)); + + AWAIT_READY(offers); + EXPECT_NE(0, offers->offers().size()); + + Future<v1::scheduler::Event::Update> runningUpdate; + EXPECT_CALL(*scheduler, update(_, _)) + .WillOnce(FutureArg<1>(&runningUpdate)); + + const v1::Offer& offer = offers->offers(0); + const SlaveID slaveId = devolve(offer.agent_id()); + + // Launch the task using unreserved resources. + v1::TaskInfo taskInfo = + evolve(createTask(slaveId, unreserved, SLEEP_COMMAND(1000))); + + v1::TaskGroupInfo taskGroup; + taskGroup.add_tasks()->CopyFrom(taskInfo); + + { + Call call; + call.mutable_framework_id()->CopyFrom(frameworkId); + call.set_type(Call::ACCEPT); + + Call::Accept* accept = call.mutable_accept(); + accept->add_offer_ids()->CopyFrom(offer.id()); + + accept->add_operations()->CopyFrom(v1::RESERVE(evolve(reserved))); + + v1::Offer::Operation* operation = accept->add_operations(); + operation->set_type(v1::Offer::Operation::LAUNCH_GROUP); + + v1::Offer::Operation::LaunchGroup* launchGroup = + operation->mutable_launch_group(); + + launchGroup->mutable_executor()->CopyFrom(evolve(executorInfo)); + launchGroup->mutable_task_group()->CopyFrom(taskGroup); + + mesos.send(call); + } + + AWAIT_READY(runningUpdate); + ASSERT_EQ(TASK_RUNNING, runningUpdate->status().state()); + ASSERT_EQ(taskInfo.task_id(), runningUpdate->status().task_id()); +} + } // namespace tests { } // namespace internal { } // namespace mesos {
