Added Decline call support to the master and C++ scheduler library.

Review: https://reviews.apache.org/r/35855


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/fa4a9a8c
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/fa4a9a8c
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/fa4a9a8c

Branch: refs/heads/master
Commit: fa4a9a8c6ee60fb4b459c45c6fac85dc0ddc2264
Parents: c504c3d
Author: Vinod Kone <[email protected]>
Authored: Wed Jun 24 12:01:22 2015 -0700
Committer: Vinod Kone <[email protected]>
Committed: Wed Jul 1 17:54:58 2015 -0700

----------------------------------------------------------------------
 src/master/master.cpp         | 120 ++++++++++++++++++++++++++++---------
 src/master/master.hpp         |   4 ++
 src/scheduler/scheduler.cpp   |  18 +++---
 src/tests/scheduler_tests.cpp |  79 ++++++++++++++++++++++++
 4 files changed, 181 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/fa4a9a8c/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 9f9f578..78edf6a 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -1643,49 +1643,70 @@ void Master::receive(
   // framework id is set and non-empty except for SUBSCRIBE call.
 
   switch (call.type()) {
-    case scheduler::Call::REVIVE:
-    case scheduler::Call::DECLINE:
-      drop(from, call, "Unimplemented");
+    case scheduler::Call::TEARDOWN: {
+      removeFramework(framework);
       break;
+    }
 
-    case scheduler::Call::ACCEPT:
+    case scheduler::Call::ACCEPT: {
       if (!call.has_accept()) {
         drop(from, call, "Expecting 'accept' to be present");
         return;
       }
       accept(framework, call.accept());
       break;
+    }
 
-    case scheduler::Call::RECONCILE:
-      if (!call.has_reconcile()) {
-        drop(from, call, "Expecting 'reconcile' to be present");
+    case scheduler::Call::DECLINE: {
+      if (!call.has_decline()) {
+        drop(from, call, "Expecting 'decline' to be present");
         return;
       }
-      reconcile(framework, call.reconcile());
+      decline(framework, call.decline());
       break;
+    }
 
-    case scheduler::Call::SHUTDOWN:
-      if (!call.has_shutdown()) {
-        drop(from, call, "Expecting 'shutdown' to be present");
-      }
-      shutdown(framework, call.shutdown());
+    case scheduler::Call::REVIVE: {
+      drop(from, call, "Unimplemented");
       break;
+    }
 
-    case scheduler::Call::KILL:
+    case scheduler::Call::KILL: {
       if (!call.has_kill()) {
         drop(from, call, "Expecting 'kill' to be present");
+        return;
       }
       kill(framework, call.kill());
       break;
+    }
 
-    case scheduler::Call::ACKNOWLEDGE:
-    case scheduler::Call::MESSAGE:
+    case scheduler::Call::SHUTDOWN: {
+      if (!call.has_shutdown()) {
+        drop(from, call, "Expecting 'shutdown' to be present");
+        return;
+      }
+      shutdown(framework, call.shutdown());
+      break;
+    }
+
+    case scheduler::Call::ACKNOWLEDGE: {
       drop(from, call, "Unimplemented");
       break;
+    }
 
-    case scheduler::Call::TEARDOWN:
-      removeFramework(framework);
+    case scheduler::Call::RECONCILE: {
+      if (!call.has_reconcile()) {
+        drop(from, call, "Expecting 'reconcile' to be present");
+        return;
+      }
+      reconcile(framework, call.reconcile());
+      break;
+    }
+
+    case scheduler::Call::MESSAGE: {
+      drop(from, call, "Unimplemented");
       break;
+    }
 
     default:
       drop(from, call, "Unknown call type");
@@ -2232,21 +2253,34 @@ void Master::launchTasks(
     return;
   }
 
-  scheduler::Call::Accept message;
-  message.mutable_filters()->CopyFrom(filters);
+  // Currently when no tasks are specified in the launchTasks message
+  // it is implicitly considered a decline of the offers.
+  if (!tasks.empty()) {
+    scheduler::Call::Accept message;
+    message.mutable_filters()->CopyFrom(filters);
 
-  Offer::Operation* operation = message.add_operations();
-  operation->set_type(Offer::Operation::LAUNCH);
+    Offer::Operation* operation = message.add_operations();
+    operation->set_type(Offer::Operation::LAUNCH);
 
-  foreach (const TaskInfo& task, tasks) {
-    operation->mutable_launch()->add_task_infos()->CopyFrom(task);
-  }
+    foreach (const TaskInfo& task, tasks) {
+      operation->mutable_launch()->add_task_infos()->CopyFrom(task);
+    }
 
-  foreach (const OfferID& offerId, offerIds) {
-    message.add_offer_ids()->CopyFrom(offerId);
-  }
+    foreach (const OfferID& offerId, offerIds) {
+      message.add_offer_ids()->CopyFrom(offerId);
+    }
+
+    accept(framework, message);
+  } else {
+    scheduler::Call::Decline message;
+    message.mutable_filters()->CopyFrom(filters);
+
+    foreach (const OfferID& offerId, offerIds) {
+      message.add_offer_ids()->CopyFrom(offerId);
+    }
 
-  accept(framework, message);
+    decline(framework, message);
+  }
 }
 
 
@@ -2797,6 +2831,34 @@ void Master::_accept(
 }
 
 
+void Master::decline(
+    Framework* framework,
+    const scheduler::Call::Decline& decline)
+{
+  CHECK_NOTNULL(framework);
+
+  LOG(INFO) << "Processing DECLINE call for offers: " << decline.offer_ids()
+            << " for framework " << *framework;
+
+  //  Return resources to the allocator.
+  foreach (const OfferID& offerId, decline.offer_ids()) {
+    Offer* offer = getOffer(offerId);
+    if (offer != NULL) {
+      allocator->recoverResources(
+          offer->framework_id(),
+          offer->slave_id(),
+          offer->resources(),
+          decline.filters());
+
+      removeOffer(offer);
+    } else {
+      LOG(WARNING) << "Ignoring decline of offer " << offerId
+                   << " since it is no longer valid";
+    }
+  }
+}
+
+
 void Master::reviveOffers(const UPID& from, const FrameworkID& frameworkId)
 {
   ++metrics->messages_revive_offers;

http://git-wip-us.apache.org/repos/asf/mesos/blob/fa4a9a8c/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index af83d3e..46ff822 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -1015,6 +1015,10 @@ private:
     const scheduler::Call::Accept& accept,
     const process::Future<std::list<process::Future<bool>>>& authorizations);
 
+  void decline(
+      Framework* framework,
+      const scheduler::Call::Decline& decline);
+
   void reconcile(
       Framework* framework,
       const scheduler::Call::Reconcile& reconcile);

http://git-wip-us.apache.org/repos/asf/mesos/blob/fa4a9a8c/src/scheduler/scheduler.cpp
----------------------------------------------------------------------
diff --git a/src/scheduler/scheduler.cpp b/src/scheduler/scheduler.cpp
index 38fd286..d9e341f 100644
--- a/src/scheduler/scheduler.cpp
+++ b/src/scheduler/scheduler.cpp
@@ -246,22 +246,18 @@ public:
         break;
       }
 
-      case Call::DECLINE: {
-        if (!call.has_decline()) {
-          drop(call, "Expecting 'decline' to be present");
+      case Call::ACCEPT: {
+        if (!call.has_accept()) {
+          drop(call, "Expecting 'accept' to be present");
           return;
         }
-        LaunchTasksMessage message;
-        message.mutable_framework_id()->CopyFrom(call.framework_id());
-        message.mutable_filters()->CopyFrom(call.decline().filters());
-        message.mutable_offer_ids()->CopyFrom(call.decline().offer_ids());
-        send(master.get(), message);
+        send(master.get(), call);
         break;
       }
 
-      case Call::ACCEPT: {
-        if (!call.has_accept()) {
-          drop(call, "Expecting 'accept' to be present");
+      case Call::DECLINE: {
+        if (!call.has_decline()) {
+          drop(call, "Expecting 'decline' to be present");
           return;
         }
         send(master.get(), call);

http://git-wip-us.apache.org/repos/asf/mesos/blob/fa4a9a8c/src/tests/scheduler_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/scheduler_tests.cpp b/src/tests/scheduler_tests.cpp
index 5a0c645..958ccfc 100644
--- a/src/tests/scheduler_tests.cpp
+++ b/src/tests/scheduler_tests.cpp
@@ -661,6 +661,85 @@ TEST_F(SchedulerTest, Teardown)
 }
 
 
+TEST_F(SchedulerTest, Decline)
+{
+  Try<PID<Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  Try<PID<Slave>> slave = StartSlave();
+  ASSERT_SOME(slave);
+
+  Callbacks callbacks;
+
+  Future<Nothing> connected;
+  EXPECT_CALL(callbacks, connected())
+    .WillOnce(FutureSatisfy(&connected));
+
+  scheduler::Mesos mesos(
+      master.get(),
+      DEFAULT_CREDENTIAL,
+      lambda::bind(&Callbacks::connected, lambda::ref(callbacks)),
+      lambda::bind(&Callbacks::disconnected, lambda::ref(callbacks)),
+      lambda::bind(&Callbacks::received, lambda::ref(callbacks), lambda::_1));
+
+  AWAIT_READY(connected);
+
+  Queue<Event> events;
+
+  EXPECT_CALL(callbacks, received(_))
+    .WillRepeatedly(Enqueue(&events));
+
+  {
+    Call call;
+    call.set_type(Call::SUBSCRIBE);
+
+    Call::Subscribe* subscribe = call.mutable_subscribe();
+    subscribe->mutable_framework_info()->CopyFrom(DEFAULT_FRAMEWORK_INFO);
+    subscribe->set_force(true);
+
+    mesos.send(call);
+  }
+
+  Future<Event> event = events.get();
+  AWAIT_READY(event);
+  EXPECT_EQ(Event::SUBSCRIBED, event.get().type());
+
+  FrameworkID id(event.get().subscribed().framework_id());
+
+  event = events.get();
+  AWAIT_READY(event);
+  EXPECT_EQ(Event::OFFERS, event.get().type());
+  ASSERT_EQ(1, event.get().offers().offers().size());
+
+  Offer offer = event.get().offers().offers(0);
+  {
+    Call call;
+    call.mutable_framework_id()->CopyFrom(id);
+    call.set_type(Call::DECLINE);
+
+    Call::Decline* decline = call.mutable_decline();
+    decline->add_offer_ids()->CopyFrom(offer.id());
+
+    // Set 0s filter to immediately get another offer.
+    Filters filters;
+    filters.set_refuse_seconds(0);
+    decline->mutable_filters()->CopyFrom(filters);
+
+    mesos.send(call);
+  }
+
+  // If the resources were properly declined, the scheduler should
+  // get another offer with same amount of resources.
+  event = events.get();
+  AWAIT_READY(event);
+  EXPECT_EQ(Event::OFFERS, event.get().type());
+  ASSERT_EQ(1, event.get().offers().offers().size());
+  ASSERT_EQ(offer.resources(), event.get().offers().offers(0).resources());
+
+  Shutdown(); // Must shutdown before 'containerizer' gets deallocated.
+}
+
+
 // TODO(benh): Write test for sending Call::Acknowledgement through
 // master to slave when Event::Update was generated locally.
 

Reply via email to