Implemented operation status reconciliation. Review: https://reviews.apache.org/r/66464/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/d2616908 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/d2616908 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/d2616908 Branch: refs/heads/master Commit: d26169081699b6bc654113ae7ea980e55cd5f67d Parents: c7c3848 Author: Gaston Kleiman <[email protected]> Authored: Mon Apr 23 13:43:41 2018 -0700 Committer: Greg Mann <[email protected]> Committed: Mon Apr 23 13:49:07 2018 -0700 ---------------------------------------------------------------------- include/mesos/v1/scheduler/scheduler.proto | 6 ++ src/master/http.cpp | 32 +++++-- src/master/master.cpp | 108 +++++++++++++++++++++++- src/master/master.hpp | 21 ++++- src/messages/messages.proto | 7 +- 5 files changed, 159 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/d2616908/include/mesos/v1/scheduler/scheduler.proto ---------------------------------------------------------------------- diff --git a/include/mesos/v1/scheduler/scheduler.proto b/include/mesos/v1/scheduler/scheduler.proto index 25ebcfc..b912901 100644 --- a/include/mesos/v1/scheduler/scheduler.proto +++ b/include/mesos/v1/scheduler/scheduler.proto @@ -426,7 +426,13 @@ message Call { message ReconcileOperations { message Operation { required OperationID operation_id = 1; + + // If `agent_id` is not set and the master doesn't know the operation, + // then it will return `OPERATION_UNKNOWN`; if `agent_id` is set, it can + // return more fine-grained states depending on the state of the + // corresponding agent. optional AgentID agent_id = 2; + optional ResourceProviderID resource_provider_id = 3; } http://git-wip-us.apache.org/repos/asf/mesos/blob/d2616908/src/master/http.cpp ---------------------------------------------------------------------- diff --git a/src/master/http.cpp b/src/master/http.cpp index 34c9023..135ae43 100644 --- a/src/master/http.cpp +++ b/src/master/http.cpp @@ -956,12 +956,14 @@ Future<Response> Master::Http::scheduler( return BadRequest("Failed to validate scheduler::Call: " + error->message); } - if (call.type() == scheduler::Call::SUBSCRIBE) { - // We default to JSON 'Content-Type' in the response since an - // empty 'Accept' header results in all media types considered - // acceptable. - ContentType acceptType = ContentType::JSON; + ContentType acceptType; + // Ideally this handler would be consistent with the Operator API handler + // and determine the accept type regardless of the type of request. + // However, to maintain backwards compatibility, it determines the accept + // type only if the response will not be empty. + if (call.type() == scheduler::Call::SUBSCRIBE || + call.type() == scheduler::Call::RECONCILE_OPERATIONS) { if (request.acceptsMediaType(APPLICATION_JSON)) { acceptType = ContentType::JSON; } else if (request.acceptsMediaType(APPLICATION_PROTOBUF)) { @@ -971,7 +973,9 @@ Future<Response> Master::Http::scheduler( string("Expecting 'Accept' to allow ") + "'" + APPLICATION_PROTOBUF + "' or '" + APPLICATION_JSON + "'"); } + } + if (call.type() == scheduler::Call::SUBSCRIBE) { // Make sure that a stream ID was not included in the request headers. if (request.headers.contains("Mesos-Stream-Id")) { return BadRequest( @@ -1110,9 +1114,9 @@ Future<Response> Master::Http::scheduler( master->reconcile(framework, std::move(*call.mutable_reconcile())); return Accepted(); - // TODO(greggomann): Implement operation reconciliation. case scheduler::Call::RECONCILE_OPERATIONS: - return Forbidden("Operation reconciliation is not yet implemented"); + return reconcileOperations( + framework, call.reconcile_operations(), acceptType); case scheduler::Call::MESSAGE: master->message(framework, std::move(*call.mutable_message())); @@ -5089,6 +5093,20 @@ Future<Response> Master::Http::_markAgentGone(const SlaveID& slaveId) const }); } + +Future<Response> Master::Http::reconcileOperations( + Framework* framework, + const scheduler::Call::ReconcileOperations& call, + ContentType contentType) const +{ + mesos::scheduler::Response response; + response.set_type(mesos::scheduler::Response::RECONCILE_OPERATIONS); + *response.mutable_reconcile_operations() = + master->reconcileOperations(framework, call); + + return OK(serialize(contentType, evolve(response)), stringify(contentType)); +} + } // namespace master { } // namespace internal { } // namespace mesos { http://git-wip-us.apache.org/repos/asf/mesos/blob/d2616908/src/master/master.cpp ---------------------------------------------------------------------- diff --git a/src/master/master.cpp b/src/master/master.cpp index ada7709..545a4d7 100644 --- a/src/master/master.cpp +++ b/src/master/master.cpp @@ -8837,10 +8837,112 @@ void Master::reconcile( } -// TODO(greggomann): Implement operation update reconciliation. -void Master::reconcileOperations( +scheduler::Response::ReconcileOperations Master::reconcileOperations( Framework* framework, - const scheduler::Call::ReconcileOperations& reconcile) {} + const scheduler::Call::ReconcileOperations& reconcile) +{ + CHECK_NOTNULL(framework); + + ++metrics->messages_reconcile_operations; + + scheduler::Response::ReconcileOperations response; + + if (reconcile.operations_size() == 0) { + // Implicit reconciliation. + LOG(INFO) << "Performing implicit operation state reconciliation" + " for framework " << *framework; + + response.mutable_operation_statuses()->Reserve( + framework->operations.size()); + + foreachvalue (Operation* operation, framework->operations) { + if (operation->statuses().empty()) { + // This can happen if the operation is pending. + response.add_operation_statuses()->CopyFrom(operation->latest_status()); + } else { + response.add_operation_statuses()->CopyFrom( + *operation->statuses().rbegin()); + } + } + + return response; + } + + // Explicit reconciliation. + LOG(INFO) << "Performing explicit operation state reconciliation for " + << reconcile.operations_size() << " operations of framework " + << *framework; + + // Explicit reconciliation occurs for the following cases: + // (1) Operation is known: the latest status sent to the framework. + // (2) Operation is unknown, slave is recovered: OPERATION_RECOVERING. + // (3) Operation is unknown, slave is registered: OPERATION_UNKNOWN. + // (4) Operation is unknown, slave is unreachable: OPERATION_UNREACHABLE. + // (5) Operation is unknown, slave is gone: OPERATION_GONE_BY_OPERATOR. + // (6) Operation is unknown, slave is unknown: OPERATION_UNKNOWN. + // (7) Operation is unknown, slave ID is not specified: OPERATION_UNKNOWN. + + foreach (const scheduler::Call::ReconcileOperations::Operation& operation, + reconcile.operations()) { + Option<SlaveID> slaveId = None(); + if (operation.has_slave_id()) { + slaveId = operation.slave_id(); + } + + Option<Operation*> frameworkOperation = + framework->getOperation(operation.operation_id()); + + OperationStatus* status = response.add_operation_statuses(); + if (frameworkOperation.isSome()) { + // (1) Operation is known: resend the latest status sent to the framework. + if (frameworkOperation.get()->statuses().empty()) { + // This can happen if the operation is pending. + *status = frameworkOperation.get()->latest_status(); + } else { + *status = *frameworkOperation.get()->statuses().rbegin(); + } + } else if (slaveId.isSome() && slaves.recovered.contains(slaveId.get())) { + // (2) Operation is unknown, slave is recovered: OPERATION_RECOVERING. + *status = protobuf::createOperationStatus( + OperationState::OPERATION_RECOVERING, + operation.operation_id(), + "Reconciliation: Agent is recovered but has not re-registered"); + } else if (slaveId.isSome() && slaves.registered.contains(slaveId.get())) { + // (3) Operation is unknown, slave is registered: OPERATION_UNKNOWN. + *status = protobuf::createOperationStatus( + OperationState::OPERATION_UNKNOWN, + operation.operation_id(), + "Reconciliation: Operation is unknown"); + } else if (slaveId.isSome() && slaves.unreachable.contains(slaveId.get())) { + // (4) Operation is unknown, slave is unreachable: OPERATION_UNREACHABLE. + *status = protobuf::createOperationStatus( + OperationState::OPERATION_UNREACHABLE, + operation.operation_id(), + "Reconciliation: Agent is unreachable"); + } else if (slaveId.isSome() && slaves.gone.contains(slaveId.get())) { + // (5) Operation is unknown, slave is gone: OPERATION_GONE_BY_OPERATOR. + *status = protobuf::createOperationStatus( + OperationState::OPERATION_GONE_BY_OPERATOR, + operation.operation_id(), + "Reconciliation: Agent marked gone by operator"); + } else if (slaveId.isSome()) { + // (6) Operation is unknown, slave is unknown: OPERATION_UNKNOWN. + *status = protobuf::createOperationStatus( + OperationState::OPERATION_UNKNOWN, + operation.operation_id(), + "Reconciliation: Both operation and agent are unknown"); + } else { + // (7) Operation is unknown, slave is unknown: OPERATION_UNKNOWN. + *status = protobuf::createOperationStatus( + OperationState::OPERATION_UNKNOWN, + operation.operation_id(), + "Reconciliation: Operation is unknown and no 'agent_id' was" + " provided"); + } + } + + return response; +} void Master::frameworkFailoverTimeout(const FrameworkID& frameworkId, http://git-wip-us.apache.org/repos/asf/mesos/blob/d2616908/src/master/master.hpp ---------------------------------------------------------------------- diff --git a/src/master/master.hpp b/src/master/master.hpp index 0d9620d..a7cadd9 100644 --- a/src/master/master.hpp +++ b/src/master/master.hpp @@ -1076,7 +1076,7 @@ private: Framework* framework, scheduler::Call::Reconcile&& reconcile); - void reconcileOperations( + scheduler::Response::ReconcileOperations reconcileOperations( Framework* framework, const scheduler::Call::ReconcileOperations& reconcile); @@ -1734,6 +1734,11 @@ private: process::Future<process::http::Response> _markAgentGone( const SlaveID& slaveId) const; + process::Future<process::http::Response> reconcileOperations( + Framework* framework, + const scheduler::Call::ReconcileOperations& call, + ContentType contentType) const; + Master* master; // NOTE: The quota specific pieces of the Operator API are factored @@ -2549,6 +2554,20 @@ struct Framework } } + Option<Operation*> getOperation(const OperationID& id) { + Option<UUID> uuid = operationUUIDs.get(id); + + if (uuid.isNone()) { + return None(); + } + + Option<Operation*> operation = operations.get(uuid.get()); + + CHECK_SOME(operation); + + return operation; + } + void recoverResources(Operation* operation) { CHECK(operation->has_slave_id()) http://git-wip-us.apache.org/repos/asf/mesos/blob/d2616908/src/messages/messages.proto ---------------------------------------------------------------------- diff --git a/src/messages/messages.proto b/src/messages/messages.proto index 556801d..41e6a8a 100644 --- a/src/messages/messages.proto +++ b/src/messages/messages.proto @@ -471,10 +471,9 @@ message ReconcileTasksMessage { /** - * The master uses this message to query an agent about the state of - * one or more operations. This is useful to resolve - * discrepancies between the master and agent's view after agent - * reregistration. + * The master uses this message to query an agent about the state of one or + * more operations. This is useful to resolve discrepancies between the master + * and agent's view after agent reregistration. */ message ReconcileOperationsMessage { message Operation {
