Reused previous task status to generate a new one in default executor. Sometimes when a new task status update is generated in the executor, we have to make sure specific data is duplicated from the previous task status to, e.g., avoid shadowing of these data during reconciliation. For instance, consider a check status being sent; in this status update we must include the latest known health information.
This patch also refactors `update()` routine into two separate calls: `createTaskStatus()` which is responsible for creating a task status from scratch and `forward()`, which is responsible for forwarding task status updates to the agent. Review: https://reviews.apache.org/r/56215/ Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/fb86531e Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/fb86531e Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/fb86531e Branch: refs/heads/master Commit: fb86531e9ece829bfac994d9d7d7242a16ff8fba Parents: 5567edc Author: Alexander Rukletsov <[email protected]> Authored: Thu Mar 23 17:11:23 2017 +0100 Committer: Alexander Rukletsov <[email protected]> Committed: Fri Mar 24 00:17:27 2017 +0100 ---------------------------------------------------------------------- src/launcher/default_executor.cpp | 67 +++++++++++++++++++++++++--------- 1 file changed, 50 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/fb86531e/src/launcher/default_executor.cpp ---------------------------------------------------------------------- diff --git a/src/launcher/default_executor.cpp b/src/launcher/default_executor.cpp index f83b189..f80e79e 100644 --- a/src/launcher/default_executor.cpp +++ b/src/launcher/default_executor.cpp @@ -89,6 +89,8 @@ private: TaskInfo taskInfo; TaskGroupInfo taskGroup; // Task group of the child container. + Option<TaskStatus> lastTaskStatus; + // Health checker for the container. Option<Owned<checks::HealthChecker>> healthChecker; @@ -466,6 +468,7 @@ protected: taskGroup, None(), None(), + None(), false, false, false}); @@ -523,7 +526,8 @@ protected: // Send a TASK_RUNNING status update now that the task group has // been successfully launched. foreach (const TaskInfo& task, taskGroup.tasks()) { - update(task.task_id(), TASK_RUNNING); + const TaskStatus status = createTaskStatus(task.task_id(), TASK_RUNNING); + forward(status); } auto taskIds = [&taskGroup]() { @@ -726,12 +730,19 @@ protected: message = "Command " + WSTRINGIFY(status.get()); } + TaskStatus taskStatus = createTaskStatus( + taskId, + taskState, + None(), + message); + + // Indicate that a task has been unhealthy upon termination. if (unhealthy) { - update(taskId, taskState, message, false); - } else { - update(taskId, taskState, message, None()); + taskStatus.set_healthy(false); } + forward(taskStatus); + CHECK(containers.contains(taskId)); containers.erase(taskId); @@ -954,8 +965,21 @@ protected: << " '" << healthStatus.task_id() << "', task is " << (healthStatus.healthy() ? "healthy" : "not healthy"); - update( - healthStatus.task_id(), TASK_RUNNING, None(), healthStatus.healthy()); + // Use the previous task status to preserve all attached information. + // We always send a `TASK_RUNNING` right after the task is launched. + CHECK_SOME(containers.at(healthStatus.task_id())->lastTaskStatus); + const TaskStatus status = protobuf::createTaskStatus( + containers.at(healthStatus.task_id())->lastTaskStatus.get(), + UUID::random(), + Clock::now().secs(), + None(), + None(), + None(), + None(), + None(), + healthStatus.healthy()); + + forward(status); if (healthStatus.kill_task()) { unhealthy = true; @@ -964,29 +988,29 @@ protected: } private: - void update( + // Use this helper to create a status update from scratch, i.e., without + // previously attached extra information like `data` or `check_status`. + TaskStatus createTaskStatus( const TaskID& taskId, const TaskState& state, - const Option<string>& message = None(), - const Option<bool>& healthy = None()) + const Option<TaskStatus::Reason>& reason = None(), + const Option<string>& message = None()) { - UUID uuid = UUID::random(); - TaskStatus status = protobuf::createTaskStatus( taskId, state, - uuid, + UUID::random(), Clock::now().secs()); status.mutable_executor_id()->CopyFrom(executorId); status.set_source(TaskStatus::SOURCE_EXECUTOR); - if (message.isSome()) { - status.set_message(message.get()); + if (reason.isSome()) { + status.set_reason(reason.get()); } - if (healthy.isSome()) { - status.set_healthy(healthy.get()); + if (message.isSome()) { + status.set_message(message.get()); } // Fill the container ID associated with this task. @@ -996,6 +1020,11 @@ private: ContainerStatus* containerStatus = status.mutable_container_status(); containerStatus->mutable_container_id()->CopyFrom(container->containerId); + return status; + } + + void forward(const TaskStatus& status) + { Call call; call.set_type(Call::UPDATE); @@ -1005,7 +1034,11 @@ private: call.mutable_update()->mutable_status()->CopyFrom(status); // Capture the status update. - unacknowledgedUpdates[uuid] = call.update(); + unacknowledgedUpdates[UUID::fromBytes(status.uuid()).get()] = call.update(); + + // Overwrite the last task status. + CHECK(containers.contains(status.task_id())); + containers.at(status.task_id())->lastTaskStatus = status; mesos->send(evolve(call)); }
