This is an automated email from the ASF dual-hosted git repository.

chhsiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit e7f4e6100c7987fc2c2b031b2ec5ba17737ef76e
Author: Chun-Hung Hsiao <[email protected]>
AuthorDate: Tue Jan 29 15:48:36 2019 -0800

    Updated SLRP test `ProfileDisappeared` to request operation feedback.
    
    This patch updates `StorageLocalResourceProviderTest.ProfileDisappeared`
    to use the v1 scheduler API to request operation feedback, so MESOS-9537
    would be triggered when an outstanding `UPDATE_STATE` call from the
    resource provider races with an offer operation.
    
    Review: https://reviews.apache.org/r/69866
---
 src/tests/mesos.hpp                                |  27 +-
 .../storage_local_resource_provider_tests.cpp      | 309 ++++++++++++---------
 2 files changed, 204 insertions(+), 132 deletions(-)

diff --git a/src/tests/mesos.hpp b/src/tests/mesos.hpp
index b10ec0a..f3f1e64 100644
--- a/src/tests/mesos.hpp
+++ b/src/tests/mesos.hpp
@@ -1438,13 +1438,18 @@ inline typename TOffer::Operation CREATE_DISK(
 }
 
 
-template <typename TResource, typename TOffer>
-inline typename TOffer::Operation DESTROY_DISK(const TResource& source)
+template <typename TResource, typename TOperationID, typename TOffer>
+inline typename TOffer::Operation DESTROY_DISK(
+    const TResource& source, const Option<TOperationID>& operationId = None())
 {
   typename TOffer::Operation operation;
   operation.set_type(TOffer::Operation::DESTROY_DISK);
   operation.mutable_destroy_disk()->mutable_source()->CopyFrom(source);
 
+  if (operationId.isSome()) {
+    operation.mutable_id()->CopyFrom(operationId.get());
+  }
+
   return operation;
 }
 
@@ -1818,7 +1823,8 @@ inline Offer::Operation CREATE_DISK(Args&&... args)
 template <typename... Args>
 inline Offer::Operation DESTROY_DISK(Args&&... args)
 {
-  return common::DESTROY_DISK<Resource, Offer>(std::forward<Args>(args)...);
+  return common::DESTROY_DISK<Resource, OperationID, Offer>(
+      std::forward<Args>(args)...);
 }
 
 
@@ -2125,8 +2131,10 @@ inline mesos::v1::Offer::Operation CREATE_DISK(Args&&... 
args)
 template <typename... Args>
 inline mesos::v1::Offer::Operation DESTROY_DISK(Args&&... args)
 {
-  return common::DESTROY_DISK<mesos::v1::Resource, mesos::v1::Offer>(
-      std::forward<Args>(args)...);
+  return common::DESTROY_DISK<
+      mesos::v1::Resource,
+      mesos::v1::OperationID,
+      mesos::v1::Offer>(std::forward<Args>(args)...);
 }
 
 
@@ -2668,6 +2676,15 @@ MATCHER_P(OffersHaveAnyResource, filter, "")
 }
 
 
+// This matcher is used to match the operation ID of an
+// `Event.update_operation_status.status` message.
+MATCHER_P(OperationStatusUpdateOperationIdEq, operationId, "")
+{
+  return arg.status().has_operation_id() &&
+    arg.status().operation_id() == operationId;
+}
+
+
 // Like LaunchTasks, but decline the entire offer and don't launch any tasks.
 ACTION(DeclineOffers)
 {
diff --git a/src/tests/storage_local_resource_provider_tests.cpp 
b/src/tests/storage_local_resource_provider_tests.cpp
index fb001aa..a661951 100644
--- a/src/tests/storage_local_resource_provider_tests.cpp
+++ b/src/tests/storage_local_resource_provider_tests.cpp
@@ -15,6 +15,10 @@
 // limitations under the License.
 
 #include <algorithm>
+#include <list>
+#include <memory>
+#include <string>
+#include <vector>
 
 #include <process/clock.hpp>
 #include <process/future.hpp>
@@ -1104,18 +1108,29 @@ TEST_F(StorageLocalResourceProviderTest, 
CreateDestroyDiskRecovery)
 // This test verifies that a framework cannot create a volume during and after
 // the profile disappears, and destroying a volume with a stale profile will
 // recover the freed disk with another appeared profile.
+//
+// To accomplish this:
+//   1. Create a 2GB MOUNT disk from a 4GB RAW disk of profile 'test1'.
+//   2. Create another MOUNT disk from the rest RAW disk of profile 'test1'.
+//   3. Remove profile 'test1' and adds profile 'test2' before the second
+//      operation is applied. The operation would then be dropped, and the rest
+//      RAW disk would show up as of profile 'test2'.
+//   4. Destroy the MOUNT disk of profile 'test1'. All 4GB RAW disk would show
+//      up as of profile 'test2'.
 TEST_F(StorageLocalResourceProviderTest, ProfileDisappeared)
 {
   Clock::pause();
 
   Future<shared_ptr<TestDiskProfileServer>> server =
     TestDiskProfileServer::create();
+
   AWAIT_READY(server);
 
   Promise<http::Response> updatedProfileMapping;
   EXPECT_CALL(*server.get()->process, profiles(_))
     .WillOnce(Return(http::OK(createDiskProfileMapping({{"test1", None()}}))))
-    .WillOnce(Return(updatedProfileMapping.future()));
+    .WillOnce(Return(updatedProfileMapping.future()))
+    .WillRepeatedly(Return(Future<http::Response>())); // Stop subsequent 
polls.
 
   const Duration pollInterval = Seconds(10);
   loadUriDiskProfileAdaptorModule(
@@ -1163,47 +1178,51 @@ TEST_F(StorageLocalResourceProviderTest, 
ProfileDisappeared)
 
   Clock::pause();
 
-  // Register a framework to receive offers.
-  FrameworkInfo framework = DEFAULT_FRAMEWORK_INFO;
+  // Register a v1 framework to exercise operations with feedback.
+  v1::FrameworkInfo framework = v1::DEFAULT_FRAMEWORK_INFO;
   framework.set_roles(0, "storage");
 
-  MockScheduler sched;
-  MesosSchedulerDriver driver(
-      &sched, framework, master.get()->pid, DEFAULT_CREDENTIAL);
+  auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
 
-  EXPECT_CALL(sched, registered(&driver, _, _));
+  EXPECT_CALL(*scheduler, connected(_))
+    .WillOnce(v1::scheduler::SendSubscribe(framework));
 
-  // The framework is expected to see the following offers in sequence:
-  //   1. A 4GB RAW disk with profile 'test1' before the 1st `CREATE_DISK`.
-  //   2. A 2GB MOUNT disk and a 2GB RAW disk, both with profile 'test1', after
-  //      the 1st `CREATE_DISK` finishes.
-  //   3. A 2GB MOUNT disk with profile 'test1' and a 2GB RAW disk with profile
-  //      'test2', after the profile mapping is updated and the 2nd
-  //      `CREATE_DISK` fails due to a mismatched resource version.
-  //   4. A 4GB RAW disk with profile 'test2', after the `DESTROY_DISK`.
-  Future<vector<Offer>> rawDiskOffers;
-  Future<vector<Offer>> volumeCreatedOffers;
-  Future<vector<Offer>> profileDisappearedOffers;
-  Future<vector<Offer>> volumeDestroyedOffers;
+  Future<v1::scheduler::Event::Subscribed> subscribed;
+  EXPECT_CALL(*scheduler, subscribed(_, _))
+    .WillOnce(FutureArg<1>(&subscribed));
 
-  // We use the following filter to filter offers that do not have
-  // wanted resources for 365 days (the maximum).
-  Filters declineFilters;
-  declineFilters.set_refuse_seconds(Days(365).secs());
+  EXPECT_CALL(*scheduler, heartbeat(_))
+    .WillRepeatedly(Return()); // Ignore heartbeats.
 
-  // Decline offers that contain only the agent's default resources.
-  EXPECT_CALL(sched, resourceOffers(&driver, _))
-    .WillRepeatedly(DeclineOffers(declineFilters));
+  // Decline unwanted offers. The master can send such offers before the
+  // resource provider receives profile updates.
+  EXPECT_CALL(*scheduler, offers(_, _))
+    .WillRepeatedly(v1::scheduler::DeclineOffers());
 
-  EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveAnyResource(
-      &Resources::hasResourceProvider)))
-    .WillOnce(FutureArg<1>(&rawDiskOffers))
-    .WillOnce(FutureArg<1>(&volumeCreatedOffers))
-    .WillOnce(FutureArg<1>(&profileDisappearedOffers))
-    .WillOnce(FutureArg<1>(&volumeDestroyedOffers))
-    .WillRepeatedly(Return()); // Ignore subsequent offers.
+  auto isStoragePool = [](const v1::Resource& r, const string& profile) {
+    return r.has_disk() &&
+      r.disk().has_source() &&
+      r.disk().source().type() == v1::Resource::DiskInfo::Source::RAW &&
+      r.disk().source().has_vendor() &&
+      r.disk().source().vendor() == TEST_CSI_VENDOR &&
+      !r.disk().source().has_id() &&
+      r.disk().source().has_profile() &&
+      r.disk().source().profile() == profile;
+  };
 
-  driver.start();
+  Future<v1::scheduler::Event::Offers> offers;
+  EXPECT_CALL(
+      *scheduler,
+      offers(_, v1::scheduler::OffersHaveAnyResource(
+          std::bind(isStoragePool, lambda::_1, "test1"))))
+    .WillOnce(FutureArg<1>(&offers));
+
+  v1::scheduler::TestMesos mesos(
+      master.get()->pid, ContentType::PROTOBUF, scheduler);
+
+  AWAIT_READY(subscribed);
+
+  const v1::FrameworkID& frameworkId = subscribed->framework_id();
 
   // NOTE: If the framework has not declined an unwanted offer yet when the
   // resource provider reports its RAW resources, the new allocation triggered
@@ -1213,58 +1232,77 @@ TEST_F(StorageLocalResourceProviderTest, 
ProfileDisappeared)
   Clock::settle();
   Clock::advance(masterFlags.allocation_interval);
 
-  AWAIT_READY(rawDiskOffers);
-  ASSERT_FALSE(rawDiskOffers->empty());
+  AWAIT_READY(offers);
+  ASSERT_EQ(1, offers->offers_size());
 
-  auto hasSourceTypeAndProfile = [](
-      const Resource& r,
-      const Resource::DiskInfo::Source::Type& type,
-      const string& profile) {
-    return r.has_disk() &&
-      r.disk().has_source() &&
-      r.disk().source().type() == type &&
-      r.disk().source().has_profile() &&
-      r.disk().source().profile() == profile;
-  };
+  v1::Offer offer = offers->offers(0);
 
-  // We use the following filter so that the resources will not be
-  // filtered for 5 seconds (the default).
-  Filters acceptFilters;
-  acceptFilters.set_refuse_seconds(0);
+  EXPECT_CALL(
+      *scheduler,
+      offers(_, v1::scheduler::OffersHaveAnyResource(
+          std::bind(isStoragePool, lambda::_1, "test1"))))
+    .WillOnce(FutureArg<1>(&offers));
 
-  // Create a volume with profile 'test1'.
+  // Create a 2GB MOUNT disk of profile 'test1'.
   {
-    Resources raw =
-      Resources(rawDiskOffers->at(0).resources()).filter(std::bind(
-          hasSourceTypeAndProfile,
-          lambda::_1,
-          Resource::DiskInfo::Source::RAW,
-          "test1"));
+    v1::Resources raw = v1::Resources(offer.resources())
+      .filter(std::bind(isStoragePool, lambda::_1, "test1"));
 
     ASSERT_SOME_EQ(Gigabytes(4), raw.disk());
 
     // Just use 2GB of the storage pool.
-    Resource source = *raw.begin();
+    v1::Resource source = *raw.begin();
     source.mutable_scalar()->set_value(
-        (double) Gigabytes(2).bytes() / Bytes::MEGABYTES);
-
-    Future<UpdateOperationStatusMessage> createVolumeStatus =
-      FUTURE_PROTOBUF(UpdateOperationStatusMessage(), _, _);
-
-    driver.acceptOffers(
-        {rawDiskOffers->at(0).id()},
-        {CREATE_DISK(source, Resource::DiskInfo::Source::MOUNT)},
-        acceptFilters);
-
-    AWAIT_READY(createVolumeStatus);
-    EXPECT_EQ(OPERATION_FINISHED, createVolumeStatus->status().state());
+        static_cast<double>(Gigabytes(2).bytes()) / Bytes::MEGABYTES);
+
+    v1::OperationID operationId;
+    operationId.set_value(id::UUID::random().toString());
+
+    Future<v1::scheduler::Event::UpdateOperationStatus> update;
+    EXPECT_CALL(
+        *scheduler,
+        updateOperationStatus(
+            _, v1::scheduler::OperationStatusUpdateOperationIdEq(operationId)))
+      .WillOnce(FutureArg<1>(&update))
+      .WillRepeatedly(Return()); // Ignore subsequent updates.
+
+    mesos.send(v1::createCallAccept(
+        frameworkId,
+        offer,
+        {v1::CREATE_DISK(
+             source,
+             v1::Resource::DiskInfo::Source::MOUNT,
+             None(),
+             operationId)}));
+
+    AWAIT_READY(update);
+    EXPECT_EQ(v1::OPERATION_FINISHED, update->status().state());
   }
 
   // Advance the clock to trigger another allocation.
   Clock::advance(masterFlags.allocation_interval);
 
-  AWAIT_READY(volumeCreatedOffers);
-  ASSERT_FALSE(volumeCreatedOffers->empty());
+  AWAIT_READY(offers);
+  ASSERT_EQ(1, offers->offers_size());
+
+  offer = offers->offers(0);
+
+  auto isMountDisk = [](const v1::Resource& r, const string& profile) {
+    return r.has_disk() &&
+      r.disk().has_source() &&
+      r.disk().source().type() == v1::Resource::DiskInfo::Source::MOUNT &&
+      r.disk().source().has_vendor() &&
+      r.disk().source().vendor() == TEST_CSI_VENDOR &&
+      r.disk().source().has_id() &&
+      r.disk().source().has_profile() &&
+      r.disk().source().profile() == profile;
+  };
+
+  EXPECT_CALL(
+      *scheduler,
+      offers(_, v1::scheduler::OffersHaveAnyResource(
+          std::bind(isMountDisk, lambda::_1, "test1"))))
+    .WillOnce(FutureArg<1>(&offers));
 
   // We drop the agent update (which is triggered by the changes in the known
   // set of profiles) to simulate the situation where the update races with
@@ -1272,86 +1310,103 @@ TEST_F(StorageLocalResourceProviderTest, 
ProfileDisappeared)
   Future<UpdateSlaveMessage> updateSlave3 =
     DROP_PROTOBUF(UpdateSlaveMessage(), _, _);
 
-  // Trigger another poll for profiles. Profile 'test1' will disappear and
-  // profile 'test2' will appear.
-  //
-  // NOTE: We advance the clock before updating the disk profile mapping so
-  // there will only be one poll.
+  // Remove profile 'test1' and add profile 'test2'. No allocation will be
+  // triggered since the framework is still holding the current offer.
   Clock::advance(pollInterval);
-
-  // Update the disk profile mapping.
   updatedProfileMapping.set(
       http::OK(createDiskProfileMapping({{"test2", None()}})));
 
   AWAIT_READY(updateSlave3);
 
-  // Try to create another volume with profile 'test1', which will be dropped
-  // due to a mismatched resource version.
+  // Create another MOUNT disk from the rest RAW disk of profile 'test1'. This
+  // operation would be dropped due to a mismatched resource version.
   {
-    Resources raw =
-      Resources(volumeCreatedOffers->at(0).resources()).filter(std::bind(
-          hasSourceTypeAndProfile,
-          lambda::_1,
-          Resource::DiskInfo::Source::RAW,
-          "test1"));
+    v1::Resources raw = v1::Resources(offer.resources())
+      .filter(std::bind(isStoragePool, lambda::_1, "test1"));
 
     ASSERT_SOME_EQ(Gigabytes(2), raw.disk());
 
-    Future<UpdateOperationStatusMessage> createVolumeStatus =
-      FUTURE_PROTOBUF(UpdateOperationStatusMessage(), _, _);
-
-    driver.acceptOffers(
-        {volumeCreatedOffers->at(0).id()},
-        {CREATE_DISK(*raw.begin(), Resource::DiskInfo::Source::MOUNT)},
-        acceptFilters);
-
-    AWAIT_READY(createVolumeStatus);
-    EXPECT_EQ(OPERATION_DROPPED, createVolumeStatus->status().state());
+    v1::OperationID operationId;
+    operationId.set_value(id::UUID::random().toString());
+
+    Future<v1::scheduler::Event::UpdateOperationStatus> update;
+    EXPECT_CALL(
+        *scheduler,
+        updateOperationStatus(
+            _, v1::scheduler::OperationStatusUpdateOperationIdEq(operationId)))
+      .WillOnce(FutureArg<1>(&update))
+      .WillRepeatedly(Return()); // Ignore subsequent updates;
+
+    mesos.send(v1::createCallAccept(
+        frameworkId,
+        offer,
+        {v1::CREATE_DISK(
+             *raw.begin(),
+             v1::Resource::DiskInfo::Source::MOUNT,
+             None(),
+             operationId)}));
+
+    AWAIT_READY(update);
+    EXPECT_EQ(v1::OPERATION_DROPPED, update->status().state());
   }
 
   // Forward the dropped agent update to trigger another allocation.
   post(slave.get()->pid, master.get()->pid, updateSlave3.get());
 
-  AWAIT_READY(profileDisappearedOffers);
-  ASSERT_FALSE(profileDisappearedOffers->empty());
+  AWAIT_READY(offers);
+  ASSERT_EQ(1, offers->offers_size());
 
-  // Destroy the volume with profile 'test1', which will trigger an agent 
update
-  // to recover the freed disk with profile 'test2' and thus another 
allocation.
+  offer = offers->offers(0);
+
+  EXPECT_CALL(
+      *scheduler,
+      offers(_, v1::scheduler::OffersHaveAnyResource(
+          std::bind(isStoragePool, lambda::_1, "test2"))))
+    .WillOnce(FutureArg<1>(&offers));
+
+  // Destroy the MOUNT disk of profile 'test1'. The returned converted 
resources
+  // should be empty.
   {
-    Resources volumes =
-      Resources(profileDisappearedOffers->at(0).resources()).filter(std::bind(
-          hasSourceTypeAndProfile,
-          lambda::_1,
-          Resource::DiskInfo::Source::MOUNT,
-          "test1"));
+    v1::Resources created = v1::Resources(offer.resources())
+      .filter(std::bind(isMountDisk, lambda::_1, "test1"));
 
-    ASSERT_SOME_EQ(Gigabytes(2), volumes.disk());
+    ASSERT_SOME_EQ(Gigabytes(2), created.disk());
 
-    Future<UpdateOperationStatusMessage> destroyVolumeStatus =
-      FUTURE_PROTOBUF(UpdateOperationStatusMessage(), _, _);
+    v1::OperationID operationId;
+    operationId.set_value(id::UUID::random().toString());
 
-    driver.acceptOffers(
-        {profileDisappearedOffers->at(0).id()},
-        {DESTROY_DISK(*volumes.begin())},
-        acceptFilters);
+    Future<v1::scheduler::Event::UpdateOperationStatus> update;
+    EXPECT_CALL(
+        *scheduler,
+        updateOperationStatus(
+            _, v1::scheduler::OperationStatusUpdateOperationIdEq(operationId)))
+      .WillOnce(FutureArg<1>(&update))
+      .WillRepeatedly(Return()); // Ignore subsequent updates;
+
+    mesos.send(v1::createCallAccept(
+        frameworkId, offer, {v1::DESTROY_DISK(*created.begin(), 
operationId)}));
 
-    AWAIT_READY(destroyVolumeStatus);
-    EXPECT_EQ(OPERATION_FINISHED, destroyVolumeStatus->status().state());
+    AWAIT_READY(update);
+    EXPECT_EQ(v1::OPERATION_FINISHED, update->status().state());
+    EXPECT_TRUE(update->status().converted_resources().empty());
   }
 
-  AWAIT_READY(volumeDestroyedOffers);
-  ASSERT_FALSE(volumeDestroyedOffers->empty());
+  // The resource provider will reconcile the storage pools to reclaim the
+  // space freed by destroying a MOUNT disk of a disappeared profile, which
+  // would in turn trigger another agent update and thus another allocation.
+  //
+  // TODO(chhsiao): This might change once MESOS-9254 is done.
+  AWAIT_READY(offers);
+  ASSERT_EQ(1, offers->offers_size());
+
+  offer = offers->offers(0);
 
-  // Check that the freed disk has been recovered with profile 'test2'.
+  // Check that the freed disk shows up as of profile 'test2'.
   {
-    Resources storagePool =
-      Resources(volumeDestroyedOffers->at(0).resources()).filter(std::bind(
-          hasSourceTypeAndProfile,
-          lambda::_1,
-          Resource::DiskInfo::Source::RAW,
-          "test2"));
-
-    EXPECT_SOME_EQ(Gigabytes(4), storagePool.disk());
+    v1::Resources raw = v1::Resources(offer.resources())
+      .filter(std::bind(isStoragePool, lambda::_1, "test2"));
+
+    EXPECT_SOME_EQ(Gigabytes(4), raw.disk());
   }
 }
 

Reply via email to