Added Acknowledge call support to the master and the C++ scheduler library.

Review: https://reviews.apache.org/r/35857


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/abb5adc4
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/abb5adc4
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/abb5adc4

Branch: refs/heads/master
Commit: abb5adc44472495ce75c336f4635b31c06964203
Parents: 49a0d2b
Author: Vinod Kone <[email protected]>
Authored: Wed Jun 24 16:18:38 2015 -0700
Committer: Vinod Kone <[email protected]>
Committed: Wed Jul 1 17:54:59 2015 -0700

----------------------------------------------------------------------
 src/master/master.cpp       | 69 ++++++++++++++++++++++++++--------------
 src/master/master.hpp       | 12 ++++---
 src/scheduler/scheduler.cpp |  7 +---
 3 files changed, 55 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/abb5adc4/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 255eaae..a72b648 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -1690,7 +1690,11 @@ void Master::receive(
     }
 
     case scheduler::Call::ACKNOWLEDGE: {
-      drop(from, call, "Unimplemented");
+      if (!call.has_acknowledge()) {
+        drop(from, call, "Expecting 'acknowledge' to be present");
+        return;
+      }
+      acknowledge(framework, call.acknowledge());
       break;
     }
 
@@ -3024,43 +3028,62 @@ void Master::statusUpdateAcknowledgement(
 
   if (framework == NULL) {
     LOG(WARNING)
-      << "Ignoring status update acknowledgement message for task " << taskId
-      << " of framework " << frameworkId << " on slave " << slaveId
-      << " because the framework cannot be found";
+      << "Ignoring status update acknowledgement " << UUID::fromBytes(uuid)
+      << " for task " << taskId << " of framework " << frameworkId
+      << " on slave " << slaveId << " because the framework cannot be found";
     metrics->invalid_status_update_acknowledgements++;
     return;
   }
 
   if (from != framework->pid) {
     LOG(WARNING)
-      << "Ignoring status update acknowledgement message for task " << taskId
-      << " of framework " << *framework << " on slave " << slaveId
-      << " because it is not expected from " << from;
+      << "Ignoring status update acknowledgement " << UUID::fromBytes(uuid)
+      << " for task " << taskId << " of framework " << *framework
+      << " on slave " << slaveId << " because it is not expected from " << 
from;
     metrics->invalid_status_update_acknowledgements++;
     return;
   }
 
+  scheduler::Call::Acknowledge message;
+  message.mutable_slave_id()->CopyFrom(slaveId);
+  message.mutable_task_id()->CopyFrom(taskId);
+  message.set_uuid(uuid);
+
+  acknowledge(framework, message);
+}
+
+
+void Master::acknowledge(
+    Framework* framework,
+    const scheduler::Call::Acknowledge& acknowledge)
+{
+  CHECK_NOTNULL(framework);
+
+  const SlaveID slaveId = acknowledge.slave_id();
+  const TaskID taskId = acknowledge.task_id();
+  const UUID uuid = UUID::fromBytes(acknowledge.uuid());
+
   Slave* slave = slaves.registered.get(slaveId);
 
   if (slave == NULL) {
     LOG(WARNING)
-      << "Cannot send status update acknowledgement message for task " << 
taskId
-      << " of framework " << *framework << " to slave " << slaveId
-      << " because slave is not registered";
+      << "Cannot send status update acknowledgement " << uuid
+      << " for task " << taskId << " of framework " << *framework
+      << " to slave " << slaveId << " because slave is not registered";
     metrics->invalid_status_update_acknowledgements++;
     return;
   }
 
   if (!slave->connected) {
     LOG(WARNING)
-      << "Cannot send status update acknowledgement message for task " << 
taskId
-      << " of framework " << *framework << " to slave " << *slave
-      << " because slave is disconnected";
+      << "Cannot send status update acknowledgement " << uuid
+      << " for task " << taskId << " of framework " << *framework
+      << " to slave " << *slave << " because slave is disconnected";
     metrics->invalid_status_update_acknowledgements++;
     return;
   }
 
-  Task* task = slave->getTask(frameworkId, taskId);
+  Task* task = slave->getTask(framework->id(), taskId);
 
   if (task != NULL) {
     // Status update state and uuid should be either set or unset
@@ -3077,29 +3100,29 @@ void Master::statusUpdateAcknowledgement(
       // retry the update, at which point the master will set the
       // status update state.
       LOG(ERROR)
-        << "Ignoring status update acknowledgement message for task " << taskId
-        << " of framework " << *framework << " to slave " << *slave
-        << " because it no update was sent by this master";
+        << "Ignoring status update acknowledgement " << uuid
+        << " for task " << taskId << " of framework " << *framework
+        << " to slave " << *slave << " because the update was not"
+        << " sent by this master";
       metrics->invalid_status_update_acknowledgements++;
       return;
     }
 
     // Remove the task once the terminal update is acknowledged.
     if (protobuf::isTerminalState(task->status_update_state()) &&
-        task->status_update_uuid() == uuid) {
+        UUID::fromBytes(task->status_update_uuid()) == uuid) {
       removeTask(task);
      }
   }
 
-  LOG(INFO) << "Forwarding status update acknowledgement "
-            << UUID::fromBytes(uuid) << " for task " << taskId
-            << " of framework " << *framework << " to slave " << *slave;
+  LOG(INFO) << "Processing ACKNOWLEDGE call " << uuid << " for task " << taskId
+            << " of framework " << *framework << " on slave " << slaveId;
 
   StatusUpdateAcknowledgementMessage message;
   message.mutable_slave_id()->CopyFrom(slaveId);
-  message.mutable_framework_id()->CopyFrom(frameworkId);
+  message.mutable_framework_id()->CopyFrom(framework->id());
   message.mutable_task_id()->CopyFrom(taskId);
-  message.set_uuid(uuid);
+  message.set_uuid(uuid.toBytes());
 
   send(slave->pid, message);
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/abb5adc4/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 2d25af4..6e1772b 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -1021,10 +1021,6 @@ private:
 
   void revive(Framework* framework);
 
-  void reconcile(
-      Framework* framework,
-      const scheduler::Call::Reconcile& reconcile);
-
   void kill(
       Framework* framework,
       const scheduler::Call::Kill& kill);
@@ -1033,6 +1029,14 @@ private:
       Framework* framework,
       const scheduler::Call::Shutdown& shutdown);
 
+  void acknowledge(
+      Framework* framework,
+      const scheduler::Call::Acknowledge& acknowledge);
+
+  void reconcile(
+      Framework* framework,
+      const scheduler::Call::Reconcile& reconcile);
+
   bool elected() const
   {
     return leader.isSome() && leader.get() == info_;

http://git-wip-us.apache.org/repos/asf/mesos/blob/abb5adc4/src/scheduler/scheduler.cpp
----------------------------------------------------------------------
diff --git a/src/scheduler/scheduler.cpp b/src/scheduler/scheduler.cpp
index 740c088..478ef45 100644
--- a/src/scheduler/scheduler.cpp
+++ b/src/scheduler/scheduler.cpp
@@ -292,12 +292,7 @@ public:
           drop(call, "Expecting 'acknowledge' to be present");
           return;
         }
-        StatusUpdateAcknowledgementMessage message;
-        message.mutable_framework_id()->CopyFrom(call.framework_id());
-        message.mutable_slave_id()->CopyFrom(call.acknowledge().slave_id());
-        message.mutable_task_id()->CopyFrom(call.acknowledge().task_id());
-        message.set_uuid(call.acknowledge().uuid());
-        send(master.get(), message);
+        send(master.get(), call);
         break;
       }
 

Reply via email to