Handle scheduler pid as optional in the slave.

This is anticipation of HTTP scheduler support in 0.24.0.
Note that the 'pid' is set for driver-based schedulers. The
corresponding master changes to not set 'pid' for HTTP
schedulers have not occurred yet.

Review: https://reviews.apache.org/r/36760


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/9172a5f5
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/9172a5f5
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/9172a5f5

Branch: refs/heads/master
Commit: 9172a5f50bc26c2bd88ff7382a0b5f0ccaf73b14
Parents: ac70a59
Author: Benjamin Mahler <[email protected]>
Authored: Thu Jul 23 15:17:22 2015 -0700
Committer: Benjamin Mahler <[email protected]>
Committed: Fri Jul 24 16:25:44 2015 -0700

----------------------------------------------------------------------
 src/master/master.cpp       |  5 +-
 src/messages/messages.proto | 13 +++++-
 src/slave/slave.cpp         | 99 ++++++++++++++++++++++++++++++----------
 src/slave/slave.hpp         | 16 +++++--
 src/slave/state.hpp         |  4 ++
 src/tests/mesos.cpp         |  7 ++-
 src/tests/mesos.hpp         |  8 ++--
 src/tests/slave_tests.cpp   |  5 +-
 8 files changed, 111 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/9172a5f5/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 6d64bfc..613a011 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -5042,8 +5042,9 @@ void Master::addSlave(
   // TODO(vinod): Reconcile the notion of a completed framework across the
   // master and slave.
   foreach (const Archive::Framework& completedFramework, completedFrameworks) {
-    const FrameworkID& frameworkId = completedFramework.framework_info().id();
-    Framework* framework = getFramework(frameworkId);
+    Framework* framework = getFramework(
+        completedFramework.framework_info().id());
+
     foreach (const Task& task, completedFramework.tasks()) {
       if (framework != NULL) {
         VLOG(2) << "Re-adding completed task " << task.task_id()

http://git-wip-us.apache.org/repos/asf/mesos/blob/9172a5f5/src/messages/messages.proto
----------------------------------------------------------------------
diff --git a/src/messages/messages.proto b/src/messages/messages.proto
index 165a16d..8977d8e 100644
--- a/src/messages/messages.proto
+++ b/src/messages/messages.proto
@@ -193,8 +193,15 @@ message RunTaskMessage {
   // TODO(karya): Remove framework_id after MESOS-2559 has shipped.
   optional FrameworkID framework_id = 1 [deprecated = true];
   required FrameworkInfo framework = 2;
-  required string pid = 3;
   required TaskInfo task = 4;
+
+  // The pid of the framework. This was moved to 'optional' in
+  // 0.24.0 to support schedulers using the HTTP API. For now, we
+  // continue to always set pid since it was required in 0.23.x.
+  // When 'pid' is unset, or set to empty string, the slave will
+  // forward executor messages through the master. For schedulers
+  // still using the driver, this will remain set.
+  optional string pid = 3;
 }
 
 
@@ -335,7 +342,9 @@ message ShutdownExecutorMessage {
 
 message UpdateFrameworkMessage {
   required FrameworkID framework_id = 1;
-  required string pid = 2;
+
+  // See the comment on RunTaskMessage.pid.
+  optional string pid = 2;
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/9172a5f5/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 784fdc8..4ba95f9 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -1165,13 +1165,16 @@ void Slave::doReliableRegistration(Duration maxBackoff)
     foreach (const Owned<Framework>& completedFramework, completedFrameworks) {
       VLOG(1) << "Reregistering completed framework "
                 << completedFramework->id();
+
       Archive::Framework* completedFramework_ =
         message.add_completed_frameworks();
-      FrameworkInfo* frameworkInfo =
-        completedFramework_->mutable_framework_info();
-      frameworkInfo->CopyFrom(completedFramework->info);
 
-      completedFramework_->set_pid(completedFramework->pid);
+      completedFramework_->mutable_framework_info()->CopyFrom(
+          completedFramework->info);
+
+      if (completedFramework->pid.isSome()) {
+        completedFramework_->set_pid(completedFramework->pid.get());
+      }
 
       foreach (const Owned<Executor>& executor,
                completedFramework->completedExecutors) {
@@ -1179,10 +1182,12 @@ void Slave::doReliableRegistration(Duration maxBackoff)
                 << " with " << executor->terminatedTasks.size()
                 << " terminated tasks, " << executor->completedTasks.size()
                 << " completed tasks";
+
         foreach (const Task* task, executor->terminatedTasks.values()) {
           VLOG(2) << "Reregistering terminated task " << task->task_id();
           completedFramework_->add_tasks()->CopyFrom(*task);
         }
+
         foreach (const std::shared_ptr<Task>& task, executor->completedTasks) {
           VLOG(2) << "Reregistering completed task " << task->task_id();
           completedFramework_->add_tasks()->CopyFrom(*task);
@@ -1222,7 +1227,7 @@ void Slave::runTask(
     const UPID& from,
     const FrameworkInfo& frameworkInfo_,
     const FrameworkID& frameworkId_,
-    const string& pid,
+    const UPID& pid,
     TaskInfo task)
 {
   if (master != from) {
@@ -1291,7 +1296,13 @@ void Slave::runTask(
       unschedule = unschedule.then(defer(self(), &Self::unschedule, path));
     }
 
-    framework = new Framework(this, frameworkInfo, pid);
+    Option<UPID> frameworkPid = None();
+
+    if (pid != UPID()) {
+      frameworkPid = pid;
+    }
+
+    framework = new Framework(this, frameworkInfo, frameworkPid);
     frameworks[frameworkId] = framework;
 
     // Is this same framework in completedFrameworks? If so, move the completed
@@ -1340,14 +1351,13 @@ void Slave::runTask(
 
   // Run the task after the unschedules are done.
   unschedule.onAny(
-      defer(self(), &Self::_runTask, lambda::_1, frameworkInfo, pid, task));
+      defer(self(), &Self::_runTask, lambda::_1, frameworkInfo, task));
 }
 
 
 void Slave::_runTask(
     const Future<bool>& future,
     const FrameworkInfo& frameworkInfo,
-    const string& pid,
     const TaskInfo& task)
 {
   const FrameworkID frameworkId = frameworkInfo.id();
@@ -1733,8 +1743,12 @@ void Slave::runTasks(
     RunTaskMessage message;
     message.mutable_framework_id()->MergeFrom(framework->id());
     message.mutable_framework()->MergeFrom(framework->info);
-    message.set_pid(framework->pid);
     message.mutable_task()->MergeFrom(task);
+
+    // Note that 0.23.x executors require the 'pid' to be set
+    // to decode the message, but do not use the field.
+    message.set_pid(framework->pid.getOrElse(UPID()));
+
     send(executor->pid, message);
   }
 }
@@ -2087,7 +2101,9 @@ void Slave::schedulerMessage(
 }
 
 
-void Slave::updateFramework(const FrameworkID& frameworkId, const string& pid)
+void Slave::updateFramework(
+    const FrameworkID& frameworkId,
+    const UPID& pid)
 {
   CHECK(state == RECOVERING || state == DISCONNECTED ||
         state == RUNNING || state == TERMINATING)
@@ -2115,15 +2131,25 @@ void Slave::updateFramework(const FrameworkID& 
frameworkId, const string& pid)
     case Framework::RUNNING: {
       LOG(INFO) << "Updating framework " << frameworkId << " pid to " << pid;
 
-      framework->pid = pid;
+      if (pid == UPID()) {
+        framework->pid = None();
+      } else {
+        framework->pid = pid;
+      }
+
       if (framework->info.checkpoint()) {
-        // Checkpoint the framework pid.
+        // Checkpoint the framework pid, note that when the 'pid'
+        // is None, we checkpoint a default UPID() because
+        // 0.23.x slaves consider a missing pid file to be an
+        // error.
         const string path = paths::getFrameworkPidPath(
             metaDir, info.id(), frameworkId);
 
-        VLOG(1) << "Checkpointing framework pid '"
-                << framework->pid << "' to '" << path << "'";
-        CHECK_SOME(state::checkpoint(path, framework->pid));
+        VLOG(1) << "Checkpointing framework pid"
+                << " '" << framework->pid.getOrElse(UPID()) << "'"
+                << " to '" << path << "'";
+
+        CHECK_SOME(state::checkpoint(path, framework->pid.getOrElse(UPID())));
       }
 
       // Inform status update manager to immediately resend any pending
@@ -2989,15 +3015,23 @@ void Slave::executorMessage(
     return;
   }
 
-  LOG(INFO) << "Sending message for framework " << frameworkId
-            << " to " << framework->pid;
-
   ExecutorToFrameworkMessage message;
   message.mutable_slave_id()->MergeFrom(slaveId);
   message.mutable_framework_id()->MergeFrom(frameworkId);
   message.mutable_executor_id()->MergeFrom(executorId);
   message.set_data(data);
-  send(framework->pid, message);
+
+  CHECK_SOME(master);
+
+  if (framework->pid.isSome()) {
+    LOG(INFO) << "Sending message for framework " << frameworkId
+              << " to " << framework->pid.get();
+    send(framework->pid.get(), message);
+  } else {
+    LOG(INFO) << "Sending message for framework " << frameworkId
+              << " through the master " << master.get();
+    send(master.get(), message);
+  }
 
   metrics.valid_framework_messages++;
 }
@@ -4142,8 +4176,17 @@ void Slave::recoverFramework(const FrameworkState& state)
     CHECK_EQ(frameworkInfo.id(), state.id);
   }
 
+  // In 0.24.0, HTTP schedulers are supported and these do not
+  // have a 'pid'. In this case, the slave will checkpoint UPID().
   CHECK_SOME(state.pid);
-  Framework* framework = new Framework(this, frameworkInfo, state.pid.get());
+
+  Option<UPID> pid = state.pid.get();
+
+  if (pid.get() == UPID()) {
+    pid = None();
+  }
+
+  Framework* framework = new Framework(this, frameworkInfo, pid);
   frameworks[framework->id()] = framework;
 
   // Now recover the executors for this framework.
@@ -4662,7 +4705,7 @@ double Slave::_resources_revocable_percent(const string& 
name)
 Framework::Framework(
     Slave* _slave,
     const FrameworkInfo& _info,
-    const UPID& _pid)
+    const Option<UPID>& _pid)
   : state(RUNNING),
     slave(_slave),
     info(_info),
@@ -4675,15 +4718,21 @@ Framework::Framework(
         slave->metaDir, slave->info.id(), id());
 
     VLOG(1) << "Checkpointing FrameworkInfo to '" << path << "'";
+
     CHECK_SOME(state::checkpoint(path, info));
 
-    // Checkpoint the framework pid.
+    // Checkpoint the framework pid, note that we checkpoint a
+    // UPID() when it is None (for HTTP schedulers) because
+    // 0.23.x slaves consider a missing pid file to be an
+    // error.
     path = paths::getFrameworkPidPath(
         slave->metaDir, slave->info.id(), id());
 
-    VLOG(1) << "Checkpointing framework pid '"
-            << pid << "' to '" << path << "'";
-    CHECK_SOME(state::checkpoint(path, pid));
+    VLOG(1) << "Checkpointing framework pid"
+            << " '" << pid.getOrElse(UPID()) << "'"
+            << " to '" << path << "'";
+
+    CHECK_SOME(state::checkpoint(path, pid.getOrElse(UPID())));
   }
 }
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/9172a5f5/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index dec4ca8..41d0949 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -117,14 +117,13 @@ public:
       const process::UPID& from,
       const FrameworkInfo& frameworkInfo,
       const FrameworkID& frameworkId,
-      const std::string& pid,
+      const process::UPID& pid,
       TaskInfo task);
 
   // Made 'virtual' for Slave mocking.
   virtual void _runTask(
       const process::Future<bool>& future,
       const FrameworkInfo& frameworkInfo,
-      const std::string& pid,
       const TaskInfo& task);
 
   process::Future<bool> unschedule(const std::string& path);
@@ -150,7 +149,9 @@ public:
       const ExecutorID& executorId,
       const std::string& data);
 
-  void updateFramework(const FrameworkID& frameworkId, const std::string& pid);
+  void updateFramework(
+      const FrameworkID& frameworkId,
+      const process::UPID& pid);
 
   void checkpointResources(const std::vector<Resource>& checkpointedResources);
 
@@ -634,7 +635,7 @@ struct Framework
   Framework(
       Slave* slave,
       const FrameworkInfo& info,
-      const process::UPID& pid);
+      const Option<process::UPID>& pid);
 
   ~Framework();
 
@@ -660,7 +661,12 @@ struct Framework
 
   const FrameworkInfo info;
 
-  UPID pid;
+  // Frameworks using the scheduler driver will have a 'pid',
+  // which allows us to send executor messages directly to the
+  // driver. Frameworks using the HTTP API (in 0.24.0) will
+  // not have a 'pid', in which case executor messages are
+  // sent through the master.
+  Option<UPID> pid;
 
   // Executors with pending tasks.
   hashmap<ExecutorID, hashmap<TaskID, TaskInfo>> pending;

http://git-wip-us.apache.org/repos/asf/mesos/blob/9172a5f5/src/slave/state.hpp
----------------------------------------------------------------------
diff --git a/src/slave/state.hpp b/src/slave/state.hpp
index 4e00468..cecf200 100644
--- a/src/slave/state.hpp
+++ b/src/slave/state.hpp
@@ -248,7 +248,11 @@ struct FrameworkState
 
   FrameworkID id;
   Option<FrameworkInfo> info;
+
+  // Note that HTTP frameworks (supported in 0.24.0) do not have a
+  // PID, in which case 'pid' is Some(UPID()) rather than None().
   Option<process::UPID> pid;
+
   hashmap<ExecutorID, ExecutorState> executors;
   unsigned int errors;
 };

http://git-wip-us.apache.org/repos/asf/mesos/blob/9172a5f5/src/tests/mesos.cpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.cpp b/src/tests/mesos.cpp
index f09ef0f..f3b7315 100644
--- a/src/tests/mesos.cpp
+++ b/src/tests/mesos.cpp
@@ -462,7 +462,7 @@ MockSlave::MockSlave(const slave::Flags& flags,
   // Set up default behaviors, calling the original methods.
   EXPECT_CALL(*this, runTask(_, _, _, _, _))
     .WillRepeatedly(Invoke(this, &MockSlave::unmocked_runTask));
-  EXPECT_CALL(*this, _runTask(_, _, _, _))
+  EXPECT_CALL(*this, _runTask(_, _, _))
     .WillRepeatedly(Invoke(this, &MockSlave::unmocked__runTask));
   EXPECT_CALL(*this, killTask(_, _, _))
     .WillRepeatedly(Invoke(this, &MockSlave::unmocked_killTask));
@@ -485,7 +485,7 @@ void MockSlave::unmocked_runTask(
     const UPID& from,
     const FrameworkInfo& frameworkInfo,
     const FrameworkID& frameworkId,
-    const std::string& pid,
+    const UPID& pid,
     TaskInfo task)
 {
   slave::Slave::runTask(from, frameworkInfo, frameworkId, pid, task);
@@ -495,10 +495,9 @@ void MockSlave::unmocked_runTask(
 void MockSlave::unmocked__runTask(
       const Future<bool>& future,
       const FrameworkInfo& frameworkInfo,
-      const std::string& pid,
       const TaskInfo& task)
 {
-  slave::Slave::_runTask(future, frameworkInfo, pid, task);
+  slave::Slave::_runTask(future, frameworkInfo, task);
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/9172a5f5/src/tests/mesos.hpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.hpp b/src/tests/mesos.hpp
index 8a76b4f..1759d7e 100644
--- a/src/tests/mesos.hpp
+++ b/src/tests/mesos.hpp
@@ -811,26 +811,24 @@ public:
       const process::UPID& from,
       const FrameworkInfo& frameworkInfo,
       const FrameworkID& frameworkId,
-      const std::string& pid,
+      const process::UPID& pid,
       TaskInfo task));
 
   void unmocked_runTask(
       const process::UPID& from,
       const FrameworkInfo& frameworkInfo,
       const FrameworkID& frameworkId,
-      const std::string& pid,
+      const process::UPID& pid,
       TaskInfo task);
 
-  MOCK_METHOD4(_runTask, void(
+  MOCK_METHOD3(_runTask, void(
       const process::Future<bool>& future,
       const FrameworkInfo& frameworkInfo,
-      const std::string& pid,
       const TaskInfo& task));
 
   void unmocked__runTask(
       const process::Future<bool>& future,
       const FrameworkInfo& frameworkInfo,
-      const std::string& pid,
       const TaskInfo& task);
 
   MOCK_METHOD3(killTask, void(

http://git-wip-us.apache.org/repos/asf/mesos/blob/9172a5f5/src/tests/slave_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp
index b145d76..64cef6e 100644
--- a/src/tests/slave_tests.cpp
+++ b/src/tests/slave_tests.cpp
@@ -1793,7 +1793,7 @@ TEST_F(SlaveTest, KillTaskBetweenRunTaskParts)
   // later, tie reaching the critical moment when to kill the task to
   // a future.
   Future<Nothing> _runTask;
-  EXPECT_CALL(slave, _runTask(_, _, _, _))
+  EXPECT_CALL(slave, _runTask(_, _, _))
     .WillOnce(DoAll(FutureSatisfy(&_runTask),
                     SaveArg<0>(&future),
                     SaveArg<1>(&frameworkInfo)));
@@ -1818,8 +1818,7 @@ TEST_F(SlaveTest, KillTaskBetweenRunTaskParts)
   driver.killTask(task.task_id());
 
   AWAIT_READY(killTask);
-  slave.unmocked__runTask(
-      future, frameworkInfo, master.get(), task);
+  slave.unmocked__runTask(future, frameworkInfo, task);
 
   AWAIT_READY(removeFramework);
 

Reply via email to