This is an automated email from the ASF dual-hosted git repository.

bbannier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit eb505c1a7d00fc1d591d9fad53a95995601e13ba
Author: Jan Schlicht <[email protected]>
AuthorDate: Wed Mar 27 09:40:25 2019 +0100

    Fixed operator operation handling with resource provider resources.
    
    The resource provider manager didn't allow operations originating from
    operator API calls. For speculatively applied operations, this is
    allowed now.
    
    Review: https://reviews.apache.org/r/70165/
---
 src/resource_provider/manager.cpp                  |  30 +-
 .../storage_local_resource_provider_tests.cpp      | 336 +++++++++++++++++++++
 2 files changed, 358 insertions(+), 8 deletions(-)

diff --git a/src/resource_provider/manager.cpp 
b/src/resource_provider/manager.cpp
index b9dc887..ceed122 100644
--- a/src/resource_provider/manager.cpp
+++ b/src/resource_provider/manager.cpp
@@ -465,7 +465,9 @@ void ResourceProviderManagerProcess::applyOperation(
     const ApplyOperationMessage& message)
 {
   const Offer::Operation& operation = message.operation_info();
-  const FrameworkID& frameworkId = message.framework_id();
+  const Option<FrameworkID> frameworkId = message.has_framework_id()
+    ? message.framework_id()
+    : Option<FrameworkID>::none();
   const UUID& operationUUID = message.operation_uuid();
 
   Result<ResourceProviderID> resourceProviderId =
@@ -474,7 +476,11 @@ void ResourceProviderManagerProcess::applyOperation(
   if (!resourceProviderId.isSome()) {
     LOG(ERROR) << "Failed to get the resource provider ID of operation "
                << "'" << operation.id() << "' (uuid: " << operationUUID
-               << ") from framework " << frameworkId << ": "
+               << ") from "
+               << (frameworkId.isSome()
+                     ? "framework " + stringify(frameworkId.get())
+                     : "an operator API call")
+               << ": "
                << (resourceProviderId.isError() ? resourceProviderId.error()
                                                 : "Not found");
     return;
@@ -482,7 +488,10 @@ void ResourceProviderManagerProcess::applyOperation(
 
   if (!resourceProviders.subscribed.contains(resourceProviderId.get())) {
     LOG(WARNING) << "Dropping operation '" << operation.id() << "' (uuid: "
-                 << operationUUID << ") from framework " << frameworkId
+                 << operationUUID << ") from "
+                 << (frameworkId.isSome()
+                       ? "framework " + stringify(frameworkId.get())
+                       : "an operator API call")
                  << " because resource provider " << resourceProviderId.get()
                  << " is not subscribed";
     return;
@@ -502,8 +511,10 @@ void ResourceProviderManagerProcess::applyOperation(
 
   Event event;
   event.set_type(Event::APPLY_OPERATION);
-  event.mutable_apply_operation()
-    ->mutable_framework_id()->CopyFrom(frameworkId);
+  if (frameworkId.isSome()) {
+    event.mutable_apply_operation()
+      ->mutable_framework_id()->CopyFrom(frameworkId.get());
+  }
   event.mutable_apply_operation()->mutable_info()->CopyFrom(operation);
   event.mutable_apply_operation()
     ->mutable_operation_uuid()->CopyFrom(message.operation_uuid());
@@ -513,9 +524,12 @@ void ResourceProviderManagerProcess::applyOperation(
 
   if (!resourceProvider->http.send(event)) {
     LOG(WARNING) << "Failed to send operation '" << operation.id() << "' "
-                 << "(uuid: " << operationUUID << ") from framework "
-                 << frameworkId << " to resource provider "
-                 << resourceProviderId.get() << ": connection closed";
+                 << "(uuid: " << operationUUID << ") from "
+                 << (frameworkId.isSome()
+                       ? "framework " + stringify(frameworkId.get())
+                       : "an operator API call")
+                 << " to resource provider " << resourceProviderId.get()
+                 << ": connection closed";
   }
 }
 
diff --git a/src/tests/storage_local_resource_provider_tests.cpp 
b/src/tests/storage_local_resource_provider_tests.cpp
index 797f89e..55c9389 100644
--- a/src/tests/storage_local_resource_provider_tests.cpp
+++ b/src/tests/storage_local_resource_provider_tests.cpp
@@ -5381,6 +5381,342 @@ TEST_F(
   Clock::resume();
 }
 
+
+// This test verifies that operators can reserve/unreserve and
+// create/destroy persistent volumes with resource provider resources.
+TEST_F(
+    StorageLocalResourceProviderTest,
+    OperatorOperationsWithResourceProviderResources)
+{
+  Clock::pause();
+
+  Future<shared_ptr<TestDiskProfileServer>> server =
+    TestDiskProfileServer::create();
+
+  AWAIT_READY(server);
+
+  EXPECT_CALL(*server.get()->process, profiles(_))
+    .WillOnce(Return(http::OK(createDiskProfileMapping({{"test", None()}}))))
+    .WillRepeatedly(Return(Future<http::Response>())); // Stop subsequent 
polls.
+
+  const Duration pollInterval = Seconds(10);
+  loadUriDiskProfileAdaptorModule(
+      stringify(server.get()->process->url()), pollInterval);
+
+  setupResourceProviderConfig(Gigabytes(2));
+
+  master::Flags masterFlags = CreateMasterFlags();
+  Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
+  ASSERT_SOME(master);
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+
+  slave::Flags slaveFlags = CreateSlaveFlags();
+  slaveFlags.disk_profile_adaptor = URI_DISK_PROFILE_ADAPTOR_NAME;
+
+  // Since the local resource provider daemon is started after the agent
+  // is registered, it is guaranteed that the slave will send two
+  // `UpdateSlaveMessage`s, where the latter one contains resources from
+  // the storage local resource provider.
+  //
+  // NOTE: The order of the two `FUTURE_PROTOBUF`s is reversed because
+  // Google Mock will search the expectations in reverse order.
+  Future<UpdateSlaveMessage> updateSlave2 =
+    FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
+  Future<UpdateSlaveMessage> updateSlave1 =
+    FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
+
+  Future<SlaveRegisteredMessage> slaveRegisteredMessage =
+    FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
+
+  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), slaveFlags);
+  ASSERT_SOME(slave);
+
+  // Advance the clock to trigger agent registration and prevent retry.
+  Clock::advance(slaveFlags.registration_backoff_factor);
+
+  AWAIT_READY(updateSlave1);
+
+  // NOTE: We need to resume the clock so that the resource provider can
+  // periodically check if the CSI endpoint socket has been created by
+  // the plugin container, which runs in another Linux process.
+  Clock::resume();
+
+  AWAIT_READY(updateSlave2);
+  ASSERT_TRUE(updateSlave2->has_resource_providers());
+
+  Clock::pause();
+
+  AWAIT_READY(slaveRegisteredMessage);
+
+  // By default, all resource provider resources are reserved for the 'storage'
+  // role. We'll refine this role to be able to add a reservation to the
+  // existing one.
+  const string role = string("storage/") + DEFAULT_TEST_ROLE;
+
+  // Register a framework to create a 'MOUNT' volume and verify operator
+  // operation results by checking received offers.
+  FrameworkInfo framework = DEFAULT_FRAMEWORK_INFO;
+  framework.set_roles(0, role);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, framework, master.get()->pid, DEFAULT_CREDENTIAL);
+
+  EXPECT_CALL(sched, registered(&driver, _, _));
+
+  // Expect some offers to be rescinded as consequence of the operator
+  // operations.
+  EXPECT_CALL(sched, offerRescinded(_, _))
+    .WillRepeatedly(Return());
+
+  Future<vector<Offer>> offers;
+
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillRepeatedly(DeclineOffers());
+
+  // Create a 'MOUNT' volume from 'RAW' resources, because persistent volumes
+  // are only supported for 'MOUNT' volumes.
+  {
+    EXPECT_CALL(
+        sched,
+        resourceOffers(
+            &driver,
+            OffersHaveAnyResource(
+                std::bind(isStoragePool<Resource>, lambda::_1, "test"))))
+      .WillOnce(FutureArg<1>(&offers));
+
+    driver.start();
+
+    Clock::advance(masterFlags.allocation_interval);
+
+    AWAIT_READY(offers);
+    ASSERT_EQ(1u, offers->size());
+
+    const Offer offer = offers->at(0);
+
+    Resource raw = *Resources(offer.resources())
+      .filter(std::bind(isStoragePool<Resource>, lambda::_1, "test"))
+      .begin();
+
+    EXPECT_CALL(
+        sched,
+        resourceOffers(
+            &driver,
+            OffersHaveAnyResource(
+                std::bind(isMountDisk<Resource>, lambda::_1, "test"))))
+      .WillOnce(FutureArg<1>(&offers));
+
+    driver.acceptOffers(
+        {offer.id()}, {CREATE_DISK(raw, Resource::DiskInfo::Source::MOUNT)});
+
+    // NOTE: We need to resume the clock so that the resource provider can
+    // periodically check if the CSI endpoint socket has been created by
+    // the plugin container, which runs in another Linux process.
+    Clock::resume();
+
+    // Wait for an offer that contains the 'MOUNT' volume with default
+    // reservation.
+    AWAIT_READY(offers);
+    ASSERT_EQ(1u, offers->size());
+
+    Clock::pause();
+  }
+
+  Resource mountDisk = *Resources(offers->at(0).resources())
+    .filter(std::bind(isMountDisk<Resource>, lambda::_1, "test"))
+    .begin();
+  const SlaveID slaveId = offers->at(0).slave_id();
+
+  Resources reserved =
+    Resources(mountDisk).pushReservation(createDynamicReservationInfo(
+        role,
+        DEFAULT_CREDENTIAL.principal()));
+  reserved.unallocate();
+
+  auto isReservedMountDisk = [](const Resource& r) {
+    return
+        r.has_disk() &&
+        r.disk().has_source() &&
+        r.disk().source().type() == Resource::DiskInfo::Source::MOUNT &&
+        r.disk().source().has_vendor() &&
+        r.disk().source().vendor() == TEST_CSI_VENDOR &&
+        r.disk().source().has_id() &&
+        r.disk().source().has_profile() &&
+        !r.disk().has_persistence() &&
+        r.reservations().size() == 2; // Existing and refined reservation.
+  };
+
+  // Reserve resources as operator.
+  {
+    v1::master::Call v1ReserveResourcesCall;
+    v1ReserveResourcesCall.set_type(v1::master::Call::RESERVE_RESOURCES);
+
+    v1::master::Call::ReserveResources* reserveResources =
+      v1ReserveResourcesCall.mutable_reserve_resources();
+
+    reserveResources->mutable_agent_id()->CopyFrom(evolve(slaveId));
+    reserveResources->mutable_resources()->CopyFrom(evolve(reserved));
+
+    EXPECT_CALL(
+        sched,
+        resourceOffers(&driver, OffersHaveAnyResource(isReservedMountDisk)))
+      .WillOnce(FutureArg<1>(&offers));
+
+    Future<http::Response> v1ReserveResourcesResponse = http::post(
+        master.get()->pid,
+        "api/v1",
+        createBasicAuthHeaders(DEFAULT_CREDENTIAL),
+        serialize(ContentType::PROTOBUF, v1ReserveResourcesCall),
+        stringify(ContentType::PROTOBUF));
+
+    AWAIT_EXPECT_RESPONSE_STATUS_EQ(
+        http::Accepted().status, v1ReserveResourcesResponse);
+
+    Clock::advance(masterFlags.allocation_interval);
+
+    // Wait for an offer that contains the 'MOUNT' volume with refined
+    // reservation.
+    AWAIT_READY(offers);
+    ASSERT_EQ(1u, offers->size());
+  }
+
+  vector<Resource> converted;
+  foreach (Resource resource, reserved) {
+    if (Resources::isDisk(resource, Resource::DiskInfo::Source::MOUNT)) {
+      resource.mutable_disk()->mutable_persistence()->set_id(
+          id::UUID::random().toString());
+      resource.mutable_disk()->mutable_persistence()->set_principal(
+          DEFAULT_CREDENTIAL.principal());
+      resource.mutable_disk()->mutable_volume()->set_container_path("volume");
+      resource.mutable_disk()->mutable_volume()->set_mode(Volume::RW);
+      converted.push_back(resource);
+    }
+  }
+
+  Resources volumes(converted);
+
+  auto isPersistedMountDisk = [](const Resource& r) {
+    return
+        r.has_disk() &&
+        r.disk().has_source() &&
+        r.disk().source().type() == Resource::DiskInfo::Source::MOUNT &&
+        r.disk().source().has_vendor() &&
+        r.disk().source().vendor() == TEST_CSI_VENDOR &&
+        r.disk().source().has_id() &&
+        r.disk().source().has_profile() &&
+        r.disk().has_persistence();
+  };
+
+  // Create persistent volumes as operator.
+  {
+    v1::master::Call v1CreateVolumesCall;
+    v1CreateVolumesCall.set_type(v1::master::Call::CREATE_VOLUMES);
+
+    v1::master::Call::CreateVolumes* createVolumes =
+      v1CreateVolumesCall.mutable_create_volumes();
+
+    createVolumes->mutable_agent_id()->CopyFrom(evolve(slaveId));
+    createVolumes->mutable_volumes()->CopyFrom(evolve(volumes));
+
+    EXPECT_CALL(
+        sched,
+        resourceOffers(&driver, OffersHaveAnyResource(isPersistedMountDisk)))
+      .WillOnce(FutureArg<1>(&offers));
+
+    Future<http::Response> v1CreateVolumesResponse = http::post(
+        master.get()->pid,
+        "api/v1",
+        createBasicAuthHeaders(DEFAULT_CREDENTIAL),
+        serialize(ContentType::PROTOBUF, v1CreateVolumesCall),
+        stringify(ContentType::PROTOBUF));
+
+    AWAIT_EXPECT_RESPONSE_STATUS_EQ(
+        http::Accepted().status, v1CreateVolumesResponse);
+
+    // Remove all offer filters. Existing filters would stop us from
+    // getting offers otherwise.
+    driver.reviveOffers();
+
+    Clock::advance(masterFlags.allocation_interval);
+
+    // Wait for an offer that contains the 'MOUNT' volume with refined
+    // reservation and persistence.
+    AWAIT_READY(offers);
+    ASSERT_EQ(1u, offers->size());
+  }
+
+  // Destroy persistent volumes as operator.
+  {
+    v1::master::Call v1DestroyVolumesCall;
+    v1DestroyVolumesCall.set_type(v1::master::Call::DESTROY_VOLUMES);
+
+    v1::master::Call::DestroyVolumes* destroyVolumes =
+      v1DestroyVolumesCall.mutable_destroy_volumes();
+
+    destroyVolumes->mutable_agent_id()->CopyFrom(evolve(slaveId));
+    destroyVolumes->mutable_volumes()->CopyFrom(evolve(volumes));
+
+    EXPECT_CALL(
+        sched,
+        resourceOffers(&driver, OffersHaveAnyResource(isReservedMountDisk)))
+      .WillOnce(FutureArg<1>(&offers));
+
+    Future<http::Response> v1DestroyVolumesResponse = http::post(
+        master.get()->pid,
+        "api/v1",
+        createBasicAuthHeaders(DEFAULT_CREDENTIAL),
+        serialize(ContentType::PROTOBUF, v1DestroyVolumesCall),
+        stringify(ContentType::PROTOBUF));
+
+    AWAIT_EXPECT_RESPONSE_STATUS_EQ(
+        http::Accepted().status, v1DestroyVolumesResponse);
+
+    Clock::advance(masterFlags.allocation_interval);
+
+    // Wait for an offer that contains the 'MOUNT' volume with refined
+    // reservation.
+    AWAIT_READY(offers);
+    ASSERT_EQ(1u, offers->size());
+  }
+
+  // Unreserve resources as operator.
+  {
+    v1::master::Call v1UnreserveResourcesCall;
+    v1UnreserveResourcesCall.set_type(v1::master::Call::UNRESERVE_RESOURCES);
+    v1::master::Call::UnreserveResources* unreserveResources =
+      v1UnreserveResourcesCall.mutable_unreserve_resources();
+
+    unreserveResources->mutable_agent_id()->CopyFrom(evolve(slaveId));
+    unreserveResources->mutable_resources()->CopyFrom(evolve(reserved));
+
+    EXPECT_CALL(
+        sched,
+        resourceOffers(
+            &driver,
+            OffersHaveAnyResource(
+                std::bind(isMountDisk<Resource>, lambda::_1, "test"))))
+      .WillOnce(FutureArg<1>(&offers));
+
+    Future<http::Response> v1UnreserveResourcesResponse = http::post(
+        master.get()->pid,
+        "api/v1",
+        createBasicAuthHeaders(DEFAULT_CREDENTIAL),
+        serialize(ContentType::PROTOBUF, v1UnreserveResourcesCall),
+        stringify(ContentType::PROTOBUF));
+
+    AWAIT_EXPECT_RESPONSE_STATUS_EQ(
+        http::Accepted().status, v1UnreserveResourcesResponse);
+
+    Clock::advance(masterFlags.allocation_interval);
+
+    // Wait for an offer that contains the 'MOUNT' volume with default
+    // reservation.
+    AWAIT_READY(offers);
+    ASSERT_EQ(1u, offers->size());
+  }
+}
+
 } // namespace tests {
 } // namespace internal {
 } // namespace mesos {

Reply via email to