Repository: mesos
Updated Branches:
  refs/heads/master 366b27af2 -> 3711d66aa


Made the master send operation status updates when dropping operations.

This patch makes the master send an operation status update to the
framework with status `OPERATION_ERROR` when an operation with an
operation ID is dropped.

Review: https://reviews.apache.org/r/66679/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/3711d66a
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/3711d66a
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/3711d66a

Branch: refs/heads/master
Commit: 3711d66aa9eb70e12b184d3c2f79bf56fbd9cffa
Parents: 20a9732
Author: Gaston Kleiman <[email protected]>
Authored: Mon Apr 23 14:22:26 2018 -0700
Committer: Greg Mann <[email protected]>
Committed: Mon Apr 23 14:44:22 2018 -0700

----------------------------------------------------------------------
 src/master/master.cpp | 96 ++++++++++++++++++----------------------------
 1 file changed, 37 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/3711d66a/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 545a4d7..c723a29 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -2277,13 +2277,22 @@ void Master::drop(
 
   // TODO(jieyu): Increment a metric.
 
-  // NOTE: There is no direct feedback to the framework when an
-  // operation is dropped. The framework will find out that the
-  // operation was dropped through subsequent offers.
-
   LOG(WARNING) << "Dropping " << Offer::Operation::Type_Name(operation.type())
                << " operation from framework " << *framework
                << ": " << message;
+
+  if (operation.has_id()) {
+    scheduler::Event update;
+    update.set_type(scheduler::Event::UPDATE_OPERATION_STATUS);
+
+    *update.mutable_update_operation_status()->mutable_status() =
+      protobuf::createOperationStatus(
+          OperationState::OPERATION_ERROR,
+          operation.id(),
+          message);
+
+    framework->send(update);
+  }
 }
 
 
@@ -3983,20 +3992,9 @@ void Master::accept(
     foreach (const Offer::Operation& operation, accept.operations()) {
       if (operation.type() != Offer::Operation::LAUNCH &&
           operation.type() != Offer::Operation::LAUNCH_GROUP) {
-        if (operation.has_id()) {
-          scheduler::Event update;
-          update.set_type(scheduler::Event::UPDATE_OPERATION_STATUS);
-
-          *update.mutable_update_operation_status()->mutable_status() =
-            protobuf::createOperationStatus(
-                OperationState::OPERATION_DROPPED,
-                operation.id(),
-                "Operation attempted with invalid offers: " +
-                  error->message);
-
-          framework->send(update);
-        }
-
+        drop(framework,
+             operation,
+             "Operation attempted with invalid offers: " + error->message);
         continue;
       }
 
@@ -4044,22 +4042,22 @@ void Master::accept(
 
   // Validate and upgrade all of the resources in `accept.operations`:
   //
-  // For a RESERVE, UNRESERVE, CREATE, or DESTROY operation
-  // which contains invalid resources,
+  // For an operation except LAUNCH and LAUNCH_GROUP which contains invalid
+  // resources,
   //   - if the framework has elected to receive feedback by setting the `id`
   //     field, then we send an offer operation status update with a state of
-  //     OFFER_OPERATION_ERROR.
-  //   - if the framework has not set the `id` field,
-  //     then we simply drop the operation.
+  //     OPERATION_ERROR.
+  //   - if the framework has not set the `id` field, then we simply drop the
+  //     operation.
   //
-  // If a LAUNCH or LAUNCH_GROUP operation contains invalid
-  // resources, we send a TASK_ERROR status update per task.
+  // If a LAUNCH or LAUNCH_GROUP operation contains invalid resources, we send
+  // a TASK_ERROR status update per task.
   //
   //
   // If the framework is requesting offer operation status updates by setting
   // the `id` field in an operation, then also verify that the relevant agent
   // has the RESOURCE_PROVIDER capability. If it does not, then send an offer
-  // operation status update with a state of OFFER_OPERATION_ERROR.
+  // operation status update with a state of OPERATION_ERROR.
   //
   // LAUNCH and LAUNCH_GROUP operations cannot receive offer operation status,
   // updates, so we send a TASK_ERROR status update per task when these
@@ -4112,21 +4110,10 @@ void Master::accept(
           case Offer::Operation::DESTROY_VOLUME:
           case Offer::Operation::CREATE_BLOCK:
           case Offer::Operation::DESTROY_BLOCK: {
-            if (operation.has_id()) {
-              scheduler::Event update;
-              update.set_type(scheduler::Event::UPDATE_OPERATION_STATUS);
-
-              *update.mutable_update_operation_status()->mutable_status() =
-                protobuf::createOperationStatus(
-                    OperationState::OPERATION_ERROR,
-                    operation.id(),
-                    "Operation attempted with invalid resources: " +
-                      error->message);
-
-              framework->send(update);
-            }
-
-            drop(framework, operation, error->message);
+            drop(framework,
+                 operation,
+                 "Operation attempted with invalid resources: " +
+                 error->message);
             break;
           }
           case Offer::Operation::LAUNCH: {
@@ -4189,7 +4176,10 @@ void Master::accept(
                 "The 'id' field was set in an offer operation, but operation"
                 " feedback is not supported for the SchedulerDriver API";
 
-              drop(framework, operation, message);
+              LOG(WARNING) << "Dropping "
+                           << Offer::Operation::Type_Name(operation.type())
+                           << " operation from framework " << *framework << ": 
"
+                           << message;
 
               // Send an error which will cause the scheduler driver to abort.
               FrameworkErrorMessage frameworkError;
@@ -4202,23 +4192,11 @@ void Master::accept(
             }
 
             if (!slave->capabilities.resourceProvider) {
-              const string message =
-                "Operation requested feedback, but agent " +
-                stringify(slaveId.get()) +
-                " does not have the required RESOURCE_PROVIDER capability";
-
-              scheduler::Event update;
-              update.set_type(scheduler::Event::UPDATE_OPERATION_STATUS);
-
-              *update.mutable_update_operation_status()->mutable_status() =
-                protobuf::createOperationStatus(
-                    OperationState::OPERATION_ERROR,
-                    operation.id(),
-                    message);
-
-              framework->send(update);
-
-              drop(framework, operation, message);
+              drop(framework,
+                   operation,
+                   "Operation requested feedback, but agent " +
+                   stringify(slaveId.get()) +
+                   " does not have the required RESOURCE_PROVIDER capability");
               break;
             }
 

Reply via email to