This is an automated email from the ASF dual-hosted git repository. josephwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/mesos.git
commit fd9d99cea0915b6006a396a5926322ca1352ccd4 Author: Joseph Wu <[email protected]> AuthorDate: Tue Feb 12 16:08:32 2019 -0800 Added a recovery path for orphan operation. An orphan can be recovered if the originating framework reregisters with the master. When this happens, the resource accounting is reversed and resources are added back to the agent's total and the allocator. Review: https://reviews.apache.org/r/69967 --- src/master/master.cpp | 57 +++++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 48 insertions(+), 9 deletions(-) diff --git a/src/master/master.cpp b/src/master/master.cpp index de1ba56..1e04d82 100644 --- a/src/master/master.cpp +++ b/src/master/master.cpp @@ -10283,19 +10283,58 @@ void Master::recoverFramework( } } - foreachvalue (Operation* operation, slave->operations) { + // Combine all the operations of the agent into one list + // so they can be processed the same way. + vector<Operation*> allOperations = slave->operations.values(); + foreachvalue (const Slave::ResourceProvider& resourceProvider, + slave->resourceProviders) { + foreachvalue (Operation* operation, resourceProvider.operations) { + allOperations.push_back(operation); + } + } + + foreach (Operation* operation, allOperations) { if (operation->has_framework_id() && operation->framework_id() == framework->id()) { framework->addOperation(operation); - } - } - foreachvalue (const Slave::ResourceProvider& resourceProvider, - slave->resourceProviders) { - foreachvalue (Operation* operation, resourceProvider.operations) { - if (operation->has_framework_id() && - operation->framework_id() == framework->id()) { - framework->addOperation(operation); + // If this is an orphaned operation, the orphan's resources + // must be added back to the agent's total, and the allocator + // will need to be updated with the new total and allocation. + if (slave->orphanedOperations.contains(operation->uuid())) { + LOG(INFO) + << "Recovered orphan operation " << operation->uuid() + << (operation->info().has_id() + ? " (ID: " + operation->info().id().value() + ")" + : "") + << " on agent " << operation->slave_id() + << " belonging to framework " << operation->framework_id() + << " in state " << operation->latest_status().state(); + + slave->orphanedOperations.erase(operation->uuid()); + + // A terminal orphan operation is one whose resources are no longer + // allocated, but the terminal status has yet to be acknowledged. + // The operation will be removed once this framework acknowledges it. + if (protobuf::isTerminalState(operation->latest_status().state())) { + continue; + } + + Try<Resources> consumed = + protobuf::getConsumedResources(operation->info()); + + CHECK_SOME(consumed); + + Resources consumedUnallocated = consumed.get(); + consumedUnallocated.unallocate(); + + slave->totalResources += consumedUnallocated; + slave->usedResources[framework->id()] += consumed.get(); + + allocator->updateSlave(slave->id, slave->info, slave->totalResources); + + // NOTE: The allocation of these orphan operation resources will be + // updated in `addFramework` below. } } }
