Repository: mesos Updated Branches: refs/heads/master 0e41ba0cf -> 122fc2e1f
Moved framework id validation from scheduler driver to master. Review: https://reviews.apache.org/r/27102 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/122fc2e1 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/122fc2e1 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/122fc2e1 Branch: refs/heads/master Commit: 122fc2e1f0ccadf9e81f49e45333e8815a059fa6 Parents: 0e41ba0 Author: Vinod Kone <[email protected]> Authored: Thu Oct 23 11:33:27 2014 -0700 Committer: Vinod Kone <[email protected]> Committed: Fri Oct 24 13:40:22 2014 -0700 ---------------------------------------------------------------------- src/master/master.cpp | 8 ++- src/sched/sched.cpp | 63 +++++----------------- src/tests/resource_offers_tests.cpp | 91 ++++++++++++++++++++++++++++++++ 3 files changed, 110 insertions(+), 52 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/122fc2e1/src/master/master.cpp ---------------------------------------------------------------------- diff --git a/src/master/master.cpp b/src/master/master.cpp index 95589b8..9ebdc35 100644 --- a/src/master/master.cpp +++ b/src/master/master.cpp @@ -1884,7 +1884,7 @@ struct SlaveIDChecker : TaskInfoVisitor { if (!(task.slave_id() == slave.id)) { return "Task uses invalid slave " + task.slave_id().value() + - " while slave " + slave.id.value() + " is expected"; + " while slave " + slave.id.value() + " is expected"; } return None(); @@ -2022,6 +2022,12 @@ struct ExecutorInfoChecker : TaskInfoVisitor "Task has invalid ExecutorInfo: missing field 'framework_id'"); } + if (!(task.executor().framework_id() == framework.id)) { + return string("ExecutorInfo has an invalid FrameworkID") + + " (Actual: " + stringify(task.executor().framework_id()) + + " vs Expected: " + stringify(framework.id) + ")"; + } + const ExecutorID& executorId = task.executor().executor_id(); Option<ExecutorInfo> executorInfo = None(); http://git-wip-us.apache.org/repos/asf/mesos/blob/122fc2e1/src/sched/sched.cpp ---------------------------------------------------------------------- diff --git a/src/sched/sched.cpp b/src/sched/sched.cpp index e89e5e5..0fb8c7b 100644 --- a/src/sched/sched.cpp +++ b/src/sched/sched.cpp @@ -822,12 +822,14 @@ protected: VLOG(1) << "Ignoring launch tasks message as master is disconnected"; // NOTE: Reply to the framework with TASK_LOST messages for each // task. This is a hack for now, to not let the scheduler - // believe the tasks are forever in PENDING state, when actually - // the master never received the launchTask message. Also, - // realize that this hack doesn't capture the case when the - // scheduler process sends it but the master never receives it - // (message lost, master failover etc). In the future, this - // should be solved by the replicated log and timeouts. + // believe the tasks are launched, when actually the master + // never received the launchTasks message. Also, realize that + // this hack doesn't capture the case when the scheduler process + // sends it but the master never receives it (message lost, + // master failover etc). The correct way for schedulers to deal + // with this situation is to use 'reconcileTasks()'. + // TODO(vinod): Kill this optimization in 0.22.0, to give + // frameworks time to implement reconciliation. foreach (const TaskInfo& task, tasks) { StatusUpdate update; update.mutable_framework_id()->MergeFrom(framework.id()); @@ -843,55 +845,14 @@ protected: return; } + // Set TaskInfo.executor.framework_id, if it's missing. vector<TaskInfo> result; - - foreach (const TaskInfo& task, tasks) { - // Check that each TaskInfo has either an ExecutorInfo or a - // CommandInfo but not both. - if (task.has_executor() == task.has_command()) { - StatusUpdate update; - update.mutable_framework_id()->MergeFrom(framework.id()); - TaskStatus* status = update.mutable_status(); - status->mutable_task_id()->MergeFrom(task.task_id()); - status->set_state(TASK_LOST); - status->set_message( - "TaskInfo must have either an 'executor' or a 'command'"); - update.set_timestamp(Clock::now().secs()); - update.set_uuid(UUID::random().toBytes()); - - statusUpdate(UPID(), update, UPID()); - continue; - } - - // Ensure the ExecutorInfo.framework_id is valid, if present. - if (task.has_executor() && - task.executor().has_framework_id() && - !(task.executor().framework_id() == framework.id())) { - StatusUpdate update; - update.mutable_framework_id()->MergeFrom(framework.id()); - TaskStatus* status = update.mutable_status(); - status->mutable_task_id()->MergeFrom(task.task_id()); - status->set_state(TASK_LOST); - status->set_message( - "ExecutorInfo has an invalid FrameworkID (Actual: " + - stringify(task.executor().framework_id()) + " vs Expected: " + - stringify(framework.id()) + ")"); - update.set_timestamp(Clock::now().secs()); - update.set_uuid(UUID::random().toBytes()); - - statusUpdate(UPID(), update, UPID()); - continue; - } - - TaskInfo copy = task; - - // Set the ExecutorInfo.framework_id if missing. + foreach (TaskInfo task, tasks) { if (task.has_executor() && !task.executor().has_framework_id()) { - copy.mutable_executor()->mutable_framework_id()->CopyFrom( + task.mutable_executor()->mutable_framework_id()->CopyFrom( framework.id()); } - - result.push_back(copy); + result.push_back(task); } LaunchTasksMessage message; http://git-wip-us.apache.org/repos/asf/mesos/blob/122fc2e1/src/tests/resource_offers_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/resource_offers_tests.cpp b/src/tests/resource_offers_tests.cpp index 060039e..fe66432 100644 --- a/src/tests/resource_offers_tests.cpp +++ b/src/tests/resource_offers_tests.cpp @@ -24,6 +24,7 @@ #include <mesos/scheduler.hpp> #include <stout/strings.hpp> +#include <stout/uuid.hpp> #include "master/hierarchical_allocator_process.hpp" #include "master/master.hpp" @@ -99,6 +100,96 @@ TEST_F(ResourceOffersTest, ResourceOfferWithMultipleSlaves) } +TEST_F(ResourceOffersTest, TaskUsesInvalidFrameworkID) +{ + Try<PID<Master> > master = StartMaster(); + ASSERT_SOME(master); + + Try<PID<Slave> > slave = StartSlave(); + ASSERT_SOME(slave); + + MockScheduler sched; + MesosSchedulerDriver driver( + &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL); + + EXPECT_CALL(sched, registered(&driver, _, _)); + + // Create an executor with a random framework id. + ExecutorInfo executor; + executor = DEFAULT_EXECUTOR_INFO; + executor.mutable_framework_id()->set_value(UUID::random().toString()); + + EXPECT_CALL(sched, resourceOffers(&driver, _)) + .WillOnce(LaunchTasks(executor, 1, 1, 16, "*")) + .WillRepeatedly(Return()); // Ignore subsequent offers. + + Future<TaskStatus> status; + EXPECT_CALL(sched, statusUpdate(&driver, _)) + .WillOnce(FutureArg<1>(&status)); + + driver.start(); + + AWAIT_READY(status); + EXPECT_EQ(TASK_LOST, status.get().state()); + EXPECT_TRUE(strings::startsWith( + status.get().message(), "ExecutorInfo has an invalid FrameworkID")); + + driver.stop(); + driver.join(); + + Shutdown(); +} + + +TEST_F(ResourceOffersTest, TaskUsesCommandInfoAndExecutorInfo) +{ + Try<PID<Master> > master = StartMaster(); + ASSERT_SOME(master); + + Try<PID<Slave> > slave = StartSlave(); + ASSERT_SOME(slave); + + MockScheduler sched; + MesosSchedulerDriver driver( + &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL); + + EXPECT_CALL(sched, registered(&driver, _, _)); + + Future<vector<Offer> > offers; + EXPECT_CALL(sched, resourceOffers(&driver, _)) + .WillOnce(FutureArg<1>(&offers)) + .WillRepeatedly(Return()); // Ignore subsequent offers. + + driver.start(); + + AWAIT_READY(offers); + EXPECT_NE(0u, offers.get().size()); + + Future<TaskStatus> status; + EXPECT_CALL(sched, statusUpdate(&driver, _)) + .WillOnce(FutureArg<1>(&status)); + + // Create a task that uses both command info and task info. + TaskInfo task = createTask(offers.get()[0], ""); // Command task. + task.mutable_executor()->MergeFrom(DEFAULT_EXECUTOR_INFO); // Executor task. + + vector<TaskInfo> tasks; + tasks.push_back(task); + + driver.launchTasks(offers.get()[0].id(), tasks); + + AWAIT_READY(status); + EXPECT_EQ(TASK_LOST, status.get().state()); + EXPECT_TRUE(strings::contains( + status.get().message(), "CommandInfo or ExecutorInfo present")); + + driver.stop(); + driver.join(); + + Shutdown(); +} + + TEST_F(ResourceOffersTest, TaskUsesNoResources) { Try<PID<Master> > master = StartMaster();
