Revert "Deleted old style message handling from the scheduler library."

This reverts commit 138ca6903a3adfcf21783ae613e120b785245304.


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/2926208c
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/2926208c
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/2926208c

Branch: refs/heads/master
Commit: 2926208c4108a8467cba00d6d49b549e7286f5a1
Parents: ddbd429
Author: Vinod Kone <[email protected]>
Authored: Thu Aug 13 13:36:09 2015 -0700
Committer: Vinod Kone <[email protected]>
Committed: Thu Aug 13 13:36:09 2015 -0700

----------------------------------------------------------------------
 src/scheduler/scheduler.cpp | 89 ++++++++++++++++++++++++++++++++++++++++
 1 file changed, 89 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/2926208c/src/scheduler/scheduler.cpp
----------------------------------------------------------------------
diff --git a/src/scheduler/scheduler.cpp b/src/scheduler/scheduler.cpp
index 36d7052..3fbe383 100644
--- a/src/scheduler/scheduler.cpp
+++ b/src/scheduler/scheduler.cpp
@@ -195,6 +195,16 @@ public:
 protected:
   virtual void initialize()
   {
+    install<FrameworkRegisteredMessage>(&MesosProcess::receive);
+    install<FrameworkReregisteredMessage>(&MesosProcess::receive);
+    install<ResourceOffersMessage>(&MesosProcess::receive);
+    install<RescindResourceOfferMessage>(&MesosProcess::receive);
+    install<StatusUpdateMessage>(&MesosProcess::receive);
+    install<LostSlaveMessage>(&MesosProcess::receive);
+    install<ExitedExecutorMessage>(&MesosProcess::receive);
+    install<ExecutorToFrameworkMessage>(&MesosProcess::receive);
+    install<FrameworkErrorMessage>(&MesosProcess::receive);
+
     // Start detecting masters.
     detector->detect()
       .onAny(defer(self(), &MesosProcess::detected, lambda::_1));
@@ -242,6 +252,40 @@ protected:
     return async(connected);
   }
 
+  // NOTE: A None 'from' is possible when an event is injected locally.
+  void receive(const Option<UPID>& from, const Event& event)
+  {
+    // Check if we're disconnected but received an event.
+    if (from.isSome() && master.isNone()) {
+      VLOG(1) << "Ignoring " << stringify(event.type())
+              << " event because we're disconnected";
+      return;
+    } else if (from.isSome() && master != from) {
+      VLOG(1)
+        << "Ignoring " << stringify(event.type())
+        << " event because it was sent from '" << from.get()
+        << "' instead of the leading master '" << master.get() << "'";
+      return;
+    }
+
+    // Note that if 'from' is None we're locally injecting this event
+    // so we always want to enqueue it even if we're not connected!
+
+    VLOG(1) << "Enqueuing event " << stringify(event.type()) << " from "
+            << (from.isNone() ? "(locally injected)" : from.get());
+
+    // Queue up the event and invoke the 'received' callback if this
+    // is the first event (between now and when the 'received'
+    // callback actually gets invoked more events might get queued).
+    events.push(event);
+
+    if (events.size() == 1) {
+      mutex.lock()
+        .then(defer(self(), &Self::_receive))
+        .onAny(lambda::bind(&Mutex::unlock, mutex));
+    }
+  }
+
   Future<Nothing> _receive()
   {
     Future<Nothing> future = async(received, events);
@@ -249,6 +293,51 @@ protected:
     return future;
   }
 
+  void receive(const UPID& from, const FrameworkRegisteredMessage& message)
+  {
+    receive(from, evolve(message));
+  }
+
+  void receive(const UPID& from, const FrameworkReregisteredMessage& message)
+  {
+    receive(from, evolve(message));
+  }
+
+  void receive(const UPID& from, const ResourceOffersMessage& message)
+  {
+    receive(from, evolve(message));
+  }
+
+  void receive(const UPID& from, const RescindResourceOfferMessage& message)
+  {
+    receive(from, evolve(message));
+  }
+
+  void receive(const UPID& from, const StatusUpdateMessage& message)
+  {
+    receive(from, evolve(message));
+  }
+
+  void receive(const UPID& from, const LostSlaveMessage& message)
+  {
+    receive(from, evolve(message));
+  }
+
+  void receive(const UPID& from, const ExitedExecutorMessage& message)
+  {
+    receive(from, evolve(message));
+  }
+
+  void receive(const UPID& from, const ExecutorToFrameworkMessage& message)
+  {
+    receive(from, evolve(message));
+  }
+
+  void receive(const UPID& from, const FrameworkErrorMessage& message)
+  {
+    receive(from, evolve(message));
+  }
+
   // Helper for injecting an ERROR event.
   void error(const string& message)
   {

Reply via email to