Repository: mesos Updated Branches: refs/heads/master b6d2f2def -> 51656c8cd
Ensured TaskStatus::source field is set for executor status updates. A status update originating from executor should have the TaskStatus::source field set to TaskStatus::SOURCE_EXECUTOR. Set this field in the slave to be future proof (a future where there will be no executor driver). Previous code has a bug and updated a copy of the update that was not forwarded. Add some checks for source and reason for status updates in existing tests. Review: https://reviews.apache.org/r/32130 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/51656c8c Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/51656c8c Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/51656c8c Branch: refs/heads/master Commit: 51656c8cdf9c2b58ad4c3381e4131dfa269d46ab Parents: b6d2f2d Author: Alexander Rukletsov <[email protected]> Authored: Tue Mar 17 15:38:49 2015 -0700 Committer: Niklas Q. Nielsen <[email protected]> Committed: Tue Mar 17 15:38:49 2015 -0700 ---------------------------------------------------------------------- src/slave/slave.cpp | 10 ++++++---- src/slave/slave.hpp | 4 +++- src/tests/slave_tests.cpp | 39 +++++++++++++++++++++++++++++++-------- 3 files changed, 40 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/51656c8c/src/slave/slave.cpp ---------------------------------------------------------------------- diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp index 0f99e4e..f1f2100 100644 --- a/src/slave/slave.cpp +++ b/src/slave/slave.cpp @@ -2479,7 +2479,7 @@ void Slave::reregisterExecutorTimeout() // reliable delivery of status updates. Since executor driver caches // unacked updates it is important that whoever sent the update gets // acknowledgement for it. -void Slave::statusUpdate(const StatusUpdate& update, const UPID& pid) +void Slave::statusUpdate(StatusUpdate update, const UPID& pid) { LOG(INFO) << "Handling status update " << update << " from " << pid; @@ -2487,9 +2487,9 @@ void Slave::statusUpdate(const StatusUpdate& update, const UPID& pid) state == RUNNING || state == TERMINATING) << state; - TaskStatus status = update.status(); - status.set_source(pid == UPID() ? TaskStatus::SOURCE_SLAVE - : TaskStatus::SOURCE_EXECUTOR); + // Set the source before forwarding the status update. + update.mutable_status()->set_source( + pid == UPID() ? TaskStatus::SOURCE_SLAVE : TaskStatus::SOURCE_EXECUTOR); Framework* framework = getFramework(update.framework_id()); if (framework == NULL) { @@ -2512,6 +2512,8 @@ void Slave::statusUpdate(const StatusUpdate& update, const UPID& pid) return; } + TaskStatus status = update.status(); + Executor* executor = framework->getExecutor(status.task_id()); if (executor == NULL) { LOG(WARNING) << "Could not find the executor for " http://git-wip-us.apache.org/repos/asf/mesos/blob/51656c8c/src/slave/slave.hpp ---------------------------------------------------------------------- diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp index 989832f..19e6b44 100644 --- a/src/slave/slave.hpp +++ b/src/slave/slave.hpp @@ -181,7 +181,9 @@ public: // after the update is successfully handled. If pid == UPID() // no ACK is sent. The latter is used by the slave to send // status updates it generated (e.g., TASK_LOST). - void statusUpdate(const StatusUpdate& update, const process::UPID& pid); + // NOTE: StatusUpdate is passed by value because it is modified + // to ensure source field is set. + void statusUpdate(StatusUpdate update, const process::UPID& pid); // Continue handling the status update after optionally updating the // container's resources. http://git-wip-us.apache.org/repos/asf/mesos/blob/51656c8c/src/tests/slave_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp index a975305..fd09d65 100644 --- a/src/tests/slave_tests.cpp +++ b/src/tests/slave_tests.cpp @@ -176,6 +176,7 @@ TEST_F(SlaveTest, ShutdownUnregisteredExecutor) AWAIT_READY(status); ASSERT_EQ(TASK_FAILED, status.get().state()); + EXPECT_EQ(TaskStatus::SOURCE_SLAVE, status.get().source()); Clock::resume(); @@ -247,6 +248,8 @@ TEST_F(SlaveTest, RemoveUnregisteredTerminatedExecutor) AWAIT_READY(status); EXPECT_EQ(TASK_LOST, status.get().state()); + EXPECT_EQ(TaskStatus::SOURCE_SLAVE, status.get().source()); + EXPECT_EQ(TaskStatus::REASON_EXECUTOR_TERMINATED, status.get().reason()); // We use 'gc.schedule' as a signal for the executor being cleaned // up by the slave. @@ -348,10 +351,10 @@ TEST_F(SlaveTest, CommandExecutorWithOverride) // Expect two status updates, one for once the mesos-executor says // the task is running and one for after our overridden command // above finishes. - Future<TaskStatus> status1, status2; + Future<TaskStatus> statusRunning, statusFinished; EXPECT_CALL(sched, statusUpdate(_, _)) - .WillOnce(FutureArg<1>(&status1)) - .WillOnce(FutureArg<1>(&status2)); + .WillOnce(FutureArg<1>(&statusRunning)) + .WillOnce(FutureArg<1>(&statusFinished)); Try<Subprocess> executor = subprocess( @@ -363,12 +366,15 @@ TEST_F(SlaveTest, CommandExecutorWithOverride) ASSERT_SOME(executor); - // Scheduler should receive the TASK_RUNNING update. - AWAIT_READY(status1); - ASSERT_EQ(TASK_RUNNING, status1.get().state()); + // Scheduler should first receive TASK_RUNNING followed by the + // TASK_FINISHED from the executor. + AWAIT_READY(statusRunning); + ASSERT_EQ(TASK_RUNNING, statusRunning.get().state()); + EXPECT_EQ(TaskStatus::SOURCE_EXECUTOR, statusRunning.get().source()); - AWAIT_READY(status2); - ASSERT_EQ(TASK_FINISHED, status2.get().state()); + AWAIT_READY(statusFinished); + ASSERT_EQ(TASK_FINISHED, statusFinished.get().state()); + EXPECT_EQ(TaskStatus::SOURCE_EXECUTOR, statusFinished.get().source()); AWAIT_READY(wait); @@ -461,11 +467,15 @@ TEST_F(SlaveTest, ComamndTaskWithArguments) driver.launchTasks(offers.get()[0].id(), tasks); + // Scheduler should first receive TASK_RUNNING followed by the + // TASK_FINISHED from the executor. AWAIT_READY(statusRunning); EXPECT_EQ(TASK_RUNNING, statusRunning.get().state()); + EXPECT_EQ(TaskStatus::SOURCE_EXECUTOR, statusRunning.get().source()); AWAIT_READY(statusFinished); EXPECT_EQ(TASK_FINISHED, statusFinished.get().state()); + EXPECT_EQ(TaskStatus::SOURCE_EXECUTOR, statusFinished.get().source()); driver.stop(); driver.join(); @@ -592,11 +602,15 @@ TEST_F(SlaveTest, ROOT_RunTaskWithCommandInfoWithoutUser) driver.launchTasks(offers.get()[0].id(), tasks); + // Scheduler should first receive TASK_RUNNING followed by the + // TASK_FINISHED from the executor. AWAIT_READY(statusRunning); EXPECT_EQ(TASK_RUNNING, statusRunning.get().state()); + EXPECT_EQ(TaskStatus::SOURCE_EXECUTOR, statusRunning.get().source()); AWAIT_READY(statusFinished); EXPECT_EQ(TASK_FINISHED, statusFinished.get().state()); + EXPECT_EQ(TaskStatus::SOURCE_EXECUTOR, statusFinished.get().source()); driver.stop(); driver.join(); @@ -684,11 +698,15 @@ TEST_F(SlaveTest, DISABLED_ROOT_RunTaskWithCommandInfoWithUser) driver.launchTasks(offers.get()[0].id(), tasks); + // Scheduler should first receive TASK_RUNNING followed by the + // TASK_FINISHED from the executor. AWAIT_READY(statusRunning); EXPECT_EQ(TASK_RUNNING, statusRunning.get().state()); + EXPECT_EQ(TaskStatus::SOURCE_EXECUTOR, statusRunning.get().source()); AWAIT_READY(statusFinished); EXPECT_EQ(TASK_FINISHED, statusFinished.get().state()); + EXPECT_EQ(TaskStatus::SOURCE_EXECUTOR, statusFinished.get().source()); driver.stop(); driver.join(); @@ -1092,9 +1110,12 @@ TEST_F(SlaveTest, TerminalTaskContainerizerUpdateFails) AWAIT_READY(status3); EXPECT_EQ(TASK_KILLED, status3.get().state()); + EXPECT_EQ(TaskStatus::SOURCE_EXECUTOR, status3.get().source()); AWAIT_READY(status4); EXPECT_EQ(TASK_LOST, status4.get().state()); + EXPECT_EQ(TaskStatus::SOURCE_SLAVE, status4.get().source()); + EXPECT_EQ(TaskStatus::REASON_EXECUTOR_TERMINATED, status4.get().reason()); driver.stop(); driver.join(); @@ -1204,6 +1225,8 @@ TEST_F(SlaveTest, TaskLaunchContainerizerUpdateFails) AWAIT_READY(status); EXPECT_EQ(TASK_LOST, status.get().state()); + EXPECT_EQ(TaskStatus::SOURCE_SLAVE, status.get().source()); + EXPECT_EQ(TaskStatus::REASON_EXECUTOR_TERMINATED, status.get().reason()); driver.stop(); driver.join();
