Repository: mesos Updated Branches: refs/heads/master cdaac1653 -> ca1359405
Add source and reason to TaskStatus. Review: https://reviews.apache.org/r/26382/ Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/ca135940 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/ca135940 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/ca135940 Branch: refs/heads/master Commit: ca13594054b179bd36ce14323de3111f92bae3cd Parents: cdaac16 Author: Dominic Hamon <[email protected]> Authored: Thu Oct 16 11:47:43 2014 -0700 Committer: Dominic Hamon <[email protected]> Committed: Tue Nov 4 14:01:19 2014 -0800 ---------------------------------------------------------------------- include/mesos/mesos.proto | 45 ++++++-- src/common/protobuf_utils.cpp | 7 ++ src/common/protobuf_utils.hpp | 2 + src/examples/balloon_framework.cpp | 4 +- src/examples/java/TestFramework.java | 4 + src/examples/low_level_scheduler_libprocess.cpp | 2 + src/examples/low_level_scheduler_pthread.cpp | 2 + src/examples/no_executor_framework.cpp | 2 + src/examples/test_framework.cpp | 2 + src/master/master.cpp | 67 +++++++++--- src/sched/sched.cpp | 2 + src/slave/slave.cpp | 107 ++++++++++++------- src/slave/slave.hpp | 8 +- src/tests/fault_tolerance_tests.cpp | 16 ++- src/tests/master_authorization_tests.cpp | 1 + src/tests/master_tests.cpp | 2 + src/tests/resource_offers_tests.cpp | 5 + 17 files changed, 211 insertions(+), 67 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/ca135940/include/mesos/mesos.proto ---------------------------------------------------------------------- diff --git a/include/mesos/mesos.proto b/include/mesos/mesos.proto index 397d542..6c846f2 100644 --- a/include/mesos/mesos.proto +++ b/include/mesos/mesos.proto @@ -607,15 +607,16 @@ message TaskInfo { * another task). */ enum TaskState { - TASK_STAGING = 6; // Initial state. Framework status updates should not use. + TASK_STAGING = 6; // Initial state. Framework status updates should not use. TASK_STARTING = 0; TASK_RUNNING = 1; - TASK_FINISHED = 2; // TERMINAL. - TASK_FAILED = 3; // TERMINAL. - TASK_KILLED = 4; // TERMINAL. - TASK_LOST = 5; // TERMINAL. - - // TODO(benh): TASK_ERROR = 7; // TERMINAL. + TASK_FINISHED = 2; // TERMINAL. The task finished successfully. + TASK_FAILED = 3; // TERMINAL. The task failed to finish successfully. + TASK_KILLED = 4; // TERMINAL. The task was killed by the executor. + TASK_LOST = 5; // TERMINAL. The task failed but can be rescheduled. + // TASK_ERROR is currently unused but will be introduced in 0.22.0. + // TODO(dhamon): Start using TASK_ERROR. + TASK_ERROR = 7; // TERMINAL. The task description contains an error. } @@ -623,9 +624,39 @@ enum TaskState { * Describes the current status of a task. */ message TaskStatus { + /** Describes the source of the task status update. */ + enum Source { + SOURCE_MASTER = 0; + SOURCE_SLAVE = 1; + SOURCE_EXECUTOR = 2; + } + + /** Detailed reason for the task status update. */ + enum Reason { + REASON_COMMAND_EXECUTOR_FAILED = 0; + REASON_EXECUTOR_TERMINATED = 1; + REASON_EXECUTOR_UNREGISTERED = 2; + REASON_FRAMEWORK_REMOVED = 3; + REASON_GC_ERROR = 4; + REASON_INVALID_FRAMEWORKID = 5; + REASON_INVALID_OFFERS = 6; + REASON_MASTER_DISCONNECTED = 7; + REASON_MEMORY_LIMIT = 8; + REASON_RECONCILIATION = 9; + REASON_SLAVE_DISCONNECTED = 10; + REASON_SLAVE_REMOVED = 11; + REASON_SLAVE_RESTARTED = 12; + REASON_SLAVE_UNKNOWN = 13; + REASON_TASK_INVALID = 14; + REASON_TASK_UNAUTHORIZED = 15; + REASON_TASK_UNKNOWN = 16; + } + required TaskID task_id = 1; required TaskState state = 2; optional string message = 4; // Possible message explaining state. + optional Source source = 9; + optional Reason reason = 10; optional bytes data = 3; optional SlaveID slave_id = 5; optional ExecutorID executor_id = 7; // TODO(benh): Use in master/slave. http://git-wip-us.apache.org/repos/asf/mesos/blob/ca135940/src/common/protobuf_utils.cpp ---------------------------------------------------------------------- diff --git a/src/common/protobuf_utils.cpp b/src/common/protobuf_utils.cpp index a9b65e3..33ce782 100644 --- a/src/common/protobuf_utils.cpp +++ b/src/common/protobuf_utils.cpp @@ -47,7 +47,9 @@ StatusUpdate createStatusUpdate( const Option<SlaveID>& slaveId, const TaskID& taskId, const TaskState& state, + const TaskStatus::Source& source, const std::string& message = "", + const Option<TaskStatus::Reason>& reason = None(), const Option<ExecutorID>& executorId = None()) { StatusUpdate update; @@ -72,9 +74,14 @@ StatusUpdate createStatusUpdate( } status->set_state(state); + status->set_source(source); status->set_message(message); status->set_timestamp(update.timestamp()); + if (reason.isSome()) { + status->set_reason(reason.get()); + } + return update; } http://git-wip-us.apache.org/repos/asf/mesos/blob/ca135940/src/common/protobuf_utils.hpp ---------------------------------------------------------------------- diff --git a/src/common/protobuf_utils.hpp b/src/common/protobuf_utils.hpp index 212d512..bc3ef2a 100644 --- a/src/common/protobuf_utils.hpp +++ b/src/common/protobuf_utils.hpp @@ -41,7 +41,9 @@ StatusUpdate createStatusUpdate( const Option<SlaveID>& slaveId, const TaskID& taskId, const TaskState& state, + const TaskStatus::Source& source, const std::string& message = "", + const Option<TaskStatus::Reason>& reason = None(), const Option<ExecutorID>& executorId = None()); Task createTask( http://git-wip-us.apache.org/repos/asf/mesos/blob/ca135940/src/examples/balloon_framework.cpp ---------------------------------------------------------------------- diff --git a/src/examples/balloon_framework.cpp b/src/examples/balloon_framework.cpp index b05d567..c2337ba 100644 --- a/src/examples/balloon_framework.cpp +++ b/src/examples/balloon_framework.cpp @@ -129,8 +129,10 @@ public: virtual void statusUpdate(SchedulerDriver* driver, const TaskStatus& status) { std::cout << "Task in state " << status.state() << std::endl; + std::cout << "Source: " << status.source() << std::endl; + std::cout << "Reason: " << status.reason() << std::endl; if (status.has_message()) { - std::cout << "Reason: " << status.message() << std::endl; + std::cout << "Message: " << status.message() << std::endl; } if (protobuf::isTerminalState(status.state())) { http://git-wip-us.apache.org/repos/asf/mesos/blob/ca135940/src/examples/java/TestFramework.java ---------------------------------------------------------------------- diff --git a/src/examples/java/TestFramework.java b/src/examples/java/TestFramework.java index bc593d0..ce87de8 100644 --- a/src/examples/java/TestFramework.java +++ b/src/examples/java/TestFramework.java @@ -132,6 +132,10 @@ public class TestFramework { System.err.println("Aborting because task " + status.getTaskId().getValue() + " is in unexpected state " + status.getState().getValueDescriptor().getName() + + " with reason '" + + status.getReason().getValueDescriptor().getName() + "'" + + " from source '" + + status.getSource().getValueDescriptor().getName() + "'" + " with message '" + status.getMessage() + "'"); driver.abort(); } http://git-wip-us.apache.org/repos/asf/mesos/blob/ca135940/src/examples/low_level_scheduler_libprocess.cpp ---------------------------------------------------------------------- diff --git a/src/examples/low_level_scheduler_libprocess.cpp b/src/examples/low_level_scheduler_libprocess.cpp index 7ef5ea7..89b4318 100644 --- a/src/examples/low_level_scheduler_libprocess.cpp +++ b/src/examples/low_level_scheduler_libprocess.cpp @@ -290,6 +290,8 @@ private: status.state() == TASK_FAILED) { EXIT(1) << "Exiting because task " << status.task_id() << " is in unexpected state " << status.state() + << " with reason " << status.reason() + << " from source " << status.source() << " with message '" << status.message() << "'"; } http://git-wip-us.apache.org/repos/asf/mesos/blob/ca135940/src/examples/low_level_scheduler_pthread.cpp ---------------------------------------------------------------------- diff --git a/src/examples/low_level_scheduler_pthread.cpp b/src/examples/low_level_scheduler_pthread.cpp index 6e233a1..e5cd48a 100644 --- a/src/examples/low_level_scheduler_pthread.cpp +++ b/src/examples/low_level_scheduler_pthread.cpp @@ -340,6 +340,8 @@ private: status.state() == TASK_FAILED) { EXIT(1) << "Exiting because task " << status.task_id() << " is in unexpected state " << status.state() + << " with reason " << status.reason() + << " from source " << status.source() << " with message '" << status.message() << "'"; } http://git-wip-us.apache.org/repos/asf/mesos/blob/ca135940/src/examples/no_executor_framework.cpp ---------------------------------------------------------------------- diff --git a/src/examples/no_executor_framework.cpp b/src/examples/no_executor_framework.cpp index f98a073..9c84e03 100644 --- a/src/examples/no_executor_framework.cpp +++ b/src/examples/no_executor_framework.cpp @@ -126,6 +126,8 @@ public: status.state() == TASK_FAILED) { cout << "Aborting because task " << taskId << " is in unexpected state " << status.state() + << " with reason " << status.reason() + << " from source " << status.source() << " with message '" << status.message() << "'" << endl; driver->abort(); http://git-wip-us.apache.org/repos/asf/mesos/blob/ca135940/src/examples/test_framework.cpp ---------------------------------------------------------------------- diff --git a/src/examples/test_framework.cpp b/src/examples/test_framework.cpp index 187a611..e87198b 100644 --- a/src/examples/test_framework.cpp +++ b/src/examples/test_framework.cpp @@ -133,6 +133,8 @@ public: status.state() == TASK_FAILED) { cout << "Aborting because task " << taskId << " is in unexpected state " << status.state() + << " with reason " << status.reason() + << " from source " << status.source() << " with message '" << status.message() << "'" << endl; driver->abort(); http://git-wip-us.apache.org/repos/asf/mesos/blob/ca135940/src/master/master.cpp ---------------------------------------------------------------------- diff --git a/src/master/master.cpp b/src/master/master.cpp index d914786..a860496 100644 --- a/src/master/master.cpp +++ b/src/master/master.cpp @@ -2321,7 +2321,9 @@ void Master::launchTasks( task.slave_id(), task.task_id(), TASK_LOST, - "Task launched with invalid offers: " + error.get().message); + TaskStatus::SOURCE_MASTER, + "Task launched with invalid offers: " + error.get().message, + TaskStatus::REASON_INVALID_OFFERS); metrics.tasks_lost++; stats.tasks[TASK_LOST]++; @@ -2542,7 +2544,11 @@ void Master::_launchTasks( task.slave_id(), task.task_id(), TASK_LOST, - (slave == NULL ? "Slave removed" : "Slave disconnected")); + TaskStatus::SOURCE_MASTER, + slave == NULL ? "Slave removed" : "Slave disconnected", + slave == NULL ? + TaskStatus::REASON_SLAVE_REMOVED : + TaskStatus::REASON_SLAVE_DISCONNECTED); metrics.tasks_lost++; stats.tasks[TASK_LOST]++; @@ -2578,8 +2584,10 @@ void Master::_launchTasks( framework->id, task.slave_id(), task.task_id(), - TASK_LOST, - validation.get().message); + TASK_LOST, // TODO(dhamon): TASK_ERROR in 0.22 + TaskStatus::SOURCE_MASTER, + validation.get().message, + TaskStatus::REASON_TASK_INVALID); metrics.tasks_lost++; stats.tasks[TASK_LOST]++; @@ -2603,10 +2611,12 @@ void Master::_launchTasks( framework->id, task.slave_id(), task.task_id(), - TASK_LOST, + TASK_LOST, // TODO(dhamon): TASK_ERROR in 0.22 + TaskStatus::SOURCE_MASTER, authorization.isFailed() ? "Authorization failure: " + authorization.failure() : - "Not authorized to launch as user '" + user + "'"); + "Not authorized to launch as user '" + user + "'", + TaskStatus::REASON_TASK_UNAUTHORIZED); metrics.tasks_lost++; stats.tasks[TASK_LOST]++; @@ -2635,7 +2645,9 @@ void Master::_launchTasks( task.slave_id(), task.task_id(), TASK_LOST, - error); + TaskStatus::SOURCE_MASTER, + error, + TaskStatus::REASON_TASK_INVALID); metrics.tasks_lost++; stats.tasks[TASK_LOST]++; @@ -2724,6 +2736,7 @@ void Master::killTask( None(), taskId, TASK_KILLED, + TaskStatus::SOURCE_MASTER, "Killed pending task"); forward(update, UPID(), framework); @@ -2765,7 +2778,9 @@ void Master::killTask( None(), taskId, TASK_LOST, - "Attempted to kill an unknown task"); + TaskStatus::SOURCE_MASTER, + "Attempted to kill an unknown task", + TaskStatus::REASON_TASK_UNKNOWN); forward(update, UPID(), framework); } else { @@ -3542,7 +3557,9 @@ void Master::reconcileTasks( task.slave_id(), task.task_id(), TASK_STAGING, - "Reconciliation: Latest task state"); + TaskStatus::SOURCE_MASTER, + "Reconciliation: Latest task state", + TaskStatus::REASON_RECONCILIATION); VLOG(1) << "Sending implicit reconciliation state " << update.status().state() @@ -3566,7 +3583,9 @@ void Master::reconcileTasks( task->slave_id(), task->task_id(), state, - "Reconciliation: Latest task state"); + TaskStatus::SOURCE_MASTER, + "Reconciliation: Latest task state", + TaskStatus::REASON_RECONCILIATION); VLOG(1) << "Sending implicit reconciliation state " << update.status().state() @@ -3617,7 +3636,9 @@ void Master::reconcileTasks( task_.slave_id(), task_.task_id(), TASK_STAGING, - "Reconciliation: Latest task state"); + TaskStatus::SOURCE_MASTER, + "Reconciliation: Latest task state", + TaskStatus::REASON_RECONCILIATION); } else if (task != NULL) { // (2) Task is known: send the latest status update state. const TaskState& state = task->has_status_update_state() @@ -3629,7 +3650,9 @@ void Master::reconcileTasks( task->slave_id(), task->task_id(), state, - "Reconciliation: Latest task state"); + TaskStatus::SOURCE_MASTER, + "Reconciliation: Latest task state", + TaskStatus::REASON_RECONCILIATION); } else if (slaveId.isSome() && slaves.registered.contains(slaveId.get())) { // (3) Task is unknown, slave is registered: TASK_LOST. update = protobuf::createStatusUpdate( @@ -3637,7 +3660,9 @@ void Master::reconcileTasks( slaveId.get(), status.task_id(), TASK_LOST, - "Reconciliation: Task is unknown to the slave"); + TaskStatus::SOURCE_MASTER, + "Reconciliation: Task is unknown to the slave", + TaskStatus::REASON_RECONCILIATION); } else if (slaves.transitioning(slaveId)) { // (4) Task is unknown, slave is transitionary: no-op. LOG(INFO) << "Ignoring reconciliation request of task " @@ -3650,7 +3675,9 @@ void Master::reconcileTasks( slaveId, status.task_id(), TASK_LOST, - "Reconciliation: Task is unknown"); + TaskStatus::SOURCE_MASTER, + "Reconciliation: Task is unknown", + TaskStatus::REASON_RECONCILIATION); } if (update.isSome()) { @@ -4010,7 +4037,9 @@ void Master::reconcile( status->mutable_task_id()->CopyFrom(task->task_id()); status->mutable_slave_id()->CopyFrom(slave->id); status->set_state(state); + status->set_source(TaskStatus::SOURCE_MASTER); status->set_message("Reconciliation request"); + status->set_reason(TaskStatus::REASON_RECONCILIATION); status->set_timestamp(Clock::now().secs()); } else { // TODO(bmahler): Remove this case in 0.22.0. @@ -4019,7 +4048,9 @@ void Master::reconcile( slave->id, task->task_id(), TASK_LOST, - "Task is unknown to the slave"); + TaskStatus::SOURCE_MASTER, + "Task is unknown to the slave", + TaskStatus::REASON_TASK_UNKNOWN); updateTask(task, update); removeTask(task); @@ -4277,7 +4308,9 @@ void Master::removeFramework(Framework* framework) task->slave_id(), task->task_id(), TASK_KILLED, + TaskStatus::SOURCE_MASTER, "Framework " + framework->id.value() + " removed", + TaskStatus::REASON_FRAMEWORK_REMOVED, (task->has_executor_id() ? Option<ExecutorID>(task->executor_id()) : None())); @@ -4369,7 +4402,9 @@ void Master::removeFramework(Slave* slave, Framework* framework) task->slave_id(), task->task_id(), TASK_LOST, + TaskStatus::SOURCE_MASTER, "Slave " + slave->info.hostname() + " disconnected", + TaskStatus::REASON_SLAVE_DISCONNECTED, (task->has_executor_id() ? Option<ExecutorID>(task->executor_id()) : None())); @@ -4486,7 +4521,9 @@ void Master::removeSlave(Slave* slave) task->slave_id(), task->task_id(), TASK_LOST, + TaskStatus::SOURCE_MASTER, "Slave " + slave->info.hostname() + " removed", + TaskStatus::REASON_SLAVE_REMOVED, (task->has_executor_id() ? Option<ExecutorID>(task->executor_id()) : None())); http://git-wip-us.apache.org/repos/asf/mesos/blob/ca135940/src/sched/sched.cpp ---------------------------------------------------------------------- diff --git a/src/sched/sched.cpp b/src/sched/sched.cpp index d84465c..e5f828d 100644 --- a/src/sched/sched.cpp +++ b/src/sched/sched.cpp @@ -836,6 +836,8 @@ protected: TaskStatus* status = update.mutable_status(); status->mutable_task_id()->MergeFrom(task.task_id()); status->set_state(TASK_LOST); + status->set_source(TaskStatus::SOURCE_MASTER); + status->set_reason(TaskStatus::REASON_MASTER_DISCONNECTED); status->set_message("Master Disconnected"); update.set_timestamp(Clock::now().secs()); update.set_uuid(UUID::random().toBytes()); http://git-wip-us.apache.org/repos/asf/mesos/blob/ca135940/src/slave/slave.cpp ---------------------------------------------------------------------- diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp index b893517..b542491 100644 --- a/src/slave/slave.cpp +++ b/src/slave/slave.cpp @@ -888,7 +888,9 @@ void Slave::reregistered( info.id(), taskId, TASK_LOST, - "Reconciliation: task unknown to the slave"); + TaskStatus::SOURCE_SLAVE, + "Reconciliation: task unknown to the slave", + TaskStatus::REASON_RECONCILIATION); // NOTE: We can't use statusUpdate() here because it drops // updates for unknown frameworks. @@ -1237,8 +1239,10 @@ void Slave::_runTask( info.id(), task.task_id(), TASK_LOST, + TaskStatus::SOURCE_SLAVE, "Could not launch the task because we failed to unschedule directories" - " scheduled for gc"); + " scheduled for gc", + TaskStatus::REASON_GC_ERROR); // TODO(vinod): Ensure that the status update manager reliably // delivers this update. Currently, we don't guarantee this @@ -1300,7 +1304,9 @@ void Slave::_runTask( info.id(), task.task_id(), TASK_LOST, - "Executor terminating/terminated"); + TaskStatus::SOURCE_SLAVE, + "Executor terminating/terminated", + TaskStatus::REASON_EXECUTOR_TERMINATED); statusUpdate(update, UPID()); break; @@ -1416,7 +1422,11 @@ void Slave::killTask( << " before it was launched"; const StatusUpdate& update = protobuf::createStatusUpdate( - frameworkId, info.id(), taskId, TASK_KILLED, + frameworkId, + info.id(), + taskId, + TASK_KILLED, + TaskStatus::SOURCE_SLAVE, "Task killed before it was launched"); statusUpdate(update, UPID()); @@ -1439,7 +1449,13 @@ void Slave::killTask( // We send a TASK_LOST update because this task has never // been launched on this slave. const StatusUpdate& update = protobuf::createStatusUpdate( - frameworkId, info.id(), taskId, TASK_LOST, "Cannot find executor"); + frameworkId, + info.id(), + taskId, + TASK_LOST, + TaskStatus::SOURCE_SLAVE, + "Cannot find executor", + TaskStatus::REASON_EXECUTOR_TERMINATED); statusUpdate(update, UPID()); return; @@ -1456,7 +1472,9 @@ void Slave::killTask( info.id(), taskId, TASK_KILLED, + TaskStatus::SOURCE_SLAVE, "Unregistered executor", + TaskStatus::REASON_EXECUTOR_UNREGISTERED, executor->id); statusUpdate(update, UPID()); @@ -2126,7 +2144,9 @@ void Slave::reregisterExecutor( info.id(), task->task_id(), TASK_LOST, + TaskStatus::SOURCE_SLAVE, "Task launched during slave restart", + TaskStatus::REASON_SLAVE_RESTARTED, executorId); statusUpdate(update, UPID()); @@ -2205,7 +2225,9 @@ void Slave::statusUpdate(const StatusUpdate& update, const UPID& pid) state == RUNNING || state == TERMINATING) << state; - const TaskStatus& status = update.status(); + TaskStatus status = update.status(); + status.set_source(pid == UPID() ? TaskStatus::SOURCE_SLAVE + : TaskStatus::SOURCE_EXECUTOR); Framework* framework = getFramework(update.framework_id()); if (framework == NULL) { @@ -2269,8 +2291,9 @@ void Slave::statusUpdate(const StatusUpdate& update, const UPID& pid) << " (" << executor->pid << ")"; } - stats.tasks[update.status().state()]++; + stats.tasks[status.state()]++; stats.validStatusUpdates++; + metrics.valid_status_updates++; // We set the latest state of the task here so that the slave can @@ -2933,22 +2956,8 @@ void Slave::executorTerminated( // supports it. foreach (Task* task, executor->launchedTasks.values()) { if (!protobuf::isTerminalState(task->state())) { - mesos::TaskState taskState; - if ((termination.isReady() && termination.get().killed()) || - executor->isCommandExecutor()) { - taskState = TASK_FAILED; - } else { - taskState = TASK_LOST; - } - statusUpdate(protobuf::createStatusUpdate( - frameworkId, - info.id(), - task->task_id(), - taskState, - termination.isReady() ? termination.get().message() : - "Abnormal executor termination", - executorId), - UPID()); + sendExecutorTerminatedStatusUpdate( + task->task_id(), termination, frameworkId, executor); } } @@ -2956,22 +2965,8 @@ void Slave::executorTerminated( // TODO(vinod): Use foreachvalue instead once LinkedHashmap // supports it. foreach (const TaskInfo& task, executor->queuedTasks.values()) { - mesos::TaskState taskState; - if ((termination.isReady() && termination.get().killed()) || - executor->isCommandExecutor()) { - taskState = TASK_FAILED; - } else { - taskState = TASK_LOST; - } - statusUpdate(protobuf::createStatusUpdate( - frameworkId, - info.id(), - task.task_id(), - taskState, - termination.isReady() ? termination.get().message() : - "Abnormal executor termination", - executorId), - UPID()); + sendExecutorTerminatedStatusUpdate( + task.task_id(), termination, frameworkId, executor); } } @@ -3709,6 +3704,38 @@ double Slave::_executors_terminating() } +void Slave::sendExecutorTerminatedStatusUpdate( + const TaskID& taskId, + const Future<containerizer::Termination>& termination, + const FrameworkID& frameworkId, + const Executor* executor) +{ + mesos::TaskState taskState = TASK_LOST; + TaskStatus::Reason reason = TaskStatus::REASON_EXECUTOR_TERMINATED; + + if (termination.isReady() && termination.get().killed()) { + taskState = TASK_FAILED; + // TODO(dhamon): MESOS-2035: Add 'reason' to containerizer::Termination. + reason = TaskStatus::REASON_MEMORY_LIMIT; + } else if (executor->isCommandExecutor()) { + taskState = TASK_FAILED; + reason = TaskStatus::REASON_COMMAND_EXECUTOR_FAILED; + } + + statusUpdate(protobuf::createStatusUpdate( + frameworkId, + info.id(), + taskId, + taskState, + TaskStatus::SOURCE_SLAVE, + termination.isReady() ? termination.get().message() : + "Abnormal executor termination", + reason, + executor->id), + UPID()); +} + + Slave::Metrics::Metrics(const Slave& slave) : uptime_secs( "slave/uptime_secs", @@ -4348,7 +4375,7 @@ bool Executor::incompleteTasks() } -bool Executor::isCommandExecutor() +bool Executor::isCommandExecutor() const { return commandExecutor; } http://git-wip-us.apache.org/repos/asf/mesos/blob/ca135940/src/slave/slave.hpp ---------------------------------------------------------------------- diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp index 6c183f8..5b082fc 100644 --- a/src/slave/slave.hpp +++ b/src/slave/slave.hpp @@ -396,6 +396,12 @@ private: double _executors_running(); double _executors_terminating(); + void sendExecutorTerminatedStatusUpdate( + const TaskID& taskId, + const Future<containerizer::Termination>& termination, + const FrameworkID& frameworkId, + const Executor* executor); + const Flags flags; SlaveInfo info; @@ -519,7 +525,7 @@ struct Executor bool incompleteTasks(); // Returns true if this is a command executor. - bool isCommandExecutor(); + bool isCommandExecutor() const; enum State { REGISTERING, // Executor is launched but not (re-)registered yet. http://git-wip-us.apache.org/repos/asf/mesos/blob/ca135940/src/tests/fault_tolerance_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/fault_tolerance_tests.cpp b/src/tests/fault_tolerance_tests.cpp index a18a41a..372c4fd 100644 --- a/src/tests/fault_tolerance_tests.cpp +++ b/src/tests/fault_tolerance_tests.cpp @@ -217,7 +217,11 @@ TEST_F(FaultToleranceTest, PartitionedSlaveStatusUpdates) TaskID taskId; taskId.set_value("task_id"); const StatusUpdate& update = createStatusUpdate( - frameworkId.get(), slaveId, taskId, TASK_RUNNING); + frameworkId.get(), + slaveId, + taskId, + TASK_RUNNING, + TaskStatus::SOURCE_SLAVE); StatusUpdateMessage message; message.mutable_update()->CopyFrom(update); @@ -1279,7 +1283,12 @@ TEST_F(FaultToleranceTest, ForwardStatusUpdateUnknownExecutor) taskId.set_value("task2"); StatusUpdate statusUpdate2 = createStatusUpdate( - frameworkId, offer.slave_id(), taskId, TASK_RUNNING, "Dummy update"); + frameworkId, + offer.slave_id(), + taskId, + TASK_RUNNING, + TaskStatus::SOURCE_SLAVE, + "Dummy update"); process::dispatch(slave.get(), &Slave::statusUpdate, statusUpdate2, UPID()); @@ -1835,7 +1844,8 @@ TEST_F(FaultToleranceTest, SplitBrainMasters) frameworkId.get(), runningStatus.get().slave_id(), runningStatus.get().task_id(), - TASK_LOST)); + TASK_LOST, + TaskStatus::SOURCE_SLAVE)); // Spoof a message from a random master; this should be dropped by // the scheduler driver. Since this is delivered locally, it is http://git-wip-us.apache.org/repos/asf/mesos/blob/ca135940/src/tests/master_authorization_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/master_authorization_tests.cpp b/src/tests/master_authorization_tests.cpp index 652e80d..5ae855e 100644 --- a/src/tests/master_authorization_tests.cpp +++ b/src/tests/master_authorization_tests.cpp @@ -206,6 +206,7 @@ TEST_F(MasterAuthorizationTest, UnauthorizedTask) AWAIT_READY(status); EXPECT_EQ(TASK_LOST, status.get().state()); + EXPECT_EQ(TaskStatus::REASON_TASK_UNAUTHORIZED, status.get().reason()); driver.stop(); driver.join(); http://git-wip-us.apache.org/repos/asf/mesos/blob/ca135940/src/tests/master_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/master_tests.cpp b/src/tests/master_tests.cpp index 2e52574..a6d1a4a 100644 --- a/src/tests/master_tests.cpp +++ b/src/tests/master_tests.cpp @@ -1356,6 +1356,7 @@ TEST_F(MasterTest, LaunchAcrossSlavesTest) AWAIT_READY(status); EXPECT_EQ(TASK_LOST, status.get().state()); + EXPECT_EQ(TaskStatus::REASON_INVALID_OFFERS, status.get().reason()); // The resources of the invalid offers should be recovered. AWAIT_READY(resourcesRecovered); @@ -1435,6 +1436,7 @@ TEST_F(MasterTest, LaunchDuplicateOfferTest) AWAIT_READY(status); EXPECT_EQ(TASK_LOST, status.get().state()); + EXPECT_EQ(TaskStatus::REASON_INVALID_OFFERS, status.get().reason()); // The resources of the invalid offers should be recovered. AWAIT_READY(resourcesRecovered); http://git-wip-us.apache.org/repos/asf/mesos/blob/ca135940/src/tests/resource_offers_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/resource_offers_tests.cpp b/src/tests/resource_offers_tests.cpp index fe66432..ee332c3 100644 --- a/src/tests/resource_offers_tests.cpp +++ b/src/tests/resource_offers_tests.cpp @@ -233,6 +233,7 @@ TEST_F(ResourceOffersTest, TaskUsesNoResources) AWAIT_READY(status); EXPECT_EQ(task.task_id(), status.get().task_id()); EXPECT_EQ(TASK_LOST, status.get().state()); + EXPECT_EQ(TaskStatus::REASON_TASK_INVALID, status.get().reason()); EXPECT_TRUE(status.get().has_message()); EXPECT_EQ("Task uses no resources", status.get().message()); @@ -291,6 +292,7 @@ TEST_F(ResourceOffersTest, TaskUsesInvalidResources) AWAIT_READY(status); EXPECT_EQ(task.task_id(), status.get().task_id()); EXPECT_EQ(TASK_LOST, status.get().state()); + EXPECT_EQ(TaskStatus::REASON_TASK_INVALID, status.get().reason()); EXPECT_TRUE(status.get().has_message()); EXPECT_EQ("Task uses invalid resources: cpus(*):0", status.get().message()); @@ -350,6 +352,7 @@ TEST_F(ResourceOffersTest, TaskUsesMoreResourcesThanOffered) EXPECT_EQ(task.task_id(), status.get().task_id()); EXPECT_EQ(TASK_LOST, status.get().state()); + EXPECT_EQ(TaskStatus::REASON_TASK_INVALID, status.get().reason()); EXPECT_TRUE(status.get().has_message()); EXPECT_TRUE(strings::contains( status.get().message(), "greater than offered")); @@ -514,6 +517,7 @@ TEST_F(ResourceOffersTest, ResourcesGetReofferedAfterTaskInfoError) AWAIT_READY(status); EXPECT_EQ(task.task_id(), status.get().task_id()); EXPECT_EQ(TASK_LOST, status.get().state()); + EXPECT_EQ(TaskStatus::REASON_TASK_INVALID, status.get().reason()); EXPECT_TRUE(status.get().has_message()); EXPECT_EQ("Task uses invalid resources: cpus(*):0", status.get().message()); @@ -680,6 +684,7 @@ TEST_F(MultipleExecutorsTest, ExecutorInfoDiffersOnSameSlave) AWAIT_READY(status); EXPECT_EQ(task2.task_id(), status.get().task_id()); EXPECT_EQ(TASK_LOST, status.get().state()); + EXPECT_EQ(TaskStatus::REASON_TASK_INVALID, status.get().reason()); EXPECT_TRUE(status.get().has_message()); EXPECT_TRUE(strings::contains( status.get().message(), "Task has invalid ExecutorInfo"));
