Fixed a crash in the master during framework re-registration. Review: https://reviews.apache.org/r/14548
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/8077ad40 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/8077ad40 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/8077ad40 Branch: refs/heads/master Commit: 8077ad40f2e2ec1912425c3cfc2a64e32b86881d Parents: 0eb61a3 Author: Benjamin Mahler <bmah...@twitter.com> Authored: Tue Oct 8 16:47:56 2013 -0700 Committer: Benjamin Mahler <bmah...@twitter.com> Committed: Tue Oct 8 18:36:02 2013 -0700 ---------------------------------------------------------------------- src/master/master.cpp | 22 +++---- src/slave/slave.cpp | 45 ++++++------- src/tests/fault_tolerance_tests.cpp | 109 ++++++++++++++++++++++++++++++- 3 files changed, 137 insertions(+), 39 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/8077ad40/src/master/master.cpp ---------------------------------------------------------------------- diff --git a/src/master/master.cpp b/src/master/master.cpp index cdfae1d..2fd48a6 100644 --- a/src/master/master.cpp +++ b/src/master/master.cpp @@ -794,18 +794,16 @@ void Master::reregisterFramework(const FrameworkInfo& frameworkInfo, foreachvalue (Task* task, slave->tasks[frameworkId]) { if (framework->id == task->framework_id()) { framework->addTask(task); - // Also add the task's executor for resource accounting. - if (task->has_executor_id()) { - if (!framework->hasExecutor(slave->id, task->executor_id())) { - CHECK(slave->hasExecutor(framework->id, task->executor_id())) - << "Unknown executor " << task->executor_id() - << " of framework " << framework->id - << " for the task " << task->task_id(); - - const ExecutorInfo& executorInfo = - slave->executors[framework->id][task->executor_id()]; - framework->addExecutor(slave->id, executorInfo); - } + + // Also add the task's executor for resource accounting + // if it's still alive on the slave and we've not yet + // added it to the framework. + if (task->has_executor_id() && + slave->hasExecutor(framework->id, task->executor_id()) && + !framework->hasExecutor(slave->id, task->executor_id())) { + const ExecutorInfo& executorInfo = + slave->executors[framework->id][task->executor_id()]; + framework->addExecutor(slave->id, executorInfo); } } } http://git-wip-us.apache.org/repos/asf/mesos/blob/8077ad40/src/slave/slave.cpp ---------------------------------------------------------------------- diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp index 1dc2189..debb2f4 100644 --- a/src/slave/slave.cpp +++ b/src/slave/slave.cpp @@ -723,9 +723,28 @@ void Slave::doReliableRegistration() continue; } + // Add launched, terminated, and queued tasks. + foreach (Task* task, executor->launchedTasks.values()) { + message.add_tasks()->CopyFrom(*task); + } + foreach (Task* task, executor->terminatedTasks.values()) { + message.add_tasks()->CopyFrom(*task); + } + foreach (const TaskInfo& task, executor->queuedTasks.values()) { + message.add_tasks()->CopyFrom(protobuf::createTask( + task, TASK_STAGING, executor->id, framework->id)); + } + // Do not re-register with Command Executors because the // master doesn't store them; they are generated by the slave. - if (!executor->commandExecutor) { + if (executor->commandExecutor) { + // NOTE: We have to unset the executor id here for the task + // because the master uses the absence of task.executor_id() + // to detect command executors. + for (int i = 0; i < message.tasks_size(); ++i) { + message.mutable_tasks(i)->clear_executor_id(); + } + } else { ExecutorInfo* executorInfo = message.add_executor_infos(); executorInfo->MergeFrom(executor->info); @@ -735,30 +754,6 @@ void Slave::doReliableRegistration() // it a required field. executorInfo->mutable_framework_id()->MergeFrom(framework->id); } - - // Add launched tasks. - // TODO(vinod): Use foreachvalue instead once LinkedHashmap - // supports it. - foreach (Task* task, executor->launchedTasks.values()) { - message.add_tasks()->CopyFrom(*task); - } - - // Add queued tasks. - // TODO(vinod): Use foreachvalue instead once LinkedHashmap - // supports it. - foreach (const TaskInfo& task, executor->queuedTasks.values()) { - const Task& t = protobuf::createTask( - task, TASK_STAGING, executor->id, framework->id); - - message.add_tasks()->CopyFrom(t); - } - - // Add terminated tasks. - // TODO(vinod): Use foreachvalue instead once LinkedHashmap - // supports it. - foreach (Task* task, executor->terminatedTasks.values()) { - message.add_tasks()->CopyFrom(*task); - } } } send(master, message); http://git-wip-us.apache.org/repos/asf/mesos/blob/8077ad40/src/tests/fault_tolerance_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/fault_tolerance_tests.cpp b/src/tests/fault_tolerance_tests.cpp index 254eae4..35b743a 100644 --- a/src/tests/fault_tolerance_tests.cpp +++ b/src/tests/fault_tolerance_tests.cpp @@ -998,8 +998,7 @@ TEST_F(FaultToleranceTest, SchedulerFailoverStatusUpdate) vector<TaskInfo> tasks; tasks.push_back(task); - EXPECT_CALL(exec, registered(_, _, _, _)) - .Times(1); + EXPECT_CALL(exec, registered(_, _, _, _)); EXPECT_CALL(exec, launchTask(_, _)) .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING)); @@ -1061,6 +1060,112 @@ TEST_F(FaultToleranceTest, SchedulerFailoverStatusUpdate) } +// This test was added to ensure MESOS-420 is fixed. +// We need to make sure that the master correctly handles non-terminal +// tasks with exited executors upon framework re-registration. This is +// possible because the ExitedExecutor message can arrive before the +// terminal status update(s) of its task(s). +TEST_F(FaultToleranceTest, ReregisterFrameworkExitedExecutor) +{ + // First we'll start a master and slave, then register a framework + // so we can launch a task. + Try<PID<Master> > master = StartMaster(); + ASSERT_SOME(master); + + MockExecutor exec(DEFAULT_EXECUTOR_ID); + TestingIsolator isolator(&exec); + Try<PID<Slave> > slave = StartSlave(&isolator); + ASSERT_SOME(slave); + + MockScheduler sched; + MesosSchedulerDriver driver(&sched, DEFAULT_FRAMEWORK_INFO, master.get()); + + Future<process::Message> frameworkRegisteredMessage = + FUTURE_MESSAGE(Eq(FrameworkRegisteredMessage().GetTypeName()), _, _); + + FrameworkID frameworkId; + EXPECT_CALL(sched, registered(&driver, _, _)) + .WillOnce(SaveArg<1>(&frameworkId)); + + EXPECT_CALL(sched, resourceOffers(&driver, _)) + .WillOnce(LaunchTasks(1, 1, 16, "*")); + + Future<Nothing> statusUpdate; + EXPECT_CALL(sched, statusUpdate(&driver, _)) + .WillOnce(FutureSatisfy(&statusUpdate)); // TASK_RUNNING. + + EXPECT_CALL(exec, registered(_, _, _, _)); + + EXPECT_CALL(exec, launchTask(_, _)) + .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING)); + + driver.start(); + + AWAIT_READY(frameworkRegisteredMessage); + + // Wait until TASK_RUNNING of the task is received. + AWAIT_READY(statusUpdate); + + EXPECT_CALL(sched, disconnected(&driver)); + + // Now that the task is launched, we need to induce the following: + // 1. ExitedExecutorMessage received by the master prior to a + // terminal status update for the corresponding task. This + // means we need to drop the status update coming from the + // slave. + // 2. Framework re-registration. + // + // To achieve this, we need to: + // 1. Restart the master (the slave / framework will not detect + // the new master automatically using the BasicMasterDetector). + // 2. Notify the slave of the new master. + // 3. Kill the executor. + // 4. Drop the status update, but allow the ExitedExecutorMessage. + // 5. Notify the framework of the new master. + Stop(master.get()); + master = StartMaster(); + ASSERT_SOME(master); + + // Simulate a new master detected message to the slave. + NewMasterDetectedMessage newMasterDetectedMsg; + newMasterDetectedMsg.set_pid(master.get()); + + Future<SlaveReregisteredMessage> slaveReregisteredMessage = + FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _); + + process::post(slave.get(), newMasterDetectedMsg); + + // Wait for the slave to re-register. + AWAIT_READY(slaveReregisteredMessage); + + // Allow the executor exited message and drop the status update. + Future<ExitedExecutorMessage> executorExitedMessage = + FUTURE_PROTOBUF(ExitedExecutorMessage(), _, _); + Future<StatusUpdateMessage> statusUpdateMessage = + DROP_PROTOBUF(StatusUpdateMessage(), _, _); + + // Now kill the executor. + dispatch(isolator, &Isolator::killExecutor, frameworkId, DEFAULT_EXECUTOR_ID); + + AWAIT_READY(executorExitedMessage); + AWAIT_READY(statusUpdateMessage); + + // Now notify the framework of the new master. + Future<FrameworkRegisteredMessage> frameworkRegisteredMessage2 = + FUTURE_PROTOBUF(FrameworkRegisteredMessage(), _, _); + + EXPECT_CALL(sched, registered(&driver, _, _)); + + process::post(frameworkRegisteredMessage.get().to, newMasterDetectedMsg); + + AWAIT_READY(frameworkRegisteredMessage2); + + driver.stop(); + driver.join(); + Shutdown(); +} + + TEST_F(FaultToleranceTest, ForwardStatusUpdateUnknownExecutor) { Try<PID<Master> > master = StartMaster();