Implemented the OFFERS Event handler in the scheduler driver.

Review: https://reviews.apache.org/r/36498


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/6b842c27
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/6b842c27
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/6b842c27

Branch: refs/heads/master
Commit: 6b842c27b3a3f75a9eea6d5e91cd766e1bf6ac17
Parents: b782c5d
Author: Benjamin Mahler <[email protected]>
Authored: Mon Jul 13 11:24:34 2015 -0700
Committer: Benjamin Mahler <[email protected]>
Committed: Fri Jul 17 13:45:17 2015 -0700

----------------------------------------------------------------------
 src/sched/sched.cpp                      |  24 +++++-
 src/tests/scheduler_event_call_tests.cpp | 104 ++++++++++++++++++++++++++
 2 files changed, 127 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/6b842c27/src/sched/sched.cpp
----------------------------------------------------------------------
diff --git a/src/sched/sched.cpp b/src/sched/sched.cpp
index 3942426..25e2d66 100644
--- a/src/sched/sched.cpp
+++ b/src/sched/sched.cpp
@@ -480,7 +480,29 @@ protected:
           break;
         }
 
-        drop(event, "Unimplemented");
+        const vector<Offer> offers =
+          google::protobuf::convert(event.offers().offers());
+
+        vector<string> pids;
+
+        foreach (const Offer& offer, offers) {
+          CHECK(offer.has_url())
+            << "Offer.url required for Event support";
+          CHECK(offer.url().has_path())
+            << "Offer.url.path required for Event support";
+
+          string id = offer.url().path();
+          id = strings::trim(id, "/");
+
+          Try<net::IP> ip =
+            net::IP::parse(offer.url().address().ip(), AF_INET);
+
+          CHECK_SOME(ip) << "Failed to parse Offer.url.address.ip";
+
+          pids.push_back(UPID(id, ip.get(), offer.url().address().port()));
+        }
+
+        resourceOffers(from, offers, pids);
         break;
       }
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/6b842c27/src/tests/scheduler_event_call_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/scheduler_event_call_tests.cpp 
b/src/tests/scheduler_event_call_tests.cpp
index 7c2ffb7..cf0baf6 100644
--- a/src/tests/scheduler_event_call_tests.cpp
+++ b/src/tests/scheduler_event_call_tests.cpp
@@ -18,6 +18,9 @@
 
 #include <gmock/gmock.h>
 
+#include <string>
+#include <vector>
+
 #include <mesos/scheduler.hpp>
 
 #include <mesos/scheduler/scheduler.hpp>
@@ -38,6 +41,7 @@
 #include "tests/mesos.hpp"
 
 using mesos::internal::master::Master;
+using mesos::internal::slave::Slave;
 
 using mesos::scheduler::Event;
 
@@ -47,7 +51,11 @@ using process::Message;
 using process::PID;
 using process::UPID;
 
+using std::string;
+using std::vector;
+
 using testing::_;
+using testing::AtMost;
 using testing::Eq;
 
 namespace mesos {
@@ -316,6 +324,102 @@ TEST_F(SchedulerDriverEventTest, 
SubscribedSchedulerFailover)
 }
 
 
+// Ensures that the driver can handle an OFFERS event.
+// Note that this includes the ability to bypass the
+// master when sending framework messages.
+TEST_F(SchedulerDriverEventTest, Offers)
+{
+  Try<PID<Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  MockScheduler sched;
+  MesosSchedulerDriver schedDriver(
+      &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+  EXPECT_CALL(sched, registered(&schedDriver, _, _));
+
+  Future<Message> frameworkRegisteredMessage =
+    FUTURE_MESSAGE(Eq(FrameworkRegisteredMessage().GetTypeName()), _, _);
+
+  schedDriver.start();
+
+  AWAIT_READY(frameworkRegisteredMessage);
+  UPID frameworkPid = frameworkRegisteredMessage.get().to;
+
+  // Start a slave and capture the offers.
+  Future<ResourceOffersMessage> resourceOffersMessage =
+    DROP_PROTOBUF(ResourceOffersMessage(), _, _);
+
+  MockExecutor exec(DEFAULT_EXECUTOR_ID);
+  Try<PID<Slave>> slave = StartSlave(&exec);
+  ASSERT_SOME(slave);
+
+  AWAIT_READY(resourceOffersMessage);
+
+  google::protobuf::RepeatedPtrField<Offer> offers =
+    resourceOffersMessage.get().offers();
+
+  ASSERT_EQ(1u, offers.size());
+
+  // Ignore future offer messages.
+  DROP_PROTOBUFS(ResourceOffersMessage(), _, _);
+
+  // Send the offers event and expect a 'resourceOffers' call.
+  Event event;
+  event.set_type(Event::OFFERS);
+  event.mutable_offers()->mutable_offers()->CopyFrom(offers);
+
+  Future<Nothing> resourceOffers;
+  EXPECT_CALL(sched, resourceOffers(&schedDriver, _))
+    .WillOnce(FutureSatisfy(&resourceOffers));
+
+  process::post(master.get(), frameworkPid, event);
+
+  AWAIT_READY(resourceOffers);
+
+  // To test that the framework -> executor messages are
+  // sent directly to the slave, launch a task and send
+  // the executor a message.
+  EXPECT_CALL(exec, registered(_, _, _, _));
+
+  EXPECT_CALL(exec, launchTask(_, _))
+    .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
+
+  Future<TaskStatus> status;
+  EXPECT_CALL(sched, statusUpdate(&schedDriver, _))
+    .WillOnce(FutureArg<1>(&status));
+
+  TaskInfo task = createTask(offers.Get(0), "", DEFAULT_EXECUTOR_ID);
+
+  schedDriver.launchTasks(offers.Get(0).id(), {task});
+
+  AWAIT_READY(status);
+  EXPECT_EQ(TASK_RUNNING, status.get().state());
+
+  // This message should skip the master!
+  Future<FrameworkToExecutorMessage> frameworkToExecutorMessage =
+    FUTURE_PROTOBUF(FrameworkToExecutorMessage(), frameworkPid, slave.get());
+
+  Future<string> data;
+  EXPECT_CALL(exec, frameworkMessage(_, _))
+    .WillOnce(FutureArg<1>(&data));
+
+  schedDriver.sendFrameworkMessage(
+      DEFAULT_EXECUTOR_ID, offers.Get(0).slave_id(), "hello");
+
+  AWAIT_READY(frameworkToExecutorMessage);
+  AWAIT_EXPECT_EQ("hello", data);
+
+  EXPECT_CALL(exec, shutdown(_))
+    .Times(AtMost(1));
+
+  schedDriver.stop();
+  schedDriver.join();
+
+  Shutdown();
+}
+
+
 // Ensures that the driver can handle the RESCIND event.
 TEST_F(SchedulerDriverEventTest, Rescind)
 {

Reply via email to