Repository: mesos Updated Branches: refs/heads/master ab5a346fb -> b4e08210d
Prevented resource providers from changing their name or type. Since the agent uses e.g., a resource provider's name or type to construct paths to persist resource provider state, changes to this information on resource provider resubscription are not supported. This patch persists a resource provider's name and type in the resource provider registry and rejects a resource provider resubscription if incompatible changes are detected. Since we did not persist this information previous to mesos-1.7.0 we cannot and do not perform validation against resource provider registry information stored with earlier versions of Mesos. Review: https://reviews.apache.org/r/67671/ Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/b4e08210 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/b4e08210 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/b4e08210 Branch: refs/heads/master Commit: b4e08210d326b62ece0a054d4ad3c4686eb5063e Parents: ab5a346 Author: Benjamin Bannier <benjamin.bann...@mesosphere.io> Authored: Fri Jul 6 09:22:42 2018 +0200 Committer: Benjamin Bannier <bbann...@apache.org> Committed: Mon Jul 9 09:03:19 2018 +0200 ---------------------------------------------------------------------- src/resource_provider/manager.cpp | 48 ++++++++++--- src/resource_provider/registrar.cpp | 12 ++-- src/resource_provider/registrar.hpp | 5 +- src/resource_provider/registry.hpp | 47 +++++++++++++ src/resource_provider/registry.proto | 2 + src/tests/resource_provider_manager_tests.cpp | 80 ++++++++++++++-------- 6 files changed, 149 insertions(+), 45 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/b4e08210/src/resource_provider/manager.cpp ---------------------------------------------------------------------- diff --git a/src/resource_provider/manager.cpp b/src/resource_provider/manager.cpp index 6400e70..abd7e38 100644 --- a/src/resource_provider/manager.cpp +++ b/src/resource_provider/manager.cpp @@ -96,6 +96,20 @@ using process::metrics::PullGauge; namespace mesos { namespace internal { +mesos::resource_provider::registry::ResourceProvider +createRegistryResourceProvider(const ResourceProviderInfo& resourceProviderInfo) +{ + mesos::resource_provider::registry::ResourceProvider resourceProvider; + + CHECK(resourceProviderInfo.has_id()); + resourceProvider.mutable_id()->CopyFrom(resourceProviderInfo.id()); + + resourceProvider.set_name(resourceProviderInfo.name()); + resourceProvider.set_type(resourceProviderInfo.type()); + + return resourceProvider; +} + // Represents the streaming HTTP connection to a resource provider. struct HttpConnection { @@ -673,7 +687,8 @@ void ResourceProviderManagerProcess::subscribe( // triggering a `AdmitResourceProvider` operation on the registrar. admitResourceProvider = registrar->apply(Owned<mesos::resource_provider::Registrar::Operation>( - new AdmitResourceProvider(resourceProvider->info.id()))); + new AdmitResourceProvider( + createRegistryResourceProvider(resourceProvider->info)))); } else { // TODO(chhsiao): The resource provider is resubscribing after being // restarted or an agent failover. The 'ResourceProviderInfo' might @@ -692,6 +707,23 @@ void ResourceProviderManagerProcess::subscribe( return; } + // Check whether the resource provider has change + // information which should be static. + mesos::resource_provider::registry::ResourceProvider resourceProvider_ = + createRegistryResourceProvider(resourceProvider->info); + + const mesos::resource_provider::registry::ResourceProvider& + storedResourceProvider = resourceProviders.known.at(resourceProviderId); + + if (resourceProvider_ != storedResourceProvider) { + LOG(INFO) + << "Dropping resubscription attempt of resource provider " + << resourceProvider_ + << " since it does not match the previous information " + << storedResourceProvider; + return; + } + // If the resource provider is known we do not need to admit it // again, and the registrar operation implicitly succeeded. admitResourceProvider = true; @@ -757,19 +789,19 @@ void ResourceProviderManagerProcess::_subscribe( messages.put(std::move(message)); })); - // TODO(jieyu): Start heartbeat for the resource provider. - resourceProviders.subscribed.put( - resourceProviderId, - std::move(resourceProvider)); - if (!resourceProviders.known.contains(resourceProviderId)) { - mesos::resource_provider::registry::ResourceProvider resourceProvider_; - resourceProvider_.mutable_id()->CopyFrom(resourceProviderId); + mesos::resource_provider::registry::ResourceProvider resourceProvider_ = + createRegistryResourceProvider(resourceProvider->info); resourceProviders.known.put( resourceProviderId, std::move(resourceProvider_)); } + + // TODO(jieyu): Start heartbeat for the resource provider. + resourceProviders.subscribed.put( + resourceProviderId, + std::move(resourceProvider)); } http://git-wip-us.apache.org/repos/asf/mesos/blob/b4e08210/src/resource_provider/registrar.cpp ---------------------------------------------------------------------- diff --git a/src/resource_provider/registrar.cpp b/src/resource_provider/registrar.cpp index a855a2b..0dc49e6 100644 --- a/src/resource_provider/registrar.cpp +++ b/src/resource_provider/registrar.cpp @@ -100,8 +100,9 @@ Try<Owned<Registrar>> Registrar::create( } -AdmitResourceProvider::AdmitResourceProvider(const ResourceProviderID& _id) - : id(_id) {} +AdmitResourceProvider::AdmitResourceProvider( + const ResourceProvider& _resourceProvider) + : resourceProvider(_resourceProvider) {} Try<bool> AdmitResourceProvider::perform(Registry* registry) @@ -110,7 +111,7 @@ Try<bool> AdmitResourceProvider::perform(Registry* registry) registry->resource_providers().begin(), registry->resource_providers().end(), [this](const ResourceProvider& resourceProvider) { - return resourceProvider.id() == this->id; + return resourceProvider.id() == this->resourceProvider.id(); }) != registry->resource_providers().end()) { return Error("Resource provider already admitted"); } @@ -119,14 +120,11 @@ Try<bool> AdmitResourceProvider::perform(Registry* registry) registry->removed_resource_providers().begin(), registry->removed_resource_providers().end(), [this](const ResourceProvider& resourceProvider) { - return resourceProvider.id() == this->id; + return resourceProvider.id() == this->resourceProvider.id(); }) != registry->removed_resource_providers().end()) { return Error("Resource provider was removed"); } - ResourceProvider resourceProvider; - resourceProvider.mutable_id()->CopyFrom(id); - registry->add_resource_providers()->CopyFrom(resourceProvider); return true; // Mutation. http://git-wip-us.apache.org/repos/asf/mesos/blob/b4e08210/src/resource_provider/registrar.hpp ---------------------------------------------------------------------- diff --git a/src/resource_provider/registrar.hpp b/src/resource_provider/registrar.hpp index ded56e1..458108a 100644 --- a/src/resource_provider/registrar.hpp +++ b/src/resource_provider/registrar.hpp @@ -88,12 +88,13 @@ public: class AdmitResourceProvider : public Registrar::Operation { public: - explicit AdmitResourceProvider(const ResourceProviderID& id); + explicit AdmitResourceProvider( + const registry::ResourceProvider& resourceProvider); private: Try<bool> perform(registry::Registry* registry) override; - ResourceProviderID id; + registry::ResourceProvider resourceProvider; }; http://git-wip-us.apache.org/repos/asf/mesos/blob/b4e08210/src/resource_provider/registry.hpp ---------------------------------------------------------------------- diff --git a/src/resource_provider/registry.hpp b/src/resource_provider/registry.hpp index 4c6c4d4..1f84eb5 100644 --- a/src/resource_provider/registry.hpp +++ b/src/resource_provider/registry.hpp @@ -18,7 +18,54 @@ #ifndef __RESOURCE_PROVIDER_REGISTRY_HPP__ #define __RESOURCE_PROVIDER_REGISTRY_HPP__ +#include <mesos/type_utils.hpp> + // ONLY USEFUL AFTER RUNNING PROTOC. #include "resource_provider/registry.pb.h" +namespace mesos { +namespace resource_provider { +namespace registry { + +inline bool operator==( + const ResourceProvider& left, + const ResourceProvider& right) +{ + // To support additions to the persisted types we consider two resource + // providers to be equal if all their set fields are equal. + if (left.id() != right.id()) { + return false; + } + + if (left.has_name() && right.has_name() && left.name() != right.name()) { + return false; + } + + if (left.has_type() && right.has_type() && left.type() != right.type()) { + return false; + } + + return true; +} + + +inline bool operator!=( + const ResourceProvider& left, + const ResourceProvider& right) +{ + return !(left == right); +} + + +inline std::ostream& operator<<( + std::ostream& stream, + const ResourceProvider& resourceProvider) +{ + return stream << resourceProvider.DebugString(); +} + +} // namespace registry { +} // namespace resource_provider { +} // namespace mesos { + #endif // __RESOURCE_PROVIDER_REGISTRY_HPP__ http://git-wip-us.apache.org/repos/asf/mesos/blob/b4e08210/src/resource_provider/registry.proto ---------------------------------------------------------------------- diff --git a/src/resource_provider/registry.proto b/src/resource_provider/registry.proto index 491263e..cb3cb24 100644 --- a/src/resource_provider/registry.proto +++ b/src/resource_provider/registry.proto @@ -29,6 +29,8 @@ option java_outer_classname = "Protos"; message ResourceProvider { required ResourceProviderID id = 1; + optional string type = 2; + optional string name = 3; } http://git-wip-us.apache.org/repos/asf/mesos/blob/b4e08210/src/tests/resource_provider_manager_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/resource_provider_manager_tests.cpp b/src/tests/resource_provider_manager_tests.cpp index 58bdbf0..cf15e5a 100644 --- a/src/tests/resource_provider_manager_tests.cpp +++ b/src/tests/resource_provider_manager_tests.cpp @@ -824,8 +824,10 @@ class ResourceProviderRegistrarTest : public tests::MesosTest {}; #ifndef __WINDOWS__ TEST_F_TEMP_DISABLED_ON_WINDOWS(ResourceProviderRegistrarTest, GenericRegistrar) { - ResourceProviderID resourceProviderId; - resourceProviderId.set_value("foo"); + mesos::resource_provider::registry::ResourceProvider resourceProvider; + resourceProvider.mutable_id()->set_value("foo"); + resourceProvider.set_name("bar"); + resourceProvider.set_type("org.apache.mesos.rp.test"); // Perform operations on the resource provider. We use // persistent storage so we can recover the state below. @@ -841,24 +843,35 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(ResourceProviderRegistrarTest, GenericRegistrar) AWAIT_READY(recover); EXPECT_TRUE(recover->removed_resource_providers().empty()); - Future<bool> admitResourceProvider1 = + Future<bool> admitResourceProvider = registrar.get()->apply(Owned<Registrar::Operation>( - new AdmitResourceProvider(resourceProviderId))); - AWAIT_READY(admitResourceProvider1); - EXPECT_TRUE(admitResourceProvider1.get()); + new AdmitResourceProvider(resourceProvider))); + AWAIT_READY(admitResourceProvider); + EXPECT_TRUE(admitResourceProvider.get()); + + // A resource provider cannot resubscribe with changed type or name. + mesos::resource_provider::registry::ResourceProvider resourceProvider_ = + resourceProvider; + resourceProvider_.set_type("org.apache.mesos.rp.test2"); + + admitResourceProvider = + registrar.get()->apply(Owned<Registrar::Operation>( + new AdmitResourceProvider(resourceProvider_))); + AWAIT_READY(admitResourceProvider); + EXPECT_FALSE(admitResourceProvider.get()); Future<bool> removeResourceProvider = registrar.get()->apply(Owned<Registrar::Operation>( - new RemoveResourceProvider(resourceProviderId))); + new RemoveResourceProvider(resourceProvider.id()))); AWAIT_READY(removeResourceProvider); EXPECT_TRUE(removeResourceProvider.get()); // A removed resource provider cannot be admitted again. - Future<bool> admitResourceProvider2 = + admitResourceProvider = registrar.get()->apply(Owned<Registrar::Operation>( - new AdmitResourceProvider(resourceProviderId))); - AWAIT_READY(admitResourceProvider2); - EXPECT_FALSE(admitResourceProvider2.get()); + new AdmitResourceProvider(resourceProvider))); + AWAIT_READY(admitResourceProvider); + EXPECT_FALSE(admitResourceProvider.get()); } // Recover and validate the previous registry state. @@ -877,10 +890,9 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(ResourceProviderRegistrarTest, GenericRegistrar) ASSERT_EQ(1, recover->removed_resource_providers_size()); const mesos::resource_provider::registry::ResourceProvider& - resourceProvider = recover->removed_resource_providers(0); + resourceProvider_ = recover->removed_resource_providers(0); - ASSERT_TRUE(resourceProvider.has_id()); - EXPECT_EQ(resourceProviderId, resourceProvider.id()); + EXPECT_EQ(resourceProvider, resourceProvider_); } } #endif // __WINDOWS__ @@ -892,8 +904,10 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(ResourceProviderRegistrarTest, GenericRegistrar) #ifndef __WINDOWS__ TEST_F_TEMP_DISABLED_ON_WINDOWS(ResourceProviderRegistrarTest, MasterRegistrar) { - ResourceProviderID resourceProviderId; - resourceProviderId.set_value("foo"); + mesos::resource_provider::registry::ResourceProvider resourceProvider; + resourceProvider.mutable_id()->set_value("foo"); + resourceProvider.set_name("bar"); + resourceProvider.set_type("org.apache.mesos.rp.test"); // Perform operations on the resource provider. We use // persistent storage so we can recover the state below. @@ -913,24 +927,35 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(ResourceProviderRegistrarTest, MasterRegistrar) ASSERT_SOME_NE(Owned<Registrar>(nullptr), registrar); - Future<bool> admitResourceProvider1 = + Future<bool> admitResourceProvider = + registrar.get()->apply(Owned<Registrar::Operation>( + new AdmitResourceProvider(resourceProvider))); + AWAIT_READY(admitResourceProvider); + EXPECT_TRUE(admitResourceProvider.get()); + + // A resource provider cannot resubscribe with changed type or name. + mesos::resource_provider::registry::ResourceProvider resourceProvider_ = + resourceProvider; + resourceProvider_.set_type("org.apache.mesos.rp.test2"); + + admitResourceProvider = registrar.get()->apply(Owned<Registrar::Operation>( - new AdmitResourceProvider(resourceProviderId))); - AWAIT_READY(admitResourceProvider1); - EXPECT_TRUE(admitResourceProvider1.get()); + new AdmitResourceProvider(resourceProvider_))); + AWAIT_READY(admitResourceProvider); + EXPECT_FALSE(admitResourceProvider.get()); Future<bool> removeResourceProvider = registrar.get()->apply(Owned<Registrar::Operation>( - new RemoveResourceProvider(resourceProviderId))); + new RemoveResourceProvider(resourceProvider.id()))); AWAIT_READY(removeResourceProvider); EXPECT_TRUE(removeResourceProvider.get()); // A removed resource provider cannot be admitted again. - Future<bool> admitResourceProvider2 = + admitResourceProvider = registrar.get()->apply(Owned<Registrar::Operation>( - new AdmitResourceProvider(resourceProviderId))); - AWAIT_READY(admitResourceProvider2); - EXPECT_FALSE(admitResourceProvider2.get()); + new AdmitResourceProvider(resourceProvider))); + AWAIT_READY(admitResourceProvider); + EXPECT_FALSE(admitResourceProvider.get()); } // Recover and validate the previous registry state. @@ -958,10 +983,9 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(ResourceProviderRegistrarTest, MasterRegistrar) ASSERT_EQ(1, recover->removed_resource_providers_size()); const mesos::resource_provider::registry::ResourceProvider& - resourceProvider = recover->removed_resource_providers(0); + resourceProvider_ = recover->removed_resource_providers(0); - ASSERT_TRUE(resourceProvider.has_id()); - EXPECT_EQ(resourceProviderId, resourceProvider.id()); + EXPECT_EQ(resourceProvider, resourceProvider_); } } #endif // __WINDOWS__