Repository: mesos
Updated Branches:
  refs/heads/master 3da05548f -> df5b2cfc1


Added support for CREATE operation in master.

Review: https://reviews.apache.org/r/30386


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/df5b2cfc
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/df5b2cfc
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/df5b2cfc

Branch: refs/heads/master
Commit: df5b2cfc184039adb9c32ee54ca5c9bcc8245de9
Parents: 3da0554
Author: Jie Yu <[email protected]>
Authored: Wed Jan 28 16:11:12 2015 -0800
Committer: Jie Yu <[email protected]>
Committed: Wed Feb 4 14:01:29 2015 -0800

----------------------------------------------------------------------
 src/master/master.cpp | 113 +++++++++++++++++++++++++++++++++++++++++----
 src/master/master.hpp |  13 ++++++
 2 files changed, 117 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/df5b2cfc/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 69b945d..f4b6463 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -1339,6 +1339,23 @@ void Master::drop(
 }
 
 
+void Master::drop(
+    Framework* framework,
+    const Offer::Operation& operation,
+    const string& message)
+{
+  // TODO(jieyu): Increment a metric.
+
+  // NOTE: There is no direct feedback to the framework when an
+  // operation is dropped. The framework will find out that the
+  // operation was dropped through subsequent offers.
+
+  LOG(ERROR) << "Dropping " << Offer::Operation::Type_Name(operation.type())
+             << " offer operation from framework " << *framework
+             << ": " << message;
+}
+
+
 void Master::receive(
     const UPID& from,
     const scheduler::Call& call)
@@ -2237,29 +2254,46 @@ void Master::_accept(
     switch (operation.type()) {
       case Offer::Operation::RESERVE: {
         // TODO(jieyu): Provide implementation for RESERVE.
-        LOG(ERROR) << "Unsupported offer operation: "
-                   << Offer::Operation::Type_Name(operation.type());
+        drop(framework, operation, "Unimplemented");
         break;
       }
 
       case Offer::Operation::UNRESERVE: {
         // TODO(jieyu): Provide implementation for UNRESERVE.
-        LOG(ERROR) << "Unsupported offer operation: "
-                   << Offer::Operation::Type_Name(operation.type());
+        drop(framework, operation, "Unimplemented");
         break;
       }
 
       case Offer::Operation::CREATE: {
-        // TODO(jieyu): Provide implementation for CREATE.
-        LOG(ERROR) << "Unsupported offer operation: "
-                   << Offer::Operation::Type_Name(operation.type());
+        Option<Error> error = validation::operation::validate(
+            operation.create(),
+            slave->checkpointedResources);
+
+        if (error.isSome()) {
+          drop(framework, operation, error.get().message);
+          continue;
+        }
+
+        Try<Resources> resources = _offeredResources.apply(operation);
+        if (resources.isError()) {
+          drop(framework, operation, resources.error());
+          continue;
+        }
+
+        _offeredResources = resources.get();
+
+        allocator->updateAllocation(
+            frameworkId,
+            slaveId,
+            {operation});
+
+        updateCheckpointedResources(slave, operation);
         break;
       }
 
       case Offer::Operation::DESTROY: {
         // TODO(jieyu): Provide implementation for DESTROY.
-        LOG(ERROR) << "Unsupported offer operation: "
-                   << Offer::Operation::Type_Name(operation.type());
+        drop(framework, operation, "Unimplemented");
         break;
       }
 
@@ -4568,6 +4602,67 @@ void Master::removeOffer(Offer* offer, bool rescind)
 }
 
 
+void Master::updateCheckpointedResources(
+    Slave* slave,
+    const Offer::Operation& operation)
+{
+  CHECK_NOTNULL(slave);
+
+  switch (operation.type()) {
+    case Offer::Operation::RESERVE: {
+      // TODO(jieyu): Provide implementation.
+      LOG(ERROR) << "Failed to update checkpointed resources for slave "
+                 << *slave << ": Unimplemented RESERVE operation";
+      break;
+    }
+
+    case Offer::Operation::UNRESERVE: {
+      // TODO(jieyu): Provide implementation.
+      LOG(ERROR) << "Failed to update checkpointed resources for slave "
+                 << *slave << ": Unimplemented UNRESERVE operation";
+      break;
+    }
+
+    case Offer::Operation::CREATE: {
+      // If the CREATE operation applies to resources that have
+      // already been checkpointed on the slave, that means the
+      // volumes are created from dynamically reserved resources and
+      // we need to update the checkpointed resources. Otherwise, the
+      // volumes are created from regular resources and we need to add
+      // them to the checkpointed resources.
+      Try<Resources> resources = slave->checkpointedResources.apply(operation);
+      if (resources.isError()) {
+        slave->checkpointedResources += operation.create().volumes();
+      } else {
+        slave->checkpointedResources = resources.get();
+      }
+      break;
+    }
+
+    case Offer::Operation::DESTROY: {
+      // TODO(jieyu): Provide implementation.
+      LOG(ERROR) << "Failed to update checkpointed resources for slave "
+                 << *slave << ": Unimplemented DESTROY operation";
+      break;
+    }
+
+    default:
+      LOG(FATAL) << "Not expecting operation " << operation.type()
+                 << " when updating slave checkpointed resources";
+      break;
+  }
+
+  LOG(INFO) << "Sending checkpointed resources "
+            << slave->checkpointedResources
+            << " to slave " << *slave;
+
+  CheckpointResourcesMessage message;
+  message.mutable_resources()->CopyFrom(slave->checkpointedResources);
+
+  send(slave->pid, message);
+}
+
+
 // TODO(bmahler): Consider killing this.
 Framework* Master::getFramework(const FrameworkID& frameworkId)
 {

http://git-wip-us.apache.org/repos/asf/mesos/blob/df5b2cfc/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index cd37ee9..9d8c508 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -390,6 +390,14 @@ protected:
       const FrameworkID& frameworkId,
       const ExecutorID& executorId);
 
+  // Certain offer operations (e.g., RESERVE, CREATE) may result in a
+  // change to the checkpointed resources on the slave. This method
+  // updates the checkpointed resources for the slave and sends
+  // CheckpointResourcesMessage to the slave accordingly.
+  void updateCheckpointedResources(
+      Slave* slave,
+      const Offer::Operation& operation);
+
   // Forwards the update to the framework.
   void forward(
       const StatusUpdate& update,
@@ -418,6 +426,11 @@ private:
       const scheduler::Call& call,
       const std::string& message);
 
+  void drop(
+      Framework* framework,
+      const Offer::Operation& operation,
+      const std::string& message);
+
   // Call handlers.
   void receive(
       const process::UPID& from,

Reply via email to