Added an ExecutorToFramework message handler on the master.

This enables the slave to forward messages through the master.

Review: https://reviews.apache.org/r/36759


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/ac70a594
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/ac70a594
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/ac70a594

Branch: refs/heads/master
Commit: ac70a594e9b9700c4cd12eb95a26f8470785a7a9
Parents: a9312c2
Author: Benjamin Mahler <[email protected]>
Authored: Wed Jul 22 17:51:27 2015 -0700
Committer: Benjamin Mahler <[email protected]>
Committed: Fri Jul 24 16:25:44 2015 -0700

----------------------------------------------------------------------
 src/master/master.cpp  | 96 +++++++++++++++++++++++++++++++++++++++------
 src/master/master.hpp  |  7 ++++
 src/master/metrics.cpp | 14 +++++++
 src/master/metrics.hpp |  3 ++
 src/slave/slave.cpp    |  1 -
 5 files changed, 109 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/ac70a594/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 7796630..6d64bfc 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -713,6 +713,16 @@ void Master::initialize()
       &StatusUpdateMessage::update,
       &StatusUpdateMessage::pid);
 
+  // Added in 0.24.0 to support HTTP schedulers. Since
+  // these do not have a pid, the slave must forward
+  // messages through the master.
+  install<ExecutorToFrameworkMessage>(
+      &Master::executorMessage,
+      &ExecutorToFrameworkMessage::slave_id,
+      &ExecutorToFrameworkMessage::framework_id,
+      &ExecutorToFrameworkMessage::executor_id,
+      &ExecutorToFrameworkMessage::data);
+
   install<ReconcileTasksMessage>(
       &Master::reconcileTasks,
       &ReconcileTasksMessage::framework_id,
@@ -3199,24 +3209,24 @@ void Master::schedulerMessage(
     const ExecutorID& executorId,
     const string& data)
 {
-  ++metrics->messages_framework_to_executor;
+  metrics->messages_framework_to_executor++;
 
   Framework* framework = getFramework(frameworkId);
 
   if (framework == NULL) {
-    LOG(WARNING)
-      << "Ignoring framework message for executor " << executorId
-      << " of framework " << frameworkId
-      << " because the framework cannot be found";
+    LOG(WARNING) << "Ignoring framework message"
+                 << " for executor '" << executorId << "'"
+                 << " of framework " << frameworkId
+                 << " because the framework cannot be found";
     metrics->invalid_framework_to_executor_messages++;
     return;
   }
 
   if (from != framework->pid) {
-    LOG(WARNING)
-      << "Ignoring framework message for executor " << executorId
-      << " of framework " << *framework
-      << " because it is not expected from " << from;
+    LOG(WARNING) << "Ignoring framework message"
+                 << " for executor '" << executorId << "'"
+                 << " of framework " << *framework
+                 << " because it is not expected from " << from;
     metrics->invalid_framework_to_executor_messages++;
     return;
   }
@@ -3230,6 +3240,69 @@ void Master::schedulerMessage(
 }
 
 
+void Master::executorMessage(
+    const UPID& from,
+    const SlaveID& slaveId,
+    const FrameworkID& frameworkId,
+    const ExecutorID& executorId,
+    const string& data)
+{
+  metrics->messages_executor_to_framework++;
+
+  if (slaves.removed.get(slaveId).isSome()) {
+    // If the slave is removed, we have already informed
+    // frameworks that its tasks were LOST, so the slave
+    // should shut down.
+    LOG(WARNING) << "Ignoring executor message"
+                 << " from executor" << " '" << executorId << "'"
+                 << " of framework " << frameworkId
+                 << " on removed slave " << slaveId
+                 << " ; asking slave to shutdown";
+
+    ShutdownMessage message;
+    message.set_message("Executor message from unknown slave");
+    reply(message);
+    metrics->invalid_executor_to_framework_messages++;
+    return;
+  }
+
+  // The slave should (re-)register with the master before
+  // forwarding executor messages.
+  if (!slaves.registered.contains(slaveId)) {
+    LOG(WARNING) << "Ignoring executor message"
+                 << " from executor '" << executorId << "'"
+                 << " of framework " << frameworkId
+                 << " on unknown slave " << slaveId;
+    metrics->invalid_executor_to_framework_messages++;
+    return;
+  }
+
+  Slave* slave = slaves.registered.get(slaveId);
+  CHECK_NOTNULL(slave);
+
+  Framework* framework = getFramework(frameworkId);
+
+  if (framework == NULL) {
+    LOG(WARNING) << "Not forwarding executor message"
+                 << " for executor '" << executorId << "'"
+                 << " of framework " << frameworkId
+                 << " on slave " << *slave
+                 << " because the framework is unknown";
+    metrics->invalid_executor_to_framework_messages++;
+    return;
+  }
+
+  ExecutorToFrameworkMessage message;
+  message.mutable_slave_id()->MergeFrom(slaveId);
+  message.mutable_framework_id()->MergeFrom(frameworkId);
+  message.mutable_executor_id()->MergeFrom(executorId);
+  message.set_data(data);
+  send(framework->pid, message);
+
+  metrics->valid_executor_to_framework_messages++;
+}
+
+
 void Master::message(
     Framework* framework,
     const scheduler::Call::Message& message)
@@ -3850,8 +3923,6 @@ void Master::exitedExecutor(
     return;
   }
 
-  // Only update master's internal data structures here for proper
-  // accounting. The TASK_LOST updates are handled by the slave.
   if (!slaves.registered.contains(slaveId)) {
     LOG(WARNING) << "Ignoring exited executor '" << executorId
                  << "' of framework " << frameworkId
@@ -3859,6 +3930,9 @@ void Master::exitedExecutor(
     return;
   }
 
+  // Only update master's internal data structures here for proper
+  // accounting. The TASK_LOST updates are handled by the slave.
+
   Slave* slave = slaves.registered.get(slaveId);
   CHECK_NOTNULL(slave);
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/ac70a594/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 29113cb..827d0d5 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -716,6 +716,13 @@ public:
       const ExecutorID& executorId,
       const std::string& data);
 
+  void executorMessage(
+      const process::UPID& from,
+      const SlaveID& slaveId,
+      const FrameworkID& frameworkId,
+      const ExecutorID& executorId,
+      const std::string& data);
+
   void registerSlave(
       const process::UPID& from,
       const SlaveInfo& slaveInfo,

http://git-wip-us.apache.org/repos/asf/mesos/blob/ac70a594/src/master/metrics.cpp
----------------------------------------------------------------------
diff --git a/src/master/metrics.cpp b/src/master/metrics.cpp
index 10e2937..d79206f 100644
--- a/src/master/metrics.cpp
+++ b/src/master/metrics.cpp
@@ -118,6 +118,8 @@ Metrics::Metrics(const Master& master)
         "master/messages_reconcile_tasks"),
     messages_framework_to_executor(
         "master/messages_framework_to_executor"),
+    messages_executor_to_framework(
+        "master/messages_executor_to_framework"),
     messages_register_slave(
         "master/messages_register_slave"),
     messages_reregister_slave(
@@ -136,6 +138,10 @@ Metrics::Metrics(const Master& master)
         "master/valid_framework_to_executor_messages"),
     invalid_framework_to_executor_messages(
         "master/invalid_framework_to_executor_messages"),
+    valid_executor_to_framework_messages(
+        "master/valid_executor_to_framework_messages"),
+    invalid_executor_to_framework_messages(
+        "master/invalid_executor_to_framework_messages"),
     valid_status_updates(
         "master/valid_status_updates"),
     invalid_status_updates(
@@ -214,6 +220,7 @@ Metrics::Metrics(const Master& master)
   process::metrics::add(messages_revive_offers);
   process::metrics::add(messages_reconcile_tasks);
   process::metrics::add(messages_framework_to_executor);
+  process::metrics::add(messages_executor_to_framework);
 
   // Messages from slaves.
   process::metrics::add(messages_register_slave);
@@ -229,6 +236,9 @@ Metrics::Metrics(const Master& master)
   process::metrics::add(valid_framework_to_executor_messages);
   process::metrics::add(invalid_framework_to_executor_messages);
 
+  process::metrics::add(valid_executor_to_framework_messages);
+  process::metrics::add(invalid_executor_to_framework_messages);
+
   process::metrics::add(valid_status_updates);
   process::metrics::add(invalid_status_updates);
 
@@ -345,6 +355,7 @@ Metrics::~Metrics()
   process::metrics::remove(messages_revive_offers);
   process::metrics::remove(messages_reconcile_tasks);
   process::metrics::remove(messages_framework_to_executor);
+  process::metrics::remove(messages_executor_to_framework);
 
   // Messages from slaves.
   process::metrics::remove(messages_register_slave);
@@ -360,6 +371,9 @@ Metrics::~Metrics()
   process::metrics::remove(valid_framework_to_executor_messages);
   process::metrics::remove(invalid_framework_to_executor_messages);
 
+  process::metrics::remove(valid_executor_to_framework_messages);
+  process::metrics::remove(invalid_executor_to_framework_messages);
+
   process::metrics::remove(valid_status_updates);
   process::metrics::remove(invalid_status_updates);
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/ac70a594/src/master/metrics.hpp
----------------------------------------------------------------------
diff --git a/src/master/metrics.hpp b/src/master/metrics.hpp
index d37d74a..5e96a5f 100644
--- a/src/master/metrics.hpp
+++ b/src/master/metrics.hpp
@@ -131,6 +131,7 @@ struct Metrics
   process::metrics::Counter messages_revive_offers;
   process::metrics::Counter messages_reconcile_tasks;
   process::metrics::Counter messages_framework_to_executor;
+  process::metrics::Counter messages_executor_to_framework;
 
   // Messages from slaves.
   process::metrics::Counter messages_register_slave;
@@ -145,6 +146,8 @@ struct Metrics
 
   process::metrics::Counter valid_framework_to_executor_messages;
   process::metrics::Counter invalid_framework_to_executor_messages;
+  process::metrics::Counter valid_executor_to_framework_messages;
+  process::metrics::Counter invalid_executor_to_framework_messages;
 
   process::metrics::Counter valid_status_updates;
   process::metrics::Counter invalid_status_updates;

http://git-wip-us.apache.org/repos/asf/mesos/blob/ac70a594/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index dc12c45..784fdc8 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -2989,7 +2989,6 @@ void Slave::executorMessage(
     return;
   }
 
-
   LOG(INFO) << "Sending message for framework " << frameworkId
             << " to " << framework->pid;
 

Reply via email to