Updated scheduler library to send REQUEST call. Review: https://reviews.apache.org/r/36698
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/92250130 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/92250130 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/92250130 Branch: refs/heads/master Commit: 922501309e41dcbe141fb33c08e7f9600ea0d6ff Parents: f0ee4aa Author: Vinod Kone <[email protected]> Authored: Mon Jul 20 17:19:20 2015 -0700 Committer: Vinod Kone <[email protected]> Committed: Thu Jul 23 12:32:56 2015 -0700 ---------------------------------------------------------------------- include/mesos/mesos.proto | 2 - include/mesos/scheduler/scheduler.proto | 12 +++++ src/master/master.cpp | 31 ++++++++++++- src/master/master.hpp | 4 ++ src/scheduler/scheduler.cpp | 11 ++++- src/tests/scheduler_tests.cpp | 66 ++++++++++++++++++++++++++++ 6 files changed, 121 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/92250130/include/mesos/mesos.proto ---------------------------------------------------------------------- diff --git a/include/mesos/mesos.proto b/include/mesos/mesos.proto index bcb38d9..e015c81 100644 --- a/include/mesos/mesos.proto +++ b/include/mesos/mesos.proto @@ -781,8 +781,6 @@ message PerfStatistics { * to proactively influence the allocator. If 'slave_id' is provided * then this request is assumed to only apply to resources on that * slave. - * - * TODO(vinod): Remove this once the old driver is removed. */ message Request { optional SlaveID slave_id = 1; http://git-wip-us.apache.org/repos/asf/mesos/blob/92250130/include/mesos/scheduler/scheduler.proto ---------------------------------------------------------------------- diff --git a/include/mesos/scheduler/scheduler.proto b/include/mesos/scheduler/scheduler.proto index e2ca8e5..5219b76 100644 --- a/include/mesos/scheduler/scheduler.proto +++ b/include/mesos/scheduler/scheduler.proto @@ -154,6 +154,7 @@ message Call { ACKNOWLEDGE = 8; // See 'Acknowledge' below. RECONCILE = 9; // See 'Reconcile' below. MESSAGE = 10; // See 'Message' below. + REQUEST = 11; // See 'Request' below. // TODO(benh): Consider adding an 'ACTIVATE' and 'DEACTIVATE' for // already subscribed frameworks as a way of stopping offers from @@ -284,6 +285,16 @@ message Call { required bytes data = 3; } + // Requests a specific set of resources from Mesos's allocator. If + // the allocator has support for this, corresponding offers will be + // sent asynchronously via the OFFERS event(s). + // + // NOTE: The built-in hierarchical allocator doesn't have support + // for this call and hence simply ignores it. + message Request { + repeated mesos.Request requests = 1; + } + // Identifies who generated this call. Master assigns a framework id // when a new scheduler subscribes for the first time. Once assigned, // the scheduler must set the 'framework_id' here and within its @@ -304,4 +315,5 @@ message Call { optional Acknowledge acknowledge = 8; optional Reconcile reconcile = 9; optional Message message = 10; + optional Request request = 11; } http://git-wip-us.apache.org/repos/asf/mesos/blob/92250130/src/master/master.cpp ---------------------------------------------------------------------- diff --git a/src/master/master.cpp b/src/master/master.cpp index 2f00f24..bab04fe 100644 --- a/src/master/master.cpp +++ b/src/master/master.cpp @@ -1729,6 +1729,15 @@ void Master::receive( break; } + case scheduler::Call::REQUEST: { + if (!call.has_request()) { + drop(from, call, "Expecting 'request' to be present"); + return; + } + request(framework, call.request()); + break; + } + default: drop(from, call, "Unknown call type"); break; @@ -2263,8 +2272,26 @@ void Master::resourceRequest( return; } - LOG(INFO) << "Requesting resources for framework " << *framework; - allocator->requestResources(frameworkId, requests); + scheduler::Call::Request call; + foreach (const Request& request, requests) { + call.add_requests()->CopyFrom(request); + } + + request(framework, call); +} + + +void Master::request( + Framework* framework, + const scheduler::Call::Request& request) +{ + CHECK_NOTNULL(framework); + + LOG(INFO) << "Processing REQUEST call for framework " << *framework; + + allocator->requestResources( + framework->id(), + google::protobuf::convert(request.requests())); } http://git-wip-us.apache.org/repos/asf/mesos/blob/92250130/src/master/master.hpp ---------------------------------------------------------------------- diff --git a/src/master/master.hpp b/src/master/master.hpp index 2343a68..bf61bb2 100644 --- a/src/master/master.hpp +++ b/src/master/master.hpp @@ -1045,6 +1045,10 @@ private: Framework* framework, const scheduler::Call::Message& message); + void request( + Framework* framework, + const scheduler::Call::Request& request); + bool elected() const { return leader.isSome() && leader.get() == info_; http://git-wip-us.apache.org/repos/asf/mesos/blob/92250130/src/scheduler/scheduler.cpp ---------------------------------------------------------------------- diff --git a/src/scheduler/scheduler.cpp b/src/scheduler/scheduler.cpp index ac23585..badc107 100644 --- a/src/scheduler/scheduler.cpp +++ b/src/scheduler/scheduler.cpp @@ -306,8 +306,17 @@ public: break; } + case Call::REQUEST: { + if (!call.has_request()) { + drop(call, "Expecting 'request' to be present"); + return; + } + send(master.get(), call); + break; + } + default: - VLOG(1) << "Unexpected call " << stringify(call.type()); + LOG(ERROR) << "Unexpected call " << stringify(call.type()); break; } } http://git-wip-us.apache.org/repos/asf/mesos/blob/92250130/src/tests/scheduler_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/scheduler_tests.cpp b/src/tests/scheduler_tests.cpp index 13fecb2..98fc70b 100644 --- a/src/tests/scheduler_tests.cpp +++ b/src/tests/scheduler_tests.cpp @@ -45,11 +45,15 @@ #include <stout/try.hpp> #include <stout/uuid.hpp> +#include "master/allocator/mesos/allocator.hpp" + #include "master/master.hpp" #include "tests/containerizer.hpp" #include "tests/mesos.hpp" +using mesos::internal::master::allocator::MesosAllocatorProcess; + using mesos::internal::master::Master; using mesos::internal::slave::Containerizer; @@ -1002,6 +1006,68 @@ TEST_F(SchedulerTest, Message) } +TEST_F(SchedulerTest, Request) +{ + Try<PID<Master>> master = StartMaster(); + ASSERT_SOME(master); + + Callbacks callbacks; + + Future<Nothing> connected; + EXPECT_CALL(callbacks, connected()) + .WillOnce(FutureSatisfy(&connected)); + + scheduler::Mesos mesos( + master.get(), + DEFAULT_CREDENTIAL, + lambda::bind(&Callbacks::connected, lambda::ref(callbacks)), + lambda::bind(&Callbacks::disconnected, lambda::ref(callbacks)), + lambda::bind(&Callbacks::received, lambda::ref(callbacks), lambda::_1)); + + AWAIT_READY(connected); + + Queue<Event> events; + + EXPECT_CALL(callbacks, received(_)) + .WillRepeatedly(Enqueue(&events)); + + { + Call call; + call.set_type(Call::SUBSCRIBE); + + Call::Subscribe* subscribe = call.mutable_subscribe(); + subscribe->mutable_framework_info()->CopyFrom(DEFAULT_FRAMEWORK_INFO); + + mesos.send(call); + } + + Future<Event> event = events.get(); + AWAIT_READY(event); + EXPECT_EQ(Event::SUBSCRIBED, event.get().type()); + + FrameworkID id(event.get().subscribed().framework_id()); + + Future<Nothing> requestResources = + FUTURE_DISPATCH(_, &MesosAllocatorProcess::requestResources); + + { + Call call; + call.mutable_framework_id()->CopyFrom(id); + call.set_type(Call::REQUEST); + + // Create a dummy request. + Call::Request* request = call.mutable_request(); + request->add_requests(); + + mesos.send(call); + } + + AWAIT_READY(requestResources); + + Shutdown(); +} + + // TODO(benh): Write test for sending Call::Acknowledgement through // master to slave when Event::Update was generated locally.
