Repository: mesos Updated Branches: refs/heads/master 366b27af2 -> 3711d66aa
Made the master send operation status updates when dropping operations. This patch makes the master send an operation status update to the framework with status `OPERATION_ERROR` when an operation with an operation ID is dropped. Review: https://reviews.apache.org/r/66679/ Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/3711d66a Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/3711d66a Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/3711d66a Branch: refs/heads/master Commit: 3711d66aa9eb70e12b184d3c2f79bf56fbd9cffa Parents: 20a9732 Author: Gaston Kleiman <[email protected]> Authored: Mon Apr 23 14:22:26 2018 -0700 Committer: Greg Mann <[email protected]> Committed: Mon Apr 23 14:44:22 2018 -0700 ---------------------------------------------------------------------- src/master/master.cpp | 96 ++++++++++++++++++---------------------------- 1 file changed, 37 insertions(+), 59 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/3711d66a/src/master/master.cpp ---------------------------------------------------------------------- diff --git a/src/master/master.cpp b/src/master/master.cpp index 545a4d7..c723a29 100644 --- a/src/master/master.cpp +++ b/src/master/master.cpp @@ -2277,13 +2277,22 @@ void Master::drop( // TODO(jieyu): Increment a metric. - // NOTE: There is no direct feedback to the framework when an - // operation is dropped. The framework will find out that the - // operation was dropped through subsequent offers. - LOG(WARNING) << "Dropping " << Offer::Operation::Type_Name(operation.type()) << " operation from framework " << *framework << ": " << message; + + if (operation.has_id()) { + scheduler::Event update; + update.set_type(scheduler::Event::UPDATE_OPERATION_STATUS); + + *update.mutable_update_operation_status()->mutable_status() = + protobuf::createOperationStatus( + OperationState::OPERATION_ERROR, + operation.id(), + message); + + framework->send(update); + } } @@ -3983,20 +3992,9 @@ void Master::accept( foreach (const Offer::Operation& operation, accept.operations()) { if (operation.type() != Offer::Operation::LAUNCH && operation.type() != Offer::Operation::LAUNCH_GROUP) { - if (operation.has_id()) { - scheduler::Event update; - update.set_type(scheduler::Event::UPDATE_OPERATION_STATUS); - - *update.mutable_update_operation_status()->mutable_status() = - protobuf::createOperationStatus( - OperationState::OPERATION_DROPPED, - operation.id(), - "Operation attempted with invalid offers: " + - error->message); - - framework->send(update); - } - + drop(framework, + operation, + "Operation attempted with invalid offers: " + error->message); continue; } @@ -4044,22 +4042,22 @@ void Master::accept( // Validate and upgrade all of the resources in `accept.operations`: // - // For a RESERVE, UNRESERVE, CREATE, or DESTROY operation - // which contains invalid resources, + // For an operation except LAUNCH and LAUNCH_GROUP which contains invalid + // resources, // - if the framework has elected to receive feedback by setting the `id` // field, then we send an offer operation status update with a state of - // OFFER_OPERATION_ERROR. - // - if the framework has not set the `id` field, - // then we simply drop the operation. + // OPERATION_ERROR. + // - if the framework has not set the `id` field, then we simply drop the + // operation. // - // If a LAUNCH or LAUNCH_GROUP operation contains invalid - // resources, we send a TASK_ERROR status update per task. + // If a LAUNCH or LAUNCH_GROUP operation contains invalid resources, we send + // a TASK_ERROR status update per task. // // // If the framework is requesting offer operation status updates by setting // the `id` field in an operation, then also verify that the relevant agent // has the RESOURCE_PROVIDER capability. If it does not, then send an offer - // operation status update with a state of OFFER_OPERATION_ERROR. + // operation status update with a state of OPERATION_ERROR. // // LAUNCH and LAUNCH_GROUP operations cannot receive offer operation status, // updates, so we send a TASK_ERROR status update per task when these @@ -4112,21 +4110,10 @@ void Master::accept( case Offer::Operation::DESTROY_VOLUME: case Offer::Operation::CREATE_BLOCK: case Offer::Operation::DESTROY_BLOCK: { - if (operation.has_id()) { - scheduler::Event update; - update.set_type(scheduler::Event::UPDATE_OPERATION_STATUS); - - *update.mutable_update_operation_status()->mutable_status() = - protobuf::createOperationStatus( - OperationState::OPERATION_ERROR, - operation.id(), - "Operation attempted with invalid resources: " + - error->message); - - framework->send(update); - } - - drop(framework, operation, error->message); + drop(framework, + operation, + "Operation attempted with invalid resources: " + + error->message); break; } case Offer::Operation::LAUNCH: { @@ -4189,7 +4176,10 @@ void Master::accept( "The 'id' field was set in an offer operation, but operation" " feedback is not supported for the SchedulerDriver API"; - drop(framework, operation, message); + LOG(WARNING) << "Dropping " + << Offer::Operation::Type_Name(operation.type()) + << " operation from framework " << *framework << ": " + << message; // Send an error which will cause the scheduler driver to abort. FrameworkErrorMessage frameworkError; @@ -4202,23 +4192,11 @@ void Master::accept( } if (!slave->capabilities.resourceProvider) { - const string message = - "Operation requested feedback, but agent " + - stringify(slaveId.get()) + - " does not have the required RESOURCE_PROVIDER capability"; - - scheduler::Event update; - update.set_type(scheduler::Event::UPDATE_OPERATION_STATUS); - - *update.mutable_update_operation_status()->mutable_status() = - protobuf::createOperationStatus( - OperationState::OPERATION_ERROR, - operation.id(), - message); - - framework->send(update); - - drop(framework, operation, message); + drop(framework, + operation, + "Operation requested feedback, but agent " + + stringify(slaveId.get()) + + " does not have the required RESOURCE_PROVIDER capability"); break; }
