Made master reconcile known offer operations with agent. In cases where the agent fails over or where an `UpdateSlaveMessage` races with an `ApplyOfferOperationMessage`, it's possible that the master knows about an offer operation which is not contained in an `UpdateSlaveMessage`. In such cases, the master should send a `ReconcileOfferOperations` message to the agent. The agent will then respond by sending OFFER_OPERATION_DROPPED status updates for any operations which it does not know about.
Review: https://reviews.apache.org/r/64464/ Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/5c91546b Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/5c91546b Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/5c91546b Branch: refs/heads/master Commit: 5c91546babf59a42c4e3fc98d5c712e8d1ddd3d3 Parents: 9d7da9b Author: Greg Mann <[email protected]> Authored: Tue Dec 12 16:18:41 2017 -0800 Committer: Greg Mann <[email protected]> Committed: Tue Dec 12 16:55:47 2017 -0800 ---------------------------------------------------------------------- src/master/master.cpp | 50 ++++++++++++++++++++++++++++------------------ 1 file changed, 31 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/5c91546b/src/master/master.cpp ---------------------------------------------------------------------- diff --git a/src/master/master.cpp b/src/master/master.cpp index efe8b8f..806fbc2 100644 --- a/src/master/master.cpp +++ b/src/master/master.cpp @@ -7575,6 +7575,8 @@ void Master::updateSlave(const UpdateSlaveMessage& message) } } + ReconcileOfferOperationsMessage reconcile; + // Update master and allocator state. foreachpair ( const Option<ResourceProviderID>& providerId, @@ -7637,29 +7639,35 @@ void Master::updateSlave(const UpdateSlaveMessage& message) } else { // If this is a known resource provider or agent its total capacity cannot // have changed, and it would not know about any non-terminal offer - // operations not already known to the master. It might however have not - // received an offer operations since the resource provider or agent fell - // over before the message could be received. We need to remove these - // operations from our state. - - // Reconcile offer operations. This includes recovering - // resources in used by operations which did not reach the - // agent or resource provider. + // operations not already known to the master. However, it might not have + // received an offer operation for a couple different reasons: + // - The resource provider or agent could have failed over before the + // operation's `ApplyOfferOperationMessage` could be received. + // - The operation's `ApplyOfferOperationMessage` could have raced with + // this `UpdateSlaveMessage`. + // + // In both of these cases, we need to reconcile such operations explicitly + // with the agent. For operations which the agent or resource provider + // does not recognize, an OFFER_OPERATION_DROPPED status update will be + // generated and the master will remove the operation from its state upon + // receipt of that update. if (provider.oldOfferOperations.isSome()) { foreachkey (const UUID& uuid, provider.oldOfferOperations.get()) { if (provider.newOfferOperations.isNone() || !provider.newOfferOperations->contains(uuid)) { - // TODO(bbannier): Instead of simply dropping an operation with - // `removeOfferOperation` here we should instead send a `Reconcile` - // message with a failed state to the agent so its status update - // manager can reliably deliver the operation status to the - // framework. - LOG(WARNING) << "Dropping known offer operation " << uuid.toString() - << " since it was not present in reconciliation " - "message from agent"; - - CHECK(slave->offerOperations.contains(uuid)); - removeOfferOperation(slave->offerOperations.at(uuid)); + LOG(WARNING) << "Performing explicit reconciliation with agent for" + << " known offer operation " << uuid.toString() + << " since it was not present in original" + << " reconciliation message from agent"; + + ReconcileOfferOperationsMessage::Operation* reconcileOperation = + reconcile.add_operations(); + reconcileOperation->set_operation_uuid(uuid.toBytes()); + + if (providerId.isSome()) { + reconcileOperation->mutable_resource_provider_id() + ->CopyFrom(providerId.get()); + } } } } @@ -7679,6 +7687,10 @@ void Master::updateSlave(const UpdateSlaveMessage& message) } } + if (reconcile.operations_size() > 0) { + send(slave->pid, reconcile); + } + // Now update the agent's state and total resources in the allocator. allocator->updateSlave(slaveId, slave->info, slave->totalResources);
