Stopped shutting down the whole default executor on task launch failure. The default executor would be completely shutdown on a `LAUNCH_NESTED_CONTAINER` failure.
This patch makes it kill the affected task group instead of shutting down and killing all task groups. Review: https://reviews.apache.org/r/65551/ Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/c3f3542e Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/c3f3542e Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/c3f3542e Branch: refs/heads/master Commit: c3f3542e7ecce82cad8b75fdc2db14fe8c43a5da Parents: 5c8852b Author: Gaston Kleiman <gas...@mesosphere.io> Authored: Wed Feb 14 14:35:11 2018 +0800 Committer: Qian Zhang <zhq527...@gmail.com> Committed: Wed Feb 14 20:37:31 2018 +0800 ---------------------------------------------------------------------- src/launcher/default_executor.cpp | 165 ++++++++++++++++++++------------- 1 file changed, 103 insertions(+), 62 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/c3f3542e/src/launcher/default_executor.cpp ---------------------------------------------------------------------- diff --git a/src/launcher/default_executor.cpp b/src/launcher/default_executor.cpp index 2f6d1f8..16977b5 100644 --- a/src/launcher/default_executor.cpp +++ b/src/launcher/default_executor.cpp @@ -108,6 +108,9 @@ private: // `WAIT_NESTED_CONTAINER` call has not been established yet. Option<Connection> waiting; + // Error returned by the agent while trying to launch the container. + Option<string> launchError; + // TODO(bennoe): Create a real state machine instead of adding // more and more ad-hoc boolean values. @@ -420,6 +423,7 @@ protected: None(), None(), None(), + None(), false, false, false, @@ -522,28 +526,32 @@ protected: return; } - // Check if we received a 200 OK response for all the - // `LAUNCH_NESTED_CONTAINER` calls. Shutdown the executor - // if this is not the case. - foreach (const Response& response, responses.get()) { - if (response.code != process::http::Status::OK) { - LOG(ERROR) << "Received '" << response.status << "' (" - << response.body << ") while launching child container"; - _shutdown(); - return; - } - } - CHECK(launched); CHECK_EQ(containerIds.size(), (size_t) taskGroup.tasks().size()); + CHECK_EQ(containerIds.size(), responses->size()); int index = 0; + auto responseIterator = responses->begin(); foreach (const ContainerID& containerId, containerIds) { const TaskInfo& task = taskGroup.tasks().Get(index++); const TaskID& taskId = task.task_id(); + const Response& response = *(responseIterator++); CHECK(containers.contains(taskId)); - containers.at(taskId)->launched = true; + Container* container = containers.at(taskId).get(); + + // Check if we received a 200 OK response for the + // `LAUNCH_NESTED_CONTAINER` call. Skip the rest of the container + // initialization if this is not the case. + if (response.code != process::http::Status::OK) { + LOG(ERROR) << "Received '" << response.status << "' (" << response.body + << ") while launching child container " << containerId + << " of task '" << taskId << "'"; + container->launchError = response.body; + continue; + } + + container->launched = true; if (task.has_check()) { Try<Owned<checks::Checker>> checker = @@ -563,7 +571,7 @@ protected: return; } - containers.at(taskId)->checker = checker.get(); + container->checker = checker.get(); } if (task.has_health_check()) { @@ -585,7 +593,7 @@ protected: return; } - containers.at(taskId)->healthChecker = healthChecker.get(); + container->healthChecker = healthChecker.get(); } // Currently, the Mesos agent does not expose the mapping from @@ -611,13 +619,8 @@ protected: << containerId << " of task '" << taskId << "' due to: " << symlink.error(); } - } - // Send a TASK_RUNNING status update now that the task group has - // been successfully launched. - foreach (const TaskInfo& task, taskGroup.tasks()) { - const TaskStatus status = createTaskStatus(task.task_id(), TASK_RUNNING); - forward(status); + forward(createTaskStatus(task.task_id(), TASK_RUNNING)); } auto taskIds = [&taskGroup]() { @@ -629,7 +632,7 @@ protected: }; LOG(INFO) - << "Successfully launched tasks " + << "Finished launching tasks " << stringify(taskIds()) << " in child containers " << stringify(containerIds); @@ -786,9 +789,12 @@ protected: return; } - // Check if we receive a 200 OK response for the `WAIT_NESTED_CONTAINER` - // calls. Shutdown the executor otherwise. - if (response->code != process::http::Status::OK) { + // Shutdown the executor if the agent responded to the + // `WAIT_NESTED_CONTAINER` call with an error. Note that several race + // conditions can cause a 404 NOT FOUND response, which shouldn't be + // treated as an error. + if (response->code != process::http::Status::NOT_FOUND && + response->code != process::http::Status::OK) { LOG(ERROR) << "Received '" << response->status << "' (" << response->body << ") waiting on child container " << container->containerId << " of task '" << taskId << "'"; @@ -796,10 +802,6 @@ protected: return; } - Try<agent::Response> waitResponse = - deserialize<agent::Response>(contentType, response->body); - CHECK_SOME(waitResponse); - // If the task is checked, pause the associated checker to avoid // sending check updates after a terminal status update. if (container->checker.isSome()) { @@ -821,52 +823,82 @@ protected: Option<TaskStatus::Reason> reason; Option<TaskResourceLimitation> limitation; - if (!waitResponse->wait_nested_container().has_exit_status()) { - taskState = TASK_FAILED; - message = "Command terminated with unknown status"; - } else { - int status = waitResponse->wait_nested_container().exit_status(); - - CHECK(WIFEXITED(status) || WIFSIGNALED(status)) - << "Unexpected wait status " << status; + if (response->code == process::http::Status::NOT_FOUND) { + // The agent can respond with 404 NOT FOUND due to a failed container + // launch or due to a race condition. if (container->killing) { // Send TASK_KILLED if the task was killed as a result of // `killTask()` or `shutdown()`. taskState = TASK_KILLED; - } else if (WSUCCEEDED(status)) { - taskState = TASK_FINISHED; + } else if (container->launchError.isSome()) { + // Send TASK_FAILED if we know that `LAUNCH_NESTED_CONTAINER` returned + // an error. + taskState = TASK_FAILED; + message = container->launchError; } else { + // We don't know exactly why `WAIT_NESTED_CONTAINER` returned 404 NOT + // FOUND, so we'll assume that the task failed. taskState = TASK_FAILED; + message = "Unable to retrieve command's termination information"; } + } else { + Try<agent::Response> waitResponse = + deserialize<agent::Response>(contentType, response->body); + CHECK_SOME(waitResponse); - message = "Command " + WSTRINGIFY(status); - } + if (!waitResponse->wait_nested_container().has_exit_status()) { + taskState = TASK_FAILED; - // Note that we always prefer the task state and reason from the - // agent response over what we can determine ourselves because - // in general, the agent has more specific information about why - // the container exited (e.g. this might be a container resource - // limitation). - if (waitResponse->wait_nested_container().has_state()) { - taskState = waitResponse->wait_nested_container().state(); - } + if (container->launchError.isSome()) { + message = container->launchError; + } else { + message = "Command terminated with unknown status"; + } + } else { + int status = waitResponse->wait_nested_container().exit_status(); + + CHECK(WIFEXITED(status) || WIFSIGNALED(status)) + << "Unexpected wait status " << status; + + if (container->killing) { + // Send TASK_KILLED if the task was killed as a result of + // `killTask()` or `shutdown()`. + taskState = TASK_KILLED; + } else if (WSUCCEEDED(status)) { + taskState = TASK_FINISHED; + } else { + taskState = TASK_FAILED; + } - if (waitResponse->wait_nested_container().has_reason()) { - reason = waitResponse->wait_nested_container().reason(); - } + message = "Command " + WSTRINGIFY(status); + } - if (waitResponse->wait_nested_container().has_message()) { - if (message.isSome()) { - message->append( - ": " + waitResponse->wait_nested_container().message()); - } else { - message = waitResponse->wait_nested_container().message(); + // Note that we always prefer the task state and reason from the + // agent response over what we can determine ourselves because + // in general, the agent has more specific information about why + // the container exited (e.g. this might be a container resource + // limitation). + if (waitResponse->wait_nested_container().has_state()) { + taskState = waitResponse->wait_nested_container().state(); } - } - if (waitResponse->wait_nested_container().has_limitation()) { - limitation = waitResponse->wait_nested_container().limitation(); + if (waitResponse->wait_nested_container().has_reason()) { + reason = waitResponse->wait_nested_container().reason(); + } + + if (waitResponse->wait_nested_container().has_message()) { + if (message.isSome()) { + message->append( + ": " + waitResponse->wait_nested_container().message()); + } else { + message = waitResponse->wait_nested_container().message(); + } + } + + if (waitResponse->wait_nested_container().has_limitation()) { + limitation = waitResponse->wait_nested_container().limitation(); + } } TaskStatus taskStatus = createTaskStatus( @@ -877,6 +909,9 @@ protected: limitation); // Indicate that a task has been unhealthy upon termination. + // + // TODO(gkleiman): We should do this if this task or another task that + // belongs to the same task group is unhealthy. See MESOS-8543. if (unhealthy) { // TODO(abudnik): Consider specifying appropriate status update reason, // saying that the task was killed due to a failing health check. @@ -1032,6 +1067,12 @@ protected: { CHECK_EQ(SUBSCRIBED, state); + if (!container->launched) { + // We can get here if we're killing a task group for which multiple + // containers failed to launch. + return Nothing(); + } + CHECK(!container->killing); container->killing = true; @@ -1438,7 +1479,7 @@ private: CHECK_EQ(SUBSCRIBED, state); CHECK_SOME(connectionId); - CHECK(containers.contains(taskId) && containers.at(taskId)->launched); + CHECK(containers.contains(taskId)); const Owned<Container>& container = containers.at(taskId);