Implemented the OFFERS Event handler in the scheduler driver. Review: https://reviews.apache.org/r/36498
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/6b842c27 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/6b842c27 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/6b842c27 Branch: refs/heads/master Commit: 6b842c27b3a3f75a9eea6d5e91cd766e1bf6ac17 Parents: b782c5d Author: Benjamin Mahler <[email protected]> Authored: Mon Jul 13 11:24:34 2015 -0700 Committer: Benjamin Mahler <[email protected]> Committed: Fri Jul 17 13:45:17 2015 -0700 ---------------------------------------------------------------------- src/sched/sched.cpp | 24 +++++- src/tests/scheduler_event_call_tests.cpp | 104 ++++++++++++++++++++++++++ 2 files changed, 127 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/6b842c27/src/sched/sched.cpp ---------------------------------------------------------------------- diff --git a/src/sched/sched.cpp b/src/sched/sched.cpp index 3942426..25e2d66 100644 --- a/src/sched/sched.cpp +++ b/src/sched/sched.cpp @@ -480,7 +480,29 @@ protected: break; } - drop(event, "Unimplemented"); + const vector<Offer> offers = + google::protobuf::convert(event.offers().offers()); + + vector<string> pids; + + foreach (const Offer& offer, offers) { + CHECK(offer.has_url()) + << "Offer.url required for Event support"; + CHECK(offer.url().has_path()) + << "Offer.url.path required for Event support"; + + string id = offer.url().path(); + id = strings::trim(id, "/"); + + Try<net::IP> ip = + net::IP::parse(offer.url().address().ip(), AF_INET); + + CHECK_SOME(ip) << "Failed to parse Offer.url.address.ip"; + + pids.push_back(UPID(id, ip.get(), offer.url().address().port())); + } + + resourceOffers(from, offers, pids); break; } http://git-wip-us.apache.org/repos/asf/mesos/blob/6b842c27/src/tests/scheduler_event_call_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/scheduler_event_call_tests.cpp b/src/tests/scheduler_event_call_tests.cpp index 7c2ffb7..cf0baf6 100644 --- a/src/tests/scheduler_event_call_tests.cpp +++ b/src/tests/scheduler_event_call_tests.cpp @@ -18,6 +18,9 @@ #include <gmock/gmock.h> +#include <string> +#include <vector> + #include <mesos/scheduler.hpp> #include <mesos/scheduler/scheduler.hpp> @@ -38,6 +41,7 @@ #include "tests/mesos.hpp" using mesos::internal::master::Master; +using mesos::internal::slave::Slave; using mesos::scheduler::Event; @@ -47,7 +51,11 @@ using process::Message; using process::PID; using process::UPID; +using std::string; +using std::vector; + using testing::_; +using testing::AtMost; using testing::Eq; namespace mesos { @@ -316,6 +324,102 @@ TEST_F(SchedulerDriverEventTest, SubscribedSchedulerFailover) } +// Ensures that the driver can handle an OFFERS event. +// Note that this includes the ability to bypass the +// master when sending framework messages. +TEST_F(SchedulerDriverEventTest, Offers) +{ + Try<PID<Master>> master = StartMaster(); + ASSERT_SOME(master); + + MockScheduler sched; + MesosSchedulerDriver schedDriver( + &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL); + + EXPECT_CALL(sched, registered(&schedDriver, _, _)); + + Future<Message> frameworkRegisteredMessage = + FUTURE_MESSAGE(Eq(FrameworkRegisteredMessage().GetTypeName()), _, _); + + schedDriver.start(); + + AWAIT_READY(frameworkRegisteredMessage); + UPID frameworkPid = frameworkRegisteredMessage.get().to; + + // Start a slave and capture the offers. + Future<ResourceOffersMessage> resourceOffersMessage = + DROP_PROTOBUF(ResourceOffersMessage(), _, _); + + MockExecutor exec(DEFAULT_EXECUTOR_ID); + Try<PID<Slave>> slave = StartSlave(&exec); + ASSERT_SOME(slave); + + AWAIT_READY(resourceOffersMessage); + + google::protobuf::RepeatedPtrField<Offer> offers = + resourceOffersMessage.get().offers(); + + ASSERT_EQ(1u, offers.size()); + + // Ignore future offer messages. + DROP_PROTOBUFS(ResourceOffersMessage(), _, _); + + // Send the offers event and expect a 'resourceOffers' call. + Event event; + event.set_type(Event::OFFERS); + event.mutable_offers()->mutable_offers()->CopyFrom(offers); + + Future<Nothing> resourceOffers; + EXPECT_CALL(sched, resourceOffers(&schedDriver, _)) + .WillOnce(FutureSatisfy(&resourceOffers)); + + process::post(master.get(), frameworkPid, event); + + AWAIT_READY(resourceOffers); + + // To test that the framework -> executor messages are + // sent directly to the slave, launch a task and send + // the executor a message. + EXPECT_CALL(exec, registered(_, _, _, _)); + + EXPECT_CALL(exec, launchTask(_, _)) + .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING)); + + Future<TaskStatus> status; + EXPECT_CALL(sched, statusUpdate(&schedDriver, _)) + .WillOnce(FutureArg<1>(&status)); + + TaskInfo task = createTask(offers.Get(0), "", DEFAULT_EXECUTOR_ID); + + schedDriver.launchTasks(offers.Get(0).id(), {task}); + + AWAIT_READY(status); + EXPECT_EQ(TASK_RUNNING, status.get().state()); + + // This message should skip the master! + Future<FrameworkToExecutorMessage> frameworkToExecutorMessage = + FUTURE_PROTOBUF(FrameworkToExecutorMessage(), frameworkPid, slave.get()); + + Future<string> data; + EXPECT_CALL(exec, frameworkMessage(_, _)) + .WillOnce(FutureArg<1>(&data)); + + schedDriver.sendFrameworkMessage( + DEFAULT_EXECUTOR_ID, offers.Get(0).slave_id(), "hello"); + + AWAIT_READY(frameworkToExecutorMessage); + AWAIT_EXPECT_EQ("hello", data); + + EXPECT_CALL(exec, shutdown(_)) + .Times(AtMost(1)); + + schedDriver.stop(); + schedDriver.join(); + + Shutdown(); +} + + // Ensures that the driver can handle the RESCIND event. TEST_F(SchedulerDriverEventTest, Rescind) {
