Repository: mesos Updated Branches: refs/heads/master 82c2a7866 -> 3c96155a4
Added TaskStatus::Reason to containerizer Termination message. Review: https://reviews.apache.org/r/38746 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/3c96155a Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/3c96155a Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/3c96155a Branch: refs/heads/master Commit: 3c96155a4618000a0896bd42f7ca1e2a363b48fd Parents: 82c2a78 Author: Jie Yu <[email protected]> Authored: Thu Sep 24 18:42:34 2015 -0700 Committer: Jie Yu <[email protected]> Committed: Mon Oct 12 17:12:01 2015 -0700 ---------------------------------------------------------------------- include/mesos/containerizer/containerizer.proto | 13 +- include/mesos/mesos.proto | 14 +- include/mesos/slave/isolator.proto | 6 + src/common/protobuf_utils.cpp | 4 +- src/common/protobuf_utils.hpp | 3 +- src/slave/containerizer/docker.cpp | 12 +- .../containerizer/external_containerizer.cpp | 5 - .../containerizer/isolators/cgroups/mem.cpp | 7 +- .../containerizer/isolators/posix/disk.cpp | 10 +- src/slave/containerizer/mesos/containerizer.cpp | 80 ++++---- src/slave/containerizer/mesos/containerizer.hpp | 13 +- src/slave/slave.cpp | 187 +++++++++++++++---- src/slave/slave.hpp | 16 +- src/tests/containerizer.cpp | 1 - .../docker_containerizer_tests.cpp | 11 +- .../containerizer/mesos_containerizer_tests.cpp | 1 - src/tests/oversubscription_tests.cpp | 4 +- src/tests/slave_recovery_tests.cpp | 23 ++- src/tests/slave_tests.cpp | 25 +-- 19 files changed, 288 insertions(+), 147 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/3c96155a/include/mesos/containerizer/containerizer.proto ---------------------------------------------------------------------- diff --git a/include/mesos/containerizer/containerizer.proto b/include/mesos/containerizer/containerizer.proto index f16ccc8..7cf6d2b 100644 --- a/include/mesos/containerizer/containerizer.proto +++ b/include/mesos/containerizer/containerizer.proto @@ -82,15 +82,14 @@ message Usage { * containerizer to the slave. */ message Termination { - // A container may be killed if it exceeds its resources; this will - // be indicated by killed=true and described by the message string. - // TODO(jaybuff): As part of MESOS-2035 we should remove killed and - // replace it with a TaskStatus::Reason. - required bool killed = 1; - required string message = 2; - // Exit status of the process. optional int32 status = 3; + + // The 'state', 'reasons' and 'message' of a status update for + // non-terminal tasks when the executor is terminated. + optional TaskState state = 4; + repeated TaskStatus.Reason reasons = 5; + optional string message = 2; } http://git-wip-us.apache.org/repos/asf/mesos/blob/3c96155a/include/mesos/mesos.proto ---------------------------------------------------------------------- diff --git a/include/mesos/mesos.proto b/include/mesos/mesos.proto index 579e32c..2f774d1 100644 --- a/include/mesos/mesos.proto +++ b/include/mesos/mesos.proto @@ -1097,8 +1097,19 @@ message TaskStatus { // TODO(bmahler): Differentiate between slave removal reasons // (e.g. unhealthy vs. unregistered for maintenance). enum Reason { + // TODO(jieyu): The default value when a caller doesn't check for + // presence is 0 and so ideally the 0 reason is not a valid one. + // Since this is not used anywhere, consider removing this reason. REASON_COMMAND_EXECUTOR_FAILED = 0; - REASON_EXECUTOR_PREEMPTED = 17; + + REASON_CONTAINER_LAUNCH_FAILED = 21; + REASON_CONTAINER_LIMITATION = 19; + REASON_CONTAINER_LIMITATION_DISK = 20; + REASON_CONTAINER_LIMITATION_MEMORY = 8; + REASON_CONTAINER_PREEMPTED = 17; + REASON_CONTAINER_UPDATE_FAILED = 22; + REASON_EXECUTOR_REGISTRATION_TIMEOUT = 23; + REASON_EXECUTOR_REREGISTRATION_TIMEOUT = 24; REASON_EXECUTOR_TERMINATED = 1; REASON_EXECUTOR_UNREGISTERED = 2; REASON_FRAMEWORK_REMOVED = 3; @@ -1106,7 +1117,6 @@ message TaskStatus { REASON_INVALID_FRAMEWORKID = 5; REASON_INVALID_OFFERS = 6; REASON_MASTER_DISCONNECTED = 7; - REASON_MEMORY_LIMIT = 8; REASON_RECONCILIATION = 9; REASON_RESOURCES_UNKNOWN = 18; REASON_SLAVE_DISCONNECTED = 10; http://git-wip-us.apache.org/repos/asf/mesos/blob/3c96155a/include/mesos/slave/isolator.proto ---------------------------------------------------------------------- diff --git a/include/mesos/slave/isolator.proto b/include/mesos/slave/isolator.proto index 9d38a25..0e71a93 100644 --- a/include/mesos/slave/isolator.proto +++ b/include/mesos/slave/isolator.proto @@ -36,6 +36,12 @@ message ContainerLimitation // Description of the limitation. optional string message = 2; + + // The container will be terminated when a resource limitation is + // reached. This field specifies the 'reason' that will be sent in + // the status update for any remaining non-terminal tasks when the + // container is terminated. + optional TaskStatus.Reason reason = 3; } http://git-wip-us.apache.org/repos/asf/mesos/blob/3c96155a/src/common/protobuf_utils.cpp ---------------------------------------------------------------------- diff --git a/src/common/protobuf_utils.cpp b/src/common/protobuf_utils.cpp index c1e8e01..1e795dc 100644 --- a/src/common/protobuf_utils.cpp +++ b/src/common/protobuf_utils.cpp @@ -233,13 +233,15 @@ namespace slave { ContainerLimitation createContainerLimitation( const Resources& resources, - const std::string& message) + const std::string& message, + const TaskStatus::Reason& reason) { ContainerLimitation limitation; foreach (Resource resource, resources) { limitation.add_resources()->CopyFrom(resource); } limitation.set_message(message); + limitation.set_reason(reason); return limitation; } http://git-wip-us.apache.org/repos/asf/mesos/blob/3c96155a/src/common/protobuf_utils.hpp ---------------------------------------------------------------------- diff --git a/src/common/protobuf_utils.hpp b/src/common/protobuf_utils.hpp index 8793851..44a2b1d 100644 --- a/src/common/protobuf_utils.hpp +++ b/src/common/protobuf_utils.hpp @@ -92,7 +92,8 @@ namespace slave { mesos::slave::ContainerLimitation createContainerLimitation( const Resources& resources, - const std::string& message); + const std::string& message, + const TaskStatus::Reason& reason); mesos::slave::ContainerState createContainerState( http://git-wip-us.apache.org/repos/asf/mesos/blob/3c96155a/src/slave/containerizer/docker.cpp ---------------------------------------------------------------------- diff --git a/src/slave/containerizer/docker.cpp b/src/slave/containerizer/docker.cpp index 174448c..7022958 100644 --- a/src/slave/containerizer/docker.cpp +++ b/src/slave/containerizer/docker.cpp @@ -1365,11 +1365,10 @@ void DockerContainerizerProcess::destroy( // This means we failed to launch the container and we're trying to // cleanup. CHECK_PENDING(container->status.future()); - containerizer::Termination termination; - termination.set_killed(killed); - termination.set_message( - "Failed to launch container: " + container->launch.failure()); - container->termination.set(termination); + + // NOTE: The launch error message will be retrieved by the slave + // and properly set in the corresponding status update. + container->termination.set(containerizer::Termination()); containers_.erase(containerId); delete container; @@ -1409,7 +1408,6 @@ void DockerContainerizerProcess::destroy( fetcher->kill(containerId); containerizer::Termination termination; - termination.set_killed(killed); termination.set_message("Container destroyed while fetching"); container->termination.set(termination); @@ -1429,7 +1427,6 @@ void DockerContainerizerProcess::destroy( container->pull.discard(); containerizer::Termination termination; - termination.set_killed(killed); termination.set_message("Container destroyed while pulling image"); container->termination.set(termination); @@ -1548,7 +1545,6 @@ void DockerContainerizerProcess::___destroy( Container* container = containers_[containerId]; containerizer::Termination termination; - termination.set_killed(killed); if (status.isReady() && status.get().isSome()) { termination.set_status(status.get().get()); http://git-wip-us.apache.org/repos/asf/mesos/blob/3c96155a/src/slave/containerizer/external_containerizer.cpp ---------------------------------------------------------------------- diff --git a/src/slave/containerizer/external_containerizer.cpp b/src/slave/containerizer/external_containerizer.cpp index 2116492..30f7e8e 100644 --- a/src/slave/containerizer/external_containerizer.cpp +++ b/src/slave/containerizer/external_containerizer.cpp @@ -676,11 +676,6 @@ void ExternalContainerizerProcess::__wait( if (status.isSome()) { VLOG(2) << "Wait got destroyed on '" << containerId << "'"; containerizer::Termination termination; - // 'killed' must only be true when a resource limitation - // had to be enforced through terminating a task. - // TODO(tillt): Consider renaming 'killed' towards 'limited'. - termination.set_killed(false); - termination.set_message(""); termination.set_status(status.get()); actives[containerId]->termination.set(termination); cleanup(containerId); http://git-wip-us.apache.org/repos/asf/mesos/blob/3c96155a/src/slave/containerizer/isolators/cgroups/mem.cpp ---------------------------------------------------------------------- diff --git a/src/slave/containerizer/isolators/cgroups/mem.cpp b/src/slave/containerizer/isolators/cgroups/mem.cpp index 89c86be..6f49e5a 100644 --- a/src/slave/containerizer/isolators/cgroups/mem.cpp +++ b/src/slave/containerizer/isolators/cgroups/mem.cpp @@ -694,8 +694,11 @@ void CgroupsMemIsolatorProcess::oom(const ContainerID& containerId) stringify(usage.isSome() ? usage.get().megabytes() : 0), "*").get(); - info->limitation.set(protobuf::slave::createContainerLimitation( - mem, message.str())); + info->limitation.set( + protobuf::slave::createContainerLimitation( + mem, + message.str(), + TaskStatus::REASON_CONTAINER_LIMITATION_MEMORY)); } http://git-wip-us.apache.org/repos/asf/mesos/blob/3c96155a/src/slave/containerizer/isolators/posix/disk.cpp ---------------------------------------------------------------------- diff --git a/src/slave/containerizer/isolators/posix/disk.cpp b/src/slave/containerizer/isolators/posix/disk.cpp index c324c79..73e62a2 100644 --- a/src/slave/containerizer/isolators/posix/disk.cpp +++ b/src/slave/containerizer/isolators/posix/disk.cpp @@ -247,10 +247,12 @@ void PosixDiskIsolatorProcess::_collect( CHECK_SOME(quota); if (future.get() > quota.get()) { - info->limitation.set(protobuf::slave::createContainerLimitation( - Resources(info->paths[path].quota), - "Disk usage (" + stringify(future.get()) + - ") exceeds quota (" + stringify(quota.get()) + ")")); + info->limitation.set( + protobuf::slave::createContainerLimitation( + Resources(info->paths[path].quota), + "Disk usage (" + stringify(future.get()) + + ") exceeds quota (" + stringify(quota.get()) + ")", + TaskStatus::REASON_CONTAINER_LIMITATION_DISK)); } } } http://git-wip-us.apache.org/repos/asf/mesos/blob/3c96155a/src/slave/containerizer/mesos/containerizer.cpp ---------------------------------------------------------------------- diff --git a/src/slave/containerizer/mesos/containerizer.cpp b/src/slave/containerizer/mesos/containerizer.cpp index b904b2d..d1fc5a4 100644 --- a/src/slave/containerizer/mesos/containerizer.cpp +++ b/src/slave/containerizer/mesos/containerizer.cpp @@ -369,8 +369,7 @@ void MesosContainerizer::destroy(const ContainerID& containerId) dispatch( process.get(), &MesosContainerizerProcess::destroy, - containerId, - true); + containerId); } @@ -1079,8 +1078,7 @@ Future<ResourceStatistics> MesosContainerizerProcess::usage( void MesosContainerizerProcess::destroy( - const ContainerID& containerId, - bool killed) + const ContainerID& containerId) { if (!containers_.contains(containerId)) { LOG(WARNING) << "Ignoring destroy of unknown container: " << containerId; @@ -1112,8 +1110,7 @@ void MesosContainerizerProcess::destroy( &Self::___destroy, containerId, status, - "Container destroyed while preparing isolators", - killed)); + "Container destroyed while preparing isolators")); return; } @@ -1131,30 +1128,28 @@ void MesosContainerizerProcess::destroy( // Wait for the isolators to finish isolating before we start // to destroy the container. container->isolation - .onAny(defer(self(), &Self::_destroy, containerId, killed)); + .onAny(defer(self(), &Self::_destroy, containerId)); return; } container->state = DESTROYING; - _destroy(containerId, killed); + _destroy(containerId); } void MesosContainerizerProcess::_destroy( - const ContainerID& containerId, - bool killed) + const ContainerID& containerId) { // Kill all processes then continue destruction. launcher->destroy(containerId) - .onAny(defer(self(), &Self::__destroy, containerId, lambda::_1, killed)); + .onAny(defer(self(), &Self::__destroy, containerId, lambda::_1)); } void MesosContainerizerProcess::__destroy( const ContainerID& containerId, - const Future<Nothing>& future, - bool killed) + const Future<Nothing>& future) { CHECK(containers_.contains(containerId)); @@ -1185,16 +1180,14 @@ void MesosContainerizerProcess::__destroy( &Self::___destroy, containerId, lambda::_1, - None(), - killed)); + None())); } void MesosContainerizerProcess::___destroy( const ContainerID& containerId, const Future<Option<int>>& status, - const Option<string>& message, - bool killed) + const Option<string>& message) { cleanupIsolators(containerId) .onAny(defer(self(), @@ -1202,8 +1195,7 @@ void MesosContainerizerProcess::___destroy( containerId, status, lambda::_1, - message, - killed)); + message)); } @@ -1211,8 +1203,7 @@ void MesosContainerizerProcess::____destroy( const ContainerID& containerId, const Future<Option<int>>& status, const Future<list<Future<Nothing>>>& cleanups, - Option<string> message, - bool killed) + Option<string> message) { // This should not occur because we only use the Future<list> to // facilitate chaining. @@ -1239,36 +1230,39 @@ void MesosContainerizerProcess::____destroy( } } - // If any isolator limited the container then we mark it to killed. - // Note: We may not see a limitation in time for it to be + containerizer::Termination termination; + + if (status.isReady() && status->isSome()) { + termination.set_status(status->get()); + } + + // NOTE: We may not see a limitation in time for it to be // registered. This could occur if the limitation (e.g., an OOM) // killed the executor and we triggered destroy() off the executor // exit. - if (!killed && container->limitations.size() > 0) { - string message_; - foreach (const ContainerLimitation& limitation, container->limitations) { - message_ += limitation.message(); - } - message = strings::trim(message_); - } else if (!killed && message.isNone()) { - message = "Executor terminated"; - } + if (!container->limitations.empty()) { + termination.set_state(TaskState::TASK_FAILED); - containerizer::Termination termination; + // We concatenate the messages if there are multiple limitations. + vector<string> messages; + + foreach (const ContainerLimitation& limitation, container->limitations) { + messages.push_back(limitation.message()); - // Killed means that the container was either asked to be destroyed - // by the slave or was destroyed because an isolator limited the - // container. - termination.set_killed(killed); + if (limitation.has_reason()) { + termination.add_reasons(limitation.reason()); + } + } - if (message.isSome()) { - termination.set_message(message.get()); + message = strings::join("; ", messages); } - if (status.isReady() && status.get().isSome()) { - termination.set_status(status.get().get()); + if (message.isNone()) { + message = "Executor terminated"; } + termination.set_message(message.get()); + container->promise.set(termination); containers_.erase(containerId); @@ -1284,7 +1278,7 @@ void MesosContainerizerProcess::reaped(const ContainerID& containerId) LOG(INFO) << "Executor for container '" << containerId << "' has exited"; // The executor has exited so destroy the container. - destroy(containerId, false); + destroy(containerId); } @@ -1312,7 +1306,7 @@ void MesosContainerizerProcess::limited( } // The container has been affected by the limitation so destroy it. - destroy(containerId, true); + destroy(containerId); } http://git-wip-us.apache.org/repos/asf/mesos/blob/3c96155a/src/slave/containerizer/mesos/containerizer.hpp ---------------------------------------------------------------------- diff --git a/src/slave/containerizer/mesos/containerizer.hpp b/src/slave/containerizer/mesos/containerizer.hpp index 4c14192..4aad8a3 100644 --- a/src/slave/containerizer/mesos/containerizer.hpp +++ b/src/slave/containerizer/mesos/containerizer.hpp @@ -158,7 +158,7 @@ public: const ContainerID& containerId, int pipeWrite); - virtual void destroy(const ContainerID& containerId, bool killed); + virtual void destroy(const ContainerID& containerId); virtual process::Future<hashset<ContainerID>> containers(); @@ -204,20 +204,18 @@ private: pid_t _pid); // Continues 'destroy()' once isolators has completed. - void _destroy(const ContainerID& containerId, bool killed); + void _destroy(const ContainerID& containerId); // Continues '_destroy()' once all processes have been killed by the launcher. void __destroy( const ContainerID& containerId, - const process::Future<Nothing>& future, - bool killed); + const process::Future<Nothing>& future); // Continues '__destroy()' once we get the exit status of the executor. void ___destroy( const ContainerID& containerId, const process::Future<Option<int>>& status, - const Option<std::string>& message, - bool killed); + const Option<std::string>& message); // Continues '___destroy()' once all isolators have completed // cleanup. @@ -225,8 +223,7 @@ private: const ContainerID& containerId, const process::Future<Option<int>>& status, const process::Future<std::list<process::Future<Nothing>>>& cleanups, - Option<std::string> message, - bool killed); + Option<std::string> message); // Call back for when an isolator limits a container and impacts the // processes. This will trigger container destruction. http://git-wip-us.apache.org/repos/asf/mesos/blob/3c96155a/src/slave/slave.cpp ---------------------------------------------------------------------- diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp index 01c5e42..6b25b49 100644 --- a/src/slave/slave.cpp +++ b/src/slave/slave.cpp @@ -1685,6 +1685,21 @@ void Slave::runTasks( << (future.isFailed() ? future.failure() : "discarded"); containerizer->destroy(containerId); + + Executor* executor = getExecutor(frameworkId, executorId); + if (executor != NULL) { + containerizer::Termination termination; + termination.set_state(TASK_LOST); + termination.add_reasons(TaskStatus::REASON_CONTAINER_UPDATE_FAILED); + termination.set_message( + "Failed to update resources for container: " + + (future.isFailed() ? future.failure() : "discarded")); + + executor->pendingTermination = termination; + + // TODO(jieyu): Set executor->state to be TERMINATING. + } + return; } @@ -2662,6 +2677,20 @@ void Slave::_reregisterExecutor( << (future.isFailed() ? future.failure() : "discarded"); containerizer->destroy(containerId); + + Executor* executor = getExecutor(frameworkId, executorId); + if (executor != NULL) { + containerizer::Termination termination; + termination.set_state(TASK_LOST); + termination.add_reasons(TaskStatus::REASON_CONTAINER_UPDATE_FAILED); + termination.set_message( + "Failed to update resources for container: " + + (future.isFailed() ? future.failure() : "discarded")); + + executor->pendingTermination = termination; + + // TODO(jieyu): Set executor->state to be TERMINATING. + } } } @@ -2683,7 +2712,7 @@ void Slave::reregisterExecutorTimeout() case Executor::TERMINATING: case Executor::TERMINATED: break; - case Executor::REGISTERING: + case Executor::REGISTERING: { // If we are here, the executor must have been hung and not // exited! This is because if the executor properly exited, // it should have already been identified by the isolator @@ -2691,10 +2720,21 @@ void Slave::reregisterExecutorTimeout() LOG(INFO) << "Killing un-reregistered executor '" << executor->id << "' of framework " << framework->id(); + containerizer->destroy(executor->containerId); + executor->state = Executor::TERMINATING; - containerizer->destroy(executor->containerId); + containerizer::Termination termination; + termination.set_state(TASK_LOST); + termination.add_reasons( + TaskStatus::REASON_EXECUTOR_REREGISTRATION_TIMEOUT); + termination.set_message( + "Executor did not re-register within " + + stringify(EXECUTOR_REREGISTER_TIMEOUT)); + + executor->pendingTermination = termination; break; + } default: LOG(FATAL) << "Executor '" << executor->id << "' of framework " << framework->id() @@ -2914,15 +2954,28 @@ void Slave::_statusUpdate( const ContainerID& containerId, bool checkpoint) { - if (future.isSome() && !future.get().isReady()) { + if (future.isSome() && !future->isReady()) { LOG(ERROR) << "Failed to update resources for container " << containerId << " of executor " << executorId << " running task " << update.status().task_id() << " on status update for terminal task, destroying container: " - << (future.get().isFailed() ? future.get().failure() - : "discarded"); + << (future->isFailed() ? future->failure() : "discarded"); containerizer->destroy(containerId); + + Executor* executor = getExecutor(update.framework_id(), executorId); + if (executor != NULL) { + containerizer::Termination termination; + termination.set_state(TASK_LOST); + termination.add_reasons(TaskStatus::REASON_CONTAINER_UPDATE_FAILED); + termination.set_message( + "Failed to update resources for container: " + + (future->isFailed() ? future->failure() : "discarded")); + + executor->pendingTermination = termination; + + // TODO(jieyu): Set executor->state to be TERMINATING. + } } if (checkpoint) { @@ -3208,6 +3261,19 @@ Framework* Slave::getFramework(const FrameworkID& frameworkId) } +Executor* Slave::getExecutor( + const FrameworkID& frameworkId, + const ExecutorID& executorId) +{ + Framework* framework = getFramework(frameworkId); + if (framework != NULL) { + return framework->getExecutor(executorId); + } + + return NULL; +} + + ExecutorInfo Slave::getExecutorInfo( const FrameworkID& frameworkId, const TaskInfo& task) @@ -3355,6 +3421,21 @@ void Slave::executorLaunched( ++metrics.container_launch_errors; containerizer->destroy(containerId); + + Executor* executor = getExecutor(frameworkId, executorId); + if (executor != NULL) { + containerizer::Termination termination; + termination.set_state(TASK_FAILED); + termination.add_reasons(TaskStatus::REASON_CONTAINER_LAUNCH_FAILED); + termination.set_message( + "Failed to launch container: " + + (future.isFailed() ? future.failure() : "discarded")); + + executor->pendingTermination = termination; + + // TODO(jieyu): Set executor->state to be TERMINATING. + } + return; } else if (!future.get()) { LOG(ERROR) << "Container '" << containerId @@ -3364,7 +3445,7 @@ void Slave::executorLaunched( << flags.containerizers << ") could create a container for the " << "provided TaskInfo/ExecutorInfo message."; - ++metrics.container_launch_errors; + ++metrics.container_launch_errors; return; } @@ -3885,17 +3966,27 @@ void Slave::registerExecutorTimeout( case Executor::TERMINATED: // Ignore the registration timeout. break; - case Executor::REGISTERING: + case Executor::REGISTERING: { LOG(INFO) << "Terminating executor " << executor->id << " of framework " << framework->id() << " because it did not register within " << flags.executor_registration_timeout; + // Immediately kill the executor. + containerizer->destroy(containerId); + executor->state = Executor::TERMINATING; - // Immediately kill the executor. - containerizer->destroy(executor->containerId); + containerizer::Termination termination; + termination.set_state(TASK_FAILED); + termination.add_reasons(TaskStatus::REASON_EXECUTOR_REGISTRATION_TIMEOUT); + termination.set_message( + "Executor did not register within " + + stringify(flags.executor_registration_timeout)); + + executor->pendingTermination = termination; break; + } default: LOG(FATAL) << "Executor '" << executor->id << "' of framework " << framework->id() @@ -4430,6 +4521,8 @@ void Slave::_qosCorrections(const Future<list<QoSCorrection>>& future) << "' of framework " << frameworkId << " as QoS correction"; + containerizer->destroy(containerId); + // TODO(nnielsen): We should ensure that we are addressing // the _container_ which the QoS controller intended to // kill. Without this check, we may run into a scenario @@ -4438,8 +4531,13 @@ void Slave::_qosCorrections(const Future<list<QoSCorrection>>& future) // container than the one the QoS controller targeted // (MESOS-2875). executor->state = Executor::TERMINATING; - executor->reason = TaskStatus::REASON_EXECUTOR_PREEMPTED; - containerizer->destroy(containerId); + + containerizer::Termination termination; + termination.set_state(TASK_LOST); + termination.add_reasons(TaskStatus::REASON_CONTAINER_PREEMPTED); + termination.set_message("Container preempted by QoS correction"); + + executor->pendingTermination = termination; ++metrics.executors_preempted; break; @@ -4633,38 +4731,61 @@ void Slave::sendExecutorTerminatedStatusUpdate( const FrameworkID& frameworkId, const Executor* executor) { - mesos::TaskState taskState = TASK_LOST; - TaskStatus::Reason reason = TaskStatus::REASON_EXECUTOR_TERMINATED; - CHECK_NOTNULL(executor); - if (executor->reason.isSome()) { - // TODO(nnielsen): We want to dispatch the task status and reason - // from the termination reason (MESOS-2035) and the executor - // reason based on a specific policy i.e. if the termination - // reason is set, this overrides executor->reason. At the moment, - // we infer the containerizer reason for killing from 'killed' - // field in 'termination' and are explicitly overriding the task - // status and reason. - reason = executor->reason.get(); - } else if (termination.isReady() && termination.get().killed()) { - taskState = TASK_FAILED; - // TODO(dhamon): MESOS-2035: Add 'reason' to containerizer::Termination. - reason = TaskStatus::REASON_MEMORY_LIMIT; - } else if (executor->isCommandExecutor()) { - taskState = TASK_FAILED; - reason = TaskStatus::REASON_COMMAND_EXECUTOR_FAILED; + mesos::TaskState state; + TaskStatus::Reason reason; + string message; + + // Determine the task state for the status update. + if (termination.isReady() && termination->has_state()) { + state = termination->state(); + } else if (executor->pendingTermination.isSome() && + executor->pendingTermination->has_state()) { + state = executor->pendingTermination->state(); + } else { + state = TASK_FAILED; + } + + // Determine the task reason for the status update. + // TODO(jieyu): Handle multiple reasons (MESOS-2657). + if (termination.isReady() && termination->reasons().size() > 0) { + reason = termination->reasons(0); + } else if (executor->pendingTermination.isSome() && + executor->pendingTermination->reasons().size() > 0) { + reason = executor->pendingTermination->reasons(0); + } else { + reason = TaskStatus::REASON_EXECUTOR_TERMINATED; + } + + // Determine the message for the status update. + vector<string> messages; + + if (executor->pendingTermination.isSome() && + executor->pendingTermination->has_message()) { + messages.push_back(executor->pendingTermination->message()); + } + + if (!termination.isReady()) { + messages.push_back("Abnormal executor termination"); + } else if (termination->has_message()) { + messages.push_back(termination->message()); + } + + if (messages.empty()) { + message = "Executor terminated"; + } else { + message = strings::join("; ", messages); } statusUpdate(protobuf::createStatusUpdate( frameworkId, info.id(), taskId, - taskState, + state, TaskStatus::SOURCE_SLAVE, UUID::random(), - termination.isReady() ? termination.get().message() - : "Abnormal executor termination", + message, reason, executor->id), UPID()); http://git-wip-us.apache.org/repos/asf/mesos/blob/3c96155a/src/slave/slave.hpp ---------------------------------------------------------------------- diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp index 18be4f8..d6e417b 100644 --- a/src/slave/slave.hpp +++ b/src/slave/slave.hpp @@ -300,9 +300,13 @@ public: void authenticate(); - // Helper routine to lookup a framework. + // Helper routines to lookup a framework/executor. Framework* getFramework(const FrameworkID& frameworkId); + Executor* getExecutor( + const FrameworkID& frameworkId, + const ExecutorID& executorId); + // Returns an ExecutorInfo for a TaskInfo (possibly // constructing one if the task has a CommandInfo). ExecutorInfo getExecutorInfo( @@ -623,10 +627,12 @@ struct Executor // attempts to do some memset's which are unsafe). boost::circular_buffer<std::shared_ptr<Task>> completedTasks; - // The 'reason' is for the slave to encode the reason behind a - // terminal status update for those pending/unterminated tasks when - // the executor is terminated. - Option<TaskStatus::Reason> reason; + // When the slave initiates a destroy of the container, we expect a + // termination to occur. The 'pendingTermation' indicates why the + // slave initiated the destruction and will influence the + // information sent in the status updates for any remaining + // non-terminal tasks. + Option<containerizer::Termination> pendingTermination; private: Executor(const Executor&); // No copying. http://git-wip-us.apache.org/repos/asf/mesos/blob/3c96155a/src/tests/containerizer.cpp ---------------------------------------------------------------------- diff --git a/src/tests/containerizer.cpp b/src/tests/containerizer.cpp index 1f74315..451c7be 100644 --- a/src/tests/containerizer.cpp +++ b/src/tests/containerizer.cpp @@ -238,7 +238,6 @@ void TestContainerizer::destroy(const ContainerID& containerId) if (promises.contains(containerId)) { containerizer::Termination termination; - termination.set_killed(false); termination.set_message("Killed executor"); termination.set_status(0); http://git-wip-us.apache.org/repos/asf/mesos/blob/3c96155a/src/tests/containerizer/docker_containerizer_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/containerizer/docker_containerizer_tests.cpp b/src/tests/containerizer/docker_containerizer_tests.cpp index 8771ef6..4bb65af 100644 --- a/src/tests/containerizer/docker_containerizer_tests.cpp +++ b/src/tests/containerizer/docker_containerizer_tests.cpp @@ -2402,9 +2402,9 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_ExecutorCleanupWhenLaunchFailed) task.mutable_command()->CopyFrom(command); task.mutable_container()->CopyFrom(containerInfo); - Future<TaskStatus> statusFailed; + Future<TaskStatus> statusLost; EXPECT_CALL(sched, statusUpdate(&driver, _)) - .WillOnce(FutureArg<1>(&statusFailed)); + .WillOnce(FutureArg<1>(&statusLost)); Future<ContainerID> containerId; EXPECT_CALL(dockerContainerizer, launch(_, _, _, _, _, _, _, _)) @@ -2420,9 +2420,10 @@ TEST_F(DockerContainerizerTest, ROOT_DOCKER_ExecutorCleanupWhenLaunchFailed) AWAIT_READY_FOR(containerId, Seconds(60)); - AWAIT_READY(statusFailed); - - EXPECT_EQ(TASK_FAILED, statusFailed.get().state()); + AWAIT_READY(statusLost); + EXPECT_EQ(TASK_LOST, statusLost.get().state()); + EXPECT_EQ(TaskStatus::REASON_CONTAINER_UPDATE_FAILED, + statusLost.get().reason()); driver.stop(); driver.join(); http://git-wip-us.apache.org/repos/asf/mesos/blob/3c96155a/src/tests/containerizer/mesos_containerizer_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/containerizer/mesos_containerizer_tests.cpp b/src/tests/containerizer/mesos_containerizer_tests.cpp index 873acd3..b48133c 100644 --- a/src/tests/containerizer/mesos_containerizer_tests.cpp +++ b/src/tests/containerizer/mesos_containerizer_tests.cpp @@ -669,7 +669,6 @@ TEST_F(MesosContainerizerDestroyTest, DestroyWhilePreparing) "Container destroyed while preparing isolators", termination.message()); - EXPECT_TRUE(termination.killed()); EXPECT_FALSE(termination.has_status()); } http://git-wip-us.apache.org/repos/asf/mesos/blob/3c96155a/src/tests/oversubscription_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/oversubscription_tests.cpp b/src/tests/oversubscription_tests.cpp index 2cf047e..0d0bf7e 100644 --- a/src/tests/oversubscription_tests.cpp +++ b/src/tests/oversubscription_tests.cpp @@ -872,8 +872,8 @@ TEST_F(OversubscriptionTest, QoSCorrectionKill) // Verify task status is TASK_LOST. AWAIT_READY(status2); - ASSERT_EQ(TASK_LOST, status2.get().state()); - ASSERT_EQ(TaskStatus::REASON_EXECUTOR_PREEMPTED, status2.get().reason()); + ASSERT_EQ(TASK_LOST, status2->state()); + ASSERT_EQ(TaskStatus::REASON_CONTAINER_PREEMPTED, status2->reason()); // Verify that slave incremented counter for preempted executors. snapshot = Metrics(); http://git-wip-us.apache.org/repos/asf/mesos/blob/3c96155a/src/tests/slave_recovery_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/slave_recovery_tests.cpp b/src/tests/slave_recovery_tests.cpp index fd285fb..a50960c 100644 --- a/src/tests/slave_recovery_tests.cpp +++ b/src/tests/slave_recovery_tests.cpp @@ -506,7 +506,7 @@ TYPED_TEST(SlaveRecoveryTest, ReconnectExecutor) // The slave is stopped before the (command) executor is registered. // When it comes back up with recovery=reconnect, make sure the -// executor is killed and the task is transitioned to FAILED. +// executor is killed and the task is transitioned to LOST. TYPED_TEST(SlaveRecoveryTest, RecoverUnregisteredExecutor) { Try<PID<Master> > master = this->StartMaster(); @@ -591,9 +591,12 @@ TYPED_TEST(SlaveRecoveryTest, RecoverUnregisteredExecutor) Clock::settle(); } - // Scheduler should receive the TASK_FAILED update. + // Scheduler should receive the TASK_LOST update. AWAIT_READY(status); - ASSERT_EQ(TASK_FAILED, status.get().state()); + ASSERT_EQ(TASK_LOST, status->state()); + EXPECT_EQ(TaskStatus::SOURCE_SLAVE, status->source()); + EXPECT_EQ(TaskStatus::REASON_EXECUTOR_REREGISTRATION_TIMEOUT, + status->reason()); // Master should subsequently reoffer the same resources. while (offers2.isPending()) { @@ -616,7 +619,7 @@ TYPED_TEST(SlaveRecoveryTest, RecoverUnregisteredExecutor) // The slave is stopped after a non-terminal update is received. // The command executor terminates when the slave is down. // When it comes back up with recovery=reconnect, make -// sure the task is properly transitioned to FAILED. +// sure the task is properly transitioned to LOST. TYPED_TEST(SlaveRecoveryTest, RecoverTerminatedExecutor) { Try<PID<Master> > master = this->StartMaster(); @@ -710,9 +713,12 @@ TYPED_TEST(SlaveRecoveryTest, RecoverTerminatedExecutor) Clock::settle(); } - // Scheduler should receive the TASK_FAILED update. + // Scheduler should receive the TASK_LOST update. AWAIT_READY(status); - ASSERT_EQ(TASK_FAILED, status.get().state()); + ASSERT_EQ(TASK_LOST, status->state()); + EXPECT_EQ(TaskStatus::SOURCE_SLAVE, status->source()); + EXPECT_EQ(TaskStatus::REASON_EXECUTOR_REREGISTRATION_TIMEOUT, + status->reason()); while (offers2.isPending()) { Clock::advance(Seconds(1)); @@ -3169,7 +3175,10 @@ TYPED_TEST(SlaveRecoveryTest, RestartBeforeContainerizerLaunch) // Scheduler should receive the TASK_FAILED update. AWAIT_READY(status); - ASSERT_EQ(TASK_FAILED, status.get().state()); + ASSERT_EQ(TASK_FAILED, status->state()); + EXPECT_EQ(TaskStatus::SOURCE_SLAVE, status->source()); + EXPECT_EQ(TaskStatus::REASON_EXECUTOR_TERMINATED, + status->reason()); driver.stop(); driver.join(); http://git-wip-us.apache.org/repos/asf/mesos/blob/3c96155a/src/tests/slave_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp index dccdbb0..10a4fa7 100644 --- a/src/tests/slave_tests.cpp +++ b/src/tests/slave_tests.cpp @@ -229,8 +229,10 @@ TEST_F(SlaveTest, ShutdownUnregisteredExecutor) } AWAIT_READY(status); - ASSERT_EQ(TASK_FAILED, status.get().state()); - EXPECT_EQ(TaskStatus::SOURCE_SLAVE, status.get().source()); + ASSERT_EQ(TASK_FAILED, status->state()); + EXPECT_EQ(TaskStatus::SOURCE_SLAVE, status->source()); + EXPECT_EQ(TaskStatus::REASON_EXECUTOR_REGISTRATION_TIMEOUT, + status->reason()); Clock::resume(); @@ -298,9 +300,9 @@ TEST_F(SlaveTest, RemoveUnregisteredTerminatedExecutor) containerizer.destroy(offers.get()[0].framework_id(), DEFAULT_EXECUTOR_ID); AWAIT_READY(status); - EXPECT_EQ(TASK_LOST, status.get().state()); - EXPECT_EQ(TaskStatus::SOURCE_SLAVE, status.get().source()); - EXPECT_EQ(TaskStatus::REASON_EXECUTOR_TERMINATED, status.get().reason()); + EXPECT_EQ(TASK_FAILED, status->state()); + EXPECT_EQ(TaskStatus::SOURCE_SLAVE, status->source()); + EXPECT_EQ(TaskStatus::REASON_EXECUTOR_TERMINATED, status->reason()); // We use 'gc.schedule' as a signal for the executor being cleaned // up by the slave. @@ -427,7 +429,6 @@ TEST_F(SlaveTest, CommandExecutorWithOverride) AWAIT_READY(wait); containerizer::Termination termination; - termination.set_killed(false); termination.set_message("Killed executor"); termination.set_status(0); promise.set(termination); @@ -1371,9 +1372,9 @@ TEST_F(SlaveTest, TerminalTaskContainerizerUpdateFails) EXPECT_EQ(TaskStatus::SOURCE_EXECUTOR, status3.get().source()); AWAIT_READY(status4); - EXPECT_EQ(TASK_LOST, status4.get().state()); - EXPECT_EQ(TaskStatus::SOURCE_SLAVE, status4.get().source()); - EXPECT_EQ(TaskStatus::REASON_EXECUTOR_TERMINATED, status4.get().reason()); + EXPECT_EQ(TASK_LOST, status4->state()); + EXPECT_EQ(TaskStatus::SOURCE_SLAVE, status4->source()); + EXPECT_EQ(TaskStatus::REASON_CONTAINER_UPDATE_FAILED, status4->reason()); driver.stop(); driver.join(); @@ -1482,9 +1483,9 @@ TEST_F(SlaveTest, TaskLaunchContainerizerUpdateFails) driver.start(); AWAIT_READY(status); - EXPECT_EQ(TASK_LOST, status.get().state()); - EXPECT_EQ(TaskStatus::SOURCE_SLAVE, status.get().source()); - EXPECT_EQ(TaskStatus::REASON_EXECUTOR_TERMINATED, status.get().reason()); + EXPECT_EQ(TASK_LOST, status->state()); + EXPECT_EQ(TaskStatus::SOURCE_SLAVE, status->source()); + EXPECT_EQ(TaskStatus::REASON_CONTAINER_UPDATE_FAILED, status->reason()); driver.stop(); driver.join();
