Added Acknowledge call support to the master and the C++ scheduler library.
Review: https://reviews.apache.org/r/35857 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/abb5adc4 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/abb5adc4 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/abb5adc4 Branch: refs/heads/master Commit: abb5adc44472495ce75c336f4635b31c06964203 Parents: 49a0d2b Author: Vinod Kone <[email protected]> Authored: Wed Jun 24 16:18:38 2015 -0700 Committer: Vinod Kone <[email protected]> Committed: Wed Jul 1 17:54:59 2015 -0700 ---------------------------------------------------------------------- src/master/master.cpp | 69 ++++++++++++++++++++++++++-------------- src/master/master.hpp | 12 ++++--- src/scheduler/scheduler.cpp | 7 +--- 3 files changed, 55 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/abb5adc4/src/master/master.cpp ---------------------------------------------------------------------- diff --git a/src/master/master.cpp b/src/master/master.cpp index 255eaae..a72b648 100644 --- a/src/master/master.cpp +++ b/src/master/master.cpp @@ -1690,7 +1690,11 @@ void Master::receive( } case scheduler::Call::ACKNOWLEDGE: { - drop(from, call, "Unimplemented"); + if (!call.has_acknowledge()) { + drop(from, call, "Expecting 'acknowledge' to be present"); + return; + } + acknowledge(framework, call.acknowledge()); break; } @@ -3024,43 +3028,62 @@ void Master::statusUpdateAcknowledgement( if (framework == NULL) { LOG(WARNING) - << "Ignoring status update acknowledgement message for task " << taskId - << " of framework " << frameworkId << " on slave " << slaveId - << " because the framework cannot be found"; + << "Ignoring status update acknowledgement " << UUID::fromBytes(uuid) + << " for task " << taskId << " of framework " << frameworkId + << " on slave " << slaveId << " because the framework cannot be found"; metrics->invalid_status_update_acknowledgements++; return; } if (from != framework->pid) { LOG(WARNING) - << "Ignoring status update acknowledgement message for task " << taskId - << " of framework " << *framework << " on slave " << slaveId - << " because it is not expected from " << from; + << "Ignoring status update acknowledgement " << UUID::fromBytes(uuid) + << " for task " << taskId << " of framework " << *framework + << " on slave " << slaveId << " because it is not expected from " << from; metrics->invalid_status_update_acknowledgements++; return; } + scheduler::Call::Acknowledge message; + message.mutable_slave_id()->CopyFrom(slaveId); + message.mutable_task_id()->CopyFrom(taskId); + message.set_uuid(uuid); + + acknowledge(framework, message); +} + + +void Master::acknowledge( + Framework* framework, + const scheduler::Call::Acknowledge& acknowledge) +{ + CHECK_NOTNULL(framework); + + const SlaveID slaveId = acknowledge.slave_id(); + const TaskID taskId = acknowledge.task_id(); + const UUID uuid = UUID::fromBytes(acknowledge.uuid()); + Slave* slave = slaves.registered.get(slaveId); if (slave == NULL) { LOG(WARNING) - << "Cannot send status update acknowledgement message for task " << taskId - << " of framework " << *framework << " to slave " << slaveId - << " because slave is not registered"; + << "Cannot send status update acknowledgement " << uuid + << " for task " << taskId << " of framework " << *framework + << " to slave " << slaveId << " because slave is not registered"; metrics->invalid_status_update_acknowledgements++; return; } if (!slave->connected) { LOG(WARNING) - << "Cannot send status update acknowledgement message for task " << taskId - << " of framework " << *framework << " to slave " << *slave - << " because slave is disconnected"; + << "Cannot send status update acknowledgement " << uuid + << " for task " << taskId << " of framework " << *framework + << " to slave " << *slave << " because slave is disconnected"; metrics->invalid_status_update_acknowledgements++; return; } - Task* task = slave->getTask(frameworkId, taskId); + Task* task = slave->getTask(framework->id(), taskId); if (task != NULL) { // Status update state and uuid should be either set or unset @@ -3077,29 +3100,29 @@ void Master::statusUpdateAcknowledgement( // retry the update, at which point the master will set the // status update state. LOG(ERROR) - << "Ignoring status update acknowledgement message for task " << taskId - << " of framework " << *framework << " to slave " << *slave - << " because it no update was sent by this master"; + << "Ignoring status update acknowledgement " << uuid + << " for task " << taskId << " of framework " << *framework + << " to slave " << *slave << " because the update was not" + << " sent by this master"; metrics->invalid_status_update_acknowledgements++; return; } // Remove the task once the terminal update is acknowledged. if (protobuf::isTerminalState(task->status_update_state()) && - task->status_update_uuid() == uuid) { + UUID::fromBytes(task->status_update_uuid()) == uuid) { removeTask(task); } } - LOG(INFO) << "Forwarding status update acknowledgement " - << UUID::fromBytes(uuid) << " for task " << taskId - << " of framework " << *framework << " to slave " << *slave; + LOG(INFO) << "Processing ACKNOWLEDGE call " << uuid << " for task " << taskId + << " of framework " << *framework << " on slave " << slaveId; StatusUpdateAcknowledgementMessage message; message.mutable_slave_id()->CopyFrom(slaveId); - message.mutable_framework_id()->CopyFrom(frameworkId); + message.mutable_framework_id()->CopyFrom(framework->id()); message.mutable_task_id()->CopyFrom(taskId); - message.set_uuid(uuid); + message.set_uuid(uuid.toBytes()); send(slave->pid, message); http://git-wip-us.apache.org/repos/asf/mesos/blob/abb5adc4/src/master/master.hpp ---------------------------------------------------------------------- diff --git a/src/master/master.hpp b/src/master/master.hpp index 2d25af4..6e1772b 100644 --- a/src/master/master.hpp +++ b/src/master/master.hpp @@ -1021,10 +1021,6 @@ private: void revive(Framework* framework); - void reconcile( - Framework* framework, - const scheduler::Call::Reconcile& reconcile); - void kill( Framework* framework, const scheduler::Call::Kill& kill); @@ -1033,6 +1029,14 @@ private: Framework* framework, const scheduler::Call::Shutdown& shutdown); + void acknowledge( + Framework* framework, + const scheduler::Call::Acknowledge& acknowledge); + + void reconcile( + Framework* framework, + const scheduler::Call::Reconcile& reconcile); + bool elected() const { return leader.isSome() && leader.get() == info_; http://git-wip-us.apache.org/repos/asf/mesos/blob/abb5adc4/src/scheduler/scheduler.cpp ---------------------------------------------------------------------- diff --git a/src/scheduler/scheduler.cpp b/src/scheduler/scheduler.cpp index 740c088..478ef45 100644 --- a/src/scheduler/scheduler.cpp +++ b/src/scheduler/scheduler.cpp @@ -292,12 +292,7 @@ public: drop(call, "Expecting 'acknowledge' to be present"); return; } - StatusUpdateAcknowledgementMessage message; - message.mutable_framework_id()->CopyFrom(call.framework_id()); - message.mutable_slave_id()->CopyFrom(call.acknowledge().slave_id()); - message.mutable_task_id()->CopyFrom(call.acknowledge().task_id()); - message.set_uuid(call.acknowledge().uuid()); - send(master.get(), message); + send(master.get(), call); break; }
