Added Revive call support to the master and C++ scheduler library.

Review: https://reviews.apache.org/r/35856


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/49a0d2b4
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/49a0d2b4
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/49a0d2b4

Branch: refs/heads/master
Commit: 49a0d2b443636653f773188af2875106c5b3583e
Parents: fa4a9a8
Author: Vinod Kone <[email protected]>
Authored: Wed Jun 24 15:34:20 2015 -0700
Committer: Vinod Kone <[email protected]>
Committed: Wed Jul 1 17:54:58 2015 -0700

----------------------------------------------------------------------
 src/master/master.cpp         | 12 ++++-
 src/master/master.hpp         |  2 +
 src/scheduler/scheduler.cpp   |  4 +-
 src/tests/scheduler_tests.cpp | 95 ++++++++++++++++++++++++++++++++++++++
 4 files changed, 108 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/49a0d2b4/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 78edf6a..255eaae 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -1667,7 +1667,7 @@ void Master::receive(
     }
 
     case scheduler::Call::REVIVE: {
-      drop(from, call, "Unimplemented");
+      revive(framework);
       break;
     }
 
@@ -2879,7 +2879,15 @@ void Master::reviveOffers(const UPID& from, const 
FrameworkID& frameworkId)
     return;
   }
 
-  LOG(INFO) << "Reviving offers for framework " << *framework;
+  revive(framework);
+}
+
+
+void Master::revive(Framework* framework)
+{
+  CHECK_NOTNULL(framework);
+
+  LOG(INFO) << "Processing REVIVE call for framework " << *framework;
   allocator->reviveOffers(framework->id());
 }
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/49a0d2b4/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 46ff822..2d25af4 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -1019,6 +1019,8 @@ private:
       Framework* framework,
       const scheduler::Call::Decline& decline);
 
+  void revive(Framework* framework);
+
   void reconcile(
       Framework* framework,
       const scheduler::Call::Reconcile& reconcile);

http://git-wip-us.apache.org/repos/asf/mesos/blob/49a0d2b4/src/scheduler/scheduler.cpp
----------------------------------------------------------------------
diff --git a/src/scheduler/scheduler.cpp b/src/scheduler/scheduler.cpp
index d9e341f..740c088 100644
--- a/src/scheduler/scheduler.cpp
+++ b/src/scheduler/scheduler.cpp
@@ -265,9 +265,7 @@ public:
       }
 
       case Call::REVIVE: {
-        ReviveOffersMessage message;
-        message.mutable_framework_id()->CopyFrom(call.framework_id());
-        send(master.get(), message);
+        send(master.get(), call);
         break;
       }
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/49a0d2b4/src/tests/scheduler_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/scheduler_tests.cpp b/src/tests/scheduler_tests.cpp
index 958ccfc..5e2b906 100644
--- a/src/tests/scheduler_tests.cpp
+++ b/src/tests/scheduler_tests.cpp
@@ -740,6 +740,101 @@ TEST_F(SchedulerTest, Decline)
 }
 
 
+TEST_F(SchedulerTest, Revive)
+{
+  Try<PID<Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  Try<PID<Slave>> slave = StartSlave();
+  ASSERT_SOME(slave);
+
+  Callbacks callbacks;
+
+  Future<Nothing> connected;
+  EXPECT_CALL(callbacks, connected())
+    .WillOnce(FutureSatisfy(&connected));
+
+  scheduler::Mesos mesos(
+      master.get(),
+      DEFAULT_CREDENTIAL,
+      lambda::bind(&Callbacks::connected, lambda::ref(callbacks)),
+      lambda::bind(&Callbacks::disconnected, lambda::ref(callbacks)),
+      lambda::bind(&Callbacks::received, lambda::ref(callbacks), lambda::_1));
+
+  AWAIT_READY(connected);
+
+  Queue<Event> events;
+
+  EXPECT_CALL(callbacks, received(_))
+    .WillRepeatedly(Enqueue(&events));
+
+  {
+    Call call;
+    call.set_type(Call::SUBSCRIBE);
+
+    Call::Subscribe* subscribe = call.mutable_subscribe();
+    subscribe->mutable_framework_info()->CopyFrom(DEFAULT_FRAMEWORK_INFO);
+    subscribe->set_force(true);
+
+    mesos.send(call);
+  }
+
+  Future<Event> event = events.get();
+  AWAIT_READY(event);
+  EXPECT_EQ(Event::SUBSCRIBED, event.get().type());
+
+  FrameworkID id(event.get().subscribed().framework_id());
+
+  event = events.get();
+  AWAIT_READY(event);
+  EXPECT_EQ(Event::OFFERS, event.get().type());
+  EXPECT_NE(0, event.get().offers().offers().size());
+
+  Offer offer = event.get().offers().offers(0);
+  {
+    Call call;
+    call.mutable_framework_id()->CopyFrom(id);
+    call.set_type(Call::DECLINE);
+
+    Call::Decline* decline = call.mutable_decline();
+    decline->add_offer_ids()->CopyFrom(offer.id());
+
+    // Set 1hr filter to not immediately get another offer.
+    Filters filters;
+    filters.set_refuse_seconds(Hours(1).secs());
+    decline->mutable_filters()->CopyFrom(filters);
+
+    mesos.send(call);
+  }
+
+  // No offers should be sent within 30 mins because we set a filter
+  // for 1 hr.
+  Clock::pause();
+  Clock::advance(Minutes(30));
+  Clock::settle();
+
+  event = events.get();
+  ASSERT_TRUE(event.isPending());
+
+  // On revival the filters should be cleared and the scheduler should
+  // get another offer with same amount of resources.
+  {
+    Call call;
+    call.mutable_framework_id()->CopyFrom(id);
+    call.set_type(Call::REVIVE);
+
+    mesos.send(call);
+  }
+
+  AWAIT_READY(event);
+  EXPECT_EQ(Event::OFFERS, event.get().type());
+  EXPECT_NE(0, event.get().offers().offers().size());
+  ASSERT_EQ(offer.resources(), event.get().offers().offers(0).resources());
+
+  Shutdown(); // Must shutdown before 'containerizer' gets deallocated.
+}
+
+
 // TODO(benh): Write test for sending Call::Acknowledgement through
 // master to slave when Event::Update was generated locally.
 

Reply via email to