This is an automated email from the ASF dual-hosted git repository.

chhsiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit 087b2358496f08dd6a8b8d9852793977a7c574e5
Author: Chun-Hung Hsiao <[email protected]>
AuthorDate: Tue Jan 29 21:28:48 2019 -0800

    Persisted intentionally dropped operations in SLRP.
    
    If an operation is dropped intentionally (e.g., because of a resource
    version mismatch), the operation should be persisted so no conflicting
    status update would be generated for operation reconciliation.
    
    Review: https://reviews.apache.org/r/69858
---
 src/resource_provider/storage/provider.cpp         | 57 ++++++++++++++++------
 src/resource_provider/storage/provider_process.hpp |  5 +-
 2 files changed, 44 insertions(+), 18 deletions(-)

diff --git a/src/resource_provider/storage/provider.cpp 
b/src/resource_provider/storage/provider.cpp
index 09a710d..45aea6e 100644
--- a/src/resource_provider/storage/provider.cpp
+++ b/src/resource_provider/storage/provider.cpp
@@ -1757,6 +1757,7 @@ void 
StorageLocalResourceProviderProcess::reconcileOperations(
       continue;
     }
 
+    // TODO(chhsiao): Consider sending `OPERATION_UNKNOWN` instead.
     dropOperation(
         uuid.get(),
         None(),
@@ -2883,32 +2884,58 @@ void StorageLocalResourceProviderProcess::dropOperation(
   LOG(WARNING)
     << "Dropping operation (uuid: " << operationUuid << "): " << message;
 
+  CHECK(!operations.contains(operationUuid));
+
   UpdateOperationStatusMessage update =
     protobuf::createUpdateOperationStatusMessage(
         protobuf::createUUID(operationUuid),
         protobuf::createOperationStatus(
             OPERATION_DROPPED,
-            operation.isSome() && operation->has_id()
-              ? operation->id() : Option<OperationID>::none(),
+            None(),
             message,
             None(),
-            id::UUID::random(),
+            None(),
             slaveId,
             info.id()),
         None(),
         frameworkId,
         slaveId);
 
-  auto die = [=](const string& message) {
-    LOG(ERROR)
-      << "Failed to update status of operation (uuid: " << operationUuid
-      << "): " << message;
-    fatal();
-  };
+  if (operation.isSome()) {
+    // This operation is dropped intentionally. We have to persist the 
operation
+    // in the resource provider state and retry the status update.
+    *update.mutable_status()->mutable_uuid() = protobuf::createUUID();
+    if (operation->has_id()) {
+      *update.mutable_status()->mutable_operation_id() = operation->id();
+    }
 
-  statusUpdateManager.update(std::move(update), false)
-    .onFailed(defer(self(), std::bind(die, lambda::_1)))
-    .onDiscarded(defer(self(), std::bind(die, "future discarded")));
+    operations[operationUuid] = protobuf::createOperation(
+        operation.get(),
+        update.status(),
+        frameworkId,
+        slaveId,
+        update.operation_uuid());
+
+    checkpointResourceProviderState();
+
+    auto die = [=](const string& message) {
+      LOG(ERROR)
+        << "Failed to update status of operation (uuid: " << operationUuid
+        << "): " << message;
+      fatal();
+    };
+
+    statusUpdateManager.update(std::move(update))
+      .onFailed(defer(self(), std::bind(die, lambda::_1)))
+      .onDiscarded(defer(self(), std::bind(die, "future discarded")));
+  } else {
+    // This operation is unknown to the resource provider because of a
+    // disconnection, and is being asked for reconciliation. In this case, we
+    // send a status update without a retry. If it is dropped because of 
another
+    // disconnection, another reconciliation will be triggered by the master
+    // after a reregistration.
+    sendOperationStatusUpdate(std::move(update));
+  }
 
   ++metrics.operations_dropped.at(
       operation.isSome() ? operation->type() : Offer::Operation::UNKNOWN);
@@ -3342,9 +3369,9 @@ void 
StorageLocalResourceProviderProcess::sendOperationStatusUpdate(
     update->mutable_framework_id()->CopyFrom(_update.framework_id());
   }
 
-  // The latest status should have been set by the status update manager.
-  CHECK(_update.has_latest_status());
-  update->mutable_latest_status()->CopyFrom(_update.latest_status());
+  if (_update.has_latest_status()) {
+    update->mutable_latest_status()->CopyFrom(_update.latest_status());
+  }
 
   auto err = [](const id::UUID& uuid, const string& message) {
     LOG(ERROR)
diff --git a/src/resource_provider/storage/provider_process.hpp 
b/src/resource_provider/storage/provider_process.hpp
index 36187fb..adc4651 100644
--- a/src/resource_provider/storage/provider_process.hpp
+++ b/src/resource_provider/storage/provider_process.hpp
@@ -295,7 +295,8 @@ private:
   // applied. Do nothing if the operation is already in a terminal state.
   process::Future<Nothing> _applyOperation(const id::UUID& operationUuid);
 
-  // Sends `OPERATION_DROPPED` without checkpointing the operation status.
+  // Sends `OPERATION_DROPPED` status update. The operation status will be
+  // checkpointed if `operation` is set.
   void dropOperation(
       const id::UUID& operationUuid,
       const Option<FrameworkID>& frameworkId,
@@ -323,8 +324,6 @@ private:
 
   void sendResourceProviderStateUpdate();
 
-  // NOTE: This is a callback for the status update manager and should
-  // not be called directly.
   void sendOperationStatusUpdate(
       const UpdateOperationStatusMessage& update);
 

Reply via email to