Made the master include the operation ID in OPERATION_DROPPED updates.

Review: https://reviews.apache.org/r/66924/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/a570f943
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/a570f943
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/a570f943

Branch: refs/heads/master
Commit: a570f9436b816d40ba3d01455211f5d61f77d66d
Parents: b4c541b
Author: Gaston Kleiman <[email protected]>
Authored: Mon May 7 17:32:56 2018 -0700
Committer: Greg Mann <[email protected]>
Committed: Mon May 7 18:15:30 2018 -0700

----------------------------------------------------------------------
 src/master/master.cpp                           |  21 ++-
 src/master/master.hpp                           |   2 +-
 src/tests/master_slave_reconciliation_tests.cpp | 175 +++++++++++++++++++
 src/tests/mesos.hpp                             |   1 +
 4 files changed, 193 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/a570f943/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 28a0661..f48a4f7 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -8417,8 +8417,7 @@ void Master::forward(
 }
 
 
-void Master::updateOperationStatus(
-    const UpdateOperationStatusMessage& update)
+void Master::updateOperationStatus(UpdateOperationStatusMessage&& update)
 {
   CHECK(update.has_slave_id())
     << "External resource provider is not supported yet";
@@ -8470,6 +8469,21 @@ void Master::updateOperationStatus(
     return;
   }
 
+  if (operation->info().has_id()) {
+    // Agents don't include the framework and operation IDs when sending
+    // operation status updates for dropped operations in response to a
+    // `ReconcileOperationsMessage`, but they can be deduced from the operation
+    // info kept on the master.
+
+    // Only operations done via the scheduler API can have an ID.
+    CHECK(operation->has_framework_id());
+
+    frameworkId = operation->framework_id();
+
+    update.mutable_status()->mutable_operation_id()->CopyFrom(
+        operation->info().id());
+  }
+
   updateOperation(operation, update);
 
   CHECK(operation->statuses_size() > 0);
@@ -8477,9 +8491,6 @@ void Master::updateOperationStatus(
   const OperationStatus& latestStatus = *operation->statuses().rbegin();
 
   if (operation->info().has_id()) {
-    // Only operations done via the scheduler API can have an ID.
-    CHECK_SOME(frameworkId);
-
     // Forward the status update to the framework.
     Framework* framework = getFramework(frameworkId.get());
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/a570f943/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 76e7763..5ec764b 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -510,7 +510,7 @@ public:
       ReconcileTasksMessage&& reconcileTasksMessage);
 
   void updateOperationStatus(
-      const UpdateOperationStatusMessage& update);
+      UpdateOperationStatusMessage&& update);
 
   void exitedExecutor(
       const process::UPID& from,

http://git-wip-us.apache.org/repos/asf/mesos/blob/a570f943/src/tests/master_slave_reconciliation_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_slave_reconciliation_tests.cpp 
b/src/tests/master_slave_reconciliation_tests.cpp
index 71e22af..937bab0 100644
--- a/src/tests/master_slave_reconciliation_tests.cpp
+++ b/src/tests/master_slave_reconciliation_tests.cpp
@@ -419,6 +419,181 @@ TEST_F(MasterSlaveReconciliationTest, 
ReconcileDroppedOperation)
       reconcileOperationsMessage->operations(0).operation_uuid());
 }
 
+// The master reconciles operations that are missing from a re-registering
+// agent.
+//
+// In this case, the `ApplyOperationMessage` is dropped, so the agent should
+// respond with a OPERATION_DROPPED operation status update.
+//
+// This test verifies that if an operation ID is set, the framework receives
+// the OPERATION_DROPPED operation status update.
+//
+// This is a regression test for MESOS-8784.
+TEST_F(
+    MasterSlaveReconciliationTest,
+    ForwardOperationDroppedAfterExplicitReconciliation)
+{
+  Clock::pause();
+
+  mesos::internal::master::Flags masterFlags = CreateMasterFlags();
+  Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
+  ASSERT_SOME(master);
+
+  Future<UpdateSlaveMessage> updateSlaveMessage =
+    FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
+
+  auto detector = 
std::make_shared<StandaloneMasterDetector>(master.get()->pid);
+
+  mesos::internal::slave::Flags slaveFlags = CreateSlaveFlags();
+
+  // Disable HTTP authentication to simplify resource provider interactions.
+  slaveFlags.authenticate_http_readwrite = false;
+
+  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), slaveFlags);
+  ASSERT_SOME(slave);
+
+  // Advance the clock to trigger agent registration.
+  Clock::advance(slaveFlags.registration_backoff_factor);
+
+  // Wait for the agent to register.
+  AWAIT_READY(updateSlaveMessage);
+
+  // Start and register a resource provider.
+
+  v1::ResourceProviderInfo resourceProviderInfo;
+  resourceProviderInfo.set_type("org.apache.mesos.rp.test");
+  resourceProviderInfo.set_name("test");
+
+  v1::Resource disk = v1::createDiskResource(
+      "200", "*", None(), None(), v1::createDiskSourceRaw());
+
+  Owned<v1::MockResourceProvider> resourceProvider(
+      new v1::MockResourceProvider(resourceProviderInfo, v1::Resources(disk)));
+
+  // Make the mock resource provider answer to reconciliation events with
+  // OPERATION_DROPPED operation status updates.
+  auto reconcileOperations =
+    [&resourceProvider](
+        const v1::resource_provider::Event::ReconcileOperations& reconcile) {
+      foreach (const v1::UUID& operationUuid, reconcile.operation_uuids()) {
+        v1::resource_provider::Call call;
+
+        call.set_type(v1::resource_provider::Call::UPDATE_OPERATION_STATUS);
+        call.mutable_resource_provider_id()->CopyFrom(
+            resourceProvider->info.id());
+
+        v1::resource_provider::Call::UpdateOperationStatus*
+          updateOperationStatus = call.mutable_update_operation_status();
+
+        updateOperationStatus->mutable_status()->set_state(
+            v1::OPERATION_DROPPED);
+
+        updateOperationStatus->mutable_operation_uuid()->CopyFrom(
+            operationUuid);
+
+        resourceProvider->send(call);
+      }
+    };
+
+  EXPECT_CALL(*resourceProvider, reconcileOperations(_))
+    .WillOnce(Invoke(reconcileOperations));
+
+  Owned<EndpointDetector> endpointDetector(
+      mesos::internal::tests::resource_provider::createEndpointDetector(
+          slave.get()->pid));
+
+  updateSlaveMessage = FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
+
+  // NOTE: We need to resume the clock so that the resource provider can
+  // fully register.
+  Clock::resume();
+
+  ContentType contentType = ContentType::PROTOBUF;
+
+  resourceProvider->start(
+      endpointDetector, contentType, v1::DEFAULT_CREDENTIAL);
+
+  // Wait until the agent's resources have been updated to include the
+  // resource provider resources.
+  AWAIT_READY(updateSlaveMessage);
+
+  Clock::pause();
+
+  // Start a v1 framework.
+  auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
+
+  v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO;
+  frameworkInfo.set_roles(0, DEFAULT_TEST_ROLE);
+
+  EXPECT_CALL(*scheduler, connected(_))
+    .WillOnce(v1::scheduler::SendSubscribe(frameworkInfo));
+
+  Future<v1::scheduler::Event::Subscribed> subscribed;
+  EXPECT_CALL(*scheduler, subscribed(_, _))
+    .WillOnce(FutureArg<1>(&subscribed));
+
+  // Ignore heartbeats.
+  EXPECT_CALL(*scheduler, heartbeat(_))
+    .WillRepeatedly(Return());
+
+  Future<v1::scheduler::Event::Offers> offers;
+
+  EXPECT_CALL(*scheduler, offers(_, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(v1::scheduler::DeclineOffers());
+
+  v1::scheduler::TestMesos mesos(master.get()->pid, contentType, scheduler);
+
+  AWAIT_READY(subscribed);
+  v1::FrameworkID frameworkId(subscribed->framework_id());
+
+  AWAIT_READY(offers);
+  ASSERT_FALSE(offers->offers().empty());
+
+  const v1::Offer& offer = offers->offers(0);
+
+  // We'll drop the `ApplyOperationMessage` from the master to the agent.
+  Future<ApplyOperationMessage> applyOperationMessage =
+    DROP_PROTOBUF(ApplyOperationMessage(), master.get()->pid, _);
+
+  v1::Resources resources =
+    v1::Resources(offer.resources()).filter([](const v1::Resource& resource) {
+      return resource.has_provider_id();
+    });
+
+  ASSERT_FALSE(resources.empty());
+
+  v1::Resource reserved = *(resources.begin());
+  reserved.add_reservations()->CopyFrom(
+      v1::createDynamicReservationInfo(
+          frameworkInfo.roles(0), DEFAULT_CREDENTIAL.principal()));
+
+  v1::OperationID operationId;
+  operationId.set_value("operation");
+
+  mesos.send(v1::createCallAccept(
+      frameworkId, offer, {v1::RESERVE(reserved, operationId.value())}));
+
+  AWAIT_READY(applyOperationMessage);
+
+  Future<v1::scheduler::Event::UpdateOperationStatus> operationDroppedUpdate;
+  EXPECT_CALL(*scheduler, updateOperationStatus(_, _))
+    .WillOnce(FutureArg<1>(&operationDroppedUpdate));
+
+  // Simulate a spurious master change event (e.g., due to ZooKeeper
+  // expiration) at the slave to force re-registration.
+  detector->appoint(master.get()->pid);
+
+  // Advance the clock, so that the agent re-registers.
+  Clock::advance(slaveFlags.registration_backoff_factor);
+
+  // Wait for the framework to receive the OPERATION_DROPPED update.
+  AWAIT_READY(operationDroppedUpdate);
+
+  EXPECT_EQ(operationId, operationDroppedUpdate->status().operation_id());
+  EXPECT_EQ(v1::OPERATION_DROPPED, operationDroppedUpdate->status().state());
+}
+
 // This test verifies that the master reconciles tasks that are
 // missing from a reregistering slave. In this case, we trigger
 // a race between the slave re-registration message and the launch

http://git-wip-us.apache.org/repos/asf/mesos/blob/a570f943/src/tests/mesos.hpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.hpp b/src/tests/mesos.hpp
index 8da3b02..b945edf 100644
--- a/src/tests/mesos.hpp
+++ b/src/tests/mesos.hpp
@@ -466,6 +466,7 @@ using mesos::v1::TaskInfo;
 using mesos::v1::TaskGroupInfo;
 using mesos::v1::TaskState;
 using mesos::v1::TaskStatus;
+using mesos::v1::UUID;
 using mesos::v1::WeightInfo;
 
 } // namespace v1 {

Reply via email to