This is an automated email from the ASF dual-hosted git repository. josephwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/mesos.git
commit df9be400cefe99f73fb5f2af9c51127caede845c Author: Joseph Wu <[email protected]> AuthorDate: Tue Feb 12 14:58:33 2019 -0800 Handled terminal operation status updates for orphans. When an orphaned operation is transitioned from non-terminal to terminal, the operation's resources must be added back to the agent's total resources, while accounting for resource conversion if the operation is successful. There is an odd case where an orphan is transitioned to terminal via an UpdateSlaveMessage (instead of UpdateOperationStatusMessage). When this happens, the required resource math is actually done by the agent. Review: https://reviews.apache.org/r/69963 --- src/master/master.cpp | 173 ++++++++++++++++++++++++++++++++++---------------- 1 file changed, 120 insertions(+), 53 deletions(-) diff --git a/src/master/master.cpp b/src/master/master.cpp index 54f96f9..de1ba56 100644 --- a/src/master/master.cpp +++ b/src/master/master.cpp @@ -11598,77 +11598,144 @@ void Master::updateOperation( Slave* slave = slaves.registered.get(operation->slave_id()); CHECK_NOTNULL(slave); - switch (operation->latest_status().state()) { - // Terminal state, and the conversion is successful. - case OPERATION_FINISHED: { - const Resources converted = - operation->latest_status().converted_resources(); - - if (convertResources) { - allocator->updateAllocation( - operation->framework_id(), - operation->slave_id(), - consumed.get(), - {ResourceConversion(consumed.get(), converted)}); + // Orphaned operations are handled differently, because the allocator + // has no knowledge of resources consumed by these operations; + // and any resource consumption is accounted for in the agent's total + // resources. + if (slave->orphanedOperations.contains(operation->uuid())) { + bool updated = false; + + switch (operation->latest_status().state()) { + // Terminal state, and the conversion is successful. + case OPERATION_FINISHED: { + const Resources converted = + operation->latest_status().converted_resources(); + + if (convertResources) { + Resources convertedUnallocated = converted; + convertedUnallocated.unallocate(); + + slave->totalResources += convertedUnallocated; + updated = true; + } else { + // NOTE: This is only reachable when an existing orphan operation + // is transitioned to a terminal status via an UpdateSlaveMessage. + // When this happens, the `slave->totalResources` already contains + // the converted resources. + // The handling for normal operations must recover the consumed + // resources from the allocator. We cannot mirror this resource + // recovery (i.e. `slave->totalResources += consumed.get()`) because + // the resource has already been converted and no longer exists. + } - allocator->recoverResources( - operation->framework_id(), - operation->slave_id(), - converted, - None()); + break; + } + // Terminal state, and the conversion has failed. + case OPERATION_DROPPED: + case OPERATION_ERROR: + case OPERATION_FAILED: + case OPERATION_GONE_BY_OPERATOR: { Resources consumedUnallocated = consumed.get(); consumedUnallocated.unallocate(); - Resources convertedUnallocated = converted; - convertedUnallocated.unallocate(); + slave->totalResources += consumedUnallocated; + updated = true; + break; + } + + // Non-terminal or not expected from an agent. This shouldn't happen. + case OPERATION_UNSUPPORTED: + case OPERATION_PENDING: + case OPERATION_UNREACHABLE: + case OPERATION_RECOVERING: + case OPERATION_UNKNOWN: { + LOG(FATAL) << "Unexpected operation state " + << operation->latest_status().state(); - slave->apply( - {ResourceConversion(consumedUnallocated, convertedUnallocated)}); - } else { + break; + } + } + + // If we've added resources to the agent's total, the allocator + // must be informed about the new totals. + if (updated) { + allocator->updateSlave(slave->id, slave->info, slave->totalResources); + } + + } else { + switch (operation->latest_status().state()) { + // Terminal state, and the conversion is successful. + case OPERATION_FINISHED: { + const Resources converted = + operation->latest_status().converted_resources(); + + if (convertResources) { + allocator->updateAllocation( + operation->framework_id(), + operation->slave_id(), + consumed.get(), + {ResourceConversion(consumed.get(), converted)}); + + allocator->recoverResources( + operation->framework_id(), + operation->slave_id(), + converted, + None()); + + Resources consumedUnallocated = consumed.get(); + consumedUnallocated.unallocate(); + + Resources convertedUnallocated = converted; + convertedUnallocated.unallocate(); + + slave->apply( + {ResourceConversion(consumedUnallocated, convertedUnallocated)}); + } else { + allocator->recoverResources( + operation->framework_id(), + operation->slave_id(), + consumed.get(), + None()); + } + + break; + } + + // Terminal state, and the conversion has failed. + case OPERATION_DROPPED: + case OPERATION_ERROR: + case OPERATION_FAILED: + case OPERATION_GONE_BY_OPERATOR: { allocator->recoverResources( operation->framework_id(), operation->slave_id(), consumed.get(), None()); - } - - break; - } - // Terminal state, and the conversion has failed. - case OPERATION_DROPPED: - case OPERATION_ERROR: - case OPERATION_FAILED: - case OPERATION_GONE_BY_OPERATOR: { - allocator->recoverResources( - operation->framework_id(), - operation->slave_id(), - consumed.get(), - None()); - - break; - } + break; + } - // Non-terminal or not expected from an agent. This shouldn't happen. - case OPERATION_UNSUPPORTED: - case OPERATION_PENDING: - case OPERATION_UNREACHABLE: - case OPERATION_RECOVERING: - case OPERATION_UNKNOWN: { - LOG(FATAL) << "Unexpected operation state " - << operation->latest_status().state(); + // Non-terminal or not expected from an agent. This shouldn't happen. + case OPERATION_UNSUPPORTED: + case OPERATION_PENDING: + case OPERATION_UNREACHABLE: + case OPERATION_RECOVERING: + case OPERATION_UNKNOWN: { + LOG(FATAL) << "Unexpected operation state " + << operation->latest_status().state(); - break; + break; + } } - } - slave->recoverResources(operation); + slave->recoverResources(operation); - Framework* framework = getFramework(operation->framework_id()); + Framework* framework = getFramework(operation->framework_id()); - if (framework != nullptr) { - framework->recoverResources(operation); + if (framework != nullptr) { + framework->recoverResources(operation); + } } }
