Repository: mesos
Updated Branches:
  refs/heads/master c3c070094 -> cac8fdf01


Checkpointing `OfferOperation` in resource provider states.

Instead of checkpointing `ApplyOfferOperation`, we now checkpoint
`OfferOperations` in resource provider states such that we can keep
track of completed operations as well.

This patch also does some code cleanup, and modifies a unit test for
storage local resource provider to issue operations in batches.

Review: https://reviews.apache.org/r/64559/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/eeb09cbc
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/eeb09cbc
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/eeb09cbc

Branch: refs/heads/master
Commit: eeb09cbc18827068fe56dc826e26d2e98fbcb494
Parents: c3c0700
Author: Chun-Hung Hsiao <[email protected]>
Authored: Wed Dec 13 18:57:16 2017 -0800
Committer: Jie Yu <[email protected]>
Committed: Wed Dec 13 18:57:16 2017 -0800

----------------------------------------------------------------------
 src/resource_provider/state.proto               |  20 +-
 src/resource_provider/storage/provider.cpp      | 361 ++++++++-----------
 .../storage_local_resource_provider_tests.cpp   |  52 +--
 3 files changed, 172 insertions(+), 261 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/eeb09cbc/src/resource_provider/state.proto
----------------------------------------------------------------------
diff --git a/src/resource_provider/state.proto 
b/src/resource_provider/state.proto
index 321201e..ea759d8 100644
--- a/src/resource_provider/state.proto
+++ b/src/resource_provider/state.proto
@@ -20,9 +20,9 @@ import "mesos/mesos.proto";
 
 import "mesos/resource_provider/resource_provider.proto";
 
-package mesos.resource_provider.state;
+package mesos.resource_provider;
 
-option java_package = "org.apache.mesos.resource_provider.state";
+option java_package = "org.apache.mesos.resource_provider";
 option java_outer_classname = "Protos";
 
 
@@ -30,22 +30,8 @@ message ResourceProviderState {
   // This includes only pending operations. Operations that have
   // unacknowledged statuses should be recovered through the status
   // update manager.
-  repeated Event.ApplyOfferOperation operations = 1;
+  repeated OfferOperation operations = 1;
 
   // The total resources provided by this resource provider.
   repeated Resource resources = 2;
-
-  // Used to establish the relationship between the operation and
-  // the resources that the operation is operating on. Each resource
-  // provider will keep a resource version UUID, and change it when
-  // it believes that the resources from this resource provider are
-  // out of sync from the master's view. The master will keep track
-  // of the last known resource version UUID for each resource
-  // provider, and attach the resource version UUID in each
-  // operation it sends out. The resource provider should reject
-  // operations that have a different resource version UUID than
-  // that it maintains, because this means the operation is
-  // operating on resources that might have already been
-  // invalidated.
-  required bytes resource_version_uuid = 3;
 }

http://git-wip-us.apache.org/repos/asf/mesos/blob/eeb09cbc/src/resource_provider/storage/provider.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/storage/provider.cpp 
b/src/resource_provider/storage/provider.cpp
index 2fd4a3b..80bcea0 100644
--- a/src/resource_provider/storage/provider.cpp
+++ b/src/resource_provider/storage/provider.cpp
@@ -105,8 +105,7 @@ using mesos::internal::slave::ContainerDaemon;
 
 using mesos::resource_provider::Call;
 using mesos::resource_provider::Event;
-
-using mesos::resource_provider::state::ResourceProviderState;
+using mesos::resource_provider::ResourceProviderState;
 
 using mesos::v1::resource_provider::Driver;
 
@@ -148,7 +147,8 @@ static bool isValidType(const string& s)
 
 
 // Timeout for a CSI plugin component to create its endpoint socket.
-static const Duration CSI_ENDPOINT_CREATION_TIMEOUT = Seconds(5);
+// TODO(chhsiao): Make the timeout configurable.
+static const Duration CSI_ENDPOINT_CREATION_TIMEOUT = Minutes(1);
 
 
 // Returns a prefix for naming standalone containers to run CSI plugins
@@ -292,7 +292,8 @@ public:
       contentType(ContentType::PROTOBUF),
       info(_info),
       slaveId(_slaveId),
-      authToken(_authToken) {}
+      authToken(_authToken),
+      resourceVersion(UUID::random()) {}
 
   StorageLocalResourceProviderProcess(
       const StorageLocalResourceProviderProcess& other) = delete;
@@ -331,7 +332,7 @@ private:
   Future<Nothing> recoverVolumes();
   Future<Nothing> recoverStatusUpdates();
   void doReliableRegistration();
-  Future<Nothing> reconcile();
+  Future<Nothing> reconcileResourceProviderState();
 
   // Functions for received events.
   void subscribed(const Event::Subscribed& subscribed);
@@ -362,7 +363,8 @@ private:
   Future<Nothing> deleteVolume(const string& volumeId);
 
   // Applies the offer operation. Conventional operations will be
-  // synchronously applied.
+  // synchronously applied. Do nothing if the operation is already in a
+  // terminal state.
   Future<Nothing> _applyOfferOperation(const UUID& operationUuid);
 
   Future<vector<ResourceConversion>> applyCreateVolumeOrBlock(
@@ -373,7 +375,7 @@ private:
       const Resource& resource);
 
   // Synchronously update `totalResources` and the offer operation status.
-  Try<Nothing> applyResourceConversions(
+  Try<Nothing> updateOfferOperationStatus(
       const UUID& operationUuid,
       const Try<vector<ResourceConversion>>& conversions);
 
@@ -421,12 +423,15 @@ private:
   Option<csi::ControllerCapabilities> controllerCapabilities;
   Option<string> nodeId;
 
-  // NOTE: We store the list of pending operations in a `LinkedHashMap`
-  // to preserve the order we receive the operations. This is useful
-  // when we replay depending operations during recovery.
-  LinkedHashMap<UUID, Event::ApplyOfferOperation> pendingOperations;
+  // We maintain the following invariant: if one operation depends on
+  // another, they cannot be in PENDING state at the same time, i.e.,
+  // the result of the preceding operation must have been reflected in
+  // the total resources.
+  // NOTE: We store the list of offer operations in a `LinkedHashMap` to
+  // preserve the order we receive the operations in case we need it.
+  LinkedHashMap<UUID, OfferOperation> offerOperations;
   Resources totalResources;
-  Option<UUID> resourceVersion;
+  UUID resourceVersion;
   hashmap<string, VolumeData> volumes;
 };
 
@@ -569,10 +574,9 @@ Future<Nothing> 
StorageLocalResourceProviderProcess::recover()
     .then(defer(self(), &Self::recoverVolumes))
     .then(defer(self(), [=]() -> Future<Nothing> {
       // Recover the resource provider ID and state from the latest
-      // symlink. If the symlink cannot be resolved, this is a new
-      // resource provider, and `totalResources` and `resourceVersion`
-      // will be empty, which is fine since they will be set up during
-      // reconciliation.
+      // symlink. If the symlink does not exist, this is a new resource
+      // provider, and the total resources will be empty, which is fine
+      // since new resources will be added during reconciliation.
       Result<string> realpath = os::realpath(
           slave::paths::getLatestResourceProviderPath(
               metaDir, slaveId, info.type(), info.name()));
@@ -600,39 +604,37 @@ Future<Nothing> 
StorageLocalResourceProviderProcess::recover()
         }
 
         if (resourceProviderState.isSome()) {
-          foreach (const Event::ApplyOfferOperation& operation,
+          foreach (const OfferOperation& operation,
                    resourceProviderState->operations()) {
             Try<UUID> uuid = UUID::fromBytes(operation.operation_uuid());
             CHECK_SOME(uuid);
 
-            pendingOperations[uuid.get()] = operation;
+            offerOperations[uuid.get()] = operation;
           }
 
           totalResources = resourceProviderState->resources();
-
-          Try<UUID> uuid =
-            UUID::fromBytes(resourceProviderState->resource_version_uuid());
-          CHECK_SOME(uuid);
-
-          resourceVersion = uuid.get();
         }
       }
 
       // We replay all pending operations here, so that if a volume is
-      // actually created before the last failover, it will be reflected
-      // in the updated total resources before we do the reconciliation.
-      // NOTE: `_applyOfferOperation` will remove the applied operation
-      // from the list of pending operations, so we make a copy of keys
-      // here.
-      foreach (const UUID& uuid, pendingOperations.keys()) {
+      // actually created or deleted before the last failover, it will
+      // be reflected in the total resources before reconciliation.
+      foreachpair (const UUID& uuid,
+                   const OfferOperation& operation,
+                   offerOperations) {
+        if (protobuf::isTerminalState(operation.latest_status().state())) {
+          continue;
+        }
+
+        auto err = [](const UUID& uuid, const string& message) {
+          LOG(ERROR)
+            << "Falied to apply offer operation with UUID " << uuid << ": "
+            << message;
+        };
+
         _applyOfferOperation(uuid)
-          .onAny(defer(self(), [=](const Future<Nothing>& future) {
-            if (!future.isReady()) {
-              LOG(ERROR)
-                << "Failed to apply offer operation with UUID " << uuid << ": "
-                << (future.isFailed() ? future.failure() : "future discarded");
-            }
-          }));
+          .onFailed(std::bind(err, uuid, lambda::_1))
+          .onDiscarded(std::bind(err, uuid, "future discarded"));
       }
 
       state = DISCONNECTED;
@@ -901,7 +903,8 @@ void 
StorageLocalResourceProviderProcess::doReliableRegistration()
 }
 
 
-Future<Nothing> StorageLocalResourceProviderProcess::reconcile()
+Future<Nothing>
+StorageLocalResourceProviderProcess::reconcileResourceProviderState()
 {
   return recoverStatusUpdates()
     .then(defer(self(), &Self::importResources))
@@ -970,9 +973,8 @@ Future<Nothing> 
StorageLocalResourceProviderProcess::reconcile()
         LOG(INFO) << "Adding new resource '" << resource << "'";
       }
 
-      if (resourceVersion.isNone() || result != totalResources) {
+      if (result != totalResources) {
         totalResources = result;
-        resourceVersion = UUID::random();
         checkpointResourceProviderState();
       }
 
@@ -1007,11 +1009,11 @@ void StorageLocalResourceProviderProcess::subscribed(
   }
 
   const string message =
-    "Failed to update state for resource provider " + stringify(info.id());
+    "Failed to reconcile resource provider " + stringify(info.id());
 
   // Reconcile resources after obtaining the resource provider ID.
   // TODO(chhsiao): Do the reconciliation early.
-  reconcile()
+  reconcileResourceProviderState()
     .onFailed(defer(self(), &Self::fatal, message, lambda::_1))
     .onDiscarded(defer(self(), &Self::fatal, message, "future discarded"));
 }
@@ -1020,32 +1022,55 @@ void StorageLocalResourceProviderProcess::subscribed(
 void StorageLocalResourceProviderProcess::applyOfferOperation(
     const Event::ApplyOfferOperation& operation)
 {
-  Future<Resources> converted;
-
-  if (state == SUBSCRIBED) {
-    // TODO(chhsiao): Reject this operation.
-    return;
-  }
-
-  CHECK_EQ(READY, state);
-
-  LOG(INFO) << "Received " << operation.info().type() << " operation";
+  // NOTE: If we receive an offer operation in SUBSCRIBED state, there
+  // must be a resource version mismatch since the current resource
+  // version is not reported yet.
+  CHECK(state == SUBSCRIBED || state == READY);
 
   Try<UUID> uuid = UUID::fromBytes(operation.operation_uuid());
   CHECK_SOME(uuid);
 
-  CHECK(!pendingOperations.contains(uuid.get()));
-  pendingOperations[uuid.get()] = operation;
+  LOG(INFO)
+    << "Received " << operation.info().type() << " operation with UUID "
+    << uuid.get();
+
+  CHECK(!offerOperations.contains(uuid.get()));
+  offerOperations[uuid.get()] = protobuf::createOfferOperation(
+      operation.info(),
+      protobuf::createOfferOperationStatus(
+          OFFER_OPERATION_PENDING,
+          operation.info().has_id()
+            ? operation.info().id() : Option<OfferOperationID>::none()),
+      operation.has_framework_id()
+        ? operation.framework_id() : Option<FrameworkID>::none(),
+      slaveId,
+      uuid.get());
+
   checkpointResourceProviderState();
 
-  _applyOfferOperation(uuid.get())
-    .onAny(defer(self(), [=](const Future<Nothing>& future) {
-      if (!future.isReady()) {
-        LOG(ERROR)
-          << "Failed to apply " << operation.info().type() << " operation: "
-          << (future.isFailed() ? future.failure() : "future discarded");
-      }
-    }));
+  Future<Nothing> result;
+
+  Try<UUID> operationVersion =
+    UUID::fromBytes(operation.resource_version_uuid());
+  CHECK_SOME(operationVersion);
+
+  if (operationVersion.get() != resourceVersion) {
+    result = updateOfferOperationStatus(uuid.get(), Error(
+        "Mismatched resource version " + stringify(operationVersion.get()) +
+        " (expected: " + stringify(resourceVersion) + ")"));
+  } else {
+    result = _applyOfferOperation(uuid.get());
+  }
+
+  auto err = [](const UUID& uuid, const string& message) {
+    LOG(ERROR)
+      << "Failed to apply offer operation with UUID " << uuid << ": "
+      << message;
+  };
+
+  result
+    .onFailed(std::bind(err, uuid.get(), lambda::_1))
+    .onDiscarded(std::bind(err, uuid.get(), "future discarded"));
 }
 
 
@@ -2074,81 +2099,59 @@ Future<Nothing> 
StorageLocalResourceProviderProcess::deleteVolume(
 Future<Nothing> StorageLocalResourceProviderProcess::_applyOfferOperation(
     const UUID& operationUuid)
 {
-  Future<vector<ResourceConversion>> conversions;
-  Option<Error> error;
+  CHECK(offerOperations.contains(operationUuid));
+  const OfferOperation& operation = offerOperations.at(operationUuid);
 
-  CHECK(pendingOperations.contains(operationUuid));
-  const Event::ApplyOfferOperation& operation =
-    pendingOperations.at(operationUuid);
-
-  Try<UUID> operationVersion =
-    UUID::fromBytes(operation.resource_version_uuid());
-  CHECK_SOME(operationVersion);
+  CHECK(!protobuf::isTerminalState(operation.latest_status().state()));
 
-  if (resourceVersion.get() != operationVersion.get()) {
-    error = Error(
-        "Mismatched resource version " + stringify(operationVersion.get()) +
-        " (expected: " + stringify(resourceVersion.get()) + ")");
-  }
+  Future<vector<ResourceConversion>> conversions;
 
   switch (operation.info().type()) {
     case Offer::Operation::RESERVE:
     case Offer::Operation::UNRESERVE:
     case Offer::Operation::CREATE:
     case Offer::Operation::DESTROY: {
-      // Synchronously apply the conventional operations.
-      return applyResourceConversions(
+      // Synchronously apply the conventional operations to ensure that
+      // its result is reflected in the total resources before any of
+      // its succeeding operations is applied.
+      return updateOfferOperationStatus(
           operationUuid,
-          error.isNone()
-            ? getResourceConversions(operation.info())
-            : Try<vector<ResourceConversion>>::error(error.get()));
+          getResourceConversions(operation.info()));
     }
     case Offer::Operation::CREATE_VOLUME: {
       CHECK(operation.info().has_create_volume());
 
-      if (error.isNone()) {
-        conversions = applyCreateVolumeOrBlock(
-            operation.info().create_volume().source(),
-            operationUuid,
-            operation.info().create_volume().target_type());
-      } else {
-        conversions = Failure(error.get());
-      }
+      conversions = applyCreateVolumeOrBlock(
+          operation.info().create_volume().source(),
+          operationUuid,
+          operation.info().create_volume().target_type());
+
       break;
     }
     case Offer::Operation::DESTROY_VOLUME: {
       CHECK(operation.info().has_destroy_volume());
 
-      if (error.isNone()) {
-        conversions = applyDestroyVolumeOrBlock(
-            operation.info().destroy_volume().volume());
-      } else {
-        conversions = Failure(error.get());
-      }
+      conversions = applyDestroyVolumeOrBlock(
+          operation.info().destroy_volume().volume());
+
       break;
     }
     case Offer::Operation::CREATE_BLOCK: {
       CHECK(operation.info().has_create_block());
 
-      if (error.isNone()) {
-        conversions = applyCreateVolumeOrBlock(
-            operation.info().create_block().source(),
-            operationUuid,
-            Resource::DiskInfo::Source::BLOCK);
-      } else {
-        conversions = Failure(error.get());
-      }
+      conversions = applyCreateVolumeOrBlock(
+          operation.info().create_block().source(),
+          operationUuid,
+          Resource::DiskInfo::Source::BLOCK);
+
       break;
     }
     case Offer::Operation::DESTROY_BLOCK: {
       CHECK(operation.info().has_destroy_block());
 
-      if (error.isNone()) {
-        conversions = applyDestroyVolumeOrBlock(
-            operation.info().destroy_block().block());
-      } else {
-        conversions = Failure(error.get());
-      }
+      conversions = applyDestroyVolumeOrBlock(
+          operation.info().destroy_block().block());
+
       break;
     }
     case Offer::Operation::UNKNOWN:
@@ -2163,27 +2166,23 @@ Future<Nothing> 
StorageLocalResourceProviderProcess::_applyOfferOperation(
 
   conversions
     .onAny(defer(self(), [=](const Future<vector<ResourceConversion>>& future) 
{
-      Option<Error> error;
-      if (!future.isReady()) {
-        error =
-          Error(future.isFailed() ? future.failure() : "future discarded");
-      }
+      Try<vector<ResourceConversion>> conversions = future.isReady()
+        ? Try<vector<ResourceConversion>>::some(future.get())
+        : Error(future.isFailed() ? future.failure() : "future discarded");
 
-      if (future.isReady()) {
+      if (conversions.isSome()) {
         LOG(INFO)
-          << "Applying conversion from '" << future->at(0).consumed << "' to '"
-          << future->at(0).converted << "'";
+          << "Applying conversion from '" << conversions->at(0).consumed
+          << "' to '" << conversions->at(0).converted
+          << "' for offer operation with UUID " << operationUuid;
       } else {
         LOG(ERROR)
-          << "Failed to apply " << operation.info().type() << " operation: "
-          << error->message;
+          << "Failed to apply offer operation with UUID " << operationUuid
+          << ": " << conversions.error();
       }
 
-      promise->associate(applyResourceConversions(
-          operationUuid,
-          error.isNone()
-            ? future.get()
-            : Try<vector<ResourceConversion>>::error(error.get())));
+      promise->associate(
+          updateOfferOperationStatus(operationUuid, conversions));
     }));
 
   return promise->future();
@@ -2341,21 +2340,22 @@ 
StorageLocalResourceProviderProcess::applyDestroyVolumeOrBlock(
 }
 
 
-Try<Nothing> StorageLocalResourceProviderProcess::applyResourceConversions(
+Try<Nothing> StorageLocalResourceProviderProcess::updateOfferOperationStatus(
     const UUID& operationUuid,
     const Try<vector<ResourceConversion>>& conversions)
 {
   Option<Error> error;
+  Resources convertedResources;
 
-  CHECK(pendingOperations.contains(operationUuid));
-  const Event::ApplyOfferOperation& operation =
-    pendingOperations.at(operationUuid);
+  CHECK(offerOperations.contains(operationUuid));
+  OfferOperation& operation = offerOperations.at(operationUuid);
 
   if (conversions.isSome()) {
     // Strip away the allocation info when applying the convertion to
     // the total resources.
     vector<ResourceConversion> _conversions;
     foreach (ResourceConversion conversion, conversions.get()) {
+      convertedResources += conversion.converted;
       conversion.consumed.unallocate();
       conversion.converted.unallocate();
       _conversions.push_back(std::move(conversion));
@@ -2371,27 +2371,16 @@ Try<Nothing> 
StorageLocalResourceProviderProcess::applyResourceConversions(
     error = conversions.error();
   }
 
-  // We first ask the status update manager to checkpoint the operation
-  // status, then checkpoint the resource provider state.
-  // TODO(chhsiao): Use the status update manager.
-  Call call;
-  call.set_type(Call::UPDATE_OFFER_OPERATION_STATUS);
-  call.mutable_resource_provider_id()->CopyFrom(info.id());
-
-  Call::UpdateOfferOperationStatus* update =
-    call.mutable_update_offer_operation_status();
-  update->set_operation_uuid(operation.operation_uuid());
-
-  if (operation.has_framework_id()) {
-    update->mutable_framework_id()->CopyFrom(operation.framework_id());
-  }
+  operation.mutable_latest_status()->CopyFrom(
+      protobuf::createOfferOperationStatus(
+          error.isNone() ? OFFER_OPERATION_FINISHED : OFFER_OPERATION_FAILED,
+          operation.info().has_id()
+            ? operation.info().id() : Option<OfferOperationID>::none(),
+          error.isNone() ? Option<string>::none() : error->message,
+          error.isNone() ? convertedResources : Option<Resources>::none(),
+          UUID::random()));
 
-  OfferOperationStatus* status = update->mutable_status();
-  status->set_status_uuid(UUID::random().toBytes());
-
-  if (operation.info().has_id()) {
-    status->mutable_operation_id()->CopyFrom(operation.info().id());
-  }
+  operation.add_statuses()->CopyFrom(operation.latest_status());
 
   if (error.isSome()) {
     // We only update the resource version for failed conventional
@@ -2405,39 +2394,24 @@ Try<Nothing> 
StorageLocalResourceProviderProcess::applyResourceConversions(
       // Send an `UPDATE_STATE` after we finish the current operation.
       dispatch(self(), &Self::sendResourceProviderStateUpdate);
     }
-
-    status->set_state(OFFER_OPERATION_FAILED);
-    status->set_message(error->message);
-  } else {
-    status->set_state(OFFER_OPERATION_FINISHED);
-
-    foreach (const ResourceConversion& conversion, conversions.get()) {
-      foreach (const Resource& resource, conversion.converted) {
-        status->add_converted_resources()->CopyFrom(resource);
-      }
-    }
   }
 
-  update->mutable_latest_status()->CopyFrom(*status);
-
-  auto err = [](const UUID& operationUuid, const string& message) {
-    LOG(ERROR)
-      << "Failed to send status update for offer operation with UUID "
-      << operationUuid << ": " << message;
-  };
-
-  driver->send(evolve(call))
-    .onFailed(std::bind(err, operationUuid, lambda::_1))
-    .onDiscarded(std::bind(err, operationUuid, "future discarded"));
-
-  pendingOperations.erase(operationUuid);
   checkpointResourceProviderState();
 
-  if (error.isSome()) {
-    return error.get();
-  }
-
-  return Nothing();
+  // Send out the status update for the offer operation.
+  // TODO(chhsiao): Use the status update manager.
+  OfferOperationStatusUpdate update =
+    protobuf::createOfferOperationStatusUpdate(
+        operationUuid,
+        operation.latest_status(),
+        operation.latest_status(),
+        operation.has_framework_id()
+          ? operation.framework_id() : Option<FrameworkID>::none(),
+        slaveId);
+
+  sendOfferOperationStatusUpdate(update);
+
+  return error.isNone() ? Nothing() : Try<Nothing>::error(error.get());
 }
 
 
@@ -2445,17 +2419,12 @@ void 
StorageLocalResourceProviderProcess::checkpointResourceProviderState()
 {
   ResourceProviderState state;
 
-  foreachvalue (
-      const Event::ApplyOfferOperation& operation,
-      pendingOperations) {
+  foreachvalue (const OfferOperation& operation, offerOperations) {
     state.add_operations()->CopyFrom(operation);
   }
 
   state.mutable_resources()->CopyFrom(totalResources);
 
-  CHECK_SOME(resourceVersion);
-  state.set_resource_version_uuid(resourceVersion->toBytes());
-
   const string statePath = slave::paths::getResourceProviderStatePath(
       metaDir, slaveId, info.type(), info.name(), info.id());
 
@@ -2491,30 +2460,12 @@ void 
StorageLocalResourceProviderProcess::sendResourceProviderStateUpdate()
   call.mutable_resource_provider_id()->CopyFrom(info.id());
 
   Call::UpdateState* update = call.mutable_update_state();
-
-  foreachpair (const UUID& uuid,
-               const Event::ApplyOfferOperation& operation,
-               pendingOperations) {
-    // TODO(chhsiao): Maintain a list of terminated but unacknowledged
-    // offer operations in memory and reconstruct it during recovery
-    // by querying the status update manager.
-    update->add_operations()->CopyFrom(
-        protobuf::createOfferOperation(
-            operation.info(),
-            protobuf::createOfferOperationStatus(
-                OFFER_OPERATION_PENDING,
-                operation.info().has_id()
-                  ? operation.info().id() : Option<OfferOperationID>::none()),
-            operation.has_framework_id()
-              ? operation.framework_id() : Option<FrameworkID>::none(),
-            slaveId,
-            uuid));
-  }
-
   update->mutable_resources()->CopyFrom(totalResources);
+  update->set_resource_version_uuid(resourceVersion.toBytes());
 
-  CHECK_SOME(resourceVersion);
-  update->set_resource_version_uuid(resourceVersion->toBytes());
+  foreachvalue (const OfferOperation& operation, offerOperations) {
+    update->add_operations()->CopyFrom(operation);
+  }
 
   auto err = [](const ResourceProviderID& id, const string& message) {
     LOG(ERROR)

http://git-wip-us.apache.org/repos/asf/mesos/blob/eeb09cbc/src/tests/storage_local_resource_provider_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/storage_local_resource_provider_tests.cpp 
b/src/tests/storage_local_resource_provider_tests.cpp
index e5868bf..f01d533 100644
--- a/src/tests/storage_local_resource_provider_tests.cpp
+++ b/src/tests/storage_local_resource_provider_tests.cpp
@@ -344,17 +344,14 @@ TEST_F(StorageLocalResourceProviderTest, 
ROOT_LaunchAndDestroyVolume)
   // The framework is expected to see the following offers in sequence:
   //   1. One containing a RAW disk resource before `CREATE_VOLUME`.
   //   2. One containing a MOUNT disk resource after `CREATE_VOLUME`.
-  //   3. One containing a persistent volume after `CREATE`.
-  //   4. One containing the same persistent volume after `LAUNCH`.
-  //   5. One containing the same MOUNT disk resource after `DESTROY`.
-  //   6. One containing the same RAW disk resource after `DESTROY_VOLUME`.
+  //   3. One containing the same MOUNT disk resource after `CREADE`,
+  //      `LAUNCH` and `DESTROY`.
+  //   4. One containing the same RAW disk resource after `DESTROY_VOLUME`.
   //
   // We set up the expectations for these offers as the test progresses.
   Future<vector<Offer>> rawDiskOffers;
   Future<vector<Offer>> volumeCreatedOffers;
-  Future<vector<Offer>> persistenceCreatedOffers;
   Future<vector<Offer>> taskFinishedOffers;
-  Future<vector<Offer>> persistenceDestroyedOffers;
   Future<vector<Offer>> volumeDestroyedOffers;
 
   Sequence offers;
@@ -441,7 +438,8 @@ TEST_F(StorageLocalResourceProviderTest, 
ROOT_LaunchAndDestroyVolume)
   // Put a file into the volume.
   ASSERT_SOME(os::touch(path::join(volumePath.get(), "file")));
 
-  // Create a persistent volume on the CSI volume.
+  // Create a persistent volume on the CSI volume, then launch a task to
+  // use the persistent volume.
   Resource persistentVolume = volume.get();
   persistentVolume.mutable_disk()->mutable_persistence()
     ->set_id(UUID::random().toString());
@@ -451,20 +449,6 @@ TEST_F(StorageLocalResourceProviderTest, 
ROOT_LaunchAndDestroyVolume)
     ->set_container_path("volume");
   persistentVolume.mutable_disk()->mutable_volume()->set_mode(Volume::RW);
 
-  EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveResource(
-      persistentVolume)))
-    .InSequence(offers)
-    .WillOnce(FutureArg<1>(&persistenceCreatedOffers));
-
-  driver.acceptOffers(
-      {volumeCreatedOffers->at(0).id()},
-      {CREATE(persistentVolume)},
-      filters);
-
-  AWAIT_READY(persistenceCreatedOffers);
-  ASSERT_FALSE(persistenceCreatedOffers->empty());
-
-  // Launch a task to use the persistent volume.
   Future<TaskStatus> taskStarting;
   Future<TaskStatus> taskRunning;
   Future<TaskStatus> taskFinished;
@@ -480,9 +464,10 @@ TEST_F(StorageLocalResourceProviderTest, 
ROOT_LaunchAndDestroyVolume)
     .WillOnce(FutureArg<1>(&taskFinishedOffers));
 
   driver.acceptOffers(
-      {persistenceCreatedOffers->at(0).id()},
-      {LAUNCH({createTask(
-           persistenceCreatedOffers->at(0).slave_id(),
+      {volumeCreatedOffers->at(0).id()},
+      {CREATE(persistentVolume),
+       LAUNCH({createTask(
+           volumeCreatedOffers->at(0).slave_id(),
            persistentVolume,
            createCommandInfo("test -f " + path::join("volume", "file")))})},
       filters);
@@ -498,26 +483,15 @@ TEST_F(StorageLocalResourceProviderTest, 
ROOT_LaunchAndDestroyVolume)
 
   AWAIT_READY(taskFinishedOffers);
 
-  // Destroy the persistent volume on the CSI volume.
-  EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveResource(volume.get())))
-    .InSequence(offers)
-    .WillOnce(FutureArg<1>(&persistenceDestroyedOffers));
-
-  driver.acceptOffers(
-      {taskFinishedOffers->at(0).id()},
-      {DESTROY(persistentVolume)},
-      filters);
-
-  AWAIT_READY(persistenceDestroyedOffers);
-
-  // Destroy the created volume.
+  // Destroy the persistent volume and the CSI volume.
   EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveResource(source.get())))
     .InSequence(offers)
     .WillOnce(FutureArg<1>(&volumeDestroyedOffers));
 
   driver.acceptOffers(
-      {persistenceDestroyedOffers->at(0).id()},
-      {DESTROY_VOLUME(volume.get())},
+      {taskFinishedOffers->at(0).id()},
+      {DESTROY(persistentVolume),
+       DESTROY_VOLUME(volume.get())},
       filters);
 
   AWAIT_READY(volumeDestroyedOffers);

Reply via email to