Repository: mesos Updated Branches: refs/heads/master 616d40185 -> e960cdffe
Updated slave re-registration to send unacknowledged task states. Review: https://reviews.apache.org/r/26699 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/ca14f37b Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/ca14f37b Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/ca14f37b Branch: refs/heads/master Commit: ca14f37bf7321a977e297e974e9c4c1f0cc57e0e Parents: 65c3c36 Author: Vinod Kone <[email protected]> Authored: Fri Oct 10 12:16:32 2014 -0700 Committer: Vinod Kone <[email protected]> Committed: Tue Oct 21 15:47:08 2014 -0700 ---------------------------------------------------------------------- src/messages/messages.proto | 10 +++- src/slave/slave.cpp | 45 +++++++++++++++ src/tests/fault_tolerance_tests.cpp | 3 - src/tests/slave_tests.cpp | 99 ++++++++++++++++++++++++++++++++ 4 files changed, 153 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/ca14f37b/src/messages/messages.proto ---------------------------------------------------------------------- diff --git a/src/messages/messages.proto b/src/messages/messages.proto index 0dfc1b7..196d1d4 100644 --- a/src/messages/messages.proto +++ b/src/messages/messages.proto @@ -44,11 +44,18 @@ message Task { required FrameworkID framework_id = 3; optional ExecutorID executor_id = 4; required SlaveID slave_id = 5; - required TaskState state = 6; + required TaskState state = 6; // Latest state of the task. repeated Resource resources = 7; repeated TaskStatus statuses = 8; + + // These fields correspond to the state and uuid of the latest + // status update forwarded to the master. + // NOTE: Either both the fields must be set or both must be unset. + optional TaskState status_update_state = 9; + optional bytes status_update_uuid = 10; } + // Describes a role, which are used to group frameworks for allocation // decisions, depending on the allocation policy being used. // The weight field can be used to indicate forms of priority. @@ -58,6 +65,7 @@ message RoleInfo { } +// TODO(vinod): Create a new UUID message type. message StatusUpdate { required FrameworkID framework_id = 1; optional ExecutorID executor_id = 2; http://git-wip-us.apache.org/repos/asf/mesos/blob/ca14f37b/src/slave/slave.cpp ---------------------------------------------------------------------- diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp index a98e408..a5761ed 100644 --- a/src/slave/slave.cpp +++ b/src/slave/slave.cpp @@ -957,12 +957,16 @@ void Slave::doReliableRegistration(const Duration& duration) // Add launched, terminated, and queued tasks. // Note that terminated executors will only have terminated // unacknowledged tasks. + // Note that for each task the latest state and status update + // state (if any) is also included. foreach (Task* task, executor->launchedTasks.values()) { message.add_tasks()->CopyFrom(*task); } + foreach (Task* task, executor->terminatedTasks.values()) { message.add_tasks()->CopyFrom(*task); } + foreach (const TaskInfo& task, executor->queuedTasks.values()) { message.add_tasks()->CopyFrom(protobuf::createTask( task, TASK_STAGING, framework->id)); @@ -2381,9 +2385,50 @@ void Slave::forward(const StatusUpdate& update) return; } + // Update the status update state of the task. + Framework* framework = getFramework(update.framework_id()); + if (framework != NULL) { + const TaskID& taskId = update.status().task_id(); + Executor* executor = framework->getExecutor(taskId); + if (executor != NULL) { + // NOTE: We do not look for the task in queued tasks because + // no update is expected for it until it's launched. Similarly, + // we do not look for completed tasks because the state for a + // completed task shouldn't be changed. + Task* task = NULL; + if (executor->launchedTasks.contains(taskId)) { + task = executor->launchedTasks[taskId]; + } else if (executor->terminatedTasks.contains(taskId)) { + task = executor->terminatedTasks[taskId]; + } + + // We set the status update state of the task here because in + // steady state master updates the status update state of the + // task when it receives this update. If the master fails over, + // slave re-registers with this task with this status update + // state. Note that an acknowledgement for this update might be + // enqueued on status update manager when we are here. But that + // is ok because the status update state will be updated when + // the next update is forwarded to the slave. + if (task != NULL) { + task->set_status_update_state(update.status().state()); + task->set_status_update_uuid(update.uuid()); + } + } + } + CHECK_SOME(master); LOG(INFO) << "Forwarding the update " << update << " to " << master.get(); + // NOTE: We forward the update even if framework/executor/task + // doesn't exist because the status update manager will be expecting + // an acknowledgement for the update. This could happen for example + // if this is a retried terminal update and before we are here the + // slave has already processed the acknowledgement of the original + // update and removed the framework/executor/task. Also, slave + // re-registration can generate updates when framework/executor/task + // are unknown. + // Forward the update to master. StatusUpdateMessage message; message.mutable_update()->MergeFrom(update); http://git-wip-us.apache.org/repos/asf/mesos/blob/ca14f37b/src/tests/fault_tolerance_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/fault_tolerance_tests.cpp b/src/tests/fault_tolerance_tests.cpp index a75910d..a18a41a 100644 --- a/src/tests/fault_tolerance_tests.cpp +++ b/src/tests/fault_tolerance_tests.cpp @@ -126,9 +126,6 @@ TEST_F(FaultToleranceTest, SlaveLost) } - - - // The purpose of this test is to ensure that when slaves are removed // from the master, and then attempt to send status updates, we send // a ShutdownMessage to the slave. Why? Because during a network http://git-wip-us.apache.org/repos/asf/mesos/blob/ca14f37b/src/tests/slave_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp index 759670a..a1bd00c 100644 --- a/src/tests/slave_tests.cpp +++ b/src/tests/slave_tests.cpp @@ -1155,3 +1155,102 @@ TEST_F(SlaveTest, KillTaskBetweenRunTaskParts) Shutdown(); // Must shutdown before 'containerizer' gets deallocated. } + + +// This test verifies that when a slave re-registers with the master +// it correctly includes the latest and status update task states. +TEST_F(SlaveTest, ReregisterWithStatusUpdateTaskState) +{ + // Start a master. + Try<PID<Master> > master = StartMaster(); + ASSERT_SOME(master); + + MockExecutor exec(DEFAULT_EXECUTOR_ID); + + // Create a StandaloneMasterDetector to enable the slave to trigger + // re-registration later. + StandaloneMasterDetector detector(master.get()); + + // Start a slave. + Try<PID<Slave> > slave = StartSlave(&exec, &detector); + ASSERT_SOME(slave); + + MockScheduler sched; + MesosSchedulerDriver driver( + &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL); + + EXPECT_CALL(sched, registered(&driver, _, _)); + + EXPECT_CALL(sched, resourceOffers(_, _)) + .WillOnce(LaunchTasks(DEFAULT_EXECUTOR_INFO, 1, 2, 1024, "*")) + .WillRepeatedly(Return()); // Ignore subsequent offers. + + ExecutorDriver* execDriver; + EXPECT_CALL(exec, registered(_, _, _, _)) + .WillOnce(SaveArg<0>(&execDriver)); + + EXPECT_CALL(exec, launchTask(_, _)) + .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING)); + + // Signal when the first update is dropped. + Future<StatusUpdateMessage> statusUpdateMessage = + DROP_PROTOBUF(StatusUpdateMessage(), _, master.get()); + + Future<Nothing> __statusUpdate = FUTURE_DISPATCH(_, &Slave::__statusUpdate); + + driver.start(); + + // Pause the clock to avoid status update retries. + Clock::pause(); + + // Wait until TASK_RUNNING is sent to the master. + AWAIT_READY(statusUpdateMessage); + + // Ensure status update manager handles TASK_RUNNING update. + AWAIT_READY(__statusUpdate); + + Future<Nothing> __statusUpdate2 = FUTURE_DISPATCH(_, &Slave::__statusUpdate); + + // Now send TASK_FINISHED update. + TaskStatus finishedStatus; + finishedStatus = statusUpdateMessage.get().update().status(); + finishedStatus.set_state(TASK_FINISHED); + execDriver->sendStatusUpdate(finishedStatus); + + // Ensure status update manager handles TASK_FINISHED update. + AWAIT_READY(__statusUpdate2); + + Future<ReregisterSlaveMessage> reregisterSlaveMessage = + FUTURE_PROTOBUF(ReregisterSlaveMessage(), _, _); + + // Drop any updates to the failed over master. + DROP_PROTOBUFS(StatusUpdateMessage(), _, master.get()); + + // Simulate a new master detected event on the slave, + // so that the slave will do a re-registration. + detector.appoint(master.get()); + + // Capture and inspect the slave reregistration message. + AWAIT_READY(reregisterSlaveMessage); + + ASSERT_EQ(1, reregisterSlaveMessage.get().tasks_size()); + + // The latest state of the task should be TASK_FINISHED. + ASSERT_EQ(TASK_FINISHED, reregisterSlaveMessage.get().tasks(0).state()); + + // The status update state of the task should be TASK_RUNNING. + ASSERT_EQ(TASK_RUNNING, + reregisterSlaveMessage.get().tasks(0).status_update_state()); + + // The status update uuid should match the TASK_RUNNING's uuid. + ASSERT_EQ(statusUpdateMessage.get().update().uuid(), + reregisterSlaveMessage.get().tasks(0).status_update_uuid()); + + EXPECT_CALL(exec, shutdown(_)) + .Times(AtMost(1)); + + driver.stop(); + driver.join(); + + Shutdown(); +}
