Updated slave to include latest task state in update. Review: https://reviews.apache.org/r/26700
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/da669702 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/da669702 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/da669702 Branch: refs/heads/master Commit: da669702e4c6c4050ef49d9a1e399a837a77c143 Parents: ca14f37 Author: Vinod Kone <[email protected]> Authored: Fri Oct 10 14:48:49 2014 -0700 Committer: Vinod Kone <[email protected]> Committed: Tue Oct 21 15:47:09 2014 -0700 ---------------------------------------------------------------------- src/messages/messages.proto | 7 +++ src/slave/slave.cpp | 36 ++++++++---- src/slave/slave.hpp | 7 ++- src/slave/status_update_manager.cpp | 8 +-- src/slave/status_update_manager.hpp | 2 +- src/tests/status_update_manager_tests.cpp | 80 ++++++++++++++++++++++++++ 6 files changed, 122 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/da669702/src/messages/messages.proto ---------------------------------------------------------------------- diff --git a/src/messages/messages.proto b/src/messages/messages.proto index 196d1d4..6e49fe7 100644 --- a/src/messages/messages.proto +++ b/src/messages/messages.proto @@ -73,6 +73,13 @@ message StatusUpdate { required TaskStatus status = 4; required double timestamp = 5; required bytes uuid = 6; + + // This corresponds to the latest state of the task according to the + // slave. Note that this state might be different than the state in + // 'status' because status update manager queues updates. In other + // words, 'status' corresponds to the update at top of the queue and + // 'latest_state' corresponds to the update at bottom of the queue. + optional TaskState latest_state = 7; } http://git-wip-us.apache.org/repos/asf/mesos/blob/da669702/src/slave/slave.cpp ---------------------------------------------------------------------- diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp index a5761ed..55e5264 100644 --- a/src/slave/slave.cpp +++ b/src/slave/slave.cpp @@ -2268,6 +2268,16 @@ void Slave::statusUpdate(const StatusUpdate& update, const UPID& pid) stats.validStatusUpdates++; metrics.valid_status_updates++; + // We set the latest state of the task here so that the slave can + // inform the master about the latest state (via status update or + // ReregisterSlaveMessage message) as soon as possible. Master can + // use this information, for example, to release resources as soon + // as the latest state of the task reaches a terminal state. This + // is important because status update manager queues updates and + // only sends one update per task at a time; the next update for a + // task is sent only after the acknowledgement for the previous one + // is received, which could take a long time if the framework is + // backed up or is down. executor->updateTaskState(status); // Handle the task appropriately if it is terminated. @@ -2372,7 +2382,7 @@ void Slave::__statusUpdate( // NOTE: An acknowledgement for this update might have already been // processed by the slave but not the status update manager. -void Slave::forward(const StatusUpdate& update) +void Slave::forward(StatusUpdate update) { CHECK(state == RECOVERING || state == DISCONNECTED || state == RUNNING || state == TERMINATING) @@ -2385,7 +2395,8 @@ void Slave::forward(const StatusUpdate& update) return; } - // Update the status update state of the task. + // Update the status update state of the task and include the latest + // state of the task in the status update. Framework* framework = getFramework(update.framework_id()); if (framework != NULL) { const TaskID& taskId = update.status().task_id(); @@ -2402,17 +2413,22 @@ void Slave::forward(const StatusUpdate& update) task = executor->terminatedTasks[taskId]; } - // We set the status update state of the task here because in - // steady state master updates the status update state of the - // task when it receives this update. If the master fails over, - // slave re-registers with this task with this status update - // state. Note that an acknowledgement for this update might be - // enqueued on status update manager when we are here. But that - // is ok because the status update state will be updated when - // the next update is forwarded to the slave. if (task != NULL) { + // We set the status update state of the task here because in + // steady state master updates the status update state of the + // task when it receives this update. If the master fails over, + // slave re-registers with this task in this status update + // state. Note that an acknowledgement for this update might + // be enqueued on status update manager when we are here. But + // that is ok because the status update state will be updated + // when the next update is forwarded to the slave. task->set_status_update_state(update.status().state()); task->set_status_update_uuid(update.uuid()); + + // Include the latest state of task in the update. See the + // comments in 'statusUpdate()' on why informing the master + // about the latest state of the task is important. + update.set_latest_state(task->state()); } } } http://git-wip-us.apache.org/repos/asf/mesos/blob/da669702/src/slave/slave.hpp ---------------------------------------------------------------------- diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp index 439052e..eb5de73 100644 --- a/src/slave/slave.hpp +++ b/src/slave/slave.hpp @@ -197,9 +197,10 @@ public: const StatusUpdate& update, const process::UPID& pid); - // This is called by status update manager to forward a - // status update to the master. - void forward(const StatusUpdate& update); + // This is called by status update manager to forward a status + // update to the master. Note that the latest state of the task is + // added to the update before forwarding. + void forward(StatusUpdate update); void statusUpdateAcknowledgement( const process::UPID& from, http://git-wip-us.apache.org/repos/asf/mesos/blob/da669702/src/slave/status_update_manager.cpp ---------------------------------------------------------------------- diff --git a/src/slave/status_update_manager.cpp b/src/slave/status_update_manager.cpp index 9bdbf5e..fab8c22 100644 --- a/src/slave/status_update_manager.cpp +++ b/src/slave/status_update_manager.cpp @@ -71,7 +71,7 @@ public: using process::ProcessBase::initialize; // StatusUpdateManager implementation. - void initialize(const function<void(const StatusUpdate&)>& forward); + void initialize(const function<void(StatusUpdate)>& forward); Future<Nothing> update( const StatusUpdate& update, @@ -138,7 +138,7 @@ private: const Flags flags; bool paused; - function<void(const StatusUpdate&)> forward_; + function<void(StatusUpdate)> forward_; hashmap<FrameworkID, hashmap<TaskID, StatusUpdateStream*> > streams; }; @@ -160,7 +160,7 @@ StatusUpdateManagerProcess::~StatusUpdateManagerProcess() void StatusUpdateManagerProcess::initialize( - const function<void(const StatusUpdate&)>& forward) + const function<void(StatusUpdate)>& forward) { forward_ = forward; } @@ -559,7 +559,7 @@ StatusUpdateManager::~StatusUpdateManager() void StatusUpdateManager::initialize( - const function<void(const StatusUpdate&)>& forward) + const function<void(StatusUpdate)>& forward) { dispatch(process, &StatusUpdateManagerProcess::initialize, forward); } http://git-wip-us.apache.org/repos/asf/mesos/blob/da669702/src/slave/status_update_manager.hpp ---------------------------------------------------------------------- diff --git a/src/slave/status_update_manager.hpp b/src/slave/status_update_manager.hpp index 2852884..1c8a54e 100644 --- a/src/slave/status_update_manager.hpp +++ b/src/slave/status_update_manager.hpp @@ -78,7 +78,7 @@ public: // Expects a callback 'forward' which gets called whenever there is // a new status update that needs to be forwarded to the master. - void initialize(const lambda::function<void(const StatusUpdate&)>& forward); + void initialize(const lambda::function<void(StatusUpdate)>& forward); // TODO(vinod): Come up with better names/signatures for the // checkpointing and non-checkpointing 'update()' functions. http://git-wip-us.apache.org/repos/asf/mesos/blob/da669702/src/tests/status_update_manager_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/status_update_manager_tests.cpp b/src/tests/status_update_manager_tests.cpp index e9ef1e2..b0b1316 100644 --- a/src/tests/status_update_manager_tests.cpp +++ b/src/tests/status_update_manager_tests.cpp @@ -782,3 +782,83 @@ TEST_F(StatusUpdateManagerTest, DuplicateUpdateBeforeAck) Shutdown(); } + + +// This test verifies that the status update manager correctly includes +// the latest state of the task in status update. +TEST_F(StatusUpdateManagerTest, LatestTaskState) +{ + Try<PID<Master> > master = StartMaster(); + ASSERT_SOME(master); + + MockExecutor exec(DEFAULT_EXECUTOR_ID); + + Try<PID<Slave> > slave = StartSlave(&exec); + ASSERT_SOME(slave); + + MockScheduler sched; + MesosSchedulerDriver driver( + &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL); + + EXPECT_CALL(sched, registered(_, _, _)); + + EXPECT_CALL(sched, resourceOffers(_, _)) + .WillOnce(LaunchTasks(DEFAULT_EXECUTOR_INFO, 1, 1, 512, "*")) + .WillRepeatedly(Return()); // Ignore subsequent offers. + + ExecutorDriver* execDriver; + EXPECT_CALL(exec, registered(_, _, _, _)) + .WillOnce(SaveArg<0>(&execDriver)); + + EXPECT_CALL(exec, launchTask(_, _)) + .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING)); + + // Signal when the first update is dropped. + Future<StatusUpdateMessage> statusUpdateMessage = + DROP_PROTOBUF(StatusUpdateMessage(), _, master.get()); + + Future<Nothing> __statusUpdate = FUTURE_DISPATCH(_, &Slave::__statusUpdate); + + driver.start(); + + // Wait until TASK_RUNNING is sent to the master. + AWAIT_READY(statusUpdateMessage); + + // Ensure the status update manager handles the TASK_RUNNING update. + AWAIT_READY(__statusUpdate); + + // Pause the clock to avoid status update manager from retrying. + Clock::pause(); + + Future<Nothing> __statusUpdate2 = FUTURE_DISPATCH(_, &Slave::__statusUpdate); + + // Now send TASK_FINISHED update. + TaskStatus finishedStatus; + finishedStatus = statusUpdateMessage.get().update().status(); + finishedStatus.set_state(TASK_FINISHED); + execDriver->sendStatusUpdate(finishedStatus); + + // Ensure the status update manager handles the TASK_FINISHED update. + AWAIT_READY(__statusUpdate2); + + // Signal when the second update is dropped. + Future<StatusUpdateMessage> statusUpdateMessage2 = + DROP_PROTOBUF(StatusUpdateMessage(), _, master.get()); + + // Advance the clock for the status update manager to send a retry. + Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL_MIN); + + AWAIT_READY(statusUpdateMessage2); + + // The update should correspond to TASK_RUNNING. + ASSERT_EQ(TASK_RUNNING, statusUpdateMessage2.get().update().status().state()); + + // The update should include TASK_FINISHED as the latest state. + ASSERT_EQ(TASK_FINISHED, + statusUpdateMessage2.get().update().latest_state()); + + driver.stop(); + driver.join(); + + Shutdown(); +}
