Added an ExecutorToFramework message handler on the master. This enables the slave to forward messages through the master.
Review: https://reviews.apache.org/r/36759 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/ac70a594 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/ac70a594 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/ac70a594 Branch: refs/heads/master Commit: ac70a594e9b9700c4cd12eb95a26f8470785a7a9 Parents: a9312c2 Author: Benjamin Mahler <[email protected]> Authored: Wed Jul 22 17:51:27 2015 -0700 Committer: Benjamin Mahler <[email protected]> Committed: Fri Jul 24 16:25:44 2015 -0700 ---------------------------------------------------------------------- src/master/master.cpp | 96 +++++++++++++++++++++++++++++++++++++++------ src/master/master.hpp | 7 ++++ src/master/metrics.cpp | 14 +++++++ src/master/metrics.hpp | 3 ++ src/slave/slave.cpp | 1 - 5 files changed, 109 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/ac70a594/src/master/master.cpp ---------------------------------------------------------------------- diff --git a/src/master/master.cpp b/src/master/master.cpp index 7796630..6d64bfc 100644 --- a/src/master/master.cpp +++ b/src/master/master.cpp @@ -713,6 +713,16 @@ void Master::initialize() &StatusUpdateMessage::update, &StatusUpdateMessage::pid); + // Added in 0.24.0 to support HTTP schedulers. Since + // these do not have a pid, the slave must forward + // messages through the master. + install<ExecutorToFrameworkMessage>( + &Master::executorMessage, + &ExecutorToFrameworkMessage::slave_id, + &ExecutorToFrameworkMessage::framework_id, + &ExecutorToFrameworkMessage::executor_id, + &ExecutorToFrameworkMessage::data); + install<ReconcileTasksMessage>( &Master::reconcileTasks, &ReconcileTasksMessage::framework_id, @@ -3199,24 +3209,24 @@ void Master::schedulerMessage( const ExecutorID& executorId, const string& data) { - ++metrics->messages_framework_to_executor; + metrics->messages_framework_to_executor++; Framework* framework = getFramework(frameworkId); if (framework == NULL) { - LOG(WARNING) - << "Ignoring framework message for executor " << executorId - << " of framework " << frameworkId - << " because the framework cannot be found"; + LOG(WARNING) << "Ignoring framework message" + << " for executor '" << executorId << "'" + << " of framework " << frameworkId + << " because the framework cannot be found"; metrics->invalid_framework_to_executor_messages++; return; } if (from != framework->pid) { - LOG(WARNING) - << "Ignoring framework message for executor " << executorId - << " of framework " << *framework - << " because it is not expected from " << from; + LOG(WARNING) << "Ignoring framework message" + << " for executor '" << executorId << "'" + << " of framework " << *framework + << " because it is not expected from " << from; metrics->invalid_framework_to_executor_messages++; return; } @@ -3230,6 +3240,69 @@ void Master::schedulerMessage( } +void Master::executorMessage( + const UPID& from, + const SlaveID& slaveId, + const FrameworkID& frameworkId, + const ExecutorID& executorId, + const string& data) +{ + metrics->messages_executor_to_framework++; + + if (slaves.removed.get(slaveId).isSome()) { + // If the slave is removed, we have already informed + // frameworks that its tasks were LOST, so the slave + // should shut down. + LOG(WARNING) << "Ignoring executor message" + << " from executor" << " '" << executorId << "'" + << " of framework " << frameworkId + << " on removed slave " << slaveId + << " ; asking slave to shutdown"; + + ShutdownMessage message; + message.set_message("Executor message from unknown slave"); + reply(message); + metrics->invalid_executor_to_framework_messages++; + return; + } + + // The slave should (re-)register with the master before + // forwarding executor messages. + if (!slaves.registered.contains(slaveId)) { + LOG(WARNING) << "Ignoring executor message" + << " from executor '" << executorId << "'" + << " of framework " << frameworkId + << " on unknown slave " << slaveId; + metrics->invalid_executor_to_framework_messages++; + return; + } + + Slave* slave = slaves.registered.get(slaveId); + CHECK_NOTNULL(slave); + + Framework* framework = getFramework(frameworkId); + + if (framework == NULL) { + LOG(WARNING) << "Not forwarding executor message" + << " for executor '" << executorId << "'" + << " of framework " << frameworkId + << " on slave " << *slave + << " because the framework is unknown"; + metrics->invalid_executor_to_framework_messages++; + return; + } + + ExecutorToFrameworkMessage message; + message.mutable_slave_id()->MergeFrom(slaveId); + message.mutable_framework_id()->MergeFrom(frameworkId); + message.mutable_executor_id()->MergeFrom(executorId); + message.set_data(data); + send(framework->pid, message); + + metrics->valid_executor_to_framework_messages++; +} + + void Master::message( Framework* framework, const scheduler::Call::Message& message) @@ -3850,8 +3923,6 @@ void Master::exitedExecutor( return; } - // Only update master's internal data structures here for proper - // accounting. The TASK_LOST updates are handled by the slave. if (!slaves.registered.contains(slaveId)) { LOG(WARNING) << "Ignoring exited executor '" << executorId << "' of framework " << frameworkId @@ -3859,6 +3930,9 @@ void Master::exitedExecutor( return; } + // Only update master's internal data structures here for proper + // accounting. The TASK_LOST updates are handled by the slave. + Slave* slave = slaves.registered.get(slaveId); CHECK_NOTNULL(slave); http://git-wip-us.apache.org/repos/asf/mesos/blob/ac70a594/src/master/master.hpp ---------------------------------------------------------------------- diff --git a/src/master/master.hpp b/src/master/master.hpp index 29113cb..827d0d5 100644 --- a/src/master/master.hpp +++ b/src/master/master.hpp @@ -716,6 +716,13 @@ public: const ExecutorID& executorId, const std::string& data); + void executorMessage( + const process::UPID& from, + const SlaveID& slaveId, + const FrameworkID& frameworkId, + const ExecutorID& executorId, + const std::string& data); + void registerSlave( const process::UPID& from, const SlaveInfo& slaveInfo, http://git-wip-us.apache.org/repos/asf/mesos/blob/ac70a594/src/master/metrics.cpp ---------------------------------------------------------------------- diff --git a/src/master/metrics.cpp b/src/master/metrics.cpp index 10e2937..d79206f 100644 --- a/src/master/metrics.cpp +++ b/src/master/metrics.cpp @@ -118,6 +118,8 @@ Metrics::Metrics(const Master& master) "master/messages_reconcile_tasks"), messages_framework_to_executor( "master/messages_framework_to_executor"), + messages_executor_to_framework( + "master/messages_executor_to_framework"), messages_register_slave( "master/messages_register_slave"), messages_reregister_slave( @@ -136,6 +138,10 @@ Metrics::Metrics(const Master& master) "master/valid_framework_to_executor_messages"), invalid_framework_to_executor_messages( "master/invalid_framework_to_executor_messages"), + valid_executor_to_framework_messages( + "master/valid_executor_to_framework_messages"), + invalid_executor_to_framework_messages( + "master/invalid_executor_to_framework_messages"), valid_status_updates( "master/valid_status_updates"), invalid_status_updates( @@ -214,6 +220,7 @@ Metrics::Metrics(const Master& master) process::metrics::add(messages_revive_offers); process::metrics::add(messages_reconcile_tasks); process::metrics::add(messages_framework_to_executor); + process::metrics::add(messages_executor_to_framework); // Messages from slaves. process::metrics::add(messages_register_slave); @@ -229,6 +236,9 @@ Metrics::Metrics(const Master& master) process::metrics::add(valid_framework_to_executor_messages); process::metrics::add(invalid_framework_to_executor_messages); + process::metrics::add(valid_executor_to_framework_messages); + process::metrics::add(invalid_executor_to_framework_messages); + process::metrics::add(valid_status_updates); process::metrics::add(invalid_status_updates); @@ -345,6 +355,7 @@ Metrics::~Metrics() process::metrics::remove(messages_revive_offers); process::metrics::remove(messages_reconcile_tasks); process::metrics::remove(messages_framework_to_executor); + process::metrics::remove(messages_executor_to_framework); // Messages from slaves. process::metrics::remove(messages_register_slave); @@ -360,6 +371,9 @@ Metrics::~Metrics() process::metrics::remove(valid_framework_to_executor_messages); process::metrics::remove(invalid_framework_to_executor_messages); + process::metrics::remove(valid_executor_to_framework_messages); + process::metrics::remove(invalid_executor_to_framework_messages); + process::metrics::remove(valid_status_updates); process::metrics::remove(invalid_status_updates); http://git-wip-us.apache.org/repos/asf/mesos/blob/ac70a594/src/master/metrics.hpp ---------------------------------------------------------------------- diff --git a/src/master/metrics.hpp b/src/master/metrics.hpp index d37d74a..5e96a5f 100644 --- a/src/master/metrics.hpp +++ b/src/master/metrics.hpp @@ -131,6 +131,7 @@ struct Metrics process::metrics::Counter messages_revive_offers; process::metrics::Counter messages_reconcile_tasks; process::metrics::Counter messages_framework_to_executor; + process::metrics::Counter messages_executor_to_framework; // Messages from slaves. process::metrics::Counter messages_register_slave; @@ -145,6 +146,8 @@ struct Metrics process::metrics::Counter valid_framework_to_executor_messages; process::metrics::Counter invalid_framework_to_executor_messages; + process::metrics::Counter valid_executor_to_framework_messages; + process::metrics::Counter invalid_executor_to_framework_messages; process::metrics::Counter valid_status_updates; process::metrics::Counter invalid_status_updates; http://git-wip-us.apache.org/repos/asf/mesos/blob/ac70a594/src/slave/slave.cpp ---------------------------------------------------------------------- diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp index dc12c45..784fdc8 100644 --- a/src/slave/slave.cpp +++ b/src/slave/slave.cpp @@ -2989,7 +2989,6 @@ void Slave::executorMessage( return; } - LOG(INFO) << "Sending message for framework " << frameworkId << " to " << framework->pid;
