Revert "Deleted old style message handling from the scheduler library."
This reverts commit 138ca6903a3adfcf21783ae613e120b785245304. Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/2926208c Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/2926208c Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/2926208c Branch: refs/heads/master Commit: 2926208c4108a8467cba00d6d49b549e7286f5a1 Parents: ddbd429 Author: Vinod Kone <[email protected]> Authored: Thu Aug 13 13:36:09 2015 -0700 Committer: Vinod Kone <[email protected]> Committed: Thu Aug 13 13:36:09 2015 -0700 ---------------------------------------------------------------------- src/scheduler/scheduler.cpp | 89 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 89 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/2926208c/src/scheduler/scheduler.cpp ---------------------------------------------------------------------- diff --git a/src/scheduler/scheduler.cpp b/src/scheduler/scheduler.cpp index 36d7052..3fbe383 100644 --- a/src/scheduler/scheduler.cpp +++ b/src/scheduler/scheduler.cpp @@ -195,6 +195,16 @@ public: protected: virtual void initialize() { + install<FrameworkRegisteredMessage>(&MesosProcess::receive); + install<FrameworkReregisteredMessage>(&MesosProcess::receive); + install<ResourceOffersMessage>(&MesosProcess::receive); + install<RescindResourceOfferMessage>(&MesosProcess::receive); + install<StatusUpdateMessage>(&MesosProcess::receive); + install<LostSlaveMessage>(&MesosProcess::receive); + install<ExitedExecutorMessage>(&MesosProcess::receive); + install<ExecutorToFrameworkMessage>(&MesosProcess::receive); + install<FrameworkErrorMessage>(&MesosProcess::receive); + // Start detecting masters. detector->detect() .onAny(defer(self(), &MesosProcess::detected, lambda::_1)); @@ -242,6 +252,40 @@ protected: return async(connected); } + // NOTE: A None 'from' is possible when an event is injected locally. + void receive(const Option<UPID>& from, const Event& event) + { + // Check if we're disconnected but received an event. + if (from.isSome() && master.isNone()) { + VLOG(1) << "Ignoring " << stringify(event.type()) + << " event because we're disconnected"; + return; + } else if (from.isSome() && master != from) { + VLOG(1) + << "Ignoring " << stringify(event.type()) + << " event because it was sent from '" << from.get() + << "' instead of the leading master '" << master.get() << "'"; + return; + } + + // Note that if 'from' is None we're locally injecting this event + // so we always want to enqueue it even if we're not connected! + + VLOG(1) << "Enqueuing event " << stringify(event.type()) << " from " + << (from.isNone() ? "(locally injected)" : from.get()); + + // Queue up the event and invoke the 'received' callback if this + // is the first event (between now and when the 'received' + // callback actually gets invoked more events might get queued). + events.push(event); + + if (events.size() == 1) { + mutex.lock() + .then(defer(self(), &Self::_receive)) + .onAny(lambda::bind(&Mutex::unlock, mutex)); + } + } + Future<Nothing> _receive() { Future<Nothing> future = async(received, events); @@ -249,6 +293,51 @@ protected: return future; } + void receive(const UPID& from, const FrameworkRegisteredMessage& message) + { + receive(from, evolve(message)); + } + + void receive(const UPID& from, const FrameworkReregisteredMessage& message) + { + receive(from, evolve(message)); + } + + void receive(const UPID& from, const ResourceOffersMessage& message) + { + receive(from, evolve(message)); + } + + void receive(const UPID& from, const RescindResourceOfferMessage& message) + { + receive(from, evolve(message)); + } + + void receive(const UPID& from, const StatusUpdateMessage& message) + { + receive(from, evolve(message)); + } + + void receive(const UPID& from, const LostSlaveMessage& message) + { + receive(from, evolve(message)); + } + + void receive(const UPID& from, const ExitedExecutorMessage& message) + { + receive(from, evolve(message)); + } + + void receive(const UPID& from, const ExecutorToFrameworkMessage& message) + { + receive(from, evolve(message)); + } + + void receive(const UPID& from, const FrameworkErrorMessage& message) + { + receive(from, evolve(message)); + } + // Helper for injecting an ERROR event. void error(const string& message) {
