Maintenance Primitives: Implemented Master::inverseOffer.

Review: https://reviews.apache.org/r/37180


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/e6375f31
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/e6375f31
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/e6375f31

Branch: refs/heads/master
Commit: e6375f319914741c652bca7c9b97049e81828f5e
Parents: 42f9ce5
Author: Joris Van Remoortere <[email protected]>
Authored: Sun Aug 30 14:24:03 2015 -0400
Committer: Joris Van Remoortere <[email protected]>
Committed: Mon Sep 14 13:58:37 2015 -0400

----------------------------------------------------------------------
 src/master/master.cpp | 225 ++++++++++++++++++++++++++++++++++++++++++++-
 src/master/master.hpp |  46 +++++++++
 2 files changed, 268 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/e6375f31/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 8471735..8ab5a03 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -953,6 +953,13 @@ void Master::finalize()
       removeOffer(offer);
     }
 
+    // Remove inverse offers.
+    foreach (InverseOffer* inverseOffer, utils::copy(slave->inverseOffers)) {
+      // We don't need to update the allocator because the slave has already
+      // been removed.
+      removeInverseOffer(inverseOffer);
+    }
+
     // Terminate the slave observer.
     terminate(slave->observer);
     wait(slave->observer);
@@ -978,12 +985,14 @@ void Master::finalize()
     CHECK(framework->tasks.empty());
     CHECK(framework->executors.empty());
     CHECK(framework->offers.empty());
+    CHECK(framework->inverseOffers.empty());
 
     delete framework;
   }
   frameworks.registered.clear();
 
   CHECK(offers.empty());
+  CHECK(inverseOffers.empty());
 
   foreachvalue (Future<Option<string>> future, authenticating) {
     // NOTE: This is necessary during tests because a copy of
@@ -2323,6 +2332,17 @@ void Master::_subscribe(
         removeOffer(offer, true); // Rescind.
       }
 
+      // Also remove inverse offers.
+      foreach (InverseOffer* inverseOffer,
+               utils::copy(framework->inverseOffers)) {
+        allocator->updateInverseOffer(
+            inverseOffer->slave_id(),
+            inverseOffer->framework_id(),
+            None());
+
+        removeInverseOffer(inverseOffer, true); // Rescind.
+      }
+
       // TODO(bmahler): Shouldn't this re-link with the scheduler?
       framework->connected = true;
 
@@ -2474,8 +2494,19 @@ void Master::deactivate(Framework* framework)
   foreach (Offer* offer, utils::copy(framework->offers)) {
     allocator->recoverResources(
         offer->framework_id(), offer->slave_id(), offer->resources(), None());
+
     removeOffer(offer, true); // Rescind.
   }
+
+  // Remove the framework's inverse offers.
+  foreach (InverseOffer* inverseOffer, utils::copy(framework->inverseOffers)) {
+    allocator->updateInverseOffer(
+        inverseOffer->slave_id(),
+        inverseOffer->framework_id(),
+        None());
+
+    removeInverseOffer(inverseOffer, true); // Rescind.
+  }
 }
 
 
@@ -2515,6 +2546,16 @@ void Master::deactivate(Slave* slave)
 
     removeOffer(offer, true); // Rescind!
   }
+
+  // Remove and rescind inverse offers.
+  foreach (InverseOffer* inverseOffer, utils::copy(slave->inverseOffers)) {
+    allocator->updateInverseOffer(
+        slave->id,
+        inverseOffer->framework_id(),
+        None());
+
+    removeInverseOffer(inverseOffer, true); // Rescind!
+  }
 }
 
 
@@ -4100,8 +4141,7 @@ void Master::updateSlave(
             << " oversubscribed resources " <<  oversubscribedResources;
 
   // First, rescind any outstanding offers with revocable resources.
-  // NOTE: Need a copy of offers because the offers are removed inside
-  // the loop.
+  // NOTE: Need a copy of offers because the offers are removed inside the 
loop.
   foreach (Offer* offer, utils::copy(slave->offers)) {
     const Resources offered = offer->resources();
     if (!offered.revocable().empty()) {
@@ -4116,6 +4156,9 @@ void Master::updateSlave(
     }
   }
 
+  // NOTE: We don't need to rescind inverse offers here as they are unrelated 
to
+  // oversubscription.
+
   slave->totalResources -= slave->totalResources.revocable();
   slave->totalResources += oversubscribedResources.revocable();
 
@@ -4166,6 +4209,17 @@ void Master::updateUnavailability(
         removeOffer(offer, true); // Rescind!
       }
 
+      // Remove and rescind inverse offers since the allocator will send new
+      // inverse offers for the updated unavailability.
+      foreach (InverseOffer* inverseOffer, utils::copy(slave->inverseOffers)) {
+        allocator->updateInverseOffer(
+            slave->id,
+            inverseOffer->framework_id(),
+            None());
+
+        removeInverseOffer(inverseOffer, true); // Rescind!
+      }
+
       // We remove / resind all the offers first so that any calls to the
       // allocator to modify its internal state are queued before the update of
       // the unavailability in the allocator. We do this so that the 
allocator's
@@ -4786,7 +4840,80 @@ void Master::inverseOffer(
     const FrameworkID& frameworkId,
     const hashmap<SlaveID, UnavailableResources>& resources)
 {
-  // TODO(jmlvanre): Implement this function.
+  if (!frameworks.registered.contains(frameworkId) ||
+      !frameworks.registered[frameworkId]->active) {
+    LOG(INFO) << "Master ignoring inverse offers to framework " << frameworkId
+              << " because the framework has terminated or is inactive";
+    return;
+  }
+
+  // Create an inverse offer for each slave and add it to the message.
+  ResourceOffersMessage message;
+
+  Framework* framework = CHECK_NOTNULL(frameworks.registered[frameworkId]);
+  foreachpair (const SlaveID& slaveId,
+               const UnavailableResources& unavailableResources,
+               resources) {
+    if (!slaves.registered.contains(slaveId)) {
+      LOG(INFO)
+        << "Master ignoring inverse offers to framework " << *framework
+        << " because slave " << slaveId << " is not valid";
+      continue;
+    }
+
+    Slave* slave = slaves.registered.get(slaveId);
+    CHECK_NOTNULL(slave);
+
+    // This could happen if the allocator dispatched 'Master::inverseOffer'
+    // before the slave was deactivated in the allocator.
+    if (!slave->active) {
+      LOG(INFO)
+        << "Master ignoring inverse offers because slave " << *slave
+        << " is " << (slave->connected ? "deactivated" : "disconnected");
+
+      continue;
+    }
+
+    InverseOffer* inverseOffer = new InverseOffer();
+
+    // We use the same id generator as regular offers so that we can have 
unique
+    // ids accross both. This way we can re-use some of the `OfferID` only
+    // messages.
+    inverseOffer->mutable_id()->CopyFrom(newOfferId());
+    inverseOffer->mutable_framework_id()->CopyFrom(framework->id());
+    inverseOffer->mutable_slave_id()->CopyFrom(slave->id);
+    inverseOffer->mutable_unavailability()->CopyFrom(
+        unavailableResources.unavailability);
+
+    inverseOffers[inverseOffer->id()] = inverseOffer;
+
+    framework->addInverseOffer(inverseOffer);
+    slave->addInverseOffer(inverseOffer);
+
+    // TODO(jmlvanre): Do we want a separate flag for inverse offer
+    // timeout?
+    if (flags.offer_timeout.isSome()) {
+      // Rescind the inverse offer after the timeout elapses.
+      inverseOfferTimers[inverseOffer->id()] =
+        delay(flags.offer_timeout.get(),
+              self(),
+              &Self::inverseOfferTimeout,
+              inverseOffer->id());
+    }
+
+    // Add the inverse offer *AND* the corresponding slave's PID.
+    message.add_inverse_offers()->CopyFrom(*inverseOffer);
+    message.add_pids(slave->pid);
+  }
+
+  if (message.inverse_offers().size() == 0) {
+    return;
+  }
+
+  LOG(INFO) << "Sending " << message.inverse_offers().size()
+            << " inverse offers to framework " << *framework;
+
+  framework->send(message);
 }
 
 
@@ -5244,12 +5371,24 @@ void Master::_failoverFramework(Framework* framework)
   foreach (Offer* offer, utils::copy(framework->offers)) {
     allocator->recoverResources(
         offer->framework_id(), offer->slave_id(), offer->resources(), None());
+
     removeOffer(offer);
   }
 
+  // Also remove the inverse offers.
+  foreach (InverseOffer* inverseOffer, utils::copy(framework->inverseOffers)) {
+    allocator->updateInverseOffer(
+        inverseOffer->slave_id(),
+        inverseOffer->framework_id(),
+        None());
+
+    removeInverseOffer(inverseOffer);
+  }
+
   // Reconnect and reactivate the framework.
   framework->connected = true;
 
+  // Reactivate the framework.
   // NOTE: We do this after recovering resources (above) so that
   // the allocator has the correct view of the framework's share.
   if (!framework->active) {
@@ -5344,9 +5483,20 @@ void Master::removeFramework(Framework* framework)
         offer->slave_id(),
         offer->resources(),
         None());
+
     removeOffer(offer);
   }
 
+  // Also remove the inverse offers.
+  foreach (InverseOffer* inverseOffer, utils::copy(framework->inverseOffers)) {
+    allocator->updateInverseOffer(
+        inverseOffer->slave_id(),
+        inverseOffer->framework_id(),
+        None());
+
+    removeInverseOffer(inverseOffer);
+  }
+
   // Remove the framework's executors for correct resource accounting.
   foreachkey (const SlaveID& slaveId, utils::copy(framework->executors)) {
     Slave* slave = slaves.registered.get(slaveId);
@@ -5610,6 +5760,15 @@ void Master::removeSlave(
     removeOffer(offer, true); // Rescind!
   }
 
+  // Remove inverse offers because sending them for a slave that is
+  // gone doesn't make sense.
+  foreach (InverseOffer* inverseOffer, utils::copy(slave->inverseOffers)) {
+    // We don't need to update the allocator because we've already called
+    // `RemoveSlave()`.
+    // Remove and rescind inverse offers.
+    removeInverseOffer(inverseOffer, true); // Rescind!
+  }
+
   // Mark the slave as being removed.
   slaves.removing.insert(slave->id);
   slaves.registered.remove(slave);
@@ -5967,6 +6126,58 @@ void Master::removeOffer(Offer* offer, bool rescind)
 }
 
 
+void Master::inverseOfferTimeout(const OfferID& inverseOfferId)
+{
+  InverseOffer* inverseOffer = getInverseOffer(inverseOfferId);
+  if (inverseOffer != NULL) {
+    allocator->updateInverseOffer(
+        inverseOffer->slave_id(),
+        inverseOffer->framework_id(),
+        None());
+
+    removeInverseOffer(inverseOffer, true);
+  }
+}
+
+
+void Master::removeInverseOffer(InverseOffer* inverseOffer, bool rescind)
+{
+  // Remove from framework.
+  Framework* framework = getFramework(inverseOffer->framework_id());
+  CHECK(framework != NULL)
+    << "Unknown framework " << inverseOffer->framework_id()
+    << " in the inverse offer " << inverseOffer->id();
+
+  framework->removeInverseOffer(inverseOffer);
+
+  // Remove from slave.
+  Slave* slave = slaves.registered.get(inverseOffer->slave_id());
+
+  CHECK(slave != NULL)
+    << "Unknown slave " << inverseOffer->slave_id()
+    << " in the inverse offer " << inverseOffer->id();
+
+  slave->removeInverseOffer(inverseOffer);
+
+  if (rescind) {
+    RescindResourceOfferMessage message;
+    message.mutable_offer_id()->CopyFrom(inverseOffer->id());
+    framework->send(message);
+  }
+
+  // Remove and cancel inverse offer removal timers. Canceling the Timers is
+  // only done to avoid having too many active Timers in libprocess.
+  if (inverseOfferTimers.contains(inverseOffer->id())) {
+    Clock::cancel(inverseOfferTimers[inverseOffer->id()]);
+    inverseOfferTimers.erase(inverseOffer->id());
+  }
+
+  // Delete it.
+  inverseOffers.erase(inverseOffer->id());
+  delete inverseOffer;
+}
+
+
 // TODO(bmahler): Consider killing this.
 Framework* Master::getFramework(const FrameworkID& frameworkId)
 {
@@ -5983,6 +6194,14 @@ Offer* Master::getOffer(const OfferID& offerId)
 }
 
 
+// TODO(bmahler): Consider killing this.
+InverseOffer* Master::getInverseOffer(const OfferID& inverseOfferId)
+{
+  return inverseOffers.contains(inverseOfferId) ?
+    inverseOffers[inverseOfferId] : NULL;
+}
+
+
 // Create a new framework ID. We format the ID as MASTERID-FWID, where
 // MASTERID is the ID of the master (launch date plus fault tolerant ID)
 // and FWID is an increasing integer.

http://git-wip-us.apache.org/repos/asf/mesos/blob/e6375f31/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 1ba0837..d48ef7c 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -236,6 +236,22 @@ struct Slave
     offers.erase(offer);
   }
 
+  void addInverseOffer(InverseOffer* inverseOffer)
+  {
+    CHECK(!inverseOffers.contains(inverseOffer))
+      << "Duplicate inverse offer " << inverseOffer->id();
+
+    inverseOffers.insert(inverseOffer);
+  }
+
+  void removeInverseOffer(InverseOffer* inverseOffer)
+  {
+    CHECK(inverseOffers.contains(inverseOffer))
+      << "Unknown inverse offer " << inverseOffer->id();
+
+    inverseOffers.erase(inverseOffer);
+  }
+
   bool hasExecutor(const FrameworkID& frameworkId,
                    const ExecutorID& executorId) const
   {
@@ -319,6 +335,9 @@ struct Slave
   // Active offers on this slave.
   hashset<Offer*> offers;
 
+  // Active inverse offers on this slave.
+  hashset<InverseOffer*> inverseOffers;
+
   hashmap<FrameworkID, Resources> usedResources;  // Active task / executors.
   Resources offeredResources; // Offers.
 
@@ -715,8 +734,15 @@ protected:
   // Remove an offer and optionally rescind the offer as well.
   void removeOffer(Offer* offer, bool rescind = false);
 
+  // Remove an inverse offer after specified timeout
+  void inverseOfferTimeout(const OfferID& inverseOfferId);
+
+  // Remove an inverse offer and optionally rescind it as well.
+  void removeInverseOffer(InverseOffer* inverseOffer, bool rescind = false);
+
   Framework* getFramework(const FrameworkID& frameworkId);
   Offer* getOffer(const OfferID& offerId);
+  InverseOffer* getInverseOffer(const OfferID& inverseOfferId);
 
   FrameworkID newFrameworkId();
   OfferID newOfferId();
@@ -1134,6 +1160,9 @@ private:
   hashmap<OfferID, Offer*> offers;
   hashmap<OfferID, process::Timer> offerTimers;
 
+  hashmap<OfferID, InverseOffer*> inverseOffers;
+  hashmap<OfferID, process::Timer> inverseOfferTimers;
+
   hashmap<std::string, Role*> roles;
 
   // Authenticator names as supplied via flags.
@@ -1558,6 +1587,21 @@ struct Framework
     offers.erase(offer);
   }
 
+  void addInverseOffer(InverseOffer* inverseOffer)
+  {
+    CHECK(!inverseOffers.contains(inverseOffer))
+      << "Duplicate inverse offer " << inverseOffer->id();
+    inverseOffers.insert(inverseOffer);
+  }
+
+  void removeInverseOffer(InverseOffer* inverseOffer)
+  {
+    CHECK(inverseOffers.contains(inverseOffer))
+      << "Unknown inverse offer " << inverseOffer->id();
+
+    inverseOffers.erase(inverseOffer);
+  }
+
   bool hasExecutor(const SlaveID& slaveId,
                    const ExecutorID& executorId)
   {
@@ -1767,6 +1811,8 @@ struct Framework
 
   hashset<Offer*> offers; // Active offers for framework.
 
+  hashset<InverseOffer*> inverseOffers; // Active inverse offers for framework.
+
   hashmap<SlaveID, hashmap<ExecutorID, ExecutorInfo>> executors;
 
   // NOTE: For the used and offered resources below, we keep the

Reply via email to