Re-checkpointed the `Executor`s and `Task`s during agent recovery. Review: https://reviews.apache.org/r/57109
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/ed520374 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/ed520374 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/ed520374 Branch: refs/heads/master Commit: ed52037457e471a26e00888bd11efda12c0593d2 Parents: 4912f34 Author: Michael Park <[email protected]> Authored: Mon Feb 13 14:38:49 2017 -0800 Committer: Michael Park <[email protected]> Committed: Fri Mar 3 03:39:59 2017 -0800 ---------------------------------------------------------------------- src/slave/slave.cpp | 81 +++++++++++++++++++++++++++++++++++------------- src/slave/slave.hpp | 17 ++++++++-- 2 files changed, 73 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/ed520374/src/slave/slave.cpp ---------------------------------------------------------------------- diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp index 4db367c..775f43b 100644 --- a/src/slave/slave.cpp +++ b/src/slave/slave.cpp @@ -5295,6 +5295,7 @@ Future<Nothing> Slave::recover(const Try<state::State>& state) const FrameworkInfo& frameworkInfo) { set<string> roles = protobuf::framework::getRoles(frameworkInfo); + bool injectedAllocationInfo = false; foreach (Resource& resource, *resources) { if (!resource.has_allocation_info()) { if (roles.size() != 1) { @@ -5304,14 +5305,22 @@ Future<Nothing> Slave::recover(const Try<state::State>& state) } resource.mutable_allocation_info()->set_role(*roles.begin()); + injectedAllocationInfo = true; } } + + return injectedAllocationInfo; }; - // TODO(bmahler): We currently don't allow frameworks to - // change their roles so we do not need to re-persist the - // resources with `AllocationInfo` injected for existing - // tasks and executors. + // In order to allow frameworks to change their role(s), we need to keep + // track of the fact that the resources used to be implicitly allocated to + // `FrameworkInfo.role` before the agent upgrade. To this end, we inject + // the `AllocationInfo` to the resources in `ExecutorState` and `TaskState`, + // and re-checkpoint them if necessary. + + hashset<ExecutorID> injectedExecutors; + hashmap<ExecutorID, hashset<TaskID>> injectedTasks; + if (slaveState.isSome()) { foreachvalue (FrameworkState& frameworkState, slaveState->frameworks) { if (!frameworkState.info.isSome()) { @@ -5323,9 +5332,11 @@ Future<Nothing> Slave::recover(const Try<state::State>& state) continue; } - injectAllocationInfo( - executorState.info->mutable_resources(), - frameworkState.info.get()); + if (injectAllocationInfo( + executorState.info->mutable_resources(), + frameworkState.info.get())) { + injectedExecutors.insert(executorState.id); + } foreachvalue (RunState& runState, executorState.runs) { foreachvalue (TaskState& taskState, runState.tasks) { @@ -5333,9 +5344,11 @@ Future<Nothing> Slave::recover(const Try<state::State>& state) continue; } - injectAllocationInfo( - taskState.info->mutable_resources(), - frameworkState.info.get()); + if (injectAllocationInfo( + taskState.info->mutable_resources(), + frameworkState.info.get())) { + injectedTasks[executorState.id].insert(taskState.id); + } } } } @@ -5452,7 +5465,7 @@ Future<Nothing> Slave::recover(const Try<state::State>& state) // Recover the frameworks. foreachvalue (const FrameworkState& frameworkState, slaveState.get().frameworks) { - recoverFramework(frameworkState); + recoverFramework(frameworkState, injectedExecutors, injectedTasks); } } @@ -5638,7 +5651,10 @@ void Slave::__recover(const Future<Nothing>& future) } -void Slave::recoverFramework(const FrameworkState& state) +void Slave::recoverFramework( + const FrameworkState& state, + const hashset<ExecutorID>& executorsToRecheckpoint, + const hashmap<ExecutorID, hashset<TaskID>>& tasksToRecheckpoint) { LOG(INFO) << "Recovering framework " << state.id; @@ -5692,7 +5708,12 @@ void Slave::recoverFramework(const FrameworkState& state) // Now recover the executors for this framework. foreachvalue (const ExecutorState& executorState, state.executors) { - framework->recoverExecutor(executorState); + framework->recoverExecutor( + executorState, + executorsToRecheckpoint.contains(executorState.id), + tasksToRecheckpoint.contains(executorState.id) + ? tasksToRecheckpoint.at(executorState.id) + : hashset<TaskID>{}); } // Remove the framework in case we didn't recover any executors. @@ -6642,7 +6663,10 @@ Executor* Slave::getExecutor(const ContainerID& containerId) const } -void Framework::recoverExecutor(const ExecutorState& state) +void Framework::recoverExecutor( + const ExecutorState& state, + bool recheckpointExecutor, + const hashset<TaskID>& tasksToRecheckpoint) { LOG(INFO) << "Recovering executor '" << state.id << "' of framework " << id(); @@ -6738,7 +6762,9 @@ void Framework::recoverExecutor(const ExecutorState& state) // And finally recover all the executor's tasks. foreachvalue (const TaskState& taskState, run.get().tasks) { - executor->recoverTask(taskState); + executor->recoverTask( + taskState, + tasksToRecheckpoint.contains(taskState.id)); } ExecutorID executorId = state.id; @@ -6762,6 +6788,9 @@ void Framework::recoverExecutor(const ExecutorState& state) // Add the executor to the framework. executors[executor->id] = executor; + if (recheckpointExecutor) { + executor->checkpointExecutor(); + } // If the latest run of the executor was completed (i.e., terminated // and all updates are acknowledged) in the previous run, we @@ -6890,8 +6919,6 @@ void Executor::checkpointExecutor() { CHECK(checkpoint); - CHECK_NE(slave->state, slave->RECOVERING); - // Checkpoint the executor info. const string path = paths::getExecutorInfoPath( slave->metaDir, slave->info.id(), frameworkId, id); @@ -6908,23 +6935,28 @@ void Executor::checkpointExecutor() void Executor::checkpointTask(const TaskInfo& task) { + checkpointTask(protobuf::createTask(task, TASK_STAGING, frameworkId)); +} + + +void Executor::checkpointTask(const Task& task) +{ CHECK(checkpoint); - const Task t = protobuf::createTask(task, TASK_STAGING, frameworkId); const string path = paths::getTaskInfoPath( slave->metaDir, slave->info.id(), frameworkId, id, containerId, - t.task_id()); + task.task_id()); VLOG(1) << "Checkpointing TaskInfo to '" << path << "'"; - CHECK_SOME(state::checkpoint(path, t)); + CHECK_SOME(state::checkpoint(path, task)); } -void Executor::recoverTask(const TaskState& state) +void Executor::recoverTask(const TaskState& state, bool recheckpointTask) { if (state.info.isNone()) { LOG(WARNING) << "Skipping recovery of task " << state.id @@ -6938,7 +6970,12 @@ void Executor::recoverTask(const TaskState& state) CHECK(resource.has_allocation_info()); } - launchedTasks[state.id] = new Task(state.info.get()); + Task* task = new Task(state.info.get()); + if (recheckpointTask) { + checkpointTask(*task); + } + + launchedTasks[state.id] = task; // NOTE: Since some tasks might have been terminated when the // slave was down, the executor resources we capture here is an http://git-wip-us.apache.org/repos/asf/mesos/blob/ed520374/src/slave/slave.hpp ---------------------------------------------------------------------- diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp index e6fac20..857338c 100644 --- a/src/slave/slave.hpp +++ b/src/slave/slave.hpp @@ -411,7 +411,10 @@ public: virtual void __recover(const process::Future<Nothing>& future); // Helper to recover a framework from the specified state. - void recoverFramework(const state::FrameworkState& state); + void recoverFramework( + const state::FrameworkState& state, + const hashset<ExecutorID>& executorsToRecheckpoint, + const hashmap<ExecutorID, hashset<TaskID>>& tasksToRecheckpoint); // Removes and garbage collects the executor. void removeExecutor(Framework* framework, Executor* executor); @@ -890,7 +893,10 @@ struct Executor void completeTask(const TaskID& taskId); void checkpointExecutor(); void checkpointTask(const TaskInfo& task); - void recoverTask(const state::TaskState& state); + void checkpointTask(const Task& task); + + void recoverTask(const state::TaskState& state, bool recheckpointTask); + Try<Nothing> updateTaskState(const TaskStatus& status); // Returns true if there are any queued/launched/terminated tasks. @@ -1050,7 +1056,12 @@ struct Framework void destroyExecutor(const ExecutorID& executorId); Executor* getExecutor(const ExecutorID& executorId) const; Executor* getExecutor(const TaskID& taskId) const; - void recoverExecutor(const state::ExecutorState& state); + + void recoverExecutor( + const state::ExecutorState& state, + bool recheckpointExecutor, + const hashset<TaskID>& tasksToRecheckpoint); + void checkpointFramework() const; const FrameworkID id() const { return info.id(); }
