Repository: mesos Updated Branches: refs/heads/master c78496fd5 -> d142d38e3
Passed versions when launching tasks. In this patch we inject resource versions into task launch messages and add verification in the agent. We require that resource versions of resource providers whose resources are used in a task have not changed. With that we can make sure to e.g., not use resources created in speculated operations. Review: https://reviews.apache.org/r/64299 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/0bbdbce1 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/0bbdbce1 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/0bbdbce1 Branch: refs/heads/master Commit: 0bbdbce1daab55dbe8d02aaa858eb2e95a3071de Parents: ac97d76 Author: Benjamin Bannier <[email protected]> Authored: Thu Nov 30 14:45:55 2017 +0100 Committer: Benjamin Bannier <[email protected]> Committed: Fri Dec 8 11:07:44 2017 +0100 ---------------------------------------------------------------------- src/master/master.cpp | 6 ++ src/slave/slave.cpp | 98 ++++++++++++++++++-- src/slave/slave.hpp | 6 +- src/tests/mock_slave.cpp | 12 ++- src/tests/mock_slave.hpp | 8 +- src/tests/slave_tests.cpp | 204 ++++++++++++++++++++++++++++++++++++++--- 6 files changed, 307 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/0bbdbce1/src/master/master.cpp ---------------------------------------------------------------------- diff --git a/src/master/master.cpp b/src/master/master.cpp index 584398c..5cba506 100644 --- a/src/master/master.cpp +++ b/src/master/master.cpp @@ -4990,6 +4990,9 @@ void Master::_accept( RunTaskMessage message; message.mutable_framework()->MergeFrom(framework->info); + message.mutable_resource_version_uuids()->CopyFrom( + protobuf::createResourceVersions(slave->resourceVersions)); + // TODO(anand): We set 'pid' to UPID() for http frameworks // as 'pid' was made optional in 0.24.0. In 0.25.0, we // no longer have to set pid here for http frameworks. @@ -5179,6 +5182,9 @@ void Master::_accept( message.mutable_executor()->CopyFrom(executor); message.mutable_task_group()->CopyFrom(taskGroup); + message.mutable_resource_version_uuids()->CopyFrom( + protobuf::createResourceVersions(slave->resourceVersions)); + set<TaskID> taskIds; Resources totalResources; http://git-wip-us.apache.org/repos/asf/mesos/blob/0bbdbce1/src/slave/slave.cpp ---------------------------------------------------------------------- diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp index 8bdb945..7a4a4ac 100644 --- a/src/slave/slave.cpp +++ b/src/slave/slave.cpp @@ -2011,7 +2011,8 @@ void Slave::run( frameworkInfo, executorInfo, task, - taskGroup)); + taskGroup, + resourceVersionUuids)); } @@ -2020,7 +2021,8 @@ void Slave::_run( const FrameworkInfo& frameworkInfo, const ExecutorInfo& executorInfo, const Option<TaskInfo>& task, - const Option<TaskGroupInfo>& taskGroup) + const Option<TaskGroupInfo>& taskGroup, + const std::vector<ResourceVersionUUID>& resourceVersionUuids) { // TODO(anindya_sinha): Consider refactoring the initial steps common // to `_run()` and `__run()`. @@ -2152,7 +2154,8 @@ void Slave::_run( frameworkInfo, executorInfo, task, - taskGroup)); + taskGroup, + resourceVersionUuids)); } @@ -2161,7 +2164,8 @@ void Slave::__run( const FrameworkInfo& frameworkInfo, const ExecutorInfo& executorInfo, const Option<TaskInfo>& task, - const Option<TaskGroupInfo>& taskGroup) + const Option<TaskGroupInfo>& taskGroup, + const vector<ResourceVersionUUID>& resourceVersionUuids) { CHECK_NE(task.isSome(), taskGroup.isSome()) << "Either task or task group should be set but not both"; @@ -2306,8 +2310,84 @@ void Slave::__run( return; } - LOG(INFO) << "Launching " << taskOrTaskGroup(task, taskGroup) - << " for framework " << frameworkId; + // Check task invariants. + // + // TODO(bbannier): Instead of copy-pasting identical code to deal + // with cases where tasks need to be terminated, consolidate code + // below to decouple checking from terminating. + + // If the master sent resource versions, perform a best-effort check + // that they are consistent with the resources the task uses. + // + // TODO(bbannier): Also check executor resources. + bool kill = false; + if (!resourceVersionUuids.empty()) { + hashset<Option<ResourceProviderID>> usedResourceProviders; + foreach (const TaskInfo& _task, tasks) { + foreach (const Resource& resource, _task.resources()) { + if (resource.has_provider_id()) { + usedResourceProviders.insert(resource.provider_id()); + } else { + usedResourceProviders.insert(None()); + } + } + } + + const hashmap<Option<ResourceProviderID>, UUID> receivedResourceVersions = + protobuf::parseResourceVersions( + {resourceVersionUuids.begin(), resourceVersionUuids.end()}); + + foreach (auto&& resourceProvider, usedResourceProviders) { + Option<Error> error = None(); + + if (!resourceVersions.contains(resourceProvider)) { + // We do not expect the agent to forget about itself. + CHECK_SOME(resourceProvider); + kill = true; + } + + CHECK(receivedResourceVersions.contains(resourceProvider)); + + if (resourceVersions.at(resourceProvider) != + receivedResourceVersions.at(resourceProvider)) { + kill = true; + } + } + } + + if (kill) { + // We report TASK_DROPPED to the framework because the task was + // never launched. For non-partition-aware frameworks, we report + // TASK_LOST for backward compatibility. + mesos::TaskState taskState = TASK_DROPPED; + if (!protobuf::frameworkHasCapability( + frameworkInfo, FrameworkInfo::Capability::PARTITION_AWARE)) { + taskState = TASK_LOST; + } + + foreach (const TaskInfo& _task, tasks) { + const StatusUpdate update = protobuf::createStatusUpdate( + frameworkId, + info.id(), + _task.task_id(), + taskState, + TaskStatus::SOURCE_SLAVE, + UUID::random(), + "Tasks assumes outdated resource state", + TaskStatus::REASON_INVALID_OFFERS, + executorId); + + statusUpdate(update, UPID()); + } + + // Refer to the comment after 'framework->removePendingTask' above + // for why we need this. + if (framework->idle()) { + removeFramework(framework); + } + + return; + } auto unallocated = [](const Resources& resources) { Resources result = resources; @@ -2315,6 +2395,8 @@ void Slave::__run( return result; }; + CHECK_EQ(kill, false); + // NOTE: If the task/task group or executor uses resources that are // checkpointed on the slave (e.g. persistent volumes), we should // already know about it. If the slave doesn't know about them (e.g. @@ -2322,7 +2404,6 @@ void Slave::__run( // send TASK_DROPPED status updates here since restarting the task // may succeed in the event that CheckpointResourcesMessage arrives // out of order. - bool kill = false; foreach (const TaskInfo& _task, tasks) { // We must unallocate the resources to check whether they are // contained in the unallocated total checkpointed resources. @@ -2451,6 +2532,9 @@ void Slave::__run( CHECK(framework->state == Framework::RUNNING) << framework->state; + LOG(INFO) << "Launching " << taskOrTaskGroup(task, taskGroup) + << " for framework " << frameworkId; + // Either send the task/task group to an executor or start a new executor // and queue it until the executor has started. Executor* executor = framework->getExecutor(executorId); http://git-wip-us.apache.org/repos/asf/mesos/blob/0bbdbce1/src/slave/slave.hpp ---------------------------------------------------------------------- diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp index a47f93e..297d672 100644 --- a/src/slave/slave.hpp +++ b/src/slave/slave.hpp @@ -165,7 +165,8 @@ public: const FrameworkInfo& frameworkInfo, const ExecutorInfo& executorInfo, const Option<TaskInfo>& task, - const Option<TaskGroupInfo>& taskGroup); + const Option<TaskGroupInfo>& taskGroup, + const std::vector<ResourceVersionUUID>& resourceVersionUuids); // Made 'virtual' for Slave mocking. virtual void runTaskGroup( @@ -373,7 +374,8 @@ public: const FrameworkInfo& frameworkInfo, const ExecutorInfo& executorInfo, const Option<TaskInfo>& task, - const Option<TaskGroupInfo>& taskGroup); + const Option<TaskGroupInfo>& taskGroup, + const std::vector<ResourceVersionUUID>& resourceVersionUuids); // This is called when the resource limits of the container have // been updated for the given tasks and task groups. If the update is http://git-wip-us.apache.org/repos/asf/mesos/blob/0bbdbce1/src/tests/mock_slave.cpp ---------------------------------------------------------------------- diff --git a/src/tests/mock_slave.cpp b/src/tests/mock_slave.cpp index a43a12f..8357edc 100644 --- a/src/tests/mock_slave.cpp +++ b/src/tests/mock_slave.cpp @@ -120,7 +120,7 @@ MockSlave::MockSlave( // Set up default behaviors, calling the original methods. EXPECT_CALL(*this, runTask(_, _, _, _, _, _)) .WillRepeatedly(Invoke(this, &MockSlave::unmocked_runTask)); - EXPECT_CALL(*this, _run(_, _, _, _, _)) + EXPECT_CALL(*this, _run(_, _, _, _, _, _)) .WillRepeatedly(Invoke(this, &MockSlave::unmocked__run)); EXPECT_CALL(*this, runTaskGroup(_, _, _, _, _)) .WillRepeatedly(Invoke(this, &MockSlave::unmocked_runTaskGroup)); @@ -164,10 +164,16 @@ void MockSlave::unmocked__run( const FrameworkInfo& frameworkInfo, const ExecutorInfo& executorInfo, const Option<TaskInfo>& taskInfo, - const Option<TaskGroupInfo>& taskGroup) + const Option<TaskGroupInfo>& taskGroup, + const std::vector<ResourceVersionUUID>& resourceVersionUuids) { slave::Slave::_run( - unschedules, frameworkInfo, executorInfo, taskInfo, taskGroup); + unschedules, + frameworkInfo, + executorInfo, + taskInfo, + taskGroup, + resourceVersionUuids); } http://git-wip-us.apache.org/repos/asf/mesos/blob/0bbdbce1/src/tests/mock_slave.hpp ---------------------------------------------------------------------- diff --git a/src/tests/mock_slave.hpp b/src/tests/mock_slave.hpp index cf5a581..29ce714 100644 --- a/src/tests/mock_slave.hpp +++ b/src/tests/mock_slave.hpp @@ -117,19 +117,21 @@ public: const TaskInfo& task, const std::vector<ResourceVersionUUID>& resourceVersionUuids); - MOCK_METHOD5(_run, void( + MOCK_METHOD6(_run, void( const process::Future<std::list<bool>>& unschedules, const FrameworkInfo& frameworkInfo, const ExecutorInfo& executorInfo, const Option<TaskInfo>& task, - const Option<TaskGroupInfo>& taskGroup)); + const Option<TaskGroupInfo>& taskGroup, + const std::vector<ResourceVersionUUID>& resourceVersionUuids)); void unmocked__run( const process::Future<std::list<bool>>& unschedules, const FrameworkInfo& frameworkInfo, const ExecutorInfo& executorInfo, const Option<TaskInfo>& task, - const Option<TaskGroupInfo>& taskGroup); + const Option<TaskGroupInfo>& taskGroup, + const std::vector<ResourceVersionUUID>& resourceVersionUuids); MOCK_METHOD5(runTaskGroup, void( const process::UPID& from, http://git-wip-us.apache.org/repos/asf/mesos/blob/0bbdbce1/src/tests/slave_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp index 29ab216..0714543 100644 --- a/src/tests/slave_tests.cpp +++ b/src/tests/slave_tests.cpp @@ -1833,7 +1833,7 @@ TEST_F(SlaveTest, GetStateTaskGroupPending) // unmocked `_run()` method. Instead, we want to do nothing so that tasks // remain in the framework's 'pending' list. Future<Nothing> _run; - EXPECT_CALL(*slave.get()->mock(), _run(_, _, _, _, _)) + EXPECT_CALL(*slave.get()->mock(), _run(_, _, _, _, _, _)) .WillOnce(FutureSatisfy(&_run)); // The executor should not be launched. @@ -4118,17 +4118,19 @@ TEST_F(SlaveTest, KillTaskBetweenRunTaskParts) ExecutorInfo executorInfo; Option<TaskGroupInfo> taskGroup; Option<TaskInfo> task_; + vector<ResourceVersionUUID> resourceVersionUuids; // Skip what Slave::_run() normally does, save its arguments for // later, tie reaching the critical moment when to kill the task to // a future. Future<Nothing> _run; - EXPECT_CALL(*slave.get()->mock(), _run(_, _, _, _, _)) + EXPECT_CALL(*slave.get()->mock(), _run(_, _, _, _, _, _)) .WillOnce(DoAll(FutureSatisfy(&_run), SaveArg<0>(&unschedules), SaveArg<1>(&frameworkInfo), SaveArg<2>(&executorInfo), SaveArg<3>(&task_), - SaveArg<4>(&taskGroup))); + SaveArg<4>(&taskGroup), + SaveArg<5>(&resourceVersionUuids))); driver.launchTasks(offers.get()[0].id(), {task}); @@ -4155,7 +4157,12 @@ TEST_F(SlaveTest, KillTaskBetweenRunTaskParts) AWAIT_READY(removeFramework); slave.get()->mock()->unmocked__run( - unschedules, frameworkInfo, executorInfo, task_, taskGroup); + unschedules, + frameworkInfo, + executorInfo, + task_, + taskGroup, + resourceVersionUuids); AWAIT_READY(status); EXPECT_EQ(TASK_KILLED, status->state()); @@ -4239,21 +4246,24 @@ TEST_F(SlaveTest, KillMultiplePendingTasks) ExecutorInfo executorInfo1, executorInfo2; Option<TaskGroupInfo> taskGroup1, taskGroup2; Option<TaskInfo> task_1, task_2; + vector<ResourceVersionUUID> resourceVersionUuids1, resourceVersionUuids2; Future<Nothing> _run1, _run2; - EXPECT_CALL(*slave.get()->mock(), _run(_, _, _, _, _)) + EXPECT_CALL(*slave.get()->mock(), _run(_, _, _, _, _, _)) .WillOnce(DoAll(FutureSatisfy(&_run1), SaveArg<0>(&unschedules1), SaveArg<1>(&frameworkInfo1), SaveArg<2>(&executorInfo1), SaveArg<3>(&task_1), - SaveArg<4>(&taskGroup1))) + SaveArg<4>(&taskGroup1), + SaveArg<5>(&resourceVersionUuids1))) .WillOnce(DoAll(FutureSatisfy(&_run2), SaveArg<0>(&unschedules2), SaveArg<1>(&frameworkInfo2), SaveArg<2>(&executorInfo2), SaveArg<3>(&task_2), - SaveArg<4>(&taskGroup2))); + SaveArg<4>(&taskGroup2), + SaveArg<5>(&resourceVersionUuids2))); driver.launchTasks(offers.get()[0].id(), {task1, task2}); @@ -4290,10 +4300,20 @@ TEST_F(SlaveTest, KillMultiplePendingTasks) // The `__run` continuations should have no effect. slave.get()->mock()->unmocked__run( - unschedules1, frameworkInfo1, executorInfo1, task_1, taskGroup1); + unschedules1, + frameworkInfo1, + executorInfo1, + task_1, + taskGroup1, + resourceVersionUuids1); slave.get()->mock()->unmocked__run( - unschedules2, frameworkInfo2, executorInfo2, task_2, taskGroup2); + unschedules2, + frameworkInfo2, + executorInfo2, + task_2, + taskGroup2, + resourceVersionUuids2); Clock::settle(); @@ -7176,18 +7196,20 @@ TEST_F(SlaveTest, KillTaskGroupBetweenRunTaskParts) ExecutorInfo executorInfo_; Option<TaskGroupInfo> taskGroup_; Option<TaskInfo> task_; + vector<ResourceVersionUUID> resourceVersionUuids; // Skip what `Slave::_run()` normally does, save its arguments for // later, till reaching the critical moment when to kill the task // in the future. Future<Nothing> _run; - EXPECT_CALL(*slave.get()->mock(), _run(_, _, _, _, _)) + EXPECT_CALL(*slave.get()->mock(), _run(_, _, _, _, _, _)) .WillOnce(DoAll(FutureSatisfy(&_run), SaveArg<0>(&unschedules), SaveArg<1>(&frameworkInfo), SaveArg<2>(&executorInfo_), SaveArg<3>(&task_), - SaveArg<4>(&taskGroup_))); + SaveArg<4>(&taskGroup_), + SaveArg<5>(&resourceVersionUuids))); const v1::Offer& offer = offers->offers(0); const SlaveID slaveId = devolve(offer.agent_id()); @@ -7248,7 +7270,12 @@ TEST_F(SlaveTest, KillTaskGroupBetweenRunTaskParts) AWAIT_READY(removeFramework); slave.get()->mock()->unmocked__run( - unschedules, frameworkInfo, executorInfo_, task_, taskGroup_); + unschedules, + frameworkInfo, + executorInfo_, + task_, + taskGroup_, + resourceVersionUuids); AWAIT_READY(update1); AWAIT_READY(update2); @@ -9320,6 +9347,159 @@ TEST_F(SlaveTest, ResourceProviderReconciliation) v1::Resources(offer1.resources()), v1::Resources(offer2.resources())); } + +// This test verifies that the agent checks resource versions received when +// launching tasks against its own state of the used resource providers and +// rejects tasks assuming incompatible state. +TEST_F(SlaveTest, RunTaskResourceVersions) +{ + Clock::pause(); + + Try<Owned<cluster::Master>> master = StartMaster(); + ASSERT_SOME(master); + + slave::Flags slaveFlags = CreateSlaveFlags(); + slaveFlags.authenticate_http_readwrite = false; + + // Set the resource provider capability and other required capabilities. + constexpr SlaveInfo::Capability::Type capabilities[] = { + SlaveInfo::Capability::MULTI_ROLE, + SlaveInfo::Capability::HIERARCHICAL_ROLE, + SlaveInfo::Capability::RESERVATION_REFINEMENT, + SlaveInfo::Capability::RESOURCE_PROVIDER}; + + slaveFlags.agent_features = SlaveCapabilities(); + foreach (SlaveInfo::Capability::Type type, capabilities) { + SlaveInfo::Capability* capability = + slaveFlags.agent_features->add_capabilities(); + capability->set_type(type); + } + + Future<UpdateSlaveMessage> updateSlaveMessage = + FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _); + + StandaloneMasterDetector detector(master.get()->pid); + Try<Owned<cluster::Slave>> slave = StartSlave(&detector, slaveFlags); + ASSERT_SOME(slave); + + Clock::settle(); + Clock::advance(slaveFlags.registration_backoff_factor); + + AWAIT_READY(updateSlaveMessage); + + // Register a resource provider with the agent. + mesos::v1::ResourceProviderInfo resourceProviderInfo; + resourceProviderInfo.set_type("org.apache.mesos.resource_provider.test"); + resourceProviderInfo.set_name("test"); + + v1::Resources resourceProviderResources = v1::createDiskResource( + "200", + "*", + None(), + None(), + v1::createDiskSourceRaw()); + + v1::MockResourceProvider resourceProvider( + resourceProviderInfo, + resourceProviderResources); + + string scheme = "http"; + +#ifdef USE_SSL_SOCKET + if (process::network::openssl::flags().enabled) { + scheme = "https"; + } +#endif + + process::http::URL url( + scheme, + slave.get()->pid.address.ip, + slave.get()->pid.address.port, + slave.get()->pid.id + "/api/v1/resource_provider"); + + Owned<EndpointDetector> endpointDetector(new ConstantEndpointDetector(url)); + + updateSlaveMessage = FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _); + + resourceProvider.start( + endpointDetector, ContentType::PROTOBUF, v1::DEFAULT_CREDENTIAL); + + AWAIT_READY(updateSlaveMessage); + + // Start a framework to launch a task. + MockScheduler sched; + MesosSchedulerDriver driver( + &sched, + DEFAULT_FRAMEWORK_INFO, + master.get()->pid, + false, + DEFAULT_CREDENTIAL); + + EXPECT_CALL(sched, registered(_, _, _)); + + Future<vector<Offer>> offers; + EXPECT_CALL(sched, resourceOffers(_, _)) + .WillOnce(FutureArg<1>(&offers)); + + driver.start(); + + // Below we update the agent's resource version of the registered + // resource provider. We prevent this update from propagating to the + // master to simulate a race between the agent updating its state + // and the master launching a task. + updateSlaveMessage = DROP_PROTOBUF(UpdateSlaveMessage(), _, _); + + // Update resource version of the resource provider. + { + CHECK(resourceProvider.info.has_id()); + + v1::Resources resourceProviderResources_; + foreach (v1::Resource resource, resourceProviderResources) { + resource.mutable_provider_id()->CopyFrom(resourceProvider.info.id()); + + resourceProviderResources_ += resource; + } + + v1::resource_provider::Call call; + call.set_type(v1::resource_provider::Call::UPDATE_STATE); + call.mutable_resource_provider_id()->CopyFrom(resourceProvider.info.id()); + + v1::resource_provider::Call::UpdateState* updateState = + call.mutable_update_state(); + + updateState->set_resource_version_uuid(UUID::random().toBytes()); + updateState->mutable_resources()->CopyFrom(resourceProviderResources_); + + AWAIT_READY(resourceProvider.send(call)); + } + + AWAIT_READY(updateSlaveMessage); + + // Launch a task on the offered resources. Since the agent will only check + // resource versions from resource providers used in the task launch, we + // explicitly confirm that the offer included resource provider resources. + AWAIT_READY(offers); + ASSERT_FALSE(offers->empty()); + const Resources& offeredResources = offers->front().resources(); + ASSERT_TRUE(std::any_of( + offeredResources.begin(), offeredResources.end(), [](const Resource& r) { + return r.has_provider_id(); + })); + + TaskInfo task = createTask(offers->front(), "sleep 1000"); + + Future<TaskStatus> statusUpdate; + EXPECT_CALL(sched, statusUpdate(&driver, _)) + .WillOnce(FutureArg<1>(&statusUpdate)); + + driver.launchTasks(offers->front().id(), {task}); + + AWAIT_READY(statusUpdate); + EXPECT_EQ(TASK_LOST, statusUpdate->state()); + EXPECT_EQ(TaskStatus::SOURCE_SLAVE, statusUpdate->source()); + EXPECT_EQ(TaskStatus::REASON_INVALID_OFFERS, statusUpdate->reason()); +} + } // namespace tests { } // namespace internal { } // namespace mesos {
