Allowed resubscription of resource providers. A resource provider can resubscribe by including the resource provider ID it got assigned as part of its 'ResourceProviderInfo' in a 'SUBSCRIBE' call. A resubscription is necessary if either the resource provider or the resource provider manager (i.e. an agent) failed over.
Review: https://reviews.apache.org/r/64065/ Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/9ab950d3 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/9ab950d3 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/9ab950d3 Branch: refs/heads/master Commit: 9ab950d38cedd40c90ffc81305cae372f1d80e87 Parents: 1c92058 Author: Jan Schlicht <[email protected]> Authored: Fri Dec 1 10:03:36 2017 +0100 Committer: Benjamin Bannier <[email protected]> Committed: Fri Dec 1 12:01:57 2017 +0100 ---------------------------------------------------------------------- src/resource_provider/manager.cpp | 47 +++--- src/tests/mesos.hpp | 32 +++-- src/tests/resource_provider_manager_tests.cpp | 158 ++++++++++++++++++--- src/tests/slave_tests.cpp | 29 ++-- 4 files changed, 205 insertions(+), 61 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/9ab950d3/src/resource_provider/manager.cpp ---------------------------------------------------------------------- diff --git a/src/resource_provider/manager.cpp b/src/resource_provider/manager.cpp index 5fdce7f..c37553d 100644 --- a/src/resource_provider/manager.cpp +++ b/src/resource_provider/manager.cpp @@ -385,34 +385,43 @@ void ResourceProviderManagerProcess::subscribe( LOG(INFO) << "Subscribing resource provider " << resourceProviderInfo; + ResourceProvider resourceProvider(resourceProviderInfo, http); + if (!resourceProviderInfo.has_id()) { // The resource provider is subscribing for the first time. - resourceProviderInfo.mutable_id()->CopyFrom(newResourceProviderId()); - - ResourceProvider resourceProvider(resourceProviderInfo, http); - - Event event; - event.set_type(Event::SUBSCRIBED); - event.mutable_subscribed()->mutable_provider_id()->CopyFrom( - resourceProvider.info.id()); - - if (!resourceProvider.http.send(event)) { - LOG(WARNING) << "Failed to send SUBSCRIBED event to resource provider " - << resourceProvider.info.id() << ": connection closed"; - } + resourceProvider.info.mutable_id()->CopyFrom(newResourceProviderId()); // TODO(jieyu): Start heartbeat for the resource provider. resourceProviders.subscribed.put( - resourceProviderInfo.id(), + resourceProvider.info.id(), resourceProvider); - - return; + } else { + if (resourceProviders.subscribed.contains(resourceProviderInfo.id())) { + // Resource provider is resubscribing after failing over. + // TODO(nfnt): Test if old and new 'ResourceProviderInfo' match. + ResourceProvider& _resourceProvider = + resourceProviders.subscribed.at(resourceProviderInfo.id()); + + _resourceProvider.http.close(); + _resourceProvider.http = http; + } else { + // Resource provider is resubscribing after an agent failover. + resourceProviders.subscribed.put( + resourceProviderInfo.id(), resourceProvider); + } } - // TODO(chhsiao): Reject the subscription if it contains an unknown - // ID or there is already a subscribed instance with the same ID, - // and add tests for re-subscriptions. + Event event; + event.set_type(Event::SUBSCRIBED); + event.mutable_subscribed()->mutable_provider_id()->CopyFrom( + resourceProvider.info.id()); + + if (!resourceProviders.subscribed.at(resourceProvider.info.id()).http.send( + event)) { + LOG(WARNING) << "Failed to send SUBSCRIBED event to resource provider " + << resourceProvider.info.id() << ": connection closed"; + } } http://git-wip-us.apache.org/repos/asf/mesos/blob/9ab950d3/src/tests/mesos.hpp ---------------------------------------------------------------------- diff --git a/src/tests/mesos.hpp b/src/tests/mesos.hpp index 99542c5..aa2571f 100644 --- a/src/tests/mesos.hpp +++ b/src/tests/mesos.hpp @@ -2778,6 +2778,7 @@ template < typename Event, typename Call, typename Driver, + typename ResourceProviderInfo, typename Resource, typename Resources, typename ResourceProviderID, @@ -2787,8 +2788,11 @@ template < class MockResourceProvider { public: - MockResourceProvider(const Option<Resources>& _resources = None()) - : resources(_resources) + MockResourceProvider( + const ResourceProviderInfo& _info, + const Option<Resources>& _resources = None()) + : info(_info), + resources(_resources) { ON_CALL(*this, connected()) .WillByDefault(Invoke( @@ -2797,6 +2801,7 @@ public: Event, Call, Driver, + ResourceProviderInfo, Resource, Resources, ResourceProviderID, @@ -2812,6 +2817,7 @@ public: Event, Call, Driver, + ResourceProviderInfo, Resource, Resources, ResourceProviderID, @@ -2827,6 +2833,7 @@ public: Event, Call, Driver, + ResourceProviderInfo, Resource, Resources, ResourceProviderID, @@ -2884,6 +2891,7 @@ public: Event, Call, Driver, + ResourceProviderInfo, Resource, Resources, ResourceProviderID, @@ -2896,6 +2904,7 @@ public: Event, Call, Driver, + ResourceProviderInfo, Resource, Resources, ResourceProviderID, @@ -2908,6 +2917,7 @@ public: Event, Call, Driver, + ResourceProviderInfo, Resource, Resources, ResourceProviderID, @@ -2923,29 +2933,26 @@ public: { Call call; call.set_type(Call::SUBSCRIBE); - call.mutable_subscribe()->mutable_resource_provider_info()->set_type( - "org.apache.mesos.rp.test"); - call.mutable_subscribe()->mutable_resource_provider_info()->set_name( - "test"); + call.mutable_subscribe()->mutable_resource_provider_info()->CopyFrom(info); driver->send(call); } void subscribedDefault(const typename Event::Subscribed& subscribed) { - resourceProviderId = subscribed.provider_id(); + info.mutable_id()->CopyFrom(subscribed.provider_id()); if (resources.isSome()) { Resources injected; foreach (Resource resource, resources.get()) { - resource.mutable_provider_id()->CopyFrom(resourceProviderId.get()); + resource.mutable_provider_id()->CopyFrom(info.id()); injected += resource; } Call call; call.set_type(Call::UPDATE_STATE); - call.mutable_resource_provider_id()->CopyFrom(resourceProviderId.get()); + call.mutable_resource_provider_id()->CopyFrom(info.id()); typename Call::UpdateState* update = call.mutable_update_state(); update->mutable_resources()->CopyFrom(injected); @@ -2957,9 +2964,11 @@ public: void operationDefault(const typename Event::Operation& operation) { + CHECK(info.has_id()); + Call call; call.set_type(Call::UPDATE_OFFER_OPERATION_STATUS); - call.mutable_resource_provider_id()->CopyFrom(resourceProviderId.get()); + call.mutable_resource_provider_id()->CopyFrom(info.id()); typename Call::UpdateOfferOperationStatus* update = call.mutable_update_offer_operation_status(); @@ -3030,7 +3039,7 @@ public: driver->send(call); } - Option<ResourceProviderID> resourceProviderId; + ResourceProviderInfo info; private: Option<Resources> resources; @@ -3054,6 +3063,7 @@ using MockResourceProvider = tests::resource_provider::MockResourceProvider< mesos::v1::resource_provider::Event, mesos::v1::resource_provider::Call, mesos::v1::resource_provider::Driver, + mesos::v1::ResourceProviderInfo, mesos::v1::Resource, mesos::v1::Resources, mesos::v1::ResourceProviderID, http://git-wip-us.apache.org/repos/asf/mesos/blob/9ab950d3/src/tests/resource_provider_manager_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/resource_provider_manager_tests.cpp b/src/tests/resource_provider_manager_tests.cpp index 0b7c4ad..5764102 100644 --- a/src/tests/resource_provider_manager_tests.cpp +++ b/src/tests/resource_provider_manager_tests.cpp @@ -101,7 +101,31 @@ namespace tests { class ResourceProviderManagerHttpApiTest : public MesosTest, - public WithParamInterface<ContentType> {}; + public WithParamInterface<ContentType> +{ +public: + slave::Flags CreateSlaveFlags() override + { + slave::Flags slaveFlags = MesosTest::CreateSlaveFlags(); + + slaveFlags.authenticate_http_readwrite = false; + + constexpr SlaveInfo::Capability::Type capabilities[] = { + SlaveInfo::Capability::MULTI_ROLE, + SlaveInfo::Capability::HIERARCHICAL_ROLE, + SlaveInfo::Capability::RESERVATION_REFINEMENT, + SlaveInfo::Capability::RESOURCE_PROVIDER}; + + slaveFlags.agent_features = SlaveCapabilities(); + foreach (SlaveInfo::Capability::Type type, capabilities) { + SlaveInfo::Capability* capability = + slaveFlags.agent_features->add_capabilities(); + capability->set_type(type); + } + + return slaveFlags; + } +}; // The tests are parameterized by the content type of the request. @@ -596,26 +620,9 @@ TEST_P(ResourceProviderManagerHttpApiTest, ConvertResources) Owned<MasterDetector> detector = master.get()->createDetector(); - slave::Flags slaveFlags = CreateSlaveFlags(); - slaveFlags.authenticate_http_readwrite = false; - Future<UpdateSlaveMessage> updateSlaveMessage = FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _); - // Set the resource provider capability and other required capabilities. - constexpr SlaveInfo::Capability::Type capabilities[] = { - SlaveInfo::Capability::MULTI_ROLE, - SlaveInfo::Capability::HIERARCHICAL_ROLE, - SlaveInfo::Capability::RESERVATION_REFINEMENT, - SlaveInfo::Capability::RESOURCE_PROVIDER}; - - slaveFlags.agent_features = SlaveCapabilities(); - foreach (SlaveInfo::Capability::Type type, capabilities) { - SlaveInfo::Capability* capability = - slaveFlags.agent_features->add_capabilities(); - capability->set_type(type); - } - // Pause the clock and control it manually in order to // control the timing of the registration. A registration timeout // would trigger multiple registration attempts. As a result, multiple @@ -624,6 +631,8 @@ TEST_P(ResourceProviderManagerHttpApiTest, ConvertResources) // resource provider registration. Clock::pause(); + slave::Flags slaveFlags = CreateSlaveFlags(); + Try<Owned<cluster::Slave>> agent = StartSlave(detector.get(), slaveFlags); ASSERT_SOME(agent); @@ -638,7 +647,12 @@ TEST_P(ResourceProviderManagerHttpApiTest, ConvertResources) Clock::resume(); - v1::MockResourceProvider resourceProvider(Some(v1::Resources(disk))); + mesos::v1::ResourceProviderInfo resourceProviderInfo; + resourceProviderInfo.set_type("org.apache.mesos.rp.test"); + resourceProviderInfo.set_name("test"); + + v1::MockResourceProvider resourceProvider( + resourceProviderInfo, Some(v1::Resources(disk))); // Start and register a resource provider. string scheme = "http"; @@ -757,6 +771,112 @@ TEST_P(ResourceProviderManagerHttpApiTest, ConvertResources) EXPECT_FALSE(block->reservations().empty()); } + +// Test that resource provider can resubscribe with an agent after +// a resource provider failover as well as an agent failover. +TEST_P(ResourceProviderManagerHttpApiTest, ResubscribeResourceProvider) +{ + Clock::pause(); + + // Start master and agent. + master::Flags masterFlags = CreateMasterFlags(); + Try<Owned<cluster::Master>> master = StartMaster(masterFlags); + ASSERT_SOME(master); + + Owned<MasterDetector> detector = master.get()->createDetector(); + + Future<UpdateSlaveMessage> updateSlaveMessage = + FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _); + + slave::Flags slaveFlags = CreateSlaveFlags(); + + Try<Owned<cluster::Slave>> agent = StartSlave(detector.get(), slaveFlags); + ASSERT_SOME(agent); + + Clock::advance(slaveFlags.registration_backoff_factor); + Clock::settle(); + AWAIT_READY(updateSlaveMessage); + + updateSlaveMessage = FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _); + + mesos::v1::ResourceProviderInfo resourceProviderInfo; + resourceProviderInfo.set_type("org.apache.mesos.rp.test"); + resourceProviderInfo.set_name("test"); + + v1::Resource disk = v1::createDiskResource( + "200", "*", None(), None(), v1::createDiskSourceRaw()); + + v1::MockResourceProvider resourceProvider( + resourceProviderInfo, Some(v1::Resources(disk))); + + // Start and register a resource provider. + string scheme = "http"; + +#ifdef USE_SSL_SOCKET + if (process::network::openssl::flags().enabled) { + scheme = "https"; + } +#endif + + http::URL url( + scheme, + agent.get()->pid.address.ip, + agent.get()->pid.address.port, + agent.get()->pid.id + "/api/v1/resource_provider"); + + Owned<EndpointDetector> endpointDetector(new ConstantEndpointDetector(url)); + + const ContentType contentType = GetParam(); + + resourceProvider.start(endpointDetector, contentType, v1::DEFAULT_CREDENTIAL); + + // Wait until the agent's resources have been updated to include the + // resource provider resources. At this point the resource provider + // will have an ID assigned by the agent. + AWAIT_READY(updateSlaveMessage); + + mesos::v1::ResourceProviderID resourceProviderId = resourceProvider.info.id(); + + Future<Event::Subscribed> subscribed1; + EXPECT_CALL(resourceProvider, subscribed(_)) + .WillOnce(FutureArg<0>(&subscribed1)); + + // Resource provider failover by opening a new connection. + // The assigned resource provider ID will be used to resubscribe. + resourceProvider.start(endpointDetector, contentType, v1::DEFAULT_CREDENTIAL); + + AWAIT_READY(subscribed1); + EXPECT_EQ(resourceProviderId, subscribed1->provider_id()); + + Future<Event::Subscribed> subscribed2; + EXPECT_CALL(resourceProvider, subscribed(_)) + .WillOnce(FutureArg<0>(&subscribed2)); + + Future<Nothing> __recover = FUTURE_DISPATCH(_, &Slave::__recover); + + // The agent failover. + agent->reset(); + agent = StartSlave(detector.get(), slaveFlags); + + Clock::advance(slaveFlags.registration_backoff_factor); + Clock::settle(); + + AWAIT_READY(__recover); + + url = http::URL( + scheme, + agent.get()->pid.address.ip, + agent.get()->pid.address.port, + agent.get()->pid.id + "/api/v1/resource_provider"); + + endpointDetector.reset(new ConstantEndpointDetector(url)); + + resourceProvider.start(endpointDetector, contentType, v1::DEFAULT_CREDENTIAL); + + AWAIT_READY(subscribed2); + EXPECT_EQ(resourceProviderId, subscribed2->provider_id()); +} + } // namespace tests { } // namespace internal { } // namespace mesos { http://git-wip-us.apache.org/repos/asf/mesos/blob/9ab950d3/src/tests/slave_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp index 1344e0a..8ab63ac 100644 --- a/src/tests/slave_tests.cpp +++ b/src/tests/slave_tests.cpp @@ -8655,8 +8655,12 @@ TEST_F(SlaveTest, ResourceProviderSubscribe) Clock::advance(slaveFlags.registration_backoff_factor); AWAIT_READY(slaveRegisteredMessage); + mesos::v1::ResourceProviderInfo resourceProviderInfo; + resourceProviderInfo.set_type("org.apache.mesos.resource_provider.test"); + resourceProviderInfo.set_name("test"); + // Register a local resource provider with the agent. - v1::MockResourceProvider resourceProvider; + v1::MockResourceProvider resourceProvider(resourceProviderInfo); Future<Nothing> connected; EXPECT_CALL(resourceProvider, connected()) @@ -8696,11 +8700,8 @@ TEST_F(SlaveTest, ResourceProviderSubscribe) mesos::v1::resource_provider::Call call; call.set_type(mesos::v1::resource_provider::Call::SUBSCRIBE); - mesos::v1::ResourceProviderInfo* info = - call.mutable_subscribe()->mutable_resource_provider_info(); - - info->set_type("org.apache.mesos.resource_provider.test"); - info->set_name("test"); + call.mutable_subscribe()->mutable_resource_provider_info()->CopyFrom( + resourceProviderInfo); resourceProvider.send(call); } @@ -8864,6 +8865,10 @@ TEST_F(SlaveTest, ResourceProviderReconciliation) AWAIT_READY(updateSlaveMessage); + mesos::v1::ResourceProviderInfo resourceProviderInfo; + resourceProviderInfo.set_type("org.apache.mesos.resource_provider.test"); + resourceProviderInfo.set_name("test"); + // Register a resource provider with the agent. v1::Resources resourceProviderResources = v1::createDiskResource( "200", @@ -8872,7 +8877,9 @@ TEST_F(SlaveTest, ResourceProviderReconciliation) None(), v1::createDiskSourceRaw()); - v1::MockResourceProvider resourceProvider(resourceProviderResources); + v1::MockResourceProvider resourceProvider( + resourceProviderInfo, + resourceProviderResources); string scheme = "http"; @@ -8980,12 +8987,11 @@ TEST_F(SlaveTest, ResourceProviderReconciliation) // Fail the operation in the resource provider. This should trigger // an `UpdateSlaveMessage` to the master. { - CHECK_SOME(resourceProvider.resourceProviderId); + ASSERT_TRUE(resourceProvider.info.has_id()); v1::Resources resourceProviderResources_; foreach (v1::Resource resource, resourceProviderResources) { - resource.mutable_provider_id()->CopyFrom( - resourceProvider.resourceProviderId.get()); + resource.mutable_provider_id()->CopyFrom(resourceProvider.info.id()); resourceProviderResources_ += resource; } @@ -8996,8 +9002,7 @@ TEST_F(SlaveTest, ResourceProviderReconciliation) v1::resource_provider::Call call; call.set_type(v1::resource_provider::Call::UPDATE_STATE); - call.mutable_resource_provider_id()->CopyFrom( - resourceProvider.resourceProviderId.get()); + call.mutable_resource_provider_id()->CopyFrom(resourceProvider.info.id()); v1::resource_provider::Call::UpdateState* updateState = call.mutable_update_state();
