This is an automated email from the ASF dual-hosted git repository. chhsiao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/mesos.git
commit 087b2358496f08dd6a8b8d9852793977a7c574e5 Author: Chun-Hung Hsiao <[email protected]> AuthorDate: Tue Jan 29 21:28:48 2019 -0800 Persisted intentionally dropped operations in SLRP. If an operation is dropped intentionally (e.g., because of a resource version mismatch), the operation should be persisted so no conflicting status update would be generated for operation reconciliation. Review: https://reviews.apache.org/r/69858 --- src/resource_provider/storage/provider.cpp | 57 ++++++++++++++++------ src/resource_provider/storage/provider_process.hpp | 5 +- 2 files changed, 44 insertions(+), 18 deletions(-) diff --git a/src/resource_provider/storage/provider.cpp b/src/resource_provider/storage/provider.cpp index 09a710d..45aea6e 100644 --- a/src/resource_provider/storage/provider.cpp +++ b/src/resource_provider/storage/provider.cpp @@ -1757,6 +1757,7 @@ void StorageLocalResourceProviderProcess::reconcileOperations( continue; } + // TODO(chhsiao): Consider sending `OPERATION_UNKNOWN` instead. dropOperation( uuid.get(), None(), @@ -2883,32 +2884,58 @@ void StorageLocalResourceProviderProcess::dropOperation( LOG(WARNING) << "Dropping operation (uuid: " << operationUuid << "): " << message; + CHECK(!operations.contains(operationUuid)); + UpdateOperationStatusMessage update = protobuf::createUpdateOperationStatusMessage( protobuf::createUUID(operationUuid), protobuf::createOperationStatus( OPERATION_DROPPED, - operation.isSome() && operation->has_id() - ? operation->id() : Option<OperationID>::none(), + None(), message, None(), - id::UUID::random(), + None(), slaveId, info.id()), None(), frameworkId, slaveId); - auto die = [=](const string& message) { - LOG(ERROR) - << "Failed to update status of operation (uuid: " << operationUuid - << "): " << message; - fatal(); - }; + if (operation.isSome()) { + // This operation is dropped intentionally. We have to persist the operation + // in the resource provider state and retry the status update. + *update.mutable_status()->mutable_uuid() = protobuf::createUUID(); + if (operation->has_id()) { + *update.mutable_status()->mutable_operation_id() = operation->id(); + } - statusUpdateManager.update(std::move(update), false) - .onFailed(defer(self(), std::bind(die, lambda::_1))) - .onDiscarded(defer(self(), std::bind(die, "future discarded"))); + operations[operationUuid] = protobuf::createOperation( + operation.get(), + update.status(), + frameworkId, + slaveId, + update.operation_uuid()); + + checkpointResourceProviderState(); + + auto die = [=](const string& message) { + LOG(ERROR) + << "Failed to update status of operation (uuid: " << operationUuid + << "): " << message; + fatal(); + }; + + statusUpdateManager.update(std::move(update)) + .onFailed(defer(self(), std::bind(die, lambda::_1))) + .onDiscarded(defer(self(), std::bind(die, "future discarded"))); + } else { + // This operation is unknown to the resource provider because of a + // disconnection, and is being asked for reconciliation. In this case, we + // send a status update without a retry. If it is dropped because of another + // disconnection, another reconciliation will be triggered by the master + // after a reregistration. + sendOperationStatusUpdate(std::move(update)); + } ++metrics.operations_dropped.at( operation.isSome() ? operation->type() : Offer::Operation::UNKNOWN); @@ -3342,9 +3369,9 @@ void StorageLocalResourceProviderProcess::sendOperationStatusUpdate( update->mutable_framework_id()->CopyFrom(_update.framework_id()); } - // The latest status should have been set by the status update manager. - CHECK(_update.has_latest_status()); - update->mutable_latest_status()->CopyFrom(_update.latest_status()); + if (_update.has_latest_status()) { + update->mutable_latest_status()->CopyFrom(_update.latest_status()); + } auto err = [](const id::UUID& uuid, const string& message) { LOG(ERROR) diff --git a/src/resource_provider/storage/provider_process.hpp b/src/resource_provider/storage/provider_process.hpp index 36187fb..adc4651 100644 --- a/src/resource_provider/storage/provider_process.hpp +++ b/src/resource_provider/storage/provider_process.hpp @@ -295,7 +295,8 @@ private: // applied. Do nothing if the operation is already in a terminal state. process::Future<Nothing> _applyOperation(const id::UUID& operationUuid); - // Sends `OPERATION_DROPPED` without checkpointing the operation status. + // Sends `OPERATION_DROPPED` status update. The operation status will be + // checkpointed if `operation` is set. void dropOperation( const id::UUID& operationUuid, const Option<FrameworkID>& frameworkId, @@ -323,8 +324,6 @@ private: void sendResourceProviderStateUpdate(); - // NOTE: This is a callback for the status update manager and should - // not be called directly. void sendOperationStatusUpdate( const UpdateOperationStatusMessage& update);
