Updated reconciliation semantics to take the task's unacknowledged state into account.
Review: https://reviews.apache.org/r/26702 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/e960cdff Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/e960cdff Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/e960cdff Branch: refs/heads/master Commit: e960cdffec20d54b4f57f552d13cd92004f8e437 Parents: 3c4e3fd Author: Vinod Kone <[email protected]> Authored: Fri Oct 10 19:09:35 2014 -0700 Committer: Vinod Kone <[email protected]> Committed: Tue Oct 21 15:47:09 2014 -0700 ---------------------------------------------------------------------- src/master/master.cpp | 23 +++++-- src/tests/reconciliation_tests.cpp | 115 +++++++++++++++++++++++++++++++- 2 files changed, 133 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/e960cdff/src/master/master.cpp ---------------------------------------------------------------------- diff --git a/src/master/master.cpp b/src/master/master.cpp index 9743eab..e70cdbf 100644 --- a/src/master/master.cpp +++ b/src/master/master.cpp @@ -3535,11 +3535,15 @@ void Master::reconcileTasks( } foreachvalue (Task* task, framework->tasks) { + const TaskState& state = task->has_status_update_state() + ? task->status_update_state() + : task->state(); + const StatusUpdate& update = protobuf::createStatusUpdate( frameworkId, task->slave_id(), task->task_id(), - task->state(), + state, "Reconciliation: Latest task state"); VLOG(1) << "Sending implicit reconciliation state " @@ -3593,12 +3597,16 @@ void Master::reconcileTasks( TASK_STAGING, "Reconciliation: Latest task state"); } else if (task != NULL) { - // (2) Task is known: send the latest state. + // (2) Task is known: send the latest status update state. + const TaskState& state = task->has_status_update_state() + ? task->status_update_state() + : task->state(); + update = protobuf::createStatusUpdate( frameworkId, task->slave_id(), task->task_id(), - task->state(), + state, "Reconciliation: Latest task state"); } else if (slaveId.isSome() && slaves.registered.contains(slaveId.get())) { // (3) Task is unknown, slave is registered: TASK_LOST. @@ -3948,10 +3956,17 @@ void Master::reconcile( : ": sending TASK_LOST"); if (slave->version.isSome()) { + // NOTE: Currently the slave doesn't look at the task state + // when it reconciles the task state; we include the correct + // state for correctness and consistency. + const TaskState& state = task->has_status_update_state() + ? task->status_update_state() + : task->state(); + TaskStatus* status = reconcile.add_statuses(); status->mutable_task_id()->CopyFrom(task->task_id()); status->mutable_slave_id()->CopyFrom(slave->id); - status->set_state(task->state()); + status->set_state(state); status->set_message("Reconciliation request"); status->set_timestamp(Clock::now().secs()); } else { http://git-wip-us.apache.org/repos/asf/mesos/blob/e960cdff/src/tests/reconciliation_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/reconciliation_tests.cpp b/src/tests/reconciliation_tests.cpp index 400c5c0..4ba5394 100644 --- a/src/tests/reconciliation_tests.cpp +++ b/src/tests/reconciliation_tests.cpp @@ -63,7 +63,7 @@ using testing::An; using testing::AtMost; using testing::DoAll; using testing::Return; - +using testing::SaveArg; class ReconciliationTest : public MesosTest {}; @@ -742,3 +742,116 @@ TEST_F(ReconciliationTest, UnacknowledgedTerminalTask) Shutdown(); // Must shutdown before 'containerizer' gets deallocated. } + + +// This test verifies that when the task's latest and status update +// states differ, master responds to reconciliation request with the +// status update state. +TEST_F(ReconciliationTest, ReconcileStatusUpdateTaskState) +{ + // Start a master. + Try<PID<Master> > master = StartMaster(); + ASSERT_SOME(master); + + // Start a slave. + MockExecutor exec(DEFAULT_EXECUTOR_ID); + StandaloneMasterDetector slaveDetector(master.get()); + Try<PID<Slave> > slave = StartSlave(&exec, &slaveDetector); + ASSERT_SOME(slave); + + // Start a scheduler. + MockScheduler sched; + StandaloneMasterDetector schedulerDetector(master.get()); + TestingMesosSchedulerDriver driver(&sched, &schedulerDetector); + + EXPECT_CALL(sched, registered(&driver, _, _)); + + EXPECT_CALL(sched, resourceOffers(_, _)) + .WillOnce(LaunchTasks(DEFAULT_EXECUTOR_INFO, 1, 2, 1024, "*")) + .WillRepeatedly(Return()); // Ignore subsequent offers. + + ExecutorDriver* execDriver; + EXPECT_CALL(exec, registered(_, _, _, _)) + .WillOnce(SaveArg<0>(&execDriver)); + + EXPECT_CALL(exec, launchTask(_, _)) + .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING)); + + // Signal when the first update is dropped. + Future<StatusUpdateMessage> statusUpdateMessage = + DROP_PROTOBUF(StatusUpdateMessage(), _, master.get()); + + Future<Nothing> __statusUpdate = FUTURE_DISPATCH(_, &Slave::__statusUpdate); + + driver.start(); + + // Pause the clock to avoid status update retries. + Clock::pause(); + + // Wait until TASK_RUNNING is sent to the master. + AWAIT_READY(statusUpdateMessage); + + // Ensure status update manager handles TASK_RUNNING update. + AWAIT_READY(__statusUpdate); + + Future<Nothing> __statusUpdate2 = FUTURE_DISPATCH(_, &Slave::__statusUpdate); + + // Now send TASK_FINISHED update. + TaskStatus finishedStatus; + finishedStatus = statusUpdateMessage.get().update().status(); + finishedStatus.set_state(TASK_FINISHED); + execDriver->sendStatusUpdate(finishedStatus); + + // Ensure status update manager handles TASK_FINISHED update. + AWAIT_READY(__statusUpdate2); + + EXPECT_CALL(sched, disconnected(&driver)) + .WillOnce(Return()); + + // Simulate master failover by restarting the master. + this->Stop(master.get()); + master = StartMaster(); + + Clock::resume(); + + Future<Nothing> registered; + EXPECT_CALL(sched, registered(&driver, _, _)) + .WillOnce(FutureSatisfy(®istered)); + + // Re-register the framework. + schedulerDetector.appoint(master.get()); + + AWAIT_READY(registered); + + Future<SlaveReregisteredMessage> slaveReregisteredMessage = + FUTURE_PROTOBUF(SlaveReregisteredMessage(), master.get(), slave.get()); + + // Drop all updates to the second master. + DROP_PROTOBUFS(StatusUpdateMessage(), _, master.get()); + + // Re-register the slave. + slaveDetector.appoint(master.get()); + + AWAIT_READY(slaveReregisteredMessage); + + // Framework should receive a TASK_RUNNING update, since that is the + // latest status update state of the task. + Future<TaskStatus> update; + EXPECT_CALL(sched, statusUpdate(&driver, _)) + .WillOnce(FutureArg<1>(&update)); + + // Reconcile the state of the task. + vector<TaskStatus> statuses; + driver.reconcileTasks(statuses); + + AWAIT_READY(update); + EXPECT_EQ(TASK_RUNNING, update.get().state()); + + EXPECT_CALL(exec, shutdown(_)) + .Times(AtMost(1)); + + driver.stop(); + driver.join(); + + Shutdown(); +}
