Added Subscribe call support to the master and the C++ scheduler library. Review: https://reviews.apache.org/r/36079
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/b090b0b4 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/b090b0b4 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/b090b0b4 Branch: refs/heads/master Commit: b090b0b4bdd670d8e585f8b39d1b2f3e766903b4 Parents: f587897 Author: Vinod Kone <[email protected]> Authored: Sat Jun 27 14:36:32 2015 -0700 Committer: Vinod Kone <[email protected]> Committed: Wed Jul 1 17:54:59 2015 -0700 ---------------------------------------------------------------------- src/master/master.cpp | 48 ++++++++++++++++++++++++++++++---------- src/master/master.hpp | 4 ++++ src/scheduler/scheduler.cpp | 16 ++------------ 3 files changed, 42 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/b090b0b4/src/master/master.cpp ---------------------------------------------------------------------- diff --git a/src/master/master.cpp b/src/master/master.cpp index db59831..c5a4875 100644 --- a/src/master/master.cpp +++ b/src/master/master.cpp @@ -1611,17 +1611,29 @@ void Master::receive( const scheduler::Call& call) { // TODO(vinod): Add metrics for calls. - // TODO(vinod): Implement the unimplemented calls. - // For SUBSCRIBE call, no need to look up the framework. Therefore, - // we handle them first and separately from other types of calls. - switch (call.type()) { - case scheduler::Call::SUBSCRIBE: - drop(from, call, "Unimplemented"); + if (call.type() == scheduler::Call::SUBSCRIBE) { + if (!call.has_subscribe()) { + drop(from, call, "Expecting 'subscribe' to be present"); return; + } - default: - break; + if (!(call.subscribe().framework_info().id() == call.framework_id())) { + drop(from, + call, + "Framework id in the call doesn't match the framework id" + " in the 'subscribe' message"); + return; + } + + subscribe(from, call.subscribe()); + return; + } + + // All calls except SUBSCRIBE should have framework id set. + if (!call.has_framework_id()) { + drop(from, call, "Expecting framework id to be present"); + return; } // We consolidate the framework lookup and pid validation logic here @@ -1638,10 +1650,6 @@ void Master::receive( return; } - // TODO(jieyu): Validate frameworkInfo to make sure it's the same as - // the one that the framework used during registration and that the - // framework id is set and non-empty except for SUBSCRIBE call. - switch (call.type()) { case scheduler::Call::TEARDOWN: { removeFramework(framework); @@ -2076,6 +2084,22 @@ void Master::_reregisterFramework( } +void Master::subscribe( + const UPID& from, + const scheduler::Call::Subscribe& subscribe) +{ + const FrameworkInfo& frameworkInfo = subscribe.framework_info(); + + // TODO(vinod): Instead of calling '(re-)registerFramework()' from + // here refactor those methods to call 'subscribe()'. + if (frameworkInfo.has_id() || frameworkInfo.id() == "") { + registerFramework(from, frameworkInfo); + } else { + reregisterFramework(from, frameworkInfo, subscribe.force()); + } +} + + void Master::unregisterFramework( const UPID& from, const FrameworkID& frameworkId) http://git-wip-us.apache.org/repos/asf/mesos/blob/b090b0b4/src/master/master.hpp ---------------------------------------------------------------------- diff --git a/src/master/master.hpp b/src/master/master.hpp index 5561396..fb4d6fa 100644 --- a/src/master/master.hpp +++ b/src/master/master.hpp @@ -1004,6 +1004,10 @@ private: const process::UPID& from, const scheduler::Call& call); + void subscribe( + const process::UPID& from, + const scheduler::Call::Subscribe& subscribe); + void accept( Framework* framework, const scheduler::Call::Accept& accept); http://git-wip-us.apache.org/repos/asf/mesos/blob/b090b0b4/src/scheduler/scheduler.cpp ---------------------------------------------------------------------- diff --git a/src/scheduler/scheduler.cpp b/src/scheduler/scheduler.cpp index 34fa78e..d5ac04c 100644 --- a/src/scheduler/scheduler.cpp +++ b/src/scheduler/scheduler.cpp @@ -220,24 +220,13 @@ public: return; } - const FrameworkInfo frameworkInfo = call.subscribe().framework_info(); - - if (!(frameworkInfo.id() == call.framework_id())) { + if (!(call.subscribe().framework_info().id() == call.framework_id())) { drop(call, "Framework id in the call doesn't match the framework id" " in the 'subscribe' message"); return; } - if (!frameworkInfo.has_id() || frameworkInfo.id() == "") { - RegisterFrameworkMessage message; - message.mutable_framework()->CopyFrom(frameworkInfo); - send(master.get(), message); - } else { - ReregisterFrameworkMessage message; - message.mutable_framework()->CopyFrom(frameworkInfo); - message.set_failover(failover); - send(master.get(), message); - } + send(master.get(), call); break; } @@ -301,7 +290,6 @@ public: drop(call, "Expecting 'reconcile' to be present"); return; } - send(master.get(), call); break; }
