Tested that operation updates dropped en route to master are resent.

This patch adds
`StorageLocalResourceProviderTest.ROOT_RetryOperationStatusUpdate`
which verifies that operation status updates are resent by the
agent after being dropped en route to the master.

Review: https://reviews.apache.org/r/65057/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/60f23d87
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/60f23d87
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/60f23d87

Branch: refs/heads/master
Commit: 60f23d870080c5d70963857cb06a50cf0d2825fb
Parents: 434ef5f
Author: Gaston Kleiman <[email protected]>
Authored: Fri Jan 19 15:36:31 2018 -0800
Committer: Greg Mann <[email protected]>
Committed: Fri Jan 19 15:50:32 2018 -0800

----------------------------------------------------------------------
 .../storage_local_resource_provider_tests.cpp   | 161 +++++++++++++++++++
 1 file changed, 161 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/60f23d87/src/tests/storage_local_resource_provider_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/storage_local_resource_provider_tests.cpp 
b/src/tests/storage_local_resource_provider_tests.cpp
index 1b21527..f6d093a 100644
--- a/src/tests/storage_local_resource_provider_tests.cpp
+++ b/src/tests/storage_local_resource_provider_tests.cpp
@@ -1916,6 +1916,167 @@ TEST_F(StorageLocalResourceProviderTest, 
ROOT_ConvertPreExistingVolume)
   }
 }
 
+
+// This test verifies that operation status updates are resent to the master
+// after being dropped en route to it.
+//
+// To accomplish this:
+//   1. Creates a volume from a RAW disk resource.
+//   2. Drops the first `UpdateOperationStatusMessage` from the agent to the
+//      master, so that it isn't acknowledged by the master.
+//   3. Advances the clock and verifies that the agent resends the operation
+//      status update.
+TEST_F(StorageLocalResourceProviderTest, ROOT_RetryOperationStatusUpdate)
+{
+  Clock::pause();
+
+  loadUriDiskProfileModule();
+
+  setupResourceProviderConfig(Gigabytes(4));
+  setupDiskProfileConfig();
+
+  master::Flags masterFlags = CreateMasterFlags();
+  Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
+  ASSERT_SOME(master);
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+
+  slave::Flags flags = CreateSlaveFlags();
+  flags.isolation = "filesystem/linux";
+
+  // Disable HTTP authentication to simplify resource provider interactions.
+  flags.authenticate_http_readwrite = false;
+
+  // Set the resource provider capability.
+  vector<SlaveInfo::Capability> capabilities = slave::AGENT_CAPABILITIES();
+  SlaveInfo::Capability capability;
+  capability.set_type(SlaveInfo::Capability::RESOURCE_PROVIDER);
+  capabilities.push_back(capability);
+
+  flags.agent_features = SlaveCapabilities();
+  flags.agent_features->mutable_capabilities()->CopyFrom(
+      {capabilities.begin(), capabilities.end()});
+
+  flags.resource_provider_config_dir = resourceProviderConfigDir;
+  flags.disk_profile_adaptor = URI_DISK_PROFILE_ADAPTOR_NAME;
+
+  // Since the local resource provider daemon is started after the agent
+  // is registered, it is guaranteed that the slave will send two
+  // `UpdateSlaveMessage`s, where the latter one contains resources from
+  // the storage local resource provider.
+  //
+  // NOTE: The order of the two `FUTURE_PROTOBUF`s are reversed because
+  // Google Mock will search the expectations in reverse order.
+  Future<UpdateSlaveMessage> updateSlave2 =
+    FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
+  Future<UpdateSlaveMessage> updateSlave1 =
+    FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
+
+  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), flags);
+  ASSERT_SOME(slave);
+
+  // Advance the clock to trigger agent registration.
+  Clock::advance(flags.registration_backoff_factor);
+
+  AWAIT_READY(updateSlave1);
+
+  // NOTE: We need to resume the clock so that the resource provider can
+  // periodically check if the CSI endpoint socket has been created by
+  // the plugin container, which runs in another Linux process.
+  Clock::resume();
+
+  AWAIT_READY(updateSlave2);
+  ASSERT_TRUE(updateSlave2->has_resource_providers());
+  ASSERT_EQ(1, updateSlave2->resource_providers().providers_size());
+
+  Clock::pause();
+
+  // Register a framework to exercise an operation.
+  FrameworkInfo framework = DEFAULT_FRAMEWORK_INFO;
+  framework.set_roles(0, "storage");
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, framework, master.get()->pid, DEFAULT_CREDENTIAL);
+
+  EXPECT_CALL(sched, registered(&driver, _, _));
+
+  Future<vector<Offer>> offers;
+
+  auto isRaw = [](
+      const Resource& r) {
+    return r.has_disk() &&
+      r.disk().has_source() &&
+      r.disk().source().has_profile() &&
+      r.disk().source().type() == Resource::DiskInfo::Source::RAW;
+  };
+
+  EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveAnyResource(
+      std::bind(isRaw, lambda::_1))))
+    .WillOnce(FutureArg<1>(&offers));
+
+  driver.start();
+
+  AWAIT_READY(offers);
+  ASSERT_FALSE(offers->empty());
+
+  const Offer& offer = offers->at(0);
+
+  Option<Resource> source;
+  foreach (const Resource& resource, offer.resources()) {
+    if (isRaw(resource)) {
+      source = resource;
+      break;
+    }
+  }
+  ASSERT_SOME(source);
+
+  // We'll drop the first operation status update from the agent to the master.
+  Future<UpdateOperationStatusMessage> droppedUpdateOperationStatusMessage =
+    DROP_PROTOBUF(
+        UpdateOperationStatusMessage(), slave.get()->pid, master.get()->pid);
+
+  // Create a volume.
+  driver.acceptOffers(
+      {offer.id()},
+      {CREATE_VOLUME(source.get(), Resource::DiskInfo::Source::MOUNT)},
+      {});
+
+  AWAIT_READY(droppedUpdateOperationStatusMessage);
+
+  // The SLRP should resend the dropped operation status update after the
+  // status update retry interval minimum.
+  Future<UpdateOperationStatusMessage> retriedUpdateOperationStatusMessage =
+    FUTURE_PROTOBUF(
+        UpdateOperationStatusMessage(), slave.get()->pid, master.get()->pid);
+
+  // The master should acknowledge the operation status update.
+  Future<AcknowledgeOperationStatusMessage> acknowledgeOperationStatusMessage =
+    FUTURE_PROTOBUF(
+      AcknowledgeOperationStatusMessage(), master.get()->pid, 
slave.get()->pid);
+
+  Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
+
+  AWAIT_READY(retriedUpdateOperationStatusMessage);
+  AWAIT_READY(acknowledgeOperationStatusMessage);
+
+  // The master acknowledged the operation status update, so the SLRP shouldn't
+  // send further operation status updates.
+  EXPECT_NO_FUTURE_PROTOBUFS(UpdateOperationStatusMessage(), _, _);
+
+  // The master received the `UpdateOperationStatusMessage`, so it can now
+  // offer the `MOUNT` disk - no further offers are needed, so they can be
+  // declined.
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillRepeatedly(DeclineOffers());
+
+  Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
+  Clock::settle();
+
+  driver.stop();
+  driver.join();
+}
+
 } // namespace tests {
 } // namespace internal {
 } // namespace mesos {

Reply via email to