Send pending tasks during re-registration. Review: https://reviews.apache.org/r/25371
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/00024c3a Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/00024c3a Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/00024c3a Branch: refs/heads/master Commit: 00024c3a95d771b6e03f06de2e5e76b1f8754b02 Parents: ab1cf84 Author: Benjamin Mahler <[email protected]> Authored: Tue Aug 19 17:08:40 2014 -0700 Committer: Benjamin Mahler <[email protected]> Committed: Wed Sep 10 11:05:35 2014 -0700 ---------------------------------------------------------------------- src/common/protobuf_utils.cpp | 12 ++++++------ src/common/protobuf_utils.hpp | 8 ++++---- src/slave/slave.cpp | 33 +++++++++++++++++++++++++-------- src/slave/slave.hpp | 4 ++-- src/tests/common/http_tests.cpp | 4 ++-- src/tests/slave_recovery_tests.cpp | 2 +- 6 files changed, 40 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/00024c3a/src/common/protobuf_utils.cpp ---------------------------------------------------------------------- diff --git a/src/common/protobuf_utils.cpp b/src/common/protobuf_utils.cpp index 1714ef7..a9b65e3 100644 --- a/src/common/protobuf_utils.cpp +++ b/src/common/protobuf_utils.cpp @@ -79,10 +79,10 @@ StatusUpdate createStatusUpdate( } -Task createTask(const TaskInfo& task, - const TaskState& state, - const ExecutorID& executorId, - const FrameworkID& frameworkId) +Task createTask( + const TaskInfo& task, + const TaskState& state, + const FrameworkID& frameworkId) { Task t; t.mutable_framework_id()->MergeFrom(frameworkId); @@ -92,8 +92,8 @@ Task createTask(const TaskInfo& task, t.mutable_slave_id()->MergeFrom(task.slave_id()); t.mutable_resources()->MergeFrom(task.resources()); - if (!task.has_command()) { - t.mutable_executor_id()->MergeFrom(executorId); + if (task.has_executor()) { + t.mutable_executor_id()->CopyFrom(task.executor().executor_id()); } return t; http://git-wip-us.apache.org/repos/asf/mesos/blob/00024c3a/src/common/protobuf_utils.hpp ---------------------------------------------------------------------- diff --git a/src/common/protobuf_utils.hpp b/src/common/protobuf_utils.hpp index 809e4b2..212d512 100644 --- a/src/common/protobuf_utils.hpp +++ b/src/common/protobuf_utils.hpp @@ -44,10 +44,10 @@ StatusUpdate createStatusUpdate( const std::string& message = "", const Option<ExecutorID>& executorId = None()); -Task createTask(const TaskInfo& task, - const TaskState& state, - const ExecutorID& executorId, - const FrameworkID& frameworkId); +Task createTask( + const TaskInfo& task, + const TaskState& state, + const FrameworkID& frameworkId); // Helper function that creates a MasterInfo from UPID. MasterInfo createMasterInfo(const process::UPID& pid); http://git-wip-us.apache.org/repos/asf/mesos/blob/00024c3a/src/slave/slave.cpp ---------------------------------------------------------------------- diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp index c9ea070..9536a3b 100644 --- a/src/slave/slave.cpp +++ b/src/slave/slave.cpp @@ -892,6 +892,18 @@ void Slave::doReliableRegistration(const Duration& duration) message.mutable_slave()->CopyFrom(info); foreachvalue (Framework* framework, frameworks) { + // TODO(bmahler): We need to send the executors for these + // pending tasks, and we need to send exited events if they + // cannot be launched: MESOS-1715 MESOS-1720. + + typedef hashmap<TaskID, TaskInfo> TaskMap; + foreachvalue (const TaskMap& tasks, framework->pending) { + foreachvalue (const TaskInfo& task, tasks) { + message.add_tasks()->CopyFrom(protobuf::createTask( + task, TASK_STAGING, framework->id)); + } + } + foreachvalue (Executor* executor, framework->executors) { // Add launched, terminated, and queued tasks. // Note that terminated executors will only have terminated @@ -904,7 +916,7 @@ void Slave::doReliableRegistration(const Duration& duration) } foreach (const TaskInfo& task, executor->queuedTasks.values()) { message.add_tasks()->CopyFrom(protobuf::createTask( - task, TASK_STAGING, executor->id, framework->id)); + task, TASK_STAGING, framework->id)); } // Do not re-register with Command Executors because the @@ -1076,7 +1088,7 @@ void Slave::runTask( // removed and the framework and top level executor directories // are not scheduled for deletion before '_runTask()' is called. CHECK_NOTNULL(framework); - framework->pending.put(executorId, task.task_id()); + framework->pending[executorId][task.task_id()] = task; // If we are about to create a new executor, unschedule the top // level work and meta directories from getting gc'ed. @@ -1128,7 +1140,10 @@ void Slave::_runTask( Framework* framework = getFramework(frameworkId); CHECK_NOTNULL(framework); - framework->pending.remove(executorId, task.task_id()); + framework->pending[executorId].erase(task.task_id()); + if (framework->pending[executorId].empty()) { + framework->pending.erase(executorId); + } // We don't send a status update here because a terminating // framework cannot send acknowledgements. @@ -3329,7 +3344,10 @@ double Slave::_tasks_staging() { double count = 0.0; foreachvalue (Framework* framework, frameworks) { - count += framework->pending.size(); + typedef hashmap<TaskID, TaskInfo> TaskMap; + foreachvalue (const TaskMap& tasks, framework->pending) { + count += tasks.size(); + } foreachvalue (Executor* executor, framework->executors) { count += executor->queuedTasks.size(); @@ -3895,8 +3913,7 @@ Task* Executor::addTask(const TaskInfo& task) CHECK(!launchedTasks.contains(task.task_id())) << "Duplicate task " << task.task_id(); - Task* t = new Task( - protobuf::createTask(task, TASK_STAGING, id, frameworkId)); + Task* t = new Task(protobuf::createTask(task, TASK_STAGING, frameworkId)); launchedTasks[task.task_id()] = t; @@ -3916,7 +3933,7 @@ void Executor::terminateTask( // Remove the task if it's queued. if (queuedTasks.contains(taskId)) { task = new Task( - protobuf::createTask(queuedTasks[taskId], state, id, frameworkId)); + protobuf::createTask(queuedTasks[taskId], state, frameworkId)); queuedTasks.erase(taskId); } else if (launchedTasks.contains(taskId)) { // Update the resources if it's been launched. @@ -3965,7 +3982,7 @@ void Executor::checkpointTask(const TaskInfo& task) if (checkpoint) { CHECK_NOTNULL(slave); - const Task& t = protobuf::createTask(task, TASK_STAGING, id, frameworkId); + const Task& t = protobuf::createTask(task, TASK_STAGING, frameworkId); const string& path = paths::getTaskInfoPath( slave->metaDir, slave->info.id(), http://git-wip-us.apache.org/repos/asf/mesos/blob/00024c3a/src/slave/slave.hpp ---------------------------------------------------------------------- diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp index 062e961..d8c7ee4 100644 --- a/src/slave/slave.hpp +++ b/src/slave/slave.hpp @@ -42,7 +42,6 @@ #include <stout/linkedhashmap.hpp> #include <stout/hashmap.hpp> #include <stout/hashset.hpp> -#include <stout/multihashmap.hpp> #include <stout/option.hpp> #include <stout/os.hpp> #include <stout/path.hpp> @@ -586,7 +585,8 @@ struct Framework UPID pid; - multihashmap<ExecutorID, TaskID> pending; // Executors with pending tasks. + // Executors with pending tasks. + hashmap<ExecutorID, hashmap<TaskID, TaskInfo> > pending; // Current running executors. hashmap<ExecutorID, Executor*> executors; http://git-wip-us.apache.org/repos/asf/mesos/blob/00024c3a/src/tests/common/http_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/common/http_tests.cpp b/src/tests/common/http_tests.cpp index 5fa51bf..912653b 100644 --- a/src/tests/common/http_tests.cpp +++ b/src/tests/common/http_tests.cpp @@ -50,7 +50,7 @@ TEST(HTTP, ModelTask) slaveId.set_value("s"); ExecutorID executorId; - executorId.set_value("e"); + executorId.set_value("t"); FrameworkID frameworkId; frameworkId.set_value("f"); @@ -74,7 +74,7 @@ TEST(HTTP, ModelTask) task.mutable_slave_id()->CopyFrom(slaveId); task.mutable_command()->set_value("echo hello"); - Task task_ = protobuf::createTask(task, state, executorId, frameworkId); + Task task_ = protobuf::createTask(task, state, frameworkId); task_.add_statuses()->CopyFrom(statuses[0]); JSON::Value object = model(task, frameworkId, state, statuses); http://git-wip-us.apache.org/repos/asf/mesos/blob/00024c3a/src/tests/slave_recovery_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/slave_recovery_tests.cpp b/src/tests/slave_recovery_tests.cpp index 5818e0f..c7c30d6 100644 --- a/src/tests/slave_recovery_tests.cpp +++ b/src/tests/slave_recovery_tests.cpp @@ -262,7 +262,7 @@ TYPED_TEST(SlaveRecoveryTest, RecoverSlaveState) .tasks.contains(task.task_id())); const Task& t = mesos::internal::protobuf::createTask( - task, TASK_STAGING, executorId, frameworkId); + task, TASK_STAGING, frameworkId); ASSERT_SOME_EQ( t,
