Send pending tasks during re-registration.

Review: https://reviews.apache.org/r/25371


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/00024c3a
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/00024c3a
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/00024c3a

Branch: refs/heads/master
Commit: 00024c3a95d771b6e03f06de2e5e76b1f8754b02
Parents: ab1cf84
Author: Benjamin Mahler <[email protected]>
Authored: Tue Aug 19 17:08:40 2014 -0700
Committer: Benjamin Mahler <[email protected]>
Committed: Wed Sep 10 11:05:35 2014 -0700

----------------------------------------------------------------------
 src/common/protobuf_utils.cpp      | 12 ++++++------
 src/common/protobuf_utils.hpp      |  8 ++++----
 src/slave/slave.cpp                | 33 +++++++++++++++++++++++++--------
 src/slave/slave.hpp                |  4 ++--
 src/tests/common/http_tests.cpp    |  4 ++--
 src/tests/slave_recovery_tests.cpp |  2 +-
 6 files changed, 40 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/00024c3a/src/common/protobuf_utils.cpp
----------------------------------------------------------------------
diff --git a/src/common/protobuf_utils.cpp b/src/common/protobuf_utils.cpp
index 1714ef7..a9b65e3 100644
--- a/src/common/protobuf_utils.cpp
+++ b/src/common/protobuf_utils.cpp
@@ -79,10 +79,10 @@ StatusUpdate createStatusUpdate(
 }
 
 
-Task createTask(const TaskInfo& task,
-                       const TaskState& state,
-                       const ExecutorID& executorId,
-                       const FrameworkID& frameworkId)
+Task createTask(
+    const TaskInfo& task,
+    const TaskState& state,
+    const FrameworkID& frameworkId)
 {
   Task t;
   t.mutable_framework_id()->MergeFrom(frameworkId);
@@ -92,8 +92,8 @@ Task createTask(const TaskInfo& task,
   t.mutable_slave_id()->MergeFrom(task.slave_id());
   t.mutable_resources()->MergeFrom(task.resources());
 
-  if (!task.has_command()) {
-    t.mutable_executor_id()->MergeFrom(executorId);
+  if (task.has_executor()) {
+    t.mutable_executor_id()->CopyFrom(task.executor().executor_id());
   }
 
   return t;

http://git-wip-us.apache.org/repos/asf/mesos/blob/00024c3a/src/common/protobuf_utils.hpp
----------------------------------------------------------------------
diff --git a/src/common/protobuf_utils.hpp b/src/common/protobuf_utils.hpp
index 809e4b2..212d512 100644
--- a/src/common/protobuf_utils.hpp
+++ b/src/common/protobuf_utils.hpp
@@ -44,10 +44,10 @@ StatusUpdate createStatusUpdate(
     const std::string& message = "",
     const Option<ExecutorID>& executorId = None());
 
-Task createTask(const TaskInfo& task,
-                const TaskState& state,
-                const ExecutorID& executorId,
-                const FrameworkID& frameworkId);
+Task createTask(
+    const TaskInfo& task,
+    const TaskState& state,
+    const FrameworkID& frameworkId);
 
 // Helper function that creates a MasterInfo from UPID.
 MasterInfo createMasterInfo(const process::UPID& pid);

http://git-wip-us.apache.org/repos/asf/mesos/blob/00024c3a/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index c9ea070..9536a3b 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -892,6 +892,18 @@ void Slave::doReliableRegistration(const Duration& 
duration)
     message.mutable_slave()->CopyFrom(info);
 
     foreachvalue (Framework* framework, frameworks) {
+      // TODO(bmahler): We need to send the executors for these
+      // pending tasks, and we need to send exited events if they
+      // cannot be launched: MESOS-1715 MESOS-1720.
+
+      typedef hashmap<TaskID, TaskInfo> TaskMap;
+      foreachvalue (const TaskMap& tasks, framework->pending) {
+        foreachvalue (const TaskInfo& task, tasks) {
+          message.add_tasks()->CopyFrom(protobuf::createTask(
+              task, TASK_STAGING, framework->id));
+        }
+      }
+
       foreachvalue (Executor* executor, framework->executors) {
         // Add launched, terminated, and queued tasks.
         // Note that terminated executors will only have terminated
@@ -904,7 +916,7 @@ void Slave::doReliableRegistration(const Duration& duration)
         }
         foreach (const TaskInfo& task, executor->queuedTasks.values()) {
           message.add_tasks()->CopyFrom(protobuf::createTask(
-              task, TASK_STAGING, executor->id, framework->id));
+              task, TASK_STAGING, framework->id));
         }
 
         // Do not re-register with Command Executors because the
@@ -1076,7 +1088,7 @@ void Slave::runTask(
   // removed and the framework and top level executor directories
   // are not scheduled for deletion before '_runTask()' is called.
   CHECK_NOTNULL(framework);
-  framework->pending.put(executorId, task.task_id());
+  framework->pending[executorId][task.task_id()] = task;
 
   // If we are about to create a new executor, unschedule the top
   // level work and meta directories from getting gc'ed.
@@ -1128,7 +1140,10 @@ void Slave::_runTask(
   Framework* framework = getFramework(frameworkId);
   CHECK_NOTNULL(framework);
 
-  framework->pending.remove(executorId, task.task_id());
+  framework->pending[executorId].erase(task.task_id());
+  if (framework->pending[executorId].empty()) {
+    framework->pending.erase(executorId);
+  }
 
   // We don't send a status update here because a terminating
   // framework cannot send acknowledgements.
@@ -3329,7 +3344,10 @@ double Slave::_tasks_staging()
 {
   double count = 0.0;
   foreachvalue (Framework* framework, frameworks) {
-    count += framework->pending.size();
+    typedef hashmap<TaskID, TaskInfo> TaskMap;
+    foreachvalue (const TaskMap& tasks, framework->pending) {
+      count += tasks.size();
+    }
 
     foreachvalue (Executor* executor, framework->executors) {
       count += executor->queuedTasks.size();
@@ -3895,8 +3913,7 @@ Task* Executor::addTask(const TaskInfo& task)
   CHECK(!launchedTasks.contains(task.task_id()))
     << "Duplicate task " << task.task_id();
 
-  Task* t = new Task(
-      protobuf::createTask(task, TASK_STAGING, id, frameworkId));
+  Task* t = new Task(protobuf::createTask(task, TASK_STAGING, frameworkId));
 
   launchedTasks[task.task_id()] = t;
 
@@ -3916,7 +3933,7 @@ void Executor::terminateTask(
   // Remove the task if it's queued.
   if (queuedTasks.contains(taskId)) {
     task = new Task(
-        protobuf::createTask(queuedTasks[taskId], state, id, frameworkId));
+        protobuf::createTask(queuedTasks[taskId], state, frameworkId));
     queuedTasks.erase(taskId);
   } else if (launchedTasks.contains(taskId)) {
     // Update the resources if it's been launched.
@@ -3965,7 +3982,7 @@ void Executor::checkpointTask(const TaskInfo& task)
   if (checkpoint) {
     CHECK_NOTNULL(slave);
 
-    const Task& t = protobuf::createTask(task, TASK_STAGING, id, frameworkId);
+    const Task& t = protobuf::createTask(task, TASK_STAGING, frameworkId);
     const string& path = paths::getTaskInfoPath(
         slave->metaDir,
         slave->info.id(),

http://git-wip-us.apache.org/repos/asf/mesos/blob/00024c3a/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index 062e961..d8c7ee4 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -42,7 +42,6 @@
 #include <stout/linkedhashmap.hpp>
 #include <stout/hashmap.hpp>
 #include <stout/hashset.hpp>
-#include <stout/multihashmap.hpp>
 #include <stout/option.hpp>
 #include <stout/os.hpp>
 #include <stout/path.hpp>
@@ -586,7 +585,8 @@ struct Framework
 
   UPID pid;
 
-  multihashmap<ExecutorID, TaskID> pending; // Executors with pending tasks.
+  // Executors with pending tasks.
+  hashmap<ExecutorID, hashmap<TaskID, TaskInfo> > pending;
 
   // Current running executors.
   hashmap<ExecutorID, Executor*> executors;

http://git-wip-us.apache.org/repos/asf/mesos/blob/00024c3a/src/tests/common/http_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/common/http_tests.cpp b/src/tests/common/http_tests.cpp
index 5fa51bf..912653b 100644
--- a/src/tests/common/http_tests.cpp
+++ b/src/tests/common/http_tests.cpp
@@ -50,7 +50,7 @@ TEST(HTTP, ModelTask)
   slaveId.set_value("s");
 
   ExecutorID executorId;
-  executorId.set_value("e");
+  executorId.set_value("t");
 
   FrameworkID frameworkId;
   frameworkId.set_value("f");
@@ -74,7 +74,7 @@ TEST(HTTP, ModelTask)
   task.mutable_slave_id()->CopyFrom(slaveId);
   task.mutable_command()->set_value("echo hello");
 
-  Task task_ = protobuf::createTask(task, state, executorId, frameworkId);
+  Task task_ = protobuf::createTask(task, state, frameworkId);
   task_.add_statuses()->CopyFrom(statuses[0]);
 
   JSON::Value object = model(task, frameworkId, state, statuses);

http://git-wip-us.apache.org/repos/asf/mesos/blob/00024c3a/src/tests/slave_recovery_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_recovery_tests.cpp 
b/src/tests/slave_recovery_tests.cpp
index 5818e0f..c7c30d6 100644
--- a/src/tests/slave_recovery_tests.cpp
+++ b/src/tests/slave_recovery_tests.cpp
@@ -262,7 +262,7 @@ TYPED_TEST(SlaveRecoveryTest, RecoverSlaveState)
                 .tasks.contains(task.task_id()));
 
   const Task& t = mesos::internal::protobuf::createTask(
-      task, TASK_STAGING, executorId, frameworkId);
+      task, TASK_STAGING, frameworkId);
 
   ASSERT_SOME_EQ(
       t,

Reply via email to