Kept TaskInfo beyond first scheduler ack in default executor. Instead of maintaining a separate collection for unacknowledged tasks, we augment internal `Container` struct by the corresponding `TaskInfo` and `acknowledged` flag. This way we are still able to find all unacknowledged tasks (slightly less efficiently as before since now we have to iterate through all tasks), but also keep `TaskInfo`'s beyond receiving the first acknowledgement.
Review: https://reviews.apache.org/r/57695/ Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/5567edc9 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/5567edc9 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/5567edc9 Branch: refs/heads/master Commit: 5567edc9df7cab25f3b1a834777c6cae414a104e Parents: 8093c86 Author: Alexander Rukletsov <[email protected]> Authored: Thu Mar 23 17:11:18 2017 +0100 Committer: Alexander Rukletsov <[email protected]> Committed: Fri Mar 24 00:17:27 2017 +0100 ---------------------------------------------------------------------- src/launcher/default_executor.cpp | 48 +++++++++++++++++++++++----------- 1 file changed, 33 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/5567edc9/src/launcher/default_executor.cpp ---------------------------------------------------------------------- diff --git a/src/launcher/default_executor.cpp b/src/launcher/default_executor.cpp index 5d99def..f83b189 100644 --- a/src/launcher/default_executor.cpp +++ b/src/launcher/default_executor.cpp @@ -86,7 +86,7 @@ private: struct Container { ContainerID containerId; - TaskID taskId; + TaskInfo taskInfo; TaskGroupInfo taskGroup; // Task group of the child container. // Health checker for the container. @@ -97,6 +97,10 @@ private: // `WAIT_NESTED_CONTAINER` call has not been established yet. Option<Connection> waiting; + // Indicates whether a status update acknowledgement + // has been received for any status update. + bool acknowledged; + // Set to true if the child container is in the process of being killed. bool killing; @@ -207,8 +211,14 @@ public: // Remove the corresponding update. unacknowledgedUpdates.erase(uuid); - // Remove the corresponding task. - unacknowledgedTasks.erase(event.acknowledged().task_id()); + // Mark the corresponding task as acknowledged. An acknowledgement + // may be received after the task has already been removed from + // `containers`. + const TaskID taskId = event.acknowledged().task_id(); + if (containers.contains(taskId)) { + containers.at(taskId)->acknowledged = true; + } + break; } @@ -269,9 +279,14 @@ protected: subscribe->add_unacknowledged_updates()->MergeFrom(update); } - // Send the unacknowledged tasks. - foreachvalue (const TaskInfo& task, unacknowledgedTasks) { - subscribe->add_unacknowledged_tasks()->MergeFrom(task); + // Send all unacknowledged tasks. We don't send unacknowledged terminated + // (and hence already removed from `containers`) tasks, because for such + // tasks `WAIT_NESTED_CONTAINER` call has already succeeded, meaning the + // agent knows about the tasks and corresponding containers. + foreachvalue (const Owned<Container>& container, containers) { + if (!container->acknowledged) { + subscribe->add_unacknowledged_tasks()->MergeFrom(container->taskInfo); + } } mesos->send(evolve(call)); @@ -445,9 +460,15 @@ protected: const TaskInfo& task = taskGroup.tasks().Get(index++); const TaskID& taskId = task.task_id(); - unacknowledgedTasks[taskId] = task; - containers[taskId] = Owned<Container>(new Container - {containerId, taskId, taskGroup, None(), None(), false, false}); + containers[taskId] = Owned<Container>(new Container{ + containerId, + task, + taskGroup, + None(), + None(), + false, + false, + false}); if (task.has_health_check()) { // TODO(anand): Add support for command health checks. @@ -632,7 +653,7 @@ protected: auto retry_ = [this, container]() mutable { container->waiting->disconnect(); container->waiting = None(); - retry(connectionId.get(), container->taskId); + retry(connectionId.get(), container->taskInfo.task_id()); }; // It is possible that the response failed due to a network blip @@ -759,7 +780,8 @@ protected: // Ignore if it's the same task that triggered this callback or // if the task is no longer active. - if (taskId == container->taskId || !containers.contains(taskId)) { + if (taskId == container->taskInfo.task_id() || + !containers.contains(taskId)) { continue; } @@ -1097,10 +1119,6 @@ private: LinkedHashMap<UUID, Call::Update> unacknowledgedUpdates; - // A task is considered unacknowledged if no status update - // acknowledgements have been received for it yet. - LinkedHashMap<TaskID, TaskInfo> unacknowledgedTasks; - // Active child containers. LinkedHashMap<TaskID, Owned<Container>> containers;
