Implemented the SUBSCRIBE Event handler in the scheduler driver. Review: https://reviews.apache.org/r/36497
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/b782c5db Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/b782c5db Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/b782c5db Branch: refs/heads/master Commit: b782c5db48b97dc13afbf500d2dc821c33a66004 Parents: e0ed711 Author: Benjamin Mahler <[email protected]> Authored: Thu Jul 16 15:23:50 2015 -0700 Committer: Benjamin Mahler <[email protected]> Committed: Fri Jul 17 13:45:16 2015 -0700 ---------------------------------------------------------------------- src/sched/sched.cpp | 24 ++- src/tests/scheduler_event_call_tests.cpp | 258 ++++++++++++++++++++++++++ 2 files changed, 281 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/b782c5db/src/sched/sched.cpp ---------------------------------------------------------------------- diff --git a/src/sched/sched.cpp b/src/sched/sched.cpp index fc33d24..3942426 100644 --- a/src/sched/sched.cpp +++ b/src/sched/sched.cpp @@ -448,7 +448,29 @@ protected: break; } - drop(event, "Unimplemented"); + // The scheduler API requires a MasterInfo be passed during + // (re-)registration, so we rely on the MasterInfo provided + // by the detector. If it's None, the driver would have + // dropped the message. + if (master.isNone()) { + drop(event, "No master detected"); + break; + } + + const FrameworkID& frameworkId = event.subscribed().framework_id(); + + // We match the existing registration semantics of the + // driver, except for the 3rd case in MESOS-786 (since + // it requires non-local knowledge and schedulers could + // not have possibly relied on this case). + if (!framework.has_id() || framework.id().value().empty()) { + registered(from, frameworkId, master.get()); + } else if (failover) { + registered(from, frameworkId, master.get()); + } else { + reregistered(from, frameworkId, master.get()); + } + break; } http://git-wip-us.apache.org/repos/asf/mesos/blob/b782c5db/src/tests/scheduler_event_call_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/scheduler_event_call_tests.cpp b/src/tests/scheduler_event_call_tests.cpp index fe15f05..7c2ffb7 100644 --- a/src/tests/scheduler_event_call_tests.cpp +++ b/src/tests/scheduler_event_call_tests.cpp @@ -22,6 +22,7 @@ #include <mesos/scheduler/scheduler.hpp> +#include <process/clock.hpp> #include <process/future.hpp> #include <process/gmock.hpp> #include <process/gtest.hpp> @@ -40,6 +41,7 @@ using mesos::internal::master::Master; using mesos::scheduler::Event; +using process::Clock; using process::Future; using process::Message; using process::PID; @@ -58,6 +60,262 @@ namespace tests { class SchedulerDriverEventTest : public MesosTest {}; +// Ensures that the driver can handle the SUBSCRIBED event. +TEST_F(SchedulerDriverEventTest, Subscribed) +{ + Try<PID<Master>> master = StartMaster(); + ASSERT_SOME(master); + + FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO; + frameworkInfo.set_failover_timeout(Weeks(2).secs()); + + // Make sure the initial registration calls 'registered'. + MockScheduler sched; + MesosSchedulerDriver driver( + &sched, frameworkInfo, master.get(), DEFAULT_CREDENTIAL); + + // Intercept the registration message, send a SUBSCRIBED instead. + Future<Message> frameworkRegisteredMessage = + DROP_MESSAGE(Eq(FrameworkRegisteredMessage().GetTypeName()), _, _); + + // Ensure that there will be no (re-)registration retries + // from the scheduler driver. + Clock::pause(); + + driver.start(); + + AWAIT_READY(frameworkRegisteredMessage); + UPID frameworkPid = frameworkRegisteredMessage.get().to; + + FrameworkRegisteredMessage message; + ASSERT_TRUE(message.ParseFromString(frameworkRegisteredMessage.get().body)); + + FrameworkID frameworkId = message.framework_id(); + frameworkInfo.mutable_id()->CopyFrom(frameworkId); + + Event event; + event.set_type(Event::SUBSCRIBED); + event.mutable_subscribed()->mutable_framework_id()->CopyFrom(frameworkId); + + Future<Nothing> registered; + EXPECT_CALL(sched, registered(&driver, frameworkId, _)) + .WillOnce(FutureSatisfy(®istered)); + + process::post(master.get(), frameworkPid, event); + + AWAIT_READY(registered); +} + + +// Ensures that the driver can handle the SUBSCRIBED event +// after a disconnection with the master. +TEST_F(SchedulerDriverEventTest, SubscribedDisconnection) +{ + Try<PID<Master>> master = StartMaster(); + ASSERT_SOME(master); + + FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO; + frameworkInfo.set_failover_timeout(Weeks(2).secs()); + + // Make sure the initial registration calls 'registered'. + MockScheduler sched; + StandaloneMasterDetector detector(master.get()); + TestingMesosSchedulerDriver driver(&sched, &detector, frameworkInfo); + + // Intercept the registration message, send a SUBSCRIBED instead. + Future<Message> frameworkRegisteredMessage = + DROP_MESSAGE(Eq(FrameworkRegisteredMessage().GetTypeName()), _, _); + + // Ensure that there will be no (re-)registration retries + // from the scheduler driver. + Clock::pause(); + + driver.start(); + + AWAIT_READY(frameworkRegisteredMessage); + UPID frameworkPid = frameworkRegisteredMessage.get().to; + + FrameworkRegisteredMessage message; + ASSERT_TRUE(message.ParseFromString(frameworkRegisteredMessage.get().body)); + + FrameworkID frameworkId = message.framework_id(); + frameworkInfo.mutable_id()->CopyFrom(frameworkId); + + Event event; + event.set_type(Event::SUBSCRIBED); + event.mutable_subscribed()->mutable_framework_id()->CopyFrom(frameworkId); + + Future<Nothing> registered; + EXPECT_CALL(sched, registered(&driver, frameworkId, _)) + .WillOnce(FutureSatisfy(®istered)); + + process::post(master.get(), frameworkPid, event); + + AWAIT_READY(registered); + + // Simulate a disconnection and expect a 'reregistered' call. + EXPECT_CALL(sched, disconnected(&driver)); + + Future<Message> frameworkReregisteredMessage = + DROP_MESSAGE(Eq(FrameworkReregisteredMessage().GetTypeName()), _, _); + + detector.appoint(master.get()); + + AWAIT_READY(frameworkReregisteredMessage); + + Future<Nothing> reregistered; + EXPECT_CALL(sched, reregistered(&driver, _)) + .WillOnce(FutureSatisfy(&reregistered)); + + process::post(master.get(), frameworkPid, event); + + AWAIT_READY(reregistered); +} + + +// Ensures that the driver can handle the SUBSCRIBED event +// after a master failover. +TEST_F(SchedulerDriverEventTest, SubscribedMasterFailover) +{ + Try<PID<Master>> master = StartMaster(); + ASSERT_SOME(master); + + FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO; + frameworkInfo.set_failover_timeout(Weeks(2).secs()); + + // Make sure the initial registration calls 'registered'. + MockScheduler sched; + StandaloneMasterDetector detector(master.get()); + TestingMesosSchedulerDriver driver(&sched, &detector, frameworkInfo); + + // Intercept the registration message, send a SUBSCRIBED instead. + Future<Message> frameworkRegisteredMessage = + DROP_MESSAGE(Eq(FrameworkRegisteredMessage().GetTypeName()), _, _); + + // Ensure that there will be no (re-)registration retries + // from the scheduler driver. + Clock::pause(); + + driver.start(); + + AWAIT_READY(frameworkRegisteredMessage); + UPID frameworkPid = frameworkRegisteredMessage.get().to; + + FrameworkRegisteredMessage message; + ASSERT_TRUE(message.ParseFromString(frameworkRegisteredMessage.get().body)); + + FrameworkID frameworkId = message.framework_id(); + frameworkInfo.mutable_id()->CopyFrom(frameworkId); + + Event event; + event.set_type(Event::SUBSCRIBED); + event.mutable_subscribed()->mutable_framework_id()->CopyFrom(frameworkId); + + Future<Nothing> registered; + EXPECT_CALL(sched, registered(&driver, frameworkId, _)) + .WillOnce(FutureSatisfy(®istered)); + + process::post(master.get(), frameworkPid, event); + + AWAIT_READY(registered); + + // Fail over the master and expect a 'reregistered' call. + // Note that the master sends a registered message for + // this case (see MESOS-786). + Stop(master.get()); + master = StartMaster(); + ASSERT_SOME(master); + + EXPECT_CALL(sched, disconnected(&driver)); + + frameworkRegisteredMessage = + DROP_MESSAGE(Eq(FrameworkRegisteredMessage().GetTypeName()), _, _); + + detector.appoint(master.get()); + + AWAIT_READY(frameworkRegisteredMessage); + + Future<Nothing> reregistered; + EXPECT_CALL(sched, reregistered(&driver, _)) + .WillOnce(FutureSatisfy(&reregistered)); + + process::post(master.get(), frameworkPid, event); + + AWAIT_READY(reregistered); +} + + +// Ensures that the driver can handle the SUBSCRIBED event +// after a scheduler failover. +TEST_F(SchedulerDriverEventTest, SubscribedSchedulerFailover) +{ + Try<PID<Master>> master = StartMaster(); + ASSERT_SOME(master); + + FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO; + frameworkInfo.set_failover_timeout(Weeks(2).secs()); + + // Make sure the initial registration calls 'registered'. + MockScheduler sched; + MesosSchedulerDriver driver( + &sched, frameworkInfo, master.get(), DEFAULT_CREDENTIAL); + + // Intercept the registration message, send a SUBSCRIBED instead. + Future<Message> frameworkRegisteredMessage = + DROP_MESSAGE(Eq(FrameworkRegisteredMessage().GetTypeName()), _, _); + + // Ensure that there will be no (re-)registration retries + // from the scheduler driver. + Clock::pause(); + + driver.start(); + + AWAIT_READY(frameworkRegisteredMessage); + UPID frameworkPid = frameworkRegisteredMessage.get().to; + + FrameworkRegisteredMessage message; + ASSERT_TRUE(message.ParseFromString(frameworkRegisteredMessage.get().body)); + + FrameworkID frameworkId = message.framework_id(); + frameworkInfo.mutable_id()->CopyFrom(frameworkId); + + Event event; + event.set_type(Event::SUBSCRIBED); + event.mutable_subscribed()->mutable_framework_id()->CopyFrom(frameworkId); + + Future<Nothing> registered; + EXPECT_CALL(sched, registered(&driver, frameworkId, _)) + .WillOnce(FutureSatisfy(®istered)); + + process::post(master.get(), frameworkPid, event); + + AWAIT_READY(registered); + + // Fail over the scheduler and expect a 'registered' call. + driver.stop(true); + + MockScheduler sched2; + MesosSchedulerDriver driver2( + &sched2, frameworkInfo, master.get(), DEFAULT_CREDENTIAL); + + frameworkRegisteredMessage = + DROP_MESSAGE(Eq(FrameworkRegisteredMessage().GetTypeName()), _, _); + + driver2.start(); + + AWAIT_READY(frameworkRegisteredMessage); + UPID frameworkPid2 = frameworkRegisteredMessage.get().to; + + process::post(master.get(), frameworkPid2, event); + + Future<Nothing> registered2; + EXPECT_CALL(sched2, registered(&driver2, frameworkId, _)) + .WillOnce(FutureSatisfy(®istered2)); + + AWAIT_READY(registered2); +} + + // Ensures that the driver can handle the RESCIND event. TEST_F(SchedulerDriverEventTest, Rescind) {
