Made the master include the operation ID in OPERATION_DROPPED updates. Review: https://reviews.apache.org/r/66924/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/a570f943 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/a570f943 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/a570f943 Branch: refs/heads/master Commit: a570f9436b816d40ba3d01455211f5d61f77d66d Parents: b4c541b Author: Gaston Kleiman <[email protected]> Authored: Mon May 7 17:32:56 2018 -0700 Committer: Greg Mann <[email protected]> Committed: Mon May 7 18:15:30 2018 -0700 ---------------------------------------------------------------------- src/master/master.cpp | 21 ++- src/master/master.hpp | 2 +- src/tests/master_slave_reconciliation_tests.cpp | 175 +++++++++++++++++++ src/tests/mesos.hpp | 1 + 4 files changed, 193 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/a570f943/src/master/master.cpp ---------------------------------------------------------------------- diff --git a/src/master/master.cpp b/src/master/master.cpp index 28a0661..f48a4f7 100644 --- a/src/master/master.cpp +++ b/src/master/master.cpp @@ -8417,8 +8417,7 @@ void Master::forward( } -void Master::updateOperationStatus( - const UpdateOperationStatusMessage& update) +void Master::updateOperationStatus(UpdateOperationStatusMessage&& update) { CHECK(update.has_slave_id()) << "External resource provider is not supported yet"; @@ -8470,6 +8469,21 @@ void Master::updateOperationStatus( return; } + if (operation->info().has_id()) { + // Agents don't include the framework and operation IDs when sending + // operation status updates for dropped operations in response to a + // `ReconcileOperationsMessage`, but they can be deduced from the operation + // info kept on the master. + + // Only operations done via the scheduler API can have an ID. + CHECK(operation->has_framework_id()); + + frameworkId = operation->framework_id(); + + update.mutable_status()->mutable_operation_id()->CopyFrom( + operation->info().id()); + } + updateOperation(operation, update); CHECK(operation->statuses_size() > 0); @@ -8477,9 +8491,6 @@ void Master::updateOperationStatus( const OperationStatus& latestStatus = *operation->statuses().rbegin(); if (operation->info().has_id()) { - // Only operations done via the scheduler API can have an ID. - CHECK_SOME(frameworkId); - // Forward the status update to the framework. Framework* framework = getFramework(frameworkId.get()); http://git-wip-us.apache.org/repos/asf/mesos/blob/a570f943/src/master/master.hpp ---------------------------------------------------------------------- diff --git a/src/master/master.hpp b/src/master/master.hpp index 76e7763..5ec764b 100644 --- a/src/master/master.hpp +++ b/src/master/master.hpp @@ -510,7 +510,7 @@ public: ReconcileTasksMessage&& reconcileTasksMessage); void updateOperationStatus( - const UpdateOperationStatusMessage& update); + UpdateOperationStatusMessage&& update); void exitedExecutor( const process::UPID& from, http://git-wip-us.apache.org/repos/asf/mesos/blob/a570f943/src/tests/master_slave_reconciliation_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/master_slave_reconciliation_tests.cpp b/src/tests/master_slave_reconciliation_tests.cpp index 71e22af..937bab0 100644 --- a/src/tests/master_slave_reconciliation_tests.cpp +++ b/src/tests/master_slave_reconciliation_tests.cpp @@ -419,6 +419,181 @@ TEST_F(MasterSlaveReconciliationTest, ReconcileDroppedOperation) reconcileOperationsMessage->operations(0).operation_uuid()); } +// The master reconciles operations that are missing from a re-registering +// agent. +// +// In this case, the `ApplyOperationMessage` is dropped, so the agent should +// respond with a OPERATION_DROPPED operation status update. +// +// This test verifies that if an operation ID is set, the framework receives +// the OPERATION_DROPPED operation status update. +// +// This is a regression test for MESOS-8784. +TEST_F( + MasterSlaveReconciliationTest, + ForwardOperationDroppedAfterExplicitReconciliation) +{ + Clock::pause(); + + mesos::internal::master::Flags masterFlags = CreateMasterFlags(); + Try<Owned<cluster::Master>> master = StartMaster(masterFlags); + ASSERT_SOME(master); + + Future<UpdateSlaveMessage> updateSlaveMessage = + FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _); + + auto detector = std::make_shared<StandaloneMasterDetector>(master.get()->pid); + + mesos::internal::slave::Flags slaveFlags = CreateSlaveFlags(); + + // Disable HTTP authentication to simplify resource provider interactions. + slaveFlags.authenticate_http_readwrite = false; + + Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), slaveFlags); + ASSERT_SOME(slave); + + // Advance the clock to trigger agent registration. + Clock::advance(slaveFlags.registration_backoff_factor); + + // Wait for the agent to register. + AWAIT_READY(updateSlaveMessage); + + // Start and register a resource provider. + + v1::ResourceProviderInfo resourceProviderInfo; + resourceProviderInfo.set_type("org.apache.mesos.rp.test"); + resourceProviderInfo.set_name("test"); + + v1::Resource disk = v1::createDiskResource( + "200", "*", None(), None(), v1::createDiskSourceRaw()); + + Owned<v1::MockResourceProvider> resourceProvider( + new v1::MockResourceProvider(resourceProviderInfo, v1::Resources(disk))); + + // Make the mock resource provider answer to reconciliation events with + // OPERATION_DROPPED operation status updates. + auto reconcileOperations = + [&resourceProvider]( + const v1::resource_provider::Event::ReconcileOperations& reconcile) { + foreach (const v1::UUID& operationUuid, reconcile.operation_uuids()) { + v1::resource_provider::Call call; + + call.set_type(v1::resource_provider::Call::UPDATE_OPERATION_STATUS); + call.mutable_resource_provider_id()->CopyFrom( + resourceProvider->info.id()); + + v1::resource_provider::Call::UpdateOperationStatus* + updateOperationStatus = call.mutable_update_operation_status(); + + updateOperationStatus->mutable_status()->set_state( + v1::OPERATION_DROPPED); + + updateOperationStatus->mutable_operation_uuid()->CopyFrom( + operationUuid); + + resourceProvider->send(call); + } + }; + + EXPECT_CALL(*resourceProvider, reconcileOperations(_)) + .WillOnce(Invoke(reconcileOperations)); + + Owned<EndpointDetector> endpointDetector( + mesos::internal::tests::resource_provider::createEndpointDetector( + slave.get()->pid)); + + updateSlaveMessage = FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _); + + // NOTE: We need to resume the clock so that the resource provider can + // fully register. + Clock::resume(); + + ContentType contentType = ContentType::PROTOBUF; + + resourceProvider->start( + endpointDetector, contentType, v1::DEFAULT_CREDENTIAL); + + // Wait until the agent's resources have been updated to include the + // resource provider resources. + AWAIT_READY(updateSlaveMessage); + + Clock::pause(); + + // Start a v1 framework. + auto scheduler = std::make_shared<v1::MockHTTPScheduler>(); + + v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO; + frameworkInfo.set_roles(0, DEFAULT_TEST_ROLE); + + EXPECT_CALL(*scheduler, connected(_)) + .WillOnce(v1::scheduler::SendSubscribe(frameworkInfo)); + + Future<v1::scheduler::Event::Subscribed> subscribed; + EXPECT_CALL(*scheduler, subscribed(_, _)) + .WillOnce(FutureArg<1>(&subscribed)); + + // Ignore heartbeats. + EXPECT_CALL(*scheduler, heartbeat(_)) + .WillRepeatedly(Return()); + + Future<v1::scheduler::Event::Offers> offers; + + EXPECT_CALL(*scheduler, offers(_, _)) + .WillOnce(FutureArg<1>(&offers)) + .WillRepeatedly(v1::scheduler::DeclineOffers()); + + v1::scheduler::TestMesos mesos(master.get()->pid, contentType, scheduler); + + AWAIT_READY(subscribed); + v1::FrameworkID frameworkId(subscribed->framework_id()); + + AWAIT_READY(offers); + ASSERT_FALSE(offers->offers().empty()); + + const v1::Offer& offer = offers->offers(0); + + // We'll drop the `ApplyOperationMessage` from the master to the agent. + Future<ApplyOperationMessage> applyOperationMessage = + DROP_PROTOBUF(ApplyOperationMessage(), master.get()->pid, _); + + v1::Resources resources = + v1::Resources(offer.resources()).filter([](const v1::Resource& resource) { + return resource.has_provider_id(); + }); + + ASSERT_FALSE(resources.empty()); + + v1::Resource reserved = *(resources.begin()); + reserved.add_reservations()->CopyFrom( + v1::createDynamicReservationInfo( + frameworkInfo.roles(0), DEFAULT_CREDENTIAL.principal())); + + v1::OperationID operationId; + operationId.set_value("operation"); + + mesos.send(v1::createCallAccept( + frameworkId, offer, {v1::RESERVE(reserved, operationId.value())})); + + AWAIT_READY(applyOperationMessage); + + Future<v1::scheduler::Event::UpdateOperationStatus> operationDroppedUpdate; + EXPECT_CALL(*scheduler, updateOperationStatus(_, _)) + .WillOnce(FutureArg<1>(&operationDroppedUpdate)); + + // Simulate a spurious master change event (e.g., due to ZooKeeper + // expiration) at the slave to force re-registration. + detector->appoint(master.get()->pid); + + // Advance the clock, so that the agent re-registers. + Clock::advance(slaveFlags.registration_backoff_factor); + + // Wait for the framework to receive the OPERATION_DROPPED update. + AWAIT_READY(operationDroppedUpdate); + + EXPECT_EQ(operationId, operationDroppedUpdate->status().operation_id()); + EXPECT_EQ(v1::OPERATION_DROPPED, operationDroppedUpdate->status().state()); +} + // This test verifies that the master reconciles tasks that are // missing from a reregistering slave. In this case, we trigger // a race between the slave re-registration message and the launch http://git-wip-us.apache.org/repos/asf/mesos/blob/a570f943/src/tests/mesos.hpp ---------------------------------------------------------------------- diff --git a/src/tests/mesos.hpp b/src/tests/mesos.hpp index 8da3b02..b945edf 100644 --- a/src/tests/mesos.hpp +++ b/src/tests/mesos.hpp @@ -466,6 +466,7 @@ using mesos::v1::TaskInfo; using mesos::v1::TaskGroupInfo; using mesos::v1::TaskState; using mesos::v1::TaskStatus; +using mesos::v1::UUID; using mesos::v1::WeightInfo; } // namespace v1 {
