Implemented the SUBSCRIBE Event handler in the scheduler driver.

Review: https://reviews.apache.org/r/36497


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/b782c5db
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/b782c5db
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/b782c5db

Branch: refs/heads/master
Commit: b782c5db48b97dc13afbf500d2dc821c33a66004
Parents: e0ed711
Author: Benjamin Mahler <[email protected]>
Authored: Thu Jul 16 15:23:50 2015 -0700
Committer: Benjamin Mahler <[email protected]>
Committed: Fri Jul 17 13:45:16 2015 -0700

----------------------------------------------------------------------
 src/sched/sched.cpp                      |  24 ++-
 src/tests/scheduler_event_call_tests.cpp | 258 ++++++++++++++++++++++++++
 2 files changed, 281 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/b782c5db/src/sched/sched.cpp
----------------------------------------------------------------------
diff --git a/src/sched/sched.cpp b/src/sched/sched.cpp
index fc33d24..3942426 100644
--- a/src/sched/sched.cpp
+++ b/src/sched/sched.cpp
@@ -448,7 +448,29 @@ protected:
           break;
         }
 
-        drop(event, "Unimplemented");
+        // The scheduler API requires a MasterInfo be passed during
+        // (re-)registration, so we rely on the MasterInfo provided
+        // by the detector. If it's None, the driver would have
+        // dropped the message.
+        if (master.isNone()) {
+          drop(event, "No master detected");
+          break;
+        }
+
+        const FrameworkID& frameworkId = event.subscribed().framework_id();
+
+        // We match the existing registration semantics of the
+        // driver, except for the 3rd case in MESOS-786 (since
+        // it requires non-local knowledge and schedulers could
+        // not have possibly relied on this case).
+        if (!framework.has_id() || framework.id().value().empty()) {
+          registered(from, frameworkId, master.get());
+        } else if (failover) {
+          registered(from, frameworkId, master.get());
+        } else {
+          reregistered(from, frameworkId, master.get());
+        }
+
         break;
       }
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/b782c5db/src/tests/scheduler_event_call_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/scheduler_event_call_tests.cpp 
b/src/tests/scheduler_event_call_tests.cpp
index fe15f05..7c2ffb7 100644
--- a/src/tests/scheduler_event_call_tests.cpp
+++ b/src/tests/scheduler_event_call_tests.cpp
@@ -22,6 +22,7 @@
 
 #include <mesos/scheduler/scheduler.hpp>
 
+#include <process/clock.hpp>
 #include <process/future.hpp>
 #include <process/gmock.hpp>
 #include <process/gtest.hpp>
@@ -40,6 +41,7 @@ using mesos::internal::master::Master;
 
 using mesos::scheduler::Event;
 
+using process::Clock;
 using process::Future;
 using process::Message;
 using process::PID;
@@ -58,6 +60,262 @@ namespace tests {
 class SchedulerDriverEventTest : public MesosTest {};
 
 
+// Ensures that the driver can handle the SUBSCRIBED event.
+TEST_F(SchedulerDriverEventTest, Subscribed)
+{
+  Try<PID<Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+  frameworkInfo.set_failover_timeout(Weeks(2).secs());
+
+  // Make sure the initial registration calls 'registered'.
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, frameworkInfo, master.get(), DEFAULT_CREDENTIAL);
+
+  // Intercept the registration message, send a SUBSCRIBED instead.
+  Future<Message> frameworkRegisteredMessage =
+    DROP_MESSAGE(Eq(FrameworkRegisteredMessage().GetTypeName()), _, _);
+
+  // Ensure that there will be no (re-)registration retries
+  // from the scheduler driver.
+  Clock::pause();
+
+  driver.start();
+
+  AWAIT_READY(frameworkRegisteredMessage);
+  UPID frameworkPid = frameworkRegisteredMessage.get().to;
+
+  FrameworkRegisteredMessage message;
+  ASSERT_TRUE(message.ParseFromString(frameworkRegisteredMessage.get().body));
+
+  FrameworkID frameworkId = message.framework_id();
+  frameworkInfo.mutable_id()->CopyFrom(frameworkId);
+
+  Event event;
+  event.set_type(Event::SUBSCRIBED);
+  event.mutable_subscribed()->mutable_framework_id()->CopyFrom(frameworkId);
+
+  Future<Nothing> registered;
+  EXPECT_CALL(sched, registered(&driver, frameworkId, _))
+    .WillOnce(FutureSatisfy(&registered));
+
+  process::post(master.get(), frameworkPid, event);
+
+  AWAIT_READY(registered);
+}
+
+
+// Ensures that the driver can handle the SUBSCRIBED event
+// after a disconnection with the master.
+TEST_F(SchedulerDriverEventTest, SubscribedDisconnection)
+{
+  Try<PID<Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+  frameworkInfo.set_failover_timeout(Weeks(2).secs());
+
+  // Make sure the initial registration calls 'registered'.
+  MockScheduler sched;
+  StandaloneMasterDetector detector(master.get());
+  TestingMesosSchedulerDriver driver(&sched, &detector, frameworkInfo);
+
+  // Intercept the registration message, send a SUBSCRIBED instead.
+  Future<Message> frameworkRegisteredMessage =
+    DROP_MESSAGE(Eq(FrameworkRegisteredMessage().GetTypeName()), _, _);
+
+  // Ensure that there will be no (re-)registration retries
+  // from the scheduler driver.
+  Clock::pause();
+
+  driver.start();
+
+  AWAIT_READY(frameworkRegisteredMessage);
+  UPID frameworkPid = frameworkRegisteredMessage.get().to;
+
+  FrameworkRegisteredMessage message;
+  ASSERT_TRUE(message.ParseFromString(frameworkRegisteredMessage.get().body));
+
+  FrameworkID frameworkId = message.framework_id();
+  frameworkInfo.mutable_id()->CopyFrom(frameworkId);
+
+  Event event;
+  event.set_type(Event::SUBSCRIBED);
+  event.mutable_subscribed()->mutable_framework_id()->CopyFrom(frameworkId);
+
+  Future<Nothing> registered;
+  EXPECT_CALL(sched, registered(&driver, frameworkId, _))
+    .WillOnce(FutureSatisfy(&registered));
+
+  process::post(master.get(), frameworkPid, event);
+
+  AWAIT_READY(registered);
+
+  // Simulate a disconnection and expect a 'reregistered' call.
+  EXPECT_CALL(sched, disconnected(&driver));
+
+  Future<Message> frameworkReregisteredMessage =
+    DROP_MESSAGE(Eq(FrameworkReregisteredMessage().GetTypeName()), _, _);
+
+  detector.appoint(master.get());
+
+  AWAIT_READY(frameworkReregisteredMessage);
+
+  Future<Nothing> reregistered;
+  EXPECT_CALL(sched, reregistered(&driver, _))
+    .WillOnce(FutureSatisfy(&reregistered));
+
+  process::post(master.get(), frameworkPid, event);
+
+  AWAIT_READY(reregistered);
+}
+
+
+// Ensures that the driver can handle the SUBSCRIBED event
+// after a master failover.
+TEST_F(SchedulerDriverEventTest, SubscribedMasterFailover)
+{
+  Try<PID<Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+  frameworkInfo.set_failover_timeout(Weeks(2).secs());
+
+  // Make sure the initial registration calls 'registered'.
+  MockScheduler sched;
+  StandaloneMasterDetector detector(master.get());
+  TestingMesosSchedulerDriver driver(&sched, &detector, frameworkInfo);
+
+  // Intercept the registration message, send a SUBSCRIBED instead.
+  Future<Message> frameworkRegisteredMessage =
+    DROP_MESSAGE(Eq(FrameworkRegisteredMessage().GetTypeName()), _, _);
+
+  // Ensure that there will be no (re-)registration retries
+  // from the scheduler driver.
+  Clock::pause();
+
+  driver.start();
+
+  AWAIT_READY(frameworkRegisteredMessage);
+  UPID frameworkPid = frameworkRegisteredMessage.get().to;
+
+  FrameworkRegisteredMessage message;
+  ASSERT_TRUE(message.ParseFromString(frameworkRegisteredMessage.get().body));
+
+  FrameworkID frameworkId = message.framework_id();
+  frameworkInfo.mutable_id()->CopyFrom(frameworkId);
+
+  Event event;
+  event.set_type(Event::SUBSCRIBED);
+  event.mutable_subscribed()->mutable_framework_id()->CopyFrom(frameworkId);
+
+  Future<Nothing> registered;
+  EXPECT_CALL(sched, registered(&driver, frameworkId, _))
+    .WillOnce(FutureSatisfy(&registered));
+
+  process::post(master.get(), frameworkPid, event);
+
+  AWAIT_READY(registered);
+
+  // Fail over the master and expect a 'reregistered' call.
+  // Note that the master sends a registered message for
+  // this case (see MESOS-786).
+  Stop(master.get());
+  master = StartMaster();
+  ASSERT_SOME(master);
+
+  EXPECT_CALL(sched, disconnected(&driver));
+
+  frameworkRegisteredMessage =
+    DROP_MESSAGE(Eq(FrameworkRegisteredMessage().GetTypeName()), _, _);
+
+  detector.appoint(master.get());
+
+  AWAIT_READY(frameworkRegisteredMessage);
+
+  Future<Nothing> reregistered;
+  EXPECT_CALL(sched, reregistered(&driver, _))
+    .WillOnce(FutureSatisfy(&reregistered));
+
+  process::post(master.get(), frameworkPid, event);
+
+  AWAIT_READY(reregistered);
+}
+
+
+// Ensures that the driver can handle the SUBSCRIBED event
+// after a scheduler failover.
+TEST_F(SchedulerDriverEventTest, SubscribedSchedulerFailover)
+{
+  Try<PID<Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+  frameworkInfo.set_failover_timeout(Weeks(2).secs());
+
+  // Make sure the initial registration calls 'registered'.
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, frameworkInfo, master.get(), DEFAULT_CREDENTIAL);
+
+  // Intercept the registration message, send a SUBSCRIBED instead.
+  Future<Message> frameworkRegisteredMessage =
+    DROP_MESSAGE(Eq(FrameworkRegisteredMessage().GetTypeName()), _, _);
+
+  // Ensure that there will be no (re-)registration retries
+  // from the scheduler driver.
+  Clock::pause();
+
+  driver.start();
+
+  AWAIT_READY(frameworkRegisteredMessage);
+  UPID frameworkPid = frameworkRegisteredMessage.get().to;
+
+  FrameworkRegisteredMessage message;
+  ASSERT_TRUE(message.ParseFromString(frameworkRegisteredMessage.get().body));
+
+  FrameworkID frameworkId = message.framework_id();
+  frameworkInfo.mutable_id()->CopyFrom(frameworkId);
+
+  Event event;
+  event.set_type(Event::SUBSCRIBED);
+  event.mutable_subscribed()->mutable_framework_id()->CopyFrom(frameworkId);
+
+  Future<Nothing> registered;
+  EXPECT_CALL(sched, registered(&driver, frameworkId, _))
+    .WillOnce(FutureSatisfy(&registered));
+
+  process::post(master.get(), frameworkPid, event);
+
+  AWAIT_READY(registered);
+
+  // Fail over the scheduler and expect a 'registered' call.
+  driver.stop(true);
+
+  MockScheduler sched2;
+  MesosSchedulerDriver driver2(
+      &sched2, frameworkInfo, master.get(), DEFAULT_CREDENTIAL);
+
+  frameworkRegisteredMessage =
+    DROP_MESSAGE(Eq(FrameworkRegisteredMessage().GetTypeName()), _, _);
+
+  driver2.start();
+
+  AWAIT_READY(frameworkRegisteredMessage);
+  UPID frameworkPid2 = frameworkRegisteredMessage.get().to;
+
+  process::post(master.get(), frameworkPid2, event);
+
+  Future<Nothing> registered2;
+  EXPECT_CALL(sched2, registered(&driver2, frameworkId, _))
+    .WillOnce(FutureSatisfy(&registered2));
+
+  AWAIT_READY(registered2);
+}
+
+
 // Ensures that the driver can handle the RESCIND event.
 TEST_F(SchedulerDriverEventTest, Rescind)
 {

Reply via email to