Repository: mesos Updated Branches: refs/heads/master 07bc6734c -> 2d78336ea
Moved executor checkpointing code from the Executor constructor. Review: https://reviews.apache.org/r/26525 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/2d78336e Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/2d78336e Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/2d78336e Branch: refs/heads/master Commit: 2d78336ea7e484d1bf45679a31de9ee263332b9d Parents: 07bc673 Author: Jie Yu <[email protected]> Authored: Thu Oct 9 14:14:15 2014 -0700 Committer: Jie Yu <[email protected]> Committed: Fri Oct 10 13:49:26 2014 -0700 ---------------------------------------------------------------------- src/slave/slave.cpp | 76 +++++++++++++++++++++++++++--------------------- src/slave/slave.hpp | 1 + 2 files changed, 44 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/2d78336e/src/slave/slave.cpp ---------------------------------------------------------------------- diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp index f677adf..2b55050 100644 --- a/src/slave/slave.cpp +++ b/src/slave/slave.cpp @@ -1292,9 +1292,10 @@ void Slave::_runTask( break; } case Executor::REGISTERING: - // Checkpoint the task before we do anything else (this is a no-op - // if the framework doesn't have checkpointing enabled). - executor->checkpointTask(task); + // Checkpoint the task before we do anything else. + if (executor->checkpoint) { + executor->checkpointTask(task); + } stats.tasks[TASK_STAGING]++; @@ -1306,9 +1307,10 @@ void Slave::_runTask( executor->queuedTasks[task.task_id()] = task; break; case Executor::RUNNING: { - // Checkpoint the task before we do anything else (this is a no-op - // if the framework doesn't have checkpointing enabled). - executor->checkpointTask(task); + // Checkpoint the task before we do anything else. + if (executor->checkpoint) { + executor->checkpointTask(task); + } stats.tasks[TASK_STAGING]++; @@ -3719,6 +3721,10 @@ Executor* Framework::launchExecutor( Executor* executor = new Executor( slave, id, executorInfo, containerId, directory, info.checkpoint()); + if (executor->checkpoint) { + executor->checkpointExecutor(); + } + CHECK(!executors.contains(executorInfo.executor_id())) << "Unknown executor " << executorInfo.executor_id(); @@ -3994,20 +4000,6 @@ Executor::Executor( commandExecutor = strings::contains(info.command().value(), executorPath.get()); } - - if (checkpoint && slave->state != slave->RECOVERING) { - // Checkpoint the executor info. - const string& path = paths::getExecutorInfoPath( - slave->metaDir, slave->info.id(), frameworkId, id); - - VLOG(1) << "Checkpointing ExecutorInfo to '" << path << "'"; - CHECK_SOME(state::checkpoint(path, info)); - - // Create the meta executor directory. - // NOTE: This creates the 'latest' symlink in the meta directory. - paths::createExecutorDirectory( - slave->metaDir, slave->info.id(), frameworkId, id, containerId); - } } @@ -4096,23 +4088,41 @@ void Executor::completeTask(const TaskID& taskId) } +void Executor::checkpointExecutor() +{ + CHECK(checkpoint); + + CHECK_NE(slave->state, slave->RECOVERING); + + // Checkpoint the executor info. + const string& path = paths::getExecutorInfoPath( + slave->metaDir, slave->info.id(), frameworkId, id); + + VLOG(1) << "Checkpointing ExecutorInfo to '" << path << "'"; + CHECK_SOME(state::checkpoint(path, info)); + + // Create the meta executor directory. + // NOTE: This creates the 'latest' symlink in the meta directory. + paths::createExecutorDirectory( + slave->metaDir, slave->info.id(), frameworkId, id, containerId); +} + + void Executor::checkpointTask(const TaskInfo& task) { - if (checkpoint) { - CHECK_NOTNULL(slave); + CHECK(checkpoint); - const Task& t = protobuf::createTask(task, TASK_STAGING, frameworkId); - const string& path = paths::getTaskInfoPath( - slave->metaDir, - slave->info.id(), - frameworkId, - id, - containerId, - t.task_id()); + const Task& t = protobuf::createTask(task, TASK_STAGING, frameworkId); + const string& path = paths::getTaskInfoPath( + slave->metaDir, + slave->info.id(), + frameworkId, + id, + containerId, + t.task_id()); - VLOG(1) << "Checkpointing TaskInfo to '" << path << "'"; - CHECK_SOME(state::checkpoint(path, t)); - } + VLOG(1) << "Checkpointing TaskInfo to '" << path << "'"; + CHECK_SOME(state::checkpoint(path, t)); } http://git-wip-us.apache.org/repos/asf/mesos/blob/2d78336e/src/slave/slave.hpp ---------------------------------------------------------------------- diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp index cb879cb..efd309e 100644 --- a/src/slave/slave.hpp +++ b/src/slave/slave.hpp @@ -500,6 +500,7 @@ struct Executor Task* addTask(const TaskInfo& task); void terminateTask(const TaskID& taskId, const mesos::TaskState& state); void completeTask(const TaskID& taskId); + void checkpointExecutor(); void checkpointTask(const TaskInfo& task); void recoverTask(const state::TaskState& state); void updateTaskState(const TaskStatus& status);
