Repository: mesos Updated Branches: refs/heads/master 788c7dcec -> 267d719c7
Fixed a bug around executor not able to use reserved resources. We were not unallocating the resources before checking if the executor resources were contained in the checkpointed resources on the agent. Review: https://reviews.apache.org/r/56778/ Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/267d719c Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/267d719c Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/267d719c Branch: refs/heads/master Commit: 267d719c7a8308e1a1b98c73f5091dbb7708c444 Parents: 788c7dc Author: Anand Mazumdar <[email protected]> Authored: Fri Feb 17 12:15:54 2017 -0800 Committer: Anand Mazumdar <[email protected]> Committed: Fri Feb 17 12:25:30 2017 -0800 ---------------------------------------------------------------------- src/slave/slave.cpp | 17 +++-- src/tests/default_executor_tests.cpp | 116 ++++++++++++++++++++++++++++++ 2 files changed, 126 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/267d719c/src/slave/slave.cpp ---------------------------------------------------------------------- diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp index ebba8e1..7564e8d 100644 --- a/src/slave/slave.cpp +++ b/src/slave/slave.cpp @@ -1904,6 +1904,12 @@ void Slave::_run( return; } + auto unallocated = [](const Resources& resources) { + Resources result = resources; + result.unallocate(); + return result; + }; + // NOTE: If the task/task group or executor uses resources that are // checkpointed on the slave (e.g. persistent volumes), we should // already know about it. If the slave doesn't know about them (e.g. @@ -1913,12 +1919,6 @@ void Slave::_run( // out of order. bool kill = false; foreach (const TaskInfo& _task, tasks) { - auto unallocated = [](const Resources& resources) { - Resources result = resources; - result.unallocate(); - return result; - }; - // We must unallocate the resources to check whether they are // contained in the unallocated total checkpointed resources. Resources checkpointedTaskResources = @@ -1971,8 +1971,11 @@ void Slave::_run( } CHECK_EQ(kill, false); + + // Refer to the comment above when looping across tasks on + // why we need to unallocate resources. Resources checkpointedExecutorResources = - Resources(executorInfo.resources()).filter(needCheckpointing); + unallocated(executorInfo.resources()).filter(needCheckpointing); foreach (const Resource& resource, checkpointedExecutorResources) { if (!checkpointedResources.contains(resource)) { http://git-wip-us.apache.org/repos/asf/mesos/blob/267d719c/src/tests/default_executor_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/default_executor_tests.cpp b/src/tests/default_executor_tests.cpp index ffb69e9..eaf6394 100644 --- a/src/tests/default_executor_tests.cpp +++ b/src/tests/default_executor_tests.cpp @@ -1305,6 +1305,122 @@ TEST_P(DefaultExecutorTest, CommitSuicideOnKillTask) ASSERT_EQ(0, executorFailure->status()); } + +// This test verifies that the default executor can be +// launched using reserved resources. +TEST_P(DefaultExecutorTest, ReservedResources) +{ + FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO; + frameworkInfo.set_role("role"); + + Try<Owned<cluster::Master>> master = StartMaster(); + ASSERT_SOME(master); + + auto scheduler = std::make_shared<v1::MockHTTPScheduler>(); + + Resources unreserved = + Resources::parse("cpus:0.1;mem:32;disk:32").get(); + + Resources reserved = unreserved.flatten( + frameworkInfo.role(), + createReservationInfo(frameworkInfo.principal())).get(); + + ExecutorInfo executorInfo; + executorInfo.set_type(ExecutorInfo::DEFAULT); + + executorInfo.mutable_executor_id()->CopyFrom(DEFAULT_EXECUTOR_ID); + executorInfo.mutable_resources()->CopyFrom(reserved); + + // Disable AuthN on the agent. + slave::Flags flags = CreateSlaveFlags(); + flags.authenticate_http_readwrite = false; + + Owned<MasterDetector> detector = master.get()->createDetector(); + Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), flags); + ASSERT_SOME(slave); + + Future<Nothing> connected; + EXPECT_CALL(*scheduler, connected(_)) + .WillOnce(FutureSatisfy(&connected)); + + v1::scheduler::TestMesos mesos( + master.get()->pid, + ContentType::PROTOBUF, + scheduler); + + AWAIT_READY(connected); + + Future<v1::scheduler::Event::Subscribed> subscribed; + EXPECT_CALL(*scheduler, subscribed(_, _)) + .WillOnce(FutureArg<1>(&subscribed)); + + Future<v1::scheduler::Event::Offers> offers; + EXPECT_CALL(*scheduler, offers(_, _)) + .WillOnce(FutureArg<1>(&offers)); + + EXPECT_CALL(*scheduler, heartbeat(_)) + .WillRepeatedly(Return()); // Ignore heartbeats. + + { + Call call; + call.set_type(Call::SUBSCRIBE); + Call::Subscribe* subscribe = call.mutable_subscribe(); + subscribe->mutable_framework_info()->CopyFrom(evolve(frameworkInfo)); + + mesos.send(call); + } + + AWAIT_READY(subscribed); + + v1::FrameworkID frameworkId(subscribed->framework_id()); + + // Update `executorInfo` with the subscribed `frameworkId`. + executorInfo.mutable_framework_id()->CopyFrom(devolve(frameworkId)); + + AWAIT_READY(offers); + EXPECT_NE(0, offers->offers().size()); + + Future<v1::scheduler::Event::Update> runningUpdate; + EXPECT_CALL(*scheduler, update(_, _)) + .WillOnce(FutureArg<1>(&runningUpdate)); + + const v1::Offer& offer = offers->offers(0); + const SlaveID slaveId = devolve(offer.agent_id()); + + // Launch the task using unreserved resources. + v1::TaskInfo taskInfo = + evolve(createTask(slaveId, unreserved, SLEEP_COMMAND(1000))); + + v1::TaskGroupInfo taskGroup; + taskGroup.add_tasks()->CopyFrom(taskInfo); + + { + Call call; + call.mutable_framework_id()->CopyFrom(frameworkId); + call.set_type(Call::ACCEPT); + + Call::Accept* accept = call.mutable_accept(); + accept->add_offer_ids()->CopyFrom(offer.id()); + + accept->add_operations()->CopyFrom(v1::RESERVE(evolve(reserved))); + + v1::Offer::Operation* operation = accept->add_operations(); + operation->set_type(v1::Offer::Operation::LAUNCH_GROUP); + + v1::Offer::Operation::LaunchGroup* launchGroup = + operation->mutable_launch_group(); + + launchGroup->mutable_executor()->CopyFrom(evolve(executorInfo)); + launchGroup->mutable_task_group()->CopyFrom(taskGroup); + + mesos.send(call); + } + + AWAIT_READY(runningUpdate); + ASSERT_EQ(TASK_RUNNING, runningUpdate->status().state()); + ASSERT_EQ(taskInfo.task_id(), runningUpdate->status().task_id()); +} + } // namespace tests { } // namespace internal { } // namespace mesos {
