This is an automated email from the ASF dual-hosted git repository.

josephwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit 8174ccf0983c82e21ca103bc87a5f78d5bff4971
Author: Joseph Wu <[email protected]>
AuthorDate: Tue Feb 19 16:41:51 2019 -0800

    Added test for terminal operation updates after master failover.
    
    This test covers a corner case where an agent reregisters with the
    master with a pending operation, but the operation's originating
    framework is unknown.  This can occur in a variety of situations like:
      * the master fails over and a framework never reregisters,
      * a completed framework is rotated out of the master's memory with
        pending operations, or
      * an agent with pending operations is migrated from one cluster to
        another.
    
    In this case, the master should "adopt" the orphan operation only
    after a delay.  This gives the framework some time to reregister.
    But if the framework does not reregister in time, the master will
    be in charge of acknowledging operation status updates.
    
    Review: https://reviews.apache.org/r/70040
---
 .../storage_local_resource_provider_tests.cpp      | 191 +++++++++++++++++++++
 1 file changed, 191 insertions(+)

diff --git a/src/tests/storage_local_resource_provider_tests.cpp 
b/src/tests/storage_local_resource_provider_tests.cpp
index 6b38c4f..75f0d81 100644
--- a/src/tests/storage_local_resource_provider_tests.cpp
+++ b/src/tests/storage_local_resource_provider_tests.cpp
@@ -5293,6 +5293,197 @@ TEST_F(
   Clock::resume();
 }
 
+
+// This test verifies the master will adopt orphan operations reported
+// by agents upon reregistration. These orphans can appear whenever
+// the master fails over, but the operation's originating framework
+// never reregisters. Alternatively, and uncommonly, pending operations
+// on an agent migrated from one master to another will produce orphans.
+TEST_F(
+    StorageLocalResourceProviderTest,
+    TerminalOrphanOperationAfterMasterFailover)
+{
+  const string profilesPath = path::join(sandbox.get(), "profiles.json");
+
+  ASSERT_SOME(
+      os::write(profilesPath, createDiskProfileMapping({{"test", None()}})));
+
+  loadUriDiskProfileAdaptorModule(profilesPath);
+
+  setupResourceProviderConfig(Gigabytes(4));
+
+  master::Flags masterFlags = CreateMasterFlags();
+
+  // Greatly increase the ping timeout of the master/agent connection.
+  // The clock will be advanced by `MIN_WAIT_BEFORE_ORPHAN_OPERATION_ADOPTION`
+  // and if the ping timeout is less than this amount, the agent will
+  // disconnect.
+  masterFlags.agent_ping_timeout =
+    master::DEFAULT_AGENT_PING_TIMEOUT +
+    master::MIN_WAIT_BEFORE_ORPHAN_OPERATION_ADOPTION;
+
+  Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
+  ASSERT_SOME(master);
+
+  // NOTE: This test needs to use the StandaloneMasterDetector directly
+  // (instead of the `cluster::Master::createDetector` helper) so that
+  // the restarted master can be detected by the running agent, when the
+  // test dictates.
+  StandaloneMasterDetector detector(master.get()->pid);
+
+  slave::Flags slaveFlags = CreateSlaveFlags();
+  slaveFlags.disk_profile_adaptor = URI_DISK_PROFILE_ADAPTOR_NAME;
+
+  Future<SlaveRegisteredMessage> slaveRegisteredMessage =
+    FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
+
+  Try<Owned<cluster::Slave>> slave = StartSlave(&detector, slaveFlags);
+  ASSERT_SOME(slave);
+
+  AWAIT_READY(slaveRegisteredMessage);
+
+  // Register a framework to exercise an operation.
+  v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO;
+  frameworkInfo.set_roles(0, "storage");
+
+  auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
+
+  // NOTE: The scheduler may connect again after the TEARDOWN call.
+  EXPECT_CALL(*scheduler, connected(_))
+    .WillOnce(v1::scheduler::SendSubscribe(frameworkInfo))
+    .WillRepeatedly(Return());
+
+  Future<v1::scheduler::Event::Subscribed> subscribed;
+  EXPECT_CALL(*scheduler, subscribed(_, _))
+    .WillOnce(FutureArg<1>(&subscribed));
+
+  EXPECT_CALL(*scheduler, heartbeat(_))
+    .WillRepeatedly(Return()); // Ignore heartbeats.
+
+  // Decline offers that contain only the agent's default resources.
+  EXPECT_CALL(*scheduler, offers(_, _))
+    .WillRepeatedly(v1::scheduler::DeclineOffers());
+
+  // We are only interested in an offer with raw disk resources
+  auto isRaw = [](const v1::Resource& r) {
+    return r.has_disk() &&
+      r.disk().has_source() &&
+      r.disk().source().has_profile() &&
+      r.disk().source().type() == v1::Resource::DiskInfo::Source::RAW;
+  };
+
+  Future<v1::scheduler::Event::Offers> offers;
+
+  EXPECT_CALL(
+      *scheduler,
+      offers(_, v1::scheduler::OffersHaveAnyResource(isRaw)))
+    .WillOnce(FutureArg<1>(&offers));
+
+  v1::scheduler::TestMesos mesos(
+      master.get()->pid,
+      ContentType::PROTOBUF,
+      scheduler);
+
+  AWAIT_READY(subscribed);
+
+  const v1::FrameworkID& frameworkId = subscribed->framework_id();
+
+  // Prepare to intercept the terminal operation status message between
+  // agent and master, to allow the master to failover first.
+  // This creates a situation where the master forgets about the
+  // framework that started the operation.
+  Future<UpdateOperationStatusMessage> updateMessage =
+    DROP_PROTOBUF(
+        UpdateOperationStatusMessage(), slave.get()->pid, master.get()->pid);
+
+  AWAIT_READY(offers);
+  ASSERT_FALSE(offers->offers().empty());
+
+  const v1::Offer& offer = offers->offers(0);
+
+  ASSERT_FALSE(offer.resources().empty());
+
+  // The test relies on retry logic within the SLRP module to resend the
+  // terminal operation feedback message. We pause the clock here to control
+  // when this retry happens.
+  Clock::pause();
+
+  // Have the framework call CREATE_DISK, a non-speculative operation.
+  Option<v1::Resource> rawDisk;
+
+  foreach (const v1::Resource& resource, offer.resources()) {
+    if (isRaw(resource)) {
+      rawDisk = resource;
+      break;
+    }
+  }
+
+  ASSERT_SOME(rawDisk);
+
+  v1::OperationID operationId;
+  operationId.set_value("operation");
+
+  mesos.send(v1::createCallAccept(
+      frameworkId,
+      offer,
+      {v1::CREATE_DISK(
+          rawDisk.get(),
+          v1::Resource::DiskInfo::Source::MOUNT,
+          None(),
+          operationId)}));
+
+  // This message will be sent once the agent has completed the operation.
+  AWAIT_READY(updateMessage);
+
+  EXPECT_EQ(
+      OperationState::OPERATION_FINISHED, updateMessage->status().state());
+  EXPECT_EQ(
+      operationId.value(), updateMessage->status().operation_id().value());
+
+  Future<SlaveReregisteredMessage> slaveReregisteredMessage =
+    FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _);
+
+  // Trigger a master failover to enter a situation where a terminal operation
+  // is orphaned, and the originating framework is not "completed".
+  detector.appoint(None());
+  master->reset();
+
+  // Get rid of the framework so that it does not reregister.
+  // We want the framework to remain unknown after the master/agent recovers.
+  scheduler.reset();
+
+  // Start the master back up and wait for the agent to reregister.
+  master = StartMaster(masterFlags);
+  ASSERT_SOME(master);
+  detector.appoint(master.get()->pid);
+
+  Clock::advance(slaveFlags.registration_backoff_factor);
+  AWAIT_READY(slaveReregisteredMessage);
+
+  Future<AcknowledgeOperationStatusMessage> acknowledgeOperationStatusMessage =
+    FUTURE_PROTOBUF(
+        AcknowledgeOperationStatusMessage(),
+        master.get()->pid,
+        slave.get()->pid);
+
+  // Trigger a retry of the operation status update. The master should drop
+  // this one because the agent has just reregistered and the framework is
+  // unknown.
+  Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
+  Clock::settle();
+
+  ASSERT_TRUE(acknowledgeOperationStatusMessage.isPending());
+
+  // Trigger a retry after advancing the clock beyond the operation "adoption"
+  // time, which means the master should acknowledge the terminal status.
+  Clock::advance(master::MIN_WAIT_BEFORE_ORPHAN_OPERATION_ADOPTION);
+  Clock::settle();
+
+  AWAIT_READY(acknowledgeOperationStatusMessage);
+
+  Clock::resume();
+}
+
 } // namespace tests {
 } // namespace internal {
 } // namespace mesos {

Reply via email to