This is an automated email from the ASF dual-hosted git repository.

josephwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit 01d4c74f30cf32431c0bd70e5743dcdffab91b0e
Author: Joseph Wu <[email protected]>
AuthorDate: Wed Jan 30 17:32:31 2019 -0800

    Added test for tearing down frameworks while creating disks.
    
    The CREATE_DISK and DESTROY_DISK operations are "non-speculative"
    operations, which means the master must wait for the operations to
    complete successfully before the master can update its resources.
    Because the master must wait to update the results of non-speculative
    operations, it is possible for the framework making the
    CREATE/DESTROY_DISK to be torn down before the operation completes.
    
    This commit adds a test to make sure the master can gracefully handle
    such a case.
    
    Review: https://reviews.apache.org/r/69869
---
 .../storage_local_resource_provider_tests.cpp      | 170 +++++++++++++++++++++
 1 file changed, 170 insertions(+)

diff --git a/src/tests/storage_local_resource_provider_tests.cpp 
b/src/tests/storage_local_resource_provider_tests.cpp
index a661951..6b38c4f 100644
--- a/src/tests/storage_local_resource_provider_tests.cpp
+++ b/src/tests/storage_local_resource_provider_tests.cpp
@@ -5123,6 +5123,176 @@ TEST_F(StorageLocalResourceProviderTest, 
RetryRpcWithExponentialBackoff)
       "csi_plugin/rpcs/csi.v0.Controller.DeleteVolume/errors")));
 }
 
+
+// This test verifies the master can handle "non-speculative"
+// (i.e. the master cannot assume the success of the operation) operation
+// status updates that originate from frameworks which have been torn down.
+// Non-speculative operations include CREATE_DISK.
+TEST_F(
+    StorageLocalResourceProviderTest,
+    FrameworkTeardownBeforeTerminalOperationStatusUpdate)
+{
+  const string profilesPath = path::join(sandbox.get(), "profiles.json");
+
+  ASSERT_SOME(
+      os::write(profilesPath, createDiskProfileMapping({{"test", None()}})));
+
+  loadUriDiskProfileAdaptorModule(profilesPath);
+
+  setupResourceProviderConfig(Gigabytes(4));
+
+  Try<Owned<cluster::Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+
+  slave::Flags slaveFlags = CreateSlaveFlags();
+  slaveFlags.disk_profile_adaptor = URI_DISK_PROFILE_ADAPTOR_NAME;
+
+  Future<SlaveRegisteredMessage> slaveRegisteredMessage =
+    FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
+
+  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), slaveFlags);
+  ASSERT_SOME(slave);
+
+  AWAIT_READY(slaveRegisteredMessage);
+
+  // Register a framework to exercise an operation.
+  v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO;
+  frameworkInfo.set_roles(0, "storage");
+
+  auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
+
+  // NOTE: The scheduler may connect again after the TEARDOWN call.
+  EXPECT_CALL(*scheduler, connected(_))
+    .WillOnce(v1::scheduler::SendSubscribe(frameworkInfo))
+    .WillRepeatedly(Return());
+
+  Future<v1::scheduler::Event::Subscribed> subscribed;
+  EXPECT_CALL(*scheduler, subscribed(_, _))
+    .WillOnce(FutureArg<1>(&subscribed));
+
+  EXPECT_CALL(*scheduler, heartbeat(_))
+    .WillRepeatedly(Return()); // Ignore heartbeats.
+
+  // Decline offers that contain only the agent's default resources.
+  EXPECT_CALL(*scheduler, offers(_, _))
+    .WillRepeatedly(v1::scheduler::DeclineOffers());
+
+  // We are only interested in an offer with raw disk resources
+  auto isRaw = [](const v1::Resource& r) {
+    return r.has_disk() &&
+      r.disk().has_source() &&
+      r.disk().source().has_profile() &&
+      r.disk().source().type() == v1::Resource::DiskInfo::Source::RAW;
+  };
+
+  Future<v1::scheduler::Event::Offers> offers;
+
+  EXPECT_CALL(
+      *scheduler,
+      offers(_, v1::scheduler::OffersHaveAnyResource(isRaw)))
+    .WillOnce(FutureArg<1>(&offers));
+
+  v1::scheduler::TestMesos mesos(
+      master.get()->pid,
+      ContentType::PROTOBUF,
+      scheduler);
+
+  AWAIT_READY(subscribed);
+
+  const v1::FrameworkID& frameworkId = subscribed->framework_id();
+
+  // Prepare to intercept the terminal operation status message between
+  // agent and master, to allow the framework to teardown first.
+  // This will cause the operation to become an orphan operation.
+  Future<UpdateOperationStatusMessage> updateMessage =
+    DROP_PROTOBUF(
+        UpdateOperationStatusMessage(), slave.get()->pid, master.get()->pid);
+
+  AWAIT_READY(offers);
+  ASSERT_FALSE(offers->offers().empty());
+
+  const v1::Offer& offer = offers->offers(0);
+
+  ASSERT_FALSE(offer.resources().empty());
+
+  // The test relies on retry logic within the SLRP module to resend the
+  // terminal operation feedback message. We pause the clock here to control
+  // when this retry happens.
+  Clock::pause();
+
+  // Have the framework call CREATE_DISK, a non-speculative operation.
+  Option<v1::Resource> rawDisk;
+
+  foreach (const v1::Resource& resource, offer.resources()) {
+    if (isRaw(resource)) {
+      rawDisk = resource;
+      break;
+    }
+  }
+
+  ASSERT_SOME(rawDisk);
+
+  v1::OperationID operationId;
+  operationId.set_value("operation");
+
+  mesos.send(v1::createCallAccept(
+      frameworkId,
+      offer,
+      {v1::CREATE_DISK(
+          rawDisk.get(),
+          v1::Resource::DiskInfo::Source::MOUNT,
+          None(),
+          operationId)}));
+
+  // This message will be sent once the agent has completed the operation.
+  AWAIT_READY(updateMessage);
+
+  EXPECT_EQ(
+      OperationState::OPERATION_FINISHED, updateMessage->status().state());
+  EXPECT_EQ(
+      operationId.value(), updateMessage->status().operation_id().value());
+
+  // We can now tear down the framework, since we are certain the operation
+  // has reached the agent.
+  scheduler.reset();
+
+  {
+    v1::master::Call v1Call;
+    v1Call.set_type(v1::master::Call::TEARDOWN);
+
+    v1::master::Call::Teardown* teardown = v1Call.mutable_teardown();
+
+    teardown->mutable_framework_id()->CopyFrom(frameworkId);
+
+    Future<http::Response> response = process::http::post(
+        master.get()->pid,
+        "api/v1",
+        createBasicAuthHeaders(DEFAULT_CREDENTIAL),
+        serialize(ContentType::PROTOBUF, v1Call),
+        stringify(ContentType::PROTOBUF));
+
+    AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::OK().status, response);
+  }
+
+  // Since the scheduler is gone, the master will acknowledge the operation
+  // status update on its behalf.
+  Future<AcknowledgeOperationStatusMessage> acknowledgeOperationStatusMessage =
+    FUTURE_PROTOBUF(
+        AcknowledgeOperationStatusMessage(),
+        master.get()->pid,
+        slave.get()->pid);
+
+  // Resend the dropped operation status update.
+  Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
+  Clock::settle();
+
+  AWAIT_READY(acknowledgeOperationStatusMessage);
+
+  Clock::resume();
+}
+
 } // namespace tests {
 } // namespace internal {
 } // namespace mesos {

Reply via email to