Made the master drop operations with an ID on agent default resources. Review: https://reviews.apache.org/r/66992/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/7dd18560 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/7dd18560 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/7dd18560 Branch: refs/heads/1.6.x Commit: 7dd18560dc657cf629204b892d0bd149cef3bc05 Parents: 634941f Author: Gaston Kleiman <[email protected]> Authored: Mon May 7 17:32:15 2018 -0700 Committer: Greg Mann <[email protected]> Committed: Mon May 7 18:21:42 2018 -0700 ---------------------------------------------------------------------- src/master/master.cpp | 8 +++ src/tests/master_tests.cpp | 75 +++++++++++++++++++++++ src/tests/operation_reconciliation_tests.cpp | 69 ++++++++++++++++++--- 3 files changed, 144 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/7dd18560/src/master/master.cpp ---------------------------------------------------------------------- diff --git a/src/master/master.cpp b/src/master/master.cpp index 3b5d2eb..28a0661 100644 --- a/src/master/master.cpp +++ b/src/master/master.cpp @@ -4235,6 +4235,14 @@ void Master::accept( break; } + if (getResourceProviderId(operation).isNone()) { + drop(framework, + operation, + "Operation requested feedback, but it affects resources not" + " managed by a resource provider"); + break; + } + if (!slave->capabilities.resourceProvider) { drop(framework, operation, http://git-wip-us.apache.org/repos/asf/mesos/blob/7dd18560/src/tests/master_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/master_tests.cpp b/src/tests/master_tests.cpp index e159573..0f6042c 100644 --- a/src/tests/master_tests.cpp +++ b/src/tests/master_tests.cpp @@ -8954,6 +8954,81 @@ TEST_F(MasterTest, OperationUpdateDuringFailover) } +// Tests that the master correctly drops an operation if the operation's 'id' +// field is set and the operation affects resources not managed by a resource +// provider. +TEST_F(MasterTest, DropOperationWithIDAffectingDefaultResources) +{ + Clock::pause(); + + Try<Owned<cluster::Master>> master = StartMaster(); + ASSERT_SOME(master); + + mesos::internal::slave::Flags slaveFlags = CreateSlaveFlags(); + Owned<MasterDetector> detector = master.get()->createDetector(); + Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), slaveFlags); + ASSERT_SOME(slave); + + // Advance the clock to trigger agent registration. + Clock::advance(slaveFlags.registration_backoff_factor); + + // Start a v1 framework. + auto scheduler = std::make_shared<v1::MockHTTPScheduler>(); + + v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO; + frameworkInfo.set_roles(0, DEFAULT_TEST_ROLE); + + EXPECT_CALL(*scheduler, connected(_)) + .WillOnce(v1::scheduler::SendSubscribe(frameworkInfo)); + + Future<v1::scheduler::Event::Subscribed> subscribed; + EXPECT_CALL(*scheduler, subscribed(_, _)) + .WillOnce(FutureArg<1>(&subscribed)); + + // Ignore heartbeats. + EXPECT_CALL(*scheduler, heartbeat(_)) + .WillRepeatedly(Return()); + + Future<v1::scheduler::Event::Offers> offers; + + EXPECT_CALL(*scheduler, offers(_, _)) + .WillOnce(FutureArg<1>(&offers)) + .WillRepeatedly(v1::scheduler::DeclineOffers()); + + v1::scheduler::TestMesos mesos( + master.get()->pid, ContentType::PROTOBUF, scheduler); + + AWAIT_READY(subscribed); + v1::FrameworkID frameworkId(subscribed->framework_id()); + + AWAIT_READY(offers); + ASSERT_FALSE(offers->offers().empty()); + + const v1::Offer& offer = offers->offers(0); + + Future<v1::scheduler::Event::UpdateOperationStatus> operationErrorUpdate; + EXPECT_CALL(*scheduler, updateOperationStatus(_, _)) + .WillOnce(FutureArg<1>(&operationErrorUpdate)); + + v1::Resource reserved = *(offer.resources().begin()); + reserved.add_reservations()->CopyFrom( + v1::createDynamicReservationInfo( + frameworkInfo.roles(0), DEFAULT_CREDENTIAL.principal())); + + v1::OperationID operationId; + operationId.set_value("operation"); + + mesos.send(v1::createCallAccept( + frameworkId, offer, {v1::RESERVE(reserved, operationId.value())})); + + // Wait for the framework to receive the OPERATION_ERROR update. + AWAIT_READY(operationErrorUpdate); + + EXPECT_EQ(operationId, operationErrorUpdate->status().operation_id()); + EXPECT_EQ(v1::OPERATION_ERROR, operationErrorUpdate->status().state()); +} + + class MasterTestPrePostReservationRefinement : public MasterTest, public WithParamInterface<bool> { http://git-wip-us.apache.org/repos/asf/mesos/blob/7dd18560/src/tests/operation_reconciliation_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/operation_reconciliation_tests.cpp b/src/tests/operation_reconciliation_tests.cpp index 9717e84..39cf188 100644 --- a/src/tests/operation_reconciliation_tests.cpp +++ b/src/tests/operation_reconciliation_tests.cpp @@ -74,14 +74,60 @@ TEST_P(OperationReconciliationTest, PendingOperation) Try<Owned<cluster::Master>> master = StartMaster(); ASSERT_SOME(master); + Future<UpdateSlaveMessage> updateSlaveMessage = + FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _); + Owned<MasterDetector> detector = master.get()->createDetector(); mesos::internal::slave::Flags slaveFlags = CreateSlaveFlags(); + + // Disable HTTP authentication to simplify resource provider interactions. + slaveFlags.authenticate_http_readwrite = false; + Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), slaveFlags); ASSERT_SOME(slave); // Advance the clock to trigger agent registration. Clock::advance(slaveFlags.registration_backoff_factor); + // Wait for the agent to register. + AWAIT_READY(updateSlaveMessage); + + // Start and register a resource provider. + + ResourceProviderInfo resourceProviderInfo; + resourceProviderInfo.set_type("org.apache.mesos.rp.test"); + resourceProviderInfo.set_name("test"); + + Resource disk = + createDiskResource("200", "*", None(), None(), createDiskSourceRaw()); + + Owned<MockResourceProvider> resourceProvider( + new MockResourceProvider( + resourceProviderInfo, + Resources(disk))); + + Owned<EndpointDetector> endpointDetector( + mesos::internal::tests::resource_provider::createEndpointDetector( + slave.get()->pid)); + + updateSlaveMessage = FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _); + + // NOTE: We need to resume the clock so that the resource provider can + // fully register. + Clock::resume(); + + ContentType contentType = GetParam(); + + resourceProvider->start(endpointDetector, contentType, DEFAULT_CREDENTIAL); + + // Wait until the agent's resources have been updated to include the + // resource provider resources. + AWAIT_READY(updateSlaveMessage); + ASSERT_TRUE(updateSlaveMessage->has_resource_providers()); + ASSERT_EQ(1, updateSlaveMessage->resource_providers().providers_size()); + + Clock::pause(); + auto scheduler = std::make_shared<MockHTTPScheduler>(); FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO; @@ -114,18 +160,25 @@ TEST_P(OperationReconciliationTest, PendingOperation) const Offer& offer = offers->offers(0); const AgentID& agentId = offer.agent_id(); - OperationID operationId; - operationId.set_value("operation"); - - const Resources reservedResources = - Resources(offer.resources()) - .pushReservation(createDynamicReservationInfo( - frameworkInfo.roles(0), frameworkInfo.principal())); - // We'll drop the `ApplyOperationMessage` from the master to the agent. Future<ApplyOperationMessage> applyOperationMessage = DROP_PROTOBUF(ApplyOperationMessage(), master.get()->pid, _); + Resources resources = + Resources(offer.resources()).filter([](const Resource& resource) { + return resource.has_provider_id(); + }); + + ASSERT_FALSE(resources.empty()); + + Resource reservedResources = *(resources.begin()); + reservedResources.add_reservations()->CopyFrom( + createDynamicReservationInfo( + frameworkInfo.roles(0), DEFAULT_CREDENTIAL.principal())); + + OperationID operationId; + operationId.set_value("operation"); + mesos.send(createCallAccept( frameworkId, offer,
