Repository: mesos
Updated Branches:
  refs/heads/master ab5a346fb -> b4e08210d


Prevented resource providers from changing their name or type.

Since the agent uses e.g., a resource provider's name or type to
construct paths to persist resource provider state, changes to this
information on resource provider resubscription are not supported.

This patch persists a resource provider's name and type in the
resource provider registry and rejects a resource provider
resubscription if incompatible changes are detected. Since we did not
persist this information previous to mesos-1.7.0 we cannot and do not
perform validation against resource provider registry information
stored with earlier versions of Mesos.

Review: https://reviews.apache.org/r/67671/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/b4e08210
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/b4e08210
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/b4e08210

Branch: refs/heads/master
Commit: b4e08210d326b62ece0a054d4ad3c4686eb5063e
Parents: ab5a346
Author: Benjamin Bannier <benjamin.bann...@mesosphere.io>
Authored: Fri Jul 6 09:22:42 2018 +0200
Committer: Benjamin Bannier <bbann...@apache.org>
Committed: Mon Jul 9 09:03:19 2018 +0200

----------------------------------------------------------------------
 src/resource_provider/manager.cpp             | 48 ++++++++++---
 src/resource_provider/registrar.cpp           | 12 ++--
 src/resource_provider/registrar.hpp           |  5 +-
 src/resource_provider/registry.hpp            | 47 +++++++++++++
 src/resource_provider/registry.proto          |  2 +
 src/tests/resource_provider_manager_tests.cpp | 80 ++++++++++++++--------
 6 files changed, 149 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/b4e08210/src/resource_provider/manager.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/manager.cpp 
b/src/resource_provider/manager.cpp
index 6400e70..abd7e38 100644
--- a/src/resource_provider/manager.cpp
+++ b/src/resource_provider/manager.cpp
@@ -96,6 +96,20 @@ using process::metrics::PullGauge;
 namespace mesos {
 namespace internal {
 
+mesos::resource_provider::registry::ResourceProvider
+createRegistryResourceProvider(const ResourceProviderInfo& 
resourceProviderInfo)
+{
+  mesos::resource_provider::registry::ResourceProvider resourceProvider;
+
+  CHECK(resourceProviderInfo.has_id());
+  resourceProvider.mutable_id()->CopyFrom(resourceProviderInfo.id());
+
+  resourceProvider.set_name(resourceProviderInfo.name());
+  resourceProvider.set_type(resourceProviderInfo.type());
+
+  return resourceProvider;
+}
+
 // Represents the streaming HTTP connection to a resource provider.
 struct HttpConnection
 {
@@ -673,7 +687,8 @@ void ResourceProviderManagerProcess::subscribe(
     // triggering a `AdmitResourceProvider` operation on the registrar.
     admitResourceProvider =
       registrar->apply(Owned<mesos::resource_provider::Registrar::Operation>(
-          new AdmitResourceProvider(resourceProvider->info.id())));
+          new AdmitResourceProvider(
+              createRegistryResourceProvider(resourceProvider->info))));
   } else {
     // TODO(chhsiao): The resource provider is resubscribing after being
     // restarted or an agent failover. The 'ResourceProviderInfo' might
@@ -692,6 +707,23 @@ void ResourceProviderManagerProcess::subscribe(
       return;
     }
 
+    // Check whether the resource provider has change
+    // information which should be static.
+    mesos::resource_provider::registry::ResourceProvider resourceProvider_ =
+      createRegistryResourceProvider(resourceProvider->info);
+
+    const mesos::resource_provider::registry::ResourceProvider&
+      storedResourceProvider = resourceProviders.known.at(resourceProviderId);
+
+    if (resourceProvider_ != storedResourceProvider) {
+      LOG(INFO)
+        << "Dropping resubscription attempt of resource provider "
+        << resourceProvider_
+        << " since it does not match the previous information "
+        << storedResourceProvider;
+      return;
+    }
+
     // If the resource provider is known we do not need to admit it
     // again, and the registrar operation implicitly succeeded.
     admitResourceProvider = true;
@@ -757,19 +789,19 @@ void ResourceProviderManagerProcess::_subscribe(
       messages.put(std::move(message));
     }));
 
-  // TODO(jieyu): Start heartbeat for the resource provider.
-  resourceProviders.subscribed.put(
-      resourceProviderId,
-      std::move(resourceProvider));
-
   if (!resourceProviders.known.contains(resourceProviderId)) {
-    mesos::resource_provider::registry::ResourceProvider resourceProvider_;
-    resourceProvider_.mutable_id()->CopyFrom(resourceProviderId);
+    mesos::resource_provider::registry::ResourceProvider resourceProvider_ =
+      createRegistryResourceProvider(resourceProvider->info);
 
     resourceProviders.known.put(
         resourceProviderId,
         std::move(resourceProvider_));
   }
+
+  // TODO(jieyu): Start heartbeat for the resource provider.
+  resourceProviders.subscribed.put(
+      resourceProviderId,
+      std::move(resourceProvider));
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/b4e08210/src/resource_provider/registrar.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/registrar.cpp 
b/src/resource_provider/registrar.cpp
index a855a2b..0dc49e6 100644
--- a/src/resource_provider/registrar.cpp
+++ b/src/resource_provider/registrar.cpp
@@ -100,8 +100,9 @@ Try<Owned<Registrar>> Registrar::create(
 }
 
 
-AdmitResourceProvider::AdmitResourceProvider(const ResourceProviderID& _id)
-  : id(_id) {}
+AdmitResourceProvider::AdmitResourceProvider(
+    const ResourceProvider& _resourceProvider)
+  : resourceProvider(_resourceProvider) {}
 
 
 Try<bool> AdmitResourceProvider::perform(Registry* registry)
@@ -110,7 +111,7 @@ Try<bool> AdmitResourceProvider::perform(Registry* registry)
           registry->resource_providers().begin(),
           registry->resource_providers().end(),
           [this](const ResourceProvider& resourceProvider) {
-            return resourceProvider.id() == this->id;
+            return resourceProvider.id() == this->resourceProvider.id();
           }) != registry->resource_providers().end()) {
     return Error("Resource provider already admitted");
   }
@@ -119,14 +120,11 @@ Try<bool> AdmitResourceProvider::perform(Registry* 
registry)
           registry->removed_resource_providers().begin(),
           registry->removed_resource_providers().end(),
           [this](const ResourceProvider& resourceProvider) {
-            return resourceProvider.id() == this->id;
+            return resourceProvider.id() == this->resourceProvider.id();
           }) != registry->removed_resource_providers().end()) {
     return Error("Resource provider was removed");
   }
 
-  ResourceProvider resourceProvider;
-  resourceProvider.mutable_id()->CopyFrom(id);
-
   registry->add_resource_providers()->CopyFrom(resourceProvider);
 
   return true; // Mutation.

http://git-wip-us.apache.org/repos/asf/mesos/blob/b4e08210/src/resource_provider/registrar.hpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/registrar.hpp 
b/src/resource_provider/registrar.hpp
index ded56e1..458108a 100644
--- a/src/resource_provider/registrar.hpp
+++ b/src/resource_provider/registrar.hpp
@@ -88,12 +88,13 @@ public:
 class AdmitResourceProvider : public Registrar::Operation
 {
 public:
-  explicit AdmitResourceProvider(const ResourceProviderID& id);
+  explicit AdmitResourceProvider(
+      const registry::ResourceProvider& resourceProvider);
 
 private:
   Try<bool> perform(registry::Registry* registry) override;
 
-  ResourceProviderID id;
+  registry::ResourceProvider resourceProvider;
 };
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/b4e08210/src/resource_provider/registry.hpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/registry.hpp 
b/src/resource_provider/registry.hpp
index 4c6c4d4..1f84eb5 100644
--- a/src/resource_provider/registry.hpp
+++ b/src/resource_provider/registry.hpp
@@ -18,7 +18,54 @@
 #ifndef __RESOURCE_PROVIDER_REGISTRY_HPP__
 #define __RESOURCE_PROVIDER_REGISTRY_HPP__
 
+#include <mesos/type_utils.hpp>
+
 // ONLY USEFUL AFTER RUNNING PROTOC.
 #include "resource_provider/registry.pb.h"
 
+namespace mesos {
+namespace resource_provider {
+namespace registry {
+
+inline bool operator==(
+    const ResourceProvider& left,
+    const ResourceProvider& right)
+{
+  // To support additions to the persisted types we consider two resource
+  // providers to be equal if all their set fields are equal.
+  if (left.id() != right.id()) {
+    return false;
+  }
+
+  if (left.has_name() && right.has_name() && left.name() != right.name()) {
+    return false;
+  }
+
+  if (left.has_type() && right.has_type() && left.type() != right.type()) {
+    return false;
+  }
+
+  return true;
+}
+
+
+inline bool operator!=(
+    const ResourceProvider& left,
+    const ResourceProvider& right)
+{
+  return !(left == right);
+}
+
+
+inline std::ostream& operator<<(
+    std::ostream& stream,
+    const ResourceProvider& resourceProvider)
+{
+  return stream << resourceProvider.DebugString();
+}
+
+} // namespace registry {
+} // namespace resource_provider {
+} // namespace mesos {
+
 #endif // __RESOURCE_PROVIDER_REGISTRY_HPP__

http://git-wip-us.apache.org/repos/asf/mesos/blob/b4e08210/src/resource_provider/registry.proto
----------------------------------------------------------------------
diff --git a/src/resource_provider/registry.proto 
b/src/resource_provider/registry.proto
index 491263e..cb3cb24 100644
--- a/src/resource_provider/registry.proto
+++ b/src/resource_provider/registry.proto
@@ -29,6 +29,8 @@ option java_outer_classname = "Protos";
 
 message ResourceProvider {
   required ResourceProviderID id = 1;
+  optional string type = 2;
+  optional string name = 3;
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/b4e08210/src/tests/resource_provider_manager_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/resource_provider_manager_tests.cpp 
b/src/tests/resource_provider_manager_tests.cpp
index 58bdbf0..cf15e5a 100644
--- a/src/tests/resource_provider_manager_tests.cpp
+++ b/src/tests/resource_provider_manager_tests.cpp
@@ -824,8 +824,10 @@ class ResourceProviderRegistrarTest : public 
tests::MesosTest {};
 #ifndef __WINDOWS__
 TEST_F_TEMP_DISABLED_ON_WINDOWS(ResourceProviderRegistrarTest, 
GenericRegistrar)
 {
-  ResourceProviderID resourceProviderId;
-  resourceProviderId.set_value("foo");
+  mesos::resource_provider::registry::ResourceProvider resourceProvider;
+  resourceProvider.mutable_id()->set_value("foo");
+  resourceProvider.set_name("bar");
+  resourceProvider.set_type("org.apache.mesos.rp.test");
 
   // Perform operations on the resource provider. We use
   // persistent storage so we can recover the state below.
@@ -841,24 +843,35 @@ 
TEST_F_TEMP_DISABLED_ON_WINDOWS(ResourceProviderRegistrarTest, GenericRegistrar)
     AWAIT_READY(recover);
     EXPECT_TRUE(recover->removed_resource_providers().empty());
 
-    Future<bool> admitResourceProvider1 =
+    Future<bool> admitResourceProvider =
       registrar.get()->apply(Owned<Registrar::Operation>(
-            new AdmitResourceProvider(resourceProviderId)));
-    AWAIT_READY(admitResourceProvider1);
-    EXPECT_TRUE(admitResourceProvider1.get());
+          new AdmitResourceProvider(resourceProvider)));
+    AWAIT_READY(admitResourceProvider);
+    EXPECT_TRUE(admitResourceProvider.get());
+
+    // A resource provider cannot resubscribe with changed type or name.
+    mesos::resource_provider::registry::ResourceProvider resourceProvider_ =
+      resourceProvider;
+    resourceProvider_.set_type("org.apache.mesos.rp.test2");
+
+    admitResourceProvider =
+      registrar.get()->apply(Owned<Registrar::Operation>(
+          new AdmitResourceProvider(resourceProvider_)));
+    AWAIT_READY(admitResourceProvider);
+    EXPECT_FALSE(admitResourceProvider.get());
 
     Future<bool> removeResourceProvider =
       registrar.get()->apply(Owned<Registrar::Operation>(
-            new RemoveResourceProvider(resourceProviderId)));
+          new RemoveResourceProvider(resourceProvider.id())));
     AWAIT_READY(removeResourceProvider);
     EXPECT_TRUE(removeResourceProvider.get());
 
     // A removed resource provider cannot be admitted again.
-    Future<bool> admitResourceProvider2 =
+    admitResourceProvider =
       registrar.get()->apply(Owned<Registrar::Operation>(
-            new AdmitResourceProvider(resourceProviderId)));
-    AWAIT_READY(admitResourceProvider2);
-    EXPECT_FALSE(admitResourceProvider2.get());
+          new AdmitResourceProvider(resourceProvider)));
+    AWAIT_READY(admitResourceProvider);
+    EXPECT_FALSE(admitResourceProvider.get());
   }
 
   // Recover and validate the previous registry state.
@@ -877,10 +890,9 @@ 
TEST_F_TEMP_DISABLED_ON_WINDOWS(ResourceProviderRegistrarTest, GenericRegistrar)
     ASSERT_EQ(1, recover->removed_resource_providers_size());
 
     const mesos::resource_provider::registry::ResourceProvider&
-      resourceProvider = recover->removed_resource_providers(0);
+      resourceProvider_ = recover->removed_resource_providers(0);
 
-    ASSERT_TRUE(resourceProvider.has_id());
-    EXPECT_EQ(resourceProviderId, resourceProvider.id());
+    EXPECT_EQ(resourceProvider, resourceProvider_);
   }
 }
 #endif // __WINDOWS__
@@ -892,8 +904,10 @@ 
TEST_F_TEMP_DISABLED_ON_WINDOWS(ResourceProviderRegistrarTest, GenericRegistrar)
 #ifndef __WINDOWS__
 TEST_F_TEMP_DISABLED_ON_WINDOWS(ResourceProviderRegistrarTest, MasterRegistrar)
 {
-  ResourceProviderID resourceProviderId;
-  resourceProviderId.set_value("foo");
+  mesos::resource_provider::registry::ResourceProvider resourceProvider;
+  resourceProvider.mutable_id()->set_value("foo");
+  resourceProvider.set_name("bar");
+  resourceProvider.set_type("org.apache.mesos.rp.test");
 
   // Perform operations on the resource provider. We use
   // persistent storage so we can recover the state below.
@@ -913,24 +927,35 @@ 
TEST_F_TEMP_DISABLED_ON_WINDOWS(ResourceProviderRegistrarTest, MasterRegistrar)
 
     ASSERT_SOME_NE(Owned<Registrar>(nullptr), registrar);
 
-    Future<bool> admitResourceProvider1 =
+    Future<bool> admitResourceProvider =
+      registrar.get()->apply(Owned<Registrar::Operation>(
+            new AdmitResourceProvider(resourceProvider)));
+    AWAIT_READY(admitResourceProvider);
+    EXPECT_TRUE(admitResourceProvider.get());
+
+    // A resource provider cannot resubscribe with changed type or name.
+    mesos::resource_provider::registry::ResourceProvider resourceProvider_ =
+      resourceProvider;
+    resourceProvider_.set_type("org.apache.mesos.rp.test2");
+
+    admitResourceProvider =
       registrar.get()->apply(Owned<Registrar::Operation>(
-            new AdmitResourceProvider(resourceProviderId)));
-    AWAIT_READY(admitResourceProvider1);
-    EXPECT_TRUE(admitResourceProvider1.get());
+          new AdmitResourceProvider(resourceProvider_)));
+    AWAIT_READY(admitResourceProvider);
+    EXPECT_FALSE(admitResourceProvider.get());
 
     Future<bool> removeResourceProvider =
       registrar.get()->apply(Owned<Registrar::Operation>(
-            new RemoveResourceProvider(resourceProviderId)));
+            new RemoveResourceProvider(resourceProvider.id())));
     AWAIT_READY(removeResourceProvider);
     EXPECT_TRUE(removeResourceProvider.get());
 
     // A removed resource provider cannot be admitted again.
-    Future<bool> admitResourceProvider2 =
+    admitResourceProvider =
       registrar.get()->apply(Owned<Registrar::Operation>(
-            new AdmitResourceProvider(resourceProviderId)));
-    AWAIT_READY(admitResourceProvider2);
-    EXPECT_FALSE(admitResourceProvider2.get());
+            new AdmitResourceProvider(resourceProvider)));
+    AWAIT_READY(admitResourceProvider);
+    EXPECT_FALSE(admitResourceProvider.get());
   }
 
   // Recover and validate the previous registry state.
@@ -958,10 +983,9 @@ 
TEST_F_TEMP_DISABLED_ON_WINDOWS(ResourceProviderRegistrarTest, MasterRegistrar)
     ASSERT_EQ(1, recover->removed_resource_providers_size());
 
     const mesos::resource_provider::registry::ResourceProvider&
-      resourceProvider = recover->removed_resource_providers(0);
+      resourceProvider_ = recover->removed_resource_providers(0);
 
-    ASSERT_TRUE(resourceProvider.has_id());
-    EXPECT_EQ(resourceProviderId, resourceProvider.id());
+    EXPECT_EQ(resourceProvider, resourceProvider_);
   }
 }
 #endif // __WINDOWS__

Reply via email to