Added admitted resource providers to the manager's registry. Review: https://reviews.apache.org/r/66545/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/65ed45fb Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/65ed45fb Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/65ed45fb Branch: refs/heads/master Commit: 65ed45fb039e7da64a58094aff67cf0189aa1f6d Parents: 9a8cef0 Author: Benjamin Bannier <[email protected]> Authored: Tue May 1 13:09:19 2018 -0700 Committer: Chun-Hung Hsiao <[email protected]> Committed: Tue May 1 13:09:19 2018 -0700 ---------------------------------------------------------------------- src/resource_provider/manager.cpp | 45 ++++++++++++++++++++-- src/resource_provider/registrar.cpp | 10 +++++ src/resource_provider/registry.proto | 1 + src/tests/resource_provider_manager_tests.cpp | 28 ++++++++++---- 4 files changed, 73 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/65ed45fb/src/resource_provider/manager.cpp ---------------------------------------------------------------------- diff --git a/src/resource_provider/manager.cpp b/src/resource_provider/manager.cpp index bc52741..5979480 100644 --- a/src/resource_provider/manager.cpp +++ b/src/resource_provider/manager.cpp @@ -58,6 +58,7 @@ using std::string; using mesos::internal::resource_provider::validation::call::validate; +using mesos::resource_provider::AdmitResourceProvider; using mesos::resource_provider::Call; using mesos::resource_provider::Event; using mesos::resource_provider::Registrar; @@ -184,6 +185,10 @@ private: const HttpConnection& http, const Call::Subscribe& subscribe); + void _subscribe( + const Future<bool>& admitResourceProvider, + Owned<ResourceProvider> resourceProvider); + void updateOperationStatus( ResourceProvider* resourceProvider, const Call::UpdateOperationStatus& update); @@ -657,17 +662,53 @@ void ResourceProviderManagerProcess::subscribe( Owned<ResourceProvider> resourceProvider( new ResourceProvider(resourceProviderInfo, http)); + Future<bool> admitResourceProvider; + if (!resourceProviderInfo.has_id()) { // The resource provider is subscribing for the first time. resourceProvider->info.mutable_id()->CopyFrom(newResourceProviderId()); + + // If we are handing out a new `ResourceProviderID` persist the ID by + // triggering a `AdmitResourceProvider` operation on the registrar. + admitResourceProvider = + registrar->apply(Owned<mesos::resource_provider::Registrar::Operation>( + new AdmitResourceProvider(resourceProvider->info.id()))); } else { // TODO(chhsiao): The resource provider is resubscribing after being // restarted or an agent failover. The 'ResourceProviderInfo' might // have been updated, but its type and name should remain the same. // We should checkpoint its 'type', 'name' and ID, then check if the // resubscribption is consistent with the checkpointed record. + + // If the resource provider is known we do not need to admit it + // again, and the registrar operation implicitly succeeded. + admitResourceProvider = true; } + admitResourceProvider.onAny(defer( + self(), + &ResourceProviderManagerProcess::_subscribe, + lambda::_1, + std::move(resourceProvider))); +} + + +void ResourceProviderManagerProcess::_subscribe( + const Future<bool>& admitResourceProvider, + Owned<ResourceProvider> resourceProvider) +{ + if (!admitResourceProvider.isReady()) { + LOG(INFO) + << "Not subscribing resource provider " << resourceProvider->info.id() + << " as registry update did not succeed: " << admitResourceProvider; + + return; + } + + CHECK(admitResourceProvider.get()) + << "Could not admit resource provider " << resourceProvider->info.id() + << " as registry update was rejected"; + const ResourceProviderID& resourceProviderId = resourceProvider->info.id(); Event event; @@ -681,7 +722,7 @@ void ResourceProviderManagerProcess::subscribe( return; } - http.closed() + resourceProvider->http.closed() .onAny(defer(self(), [=](const Future<Nothing>& future) { // Iff the remote side closes the HTTP connection, the future will be // ready. We will remove the resource provider in that case. @@ -716,8 +757,6 @@ void ResourceProviderManagerProcess::subscribe( resourceProviders.known.put( resourceProviderId, std::move(resourceProvider_)); - - // TODO(bbannier): Persist this information in the registry. } } http://git-wip-us.apache.org/repos/asf/mesos/blob/65ed45fb/src/resource_provider/registrar.cpp ---------------------------------------------------------------------- diff --git a/src/resource_provider/registrar.cpp b/src/resource_provider/registrar.cpp index 6cc4625..a855a2b 100644 --- a/src/resource_provider/registrar.cpp +++ b/src/resource_provider/registrar.cpp @@ -115,6 +115,15 @@ Try<bool> AdmitResourceProvider::perform(Registry* registry) return Error("Resource provider already admitted"); } + if (std::find_if( + registry->removed_resource_providers().begin(), + registry->removed_resource_providers().end(), + [this](const ResourceProvider& resourceProvider) { + return resourceProvider.id() == this->id; + }) != registry->removed_resource_providers().end()) { + return Error("Resource provider was removed"); + } + ResourceProvider resourceProvider; resourceProvider.mutable_id()->CopyFrom(id); @@ -141,6 +150,7 @@ Try<bool> RemoveResourceProvider::perform(Registry* registry) return Error("Attempted to remove an unknown resource provider"); } + registry->add_removed_resource_providers()->CopyFrom(*pos); registry->mutable_resource_providers()->erase(pos); return true; // Mutation. http://git-wip-us.apache.org/repos/asf/mesos/blob/65ed45fb/src/resource_provider/registry.proto ---------------------------------------------------------------------- diff --git a/src/resource_provider/registry.proto b/src/resource_provider/registry.proto index 14bd433..491263e 100644 --- a/src/resource_provider/registry.proto +++ b/src/resource_provider/registry.proto @@ -35,4 +35,5 @@ message ResourceProvider { // A top level object that is managed by the Registrar and persisted. message Registry { repeated ResourceProvider resource_providers = 1; + repeated ResourceProvider removed_resource_providers = 2; } http://git-wip-us.apache.org/repos/asf/mesos/blob/65ed45fb/src/tests/resource_provider_manager_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/resource_provider_manager_tests.cpp b/src/tests/resource_provider_manager_tests.cpp index eb8e4fc..8c364a1 100644 --- a/src/tests/resource_provider_manager_tests.cpp +++ b/src/tests/resource_provider_manager_tests.cpp @@ -843,17 +843,24 @@ TEST_F(ResourceProviderRegistrarTest, GenericRegistrar) AWAIT_READY(registrar.get()->recover()); - Future<bool> admitResourceProvider = + Future<bool> admitResourceProvider1 = registrar.get()->apply(Owned<Registrar::Operation>( new AdmitResourceProvider(resourceProviderId))); - AWAIT_READY(admitResourceProvider); - EXPECT_TRUE(admitResourceProvider.get()); + AWAIT_READY(admitResourceProvider1); + EXPECT_TRUE(admitResourceProvider1.get()); Future<bool> removeResourceProvider = registrar.get()->apply(Owned<Registrar::Operation>( new RemoveResourceProvider(resourceProviderId))); AWAIT_READY(removeResourceProvider); EXPECT_TRUE(removeResourceProvider.get()); + + // A removed resource provider cannot be admitted again. + Future<bool> admitResourceProvider2 = + registrar.get()->apply(Owned<Registrar::Operation>( + new AdmitResourceProvider(resourceProviderId))); + AWAIT_READY(admitResourceProvider2); + EXPECT_FALSE(admitResourceProvider2.get()); } @@ -879,19 +886,24 @@ TEST_F(ResourceProviderRegistrarTest, MasterRegistrar) ASSERT_SOME(registrar); ASSERT_NE(nullptr, registrar->get()); - AWAIT_READY(masterRegistrar.recover(masterInfo)); - - Future<bool> admitResourceProvider = + Future<bool> admitResourceProvider1 = registrar.get()->apply(Owned<Registrar::Operation>( new AdmitResourceProvider(resourceProviderId))); - AWAIT_READY(admitResourceProvider); - EXPECT_TRUE(admitResourceProvider.get()); + AWAIT_READY(admitResourceProvider1); + EXPECT_TRUE(admitResourceProvider1.get()); Future<bool> removeResourceProvider = registrar.get()->apply(Owned<Registrar::Operation>( new RemoveResourceProvider(resourceProviderId))); AWAIT_READY(removeResourceProvider); EXPECT_TRUE(removeResourceProvider.get()); + + // A removed resource provider cannot be admitted again. + Future<bool> admitResourceProvider2 = + registrar.get()->apply(Owned<Registrar::Operation>( + new AdmitResourceProvider(resourceProviderId))); + AWAIT_READY(admitResourceProvider2); + EXPECT_FALSE(admitResourceProvider2.get()); }
