Revert "Set up recovery code paths of resource provider manager."

This reverts commit bfcf5571869598a2e6d75550013fdaefa57dd6cb.


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/f670e2b3
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/f670e2b3
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/f670e2b3

Branch: refs/heads/master
Commit: f670e2b329514282515a5df2b52d0ddbcbb92a8b
Parents: 26626f4
Author: Alexander Rukletsov <[email protected]>
Authored: Wed Apr 25 17:09:03 2018 +0200
Committer: Alexander Rukletsov <[email protected]>
Committed: Wed Apr 25 17:09:03 2018 +0200

----------------------------------------------------------------------
 src/resource_provider/manager.cpp             | 323 +++++++++------------
 src/resource_provider/registrar.cpp           |  91 +++---
 src/resource_provider/registrar.hpp           |  18 +-
 src/tests/resource_provider_manager_tests.cpp |  15 +-
 4 files changed, 187 insertions(+), 260 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/f670e2b3/src/resource_provider/manager.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/manager.cpp 
b/src/resource_provider/manager.cpp
index 6393e7a..259a810 100644
--- a/src/resource_provider/manager.cpp
+++ b/src/resource_provider/manager.cpp
@@ -36,7 +36,6 @@
 #include <process/metrics/metrics.hpp>
 
 #include <stout/hashmap.hpp>
-#include <stout/nothing.hpp>
 #include <stout/protobuf.hpp>
 #include <stout/uuid.hpp>
 
@@ -48,7 +47,6 @@
 #include "internal/devolve.hpp"
 #include "internal/evolve.hpp"
 
-#include "resource_provider/registry.hpp"
 #include "resource_provider/validation.hpp"
 
 namespace http = process::http;
@@ -62,8 +60,6 @@ using mesos::resource_provider::Call;
 using mesos::resource_provider::Event;
 using mesos::resource_provider::Registrar;
 
-using mesos::resource_provider::registry::Registry;
-
 using process::Failure;
 using process::Future;
 using process::Owned;
@@ -80,10 +76,10 @@ using process::wait;
 
 using process::http::Accepted;
 using process::http::BadRequest;
+using process::http::OK;
 using process::http::MethodNotAllowed;
 using process::http::NotAcceptable;
 using process::http::NotImplemented;
-using process::http::OK;
 using process::http::Pipe;
 using process::http::UnsupportedMediaType;
 
@@ -196,11 +192,6 @@ private:
       ResourceProvider* resourceProvider,
       const Call::UpdatePublishResourcesStatus& update);
 
-  Future<Nothing> recover(
-      const mesos::resource_provider::registry::Registry& registry);
-
-  void initialize() override;
-
   ResourceProviderID newResourceProviderId();
 
   double gaugeSubscribed();
@@ -218,9 +209,6 @@ private:
     Gauge subscribed;
   };
 
-  Owned<Registrar> registrar;
-  Promise<Nothing> recovered;
-
   Metrics metrics;
 };
 
@@ -228,191 +216,152 @@ private:
 ResourceProviderManagerProcess::ResourceProviderManagerProcess(
     Owned<Registrar> _registrar)
   : ProcessBase(process::ID::generate("resource-provider-manager")),
-    registrar(std::move(_registrar)),
     metrics(*this)
 {
-  CHECK_NOTNULL(registrar.get());
+  CHECK_NOTNULL(_registrar.get());
 }
 
 
-void ResourceProviderManagerProcess::initialize()
+Future<http::Response> ResourceProviderManagerProcess::api(
+    const http::Request& request,
+    const Option<Principal>& principal)
 {
-  // Recover the registrar.
-  registrar->recover()
-    .then(defer(self(), &ResourceProviderManagerProcess::recover, lambda::_1))
-    .onAny([](const Future<Nothing>& recovered) {
-      if (!recovered.isReady()) {
-        LOG(FATAL)
-        << "Failed to recover resource provider manager registry: "
-        << recovered;
-      }
-    });
-}
+  if (request.method != "POST") {
+    return MethodNotAllowed({"POST"}, request.method);
+  }
 
+  v1::resource_provider::Call v1Call;
 
-Future<Nothing> ResourceProviderManagerProcess::recover(
-    const mesos::resource_provider::registry::Registry& registry)
-{
-  recovered.set(Nothing());
+  // TODO(anand): Content type values are case-insensitive.
+  Option<string> contentType = request.headers.get("Content-Type");
 
-  return Nothing();
-}
+  if (contentType.isNone()) {
+    return BadRequest("Expecting 'Content-Type' to be present");
+  }
 
+  if (contentType.get() == APPLICATION_PROTOBUF) {
+    if (!v1Call.ParseFromString(request.body)) {
+      return BadRequest("Failed to parse body into Call protobuf");
+    }
+  } else if (contentType.get() == APPLICATION_JSON) {
+    Try<JSON::Value> value = JSON::parse(request.body);
+    if (value.isError()) {
+      return BadRequest("Failed to parse body into JSON: " + value.error());
+    }
 
-Future<http::Response> ResourceProviderManagerProcess::api(
-    const http::Request& request,
-    const Option<Principal>& principal)
-{
-  // TODO(bbannier): This implementation does not limit the number of messages
-  // in the actor's inbox which could become large should a big number of
-  // resource providers attempt to subscribe before recovery completed. 
Consider
-  // rejecting requests until the resource provider manager has recovered. This
-  // would likely require implementing retry logic in resource providers.
-  return recovered.future().then(defer(
-      self(), [this, request, principal](const Nothing&) -> http::Response {
-        if (request.method != "POST") {
-          return MethodNotAllowed({"POST"}, request.method);
-        }
-
-        v1::resource_provider::Call v1Call;
-
-        // TODO(anand): Content type values are case-insensitive.
-        Option<string> contentType = request.headers.get("Content-Type");
-
-        if (contentType.isNone()) {
-          return BadRequest("Expecting 'Content-Type' to be present");
-        }
-
-        if (contentType.get() == APPLICATION_PROTOBUF) {
-          if (!v1Call.ParseFromString(request.body)) {
-            return BadRequest("Failed to parse body into Call protobuf");
-          }
-        } else if (contentType.get() == APPLICATION_JSON) {
-          Try<JSON::Value> value = JSON::parse(request.body);
-          if (value.isError()) {
-            return BadRequest(
-                "Failed to parse body into JSON: " + value.error());
-          }
-
-          Try<v1::resource_provider::Call> parse =
-            ::protobuf::parse<v1::resource_provider::Call>(value.get());
-
-          if (parse.isError()) {
-            return BadRequest(
-                "Failed to convert JSON into Call protobuf: " + parse.error());
-          }
-
-          v1Call = parse.get();
-        } else {
-          return UnsupportedMediaType(
-              string("Expecting 'Content-Type' of ") + APPLICATION_JSON +
-              " or " + APPLICATION_PROTOBUF);
-        }
-
-        Call call = devolve(v1Call);
-
-        Option<Error> error = validate(call);
-        if (error.isSome()) {
-          return BadRequest(
-              "Failed to validate resource_provider::Call: " + error->message);
-        }
-
-        if (call.type() == Call::SUBSCRIBE) {
-          // We default to JSON 'Content-Type' in the response since an empty
-          // 'Accept' header results in all media types considered acceptable.
-          ContentType acceptType = ContentType::JSON;
-
-          if (request.acceptsMediaType(APPLICATION_JSON)) {
-            acceptType = ContentType::JSON;
-          } else if (request.acceptsMediaType(APPLICATION_PROTOBUF)) {
-            acceptType = ContentType::PROTOBUF;
-          } else {
-            return NotAcceptable(
-                string("Expecting 'Accept' to allow ") + "'" +
-                APPLICATION_PROTOBUF + "' or '" + APPLICATION_JSON + "'");
-          }
-
-          if (request.headers.contains("Mesos-Stream-Id")) {
-            return BadRequest(
-                "Subscribe calls should not include the 'Mesos-Stream-Id' "
-                "header");
-          }
-
-          Pipe pipe;
-          OK ok;
-
-          ok.headers["Content-Type"] = stringify(acceptType);
-          ok.type = http::Response::PIPE;
-          ok.reader = pipe.reader();
-
-          // Generate a stream ID and return it in the response.
-          id::UUID streamId = id::UUID::random();
-          ok.headers["Mesos-Stream-Id"] = streamId.toString();
-
-          HttpConnection http(pipe.writer(), acceptType, streamId);
-          this->subscribe(http, call.subscribe());
-
-          return std::move(ok);
-        }
-
-        if (!this->resourceProviders.subscribed.contains(
-                call.resource_provider_id())) {
-          return BadRequest("Resource provider is not subscribed");
-        }
-
-        ResourceProvider* resourceProvider =
-          this->resourceProviders.subscribed.at(call.resource_provider_id())
-            .get();
-
-        // This isn't a `SUBSCRIBE` call, so the request should include a 
stream
-        // ID.
-        if (!request.headers.contains("Mesos-Stream-Id")) {
-          return BadRequest(
-              "All non-subscribe calls should include to 'Mesos-Stream-Id' "
-              "header");
-        }
-
-        const string& streamId = request.headers.at("Mesos-Stream-Id");
-        if (streamId != resourceProvider->http.streamId.toString()) {
-          return BadRequest(
-              "The stream ID '" + streamId +
-              "' included in this request "
-              "didn't match the stream ID currently associated with "
-              " resource provider ID " +
-              resourceProvider->info.id().value());
-        }
-
-        switch (call.type()) {
-          case Call::UNKNOWN: {
-            return NotImplemented();
-          }
-
-          case Call::SUBSCRIBE: {
-            // `SUBSCRIBE` call should have been handled above.
-            LOG(FATAL) << "Unexpected 'SUBSCRIBE' call";
-          }
-
-          case Call::UPDATE_OPERATION_STATUS: {
-            this->updateOperationStatus(
-                resourceProvider, call.update_operation_status());
-
-            return Accepted();
-          }
-
-          case Call::UPDATE_STATE: {
-            this->updateState(resourceProvider, call.update_state());
-            return Accepted();
-          }
-
-          case Call::UPDATE_PUBLISH_RESOURCES_STATUS: {
-            this->updatePublishResourcesStatus(
-                resourceProvider, call.update_publish_resources_status());
-            return Accepted();
-          }
-        }
-
-        UNREACHABLE();
-      }));
+    Try<v1::resource_provider::Call> parse =
+      ::protobuf::parse<v1::resource_provider::Call>(value.get());
+
+    if (parse.isError()) {
+      return BadRequest("Failed to convert JSON into Call protobuf: " +
+                        parse.error());
+    }
+
+    v1Call = parse.get();
+  } else {
+    return UnsupportedMediaType(
+        string("Expecting 'Content-Type' of ") +
+        APPLICATION_JSON + " or " + APPLICATION_PROTOBUF);
+  }
+
+  Call call = devolve(v1Call);
+
+  Option<Error> error = validate(call);
+  if (error.isSome()) {
+    return BadRequest(
+        "Failed to validate resource_provider::Call: " + error->message);
+  }
+
+  if (call.type() == Call::SUBSCRIBE) {
+    // We default to JSON 'Content-Type' in the response since an empty
+    // 'Accept' header results in all media types considered acceptable.
+    ContentType acceptType = ContentType::JSON;
+
+    if (request.acceptsMediaType(APPLICATION_JSON)) {
+      acceptType = ContentType::JSON;
+    } else if (request.acceptsMediaType(APPLICATION_PROTOBUF)) {
+      acceptType = ContentType::PROTOBUF;
+    } else {
+      return NotAcceptable(
+          string("Expecting 'Accept' to allow ") +
+          "'" + APPLICATION_PROTOBUF + "' or '" + APPLICATION_JSON + "'");
+    }
+
+    if (request.headers.contains("Mesos-Stream-Id")) {
+      return BadRequest(
+          "Subscribe calls should not include the 'Mesos-Stream-Id' header");
+    }
+
+    Pipe pipe;
+    OK ok;
+
+    ok.headers["Content-Type"] = stringify(acceptType);
+    ok.type = http::Response::PIPE;
+    ok.reader = pipe.reader();
+
+    // Generate a stream ID and return it in the response.
+    id::UUID streamId = id::UUID::random();
+    ok.headers["Mesos-Stream-Id"] = streamId.toString();
+
+    HttpConnection http(pipe.writer(), acceptType, streamId);
+    subscribe(http, call.subscribe());
+
+    return ok;
+  }
+
+  if (!resourceProviders.subscribed.contains(call.resource_provider_id())) {
+    return BadRequest("Resource provider is not subscribed");
+  }
+
+  ResourceProvider* resourceProvider =
+    resourceProviders.subscribed.at(call.resource_provider_id()).get();
+
+  // This isn't a `SUBSCRIBE` call, so the request should include a stream ID.
+  if (!request.headers.contains("Mesos-Stream-Id")) {
+    return BadRequest(
+        "All non-subscribe calls should include to 'Mesos-Stream-Id' header");
+  }
+
+  const string& streamId = request.headers.at("Mesos-Stream-Id");
+  if (streamId != resourceProvider->http.streamId.toString()) {
+    return BadRequest(
+        "The stream ID '" + streamId + "' included in this request "
+        "didn't match the stream ID currently associated with "
+        " resource provider ID " + resourceProvider->info.id().value());
+  }
+
+  switch(call.type()) {
+    case Call::UNKNOWN: {
+      return NotImplemented();
+    }
+
+    case Call::SUBSCRIBE: {
+      // `SUBSCRIBE` call should have been handled above.
+      LOG(FATAL) << "Unexpected 'SUBSCRIBE' call";
+    }
+
+    case Call::UPDATE_OPERATION_STATUS: {
+      updateOperationStatus(
+          resourceProvider,
+          call.update_operation_status());
+
+      return Accepted();
+    }
+
+    case Call::UPDATE_STATE: {
+      updateState(resourceProvider, call.update_state());
+      return Accepted();
+    }
+
+    case Call::UPDATE_PUBLISH_RESOURCES_STATUS: {
+      updatePublishResourcesStatus(
+          resourceProvider,
+          call.update_publish_resources_status());
+      return Accepted();
+    }
+  }
+
+  UNREACHABLE();
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/f670e2b3/src/resource_provider/registrar.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/registrar.cpp 
b/src/resource_provider/registrar.cpp
index d7ec6a6..b151e2b 100644
--- a/src/resource_provider/registrar.cpp
+++ b/src/resource_provider/registrar.cpp
@@ -92,11 +92,9 @@ Try<Owned<Registrar>> Registrar::create(Owned<Storage> 
storage)
 }
 
 
-Try<Owned<Registrar>> Registrar::create(
-    master::Registrar* registrar,
-    Registry registry)
+Try<Owned<Registrar>> Registrar::create(master::Registrar* registrar)
 {
-  return new MasterRegistrar(registrar, std::move(registry));
+  return new MasterRegistrar(registrar);
 }
 
 
@@ -152,29 +150,28 @@ class GenericRegistrarProcess : public 
Process<GenericRegistrarProcess>
 public:
   GenericRegistrarProcess(Owned<Storage> storage);
 
-  Future<Registry> recover();
+  Future<Nothing> recover();
 
   Future<bool> apply(Owned<Registrar::Operation> operation);
 
-  void update();
-
-  void initialize() override;
-
-private:
   Future<bool> _apply(Owned<Registrar::Operation> operation);
 
+  void update();
+
   void _update(
       const Future<Option<Variable<Registry>>>& store,
+      const Registry& updatedRegistry,
       deque<Owned<Registrar::Operation>> applied);
 
-
+private:
   Owned<Storage> storage;
 
   // Use fully qualified type for `State` to disambiguate with `State`
   // enumeration in `ProcessBase`.
   mesos::state::protobuf::State state;
 
-  Promise<Nothing> recovered;
+  Option<Future<Nothing>> recovered;
+  Option<Registry> registry;
   Option<Variable<Registry>> variable;
 
   Option<Error> error;
@@ -194,36 +191,32 @@ 
GenericRegistrarProcess::GenericRegistrarProcess(Owned<Storage> _storage)
 }
 
 
-void GenericRegistrarProcess::initialize()
+Future<Nothing> GenericRegistrarProcess::recover()
 {
   constexpr char NAME[] = "RESOURCE_PROVIDER_REGISTRAR";
 
-  CHECK_NONE(variable);
-
-  recovered.associate(state.fetch<Registry>(NAME).then(
-      defer(self(), [this](const Variable<Registry>& recovery) {
-        variable = recovery;
-        return Nothing();
-      })));
-}
+  if (recovered.isNone()) {
+    recovered = state.fetch<Registry>(NAME).then(
+        defer(self(), [this](const Variable<Registry>& recovery) {
+          registry = recovery.get();
+          variable = recovery;
 
+          return Nothing();
+        }));
+  }
 
-Future<Registry> GenericRegistrarProcess::recover()
-{
-  // Prevent discards on the returned `Future` by marking the result as
-  // `undiscardable` so that we control the lifetime of the recovering 
registry.
-  return undiscardable(recovered.future()).then([this](const Nothing&) {
-    CHECK_SOME(this->variable);
-    return this->variable->get();
-  });
+  return recovered.get();
 }
 
 
 Future<bool> GenericRegistrarProcess::apply(
     Owned<Registrar::Operation> operation)
 {
-  return undiscardable(recovered.future()).then(
-      defer(self(), &Self::_apply, std::move(operation)));
+  if (recovered.isNone()) {
+    return Failure("Attempted to apply the operation before recovering");
+  }
+
+  return recovered->then(defer(self(), &Self::_apply, std::move(operation)));
 }
 
 
@@ -256,9 +249,8 @@ void GenericRegistrarProcess::update()
 
   updating = true;
 
-  CHECK_SOME(variable);
-
-  Registry updatedRegistry = variable->get();
+  CHECK_SOME(registry);
+  Registry updatedRegistry = registry.get();
 
   foreach (Owned<Registrar::Operation>& operation, operations) {
     Try<bool> operationResult = (*operation)(&updatedRegistry);
@@ -280,6 +272,7 @@ void GenericRegistrarProcess::update()
       self(),
       &Self::_update,
       lambda::_1,
+      updatedRegistry,
       std::move(operations)));
 
   operations.clear();
@@ -288,6 +281,7 @@ void GenericRegistrarProcess::update()
 
 void GenericRegistrarProcess::_update(
     const Future<Option<Variable<Registry>>>& store,
+    const Registry& updatedRegistry,
     deque<Owned<Registrar::Operation>> applied)
 {
   updating = false;
@@ -316,6 +310,7 @@ void GenericRegistrarProcess::_update(
   }
 
   variable = store->get();
+  registry = updatedRegistry;
 
   // Remove the operations.
   while (!applied.empty()) {
@@ -345,7 +340,7 @@ GenericRegistrar::~GenericRegistrar()
 }
 
 
-Future<Registry> GenericRegistrar::recover()
+Future<Nothing> GenericRegistrar::recover()
 {
   return dispatch(process.get(), &GenericRegistrarProcess::recover);
 }
@@ -369,8 +364,6 @@ class MasterRegistrarProcess : public 
Process<MasterRegistrarProcess>
   public:
     AdaptedOperation(Owned<Registrar::Operation> operation);
 
-    Future<registry::Registry> recover();
-
   private:
     Try<bool> perform(internal::Registry* registry, hashset<SlaveID>*) 
override;
 
@@ -383,17 +376,12 @@ class MasterRegistrarProcess : public 
Process<MasterRegistrarProcess>
   };
 
 public:
-  explicit MasterRegistrarProcess(
-      master::Registrar* registrar,
-      Registry registry);
+  explicit MasterRegistrarProcess(master::Registrar* registrar);
 
   Future<bool> apply(Owned<Registrar::Operation> operation);
 
-  Future<registry::Registry> recover() { return registry; }
-
 private:
   master::Registrar* registrar = nullptr;
-  Registry registry;
 };
 
 
@@ -410,12 +398,9 @@ Try<bool> 
MasterRegistrarProcess::AdaptedOperation::perform(
 }
 
 
-MasterRegistrarProcess::MasterRegistrarProcess(
-    master::Registrar* _registrar,
-    registry::Registry _registry)
+MasterRegistrarProcess::MasterRegistrarProcess(master::Registrar* _registrar)
   : ProcessBase(process::ID::generate("resource-provider-agent-registrar")),
-    registrar(_registrar),
-    registry(std::move(_registry)) {}
+    registrar(_registrar) {}
 
 
 Future<bool> MasterRegistrarProcess::apply(
@@ -428,10 +413,8 @@ Future<bool> MasterRegistrarProcess::apply(
 }
 
 
-MasterRegistrar::MasterRegistrar(
-    master::Registrar* registrar,
-    registry::Registry registry)
-  : process(new MasterRegistrarProcess(registrar, std::move(registry)))
+MasterRegistrar::MasterRegistrar(master::Registrar* registrar)
+  : process(new MasterRegistrarProcess(registrar))
 {
   spawn(process.get(), false);
 }
@@ -444,9 +427,9 @@ MasterRegistrar::~MasterRegistrar()
 }
 
 
-Future<Registry> MasterRegistrar::recover()
+Future<Nothing> MasterRegistrar::recover()
 {
-  return dispatch(process.get(), &MasterRegistrarProcess::recover);
+  return Nothing();
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/f670e2b3/src/resource_provider/registrar.hpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/registrar.hpp 
b/src/resource_provider/registrar.hpp
index ded56e1..3c10785 100644
--- a/src/resource_provider/registrar.hpp
+++ b/src/resource_provider/registrar.hpp
@@ -71,16 +71,12 @@ public:
       process::Owned<state::Storage> storage);
 
   // Create a registry on top of a master's persistent state.
-  //
-  // The created registrar does not take ownership of the passed registrar
-  // which needs to be valid as long as the created registrar is alive.
   static Try<process::Owned<Registrar>> create(
-      mesos::internal::master::Registrar* registrar,
-      registry::Registry registry);
+      mesos::internal::master::Registrar* registrar);
 
   virtual ~Registrar() = default;
 
-  virtual process::Future<registry::Registry> recover() = 0;
+  virtual process::Future<Nothing> recover() = 0;
   virtual process::Future<bool> apply(process::Owned<Operation> operation) = 0;
 };
 
@@ -119,7 +115,7 @@ public:
 
   ~GenericRegistrar() override;
 
-  process::Future<registry::Registry> recover() override;
+  process::Future<Nothing> recover() override;
 
   process::Future<bool> apply(process::Owned<Operation> operation) override;
 
@@ -134,17 +130,13 @@ class MasterRegistrarProcess;
 class MasterRegistrar : public Registrar
 {
 public:
-  // The created registrar does not take ownership of the passed registrar
-  // which needs to be valid as long as the created registrar is alive.
-  explicit MasterRegistrar(
-      mesos::internal::master::Registrar* registrar,
-      registry::Registry registry);
+  explicit MasterRegistrar(mesos::internal::master::Registrar* Registrar);
 
   ~MasterRegistrar() override;
 
   // This registrar performs no recovery; instead to recover
   // the underlying master registrar needs to be recovered.
-  process::Future<registry::Registry> recover() override;
+  process::Future<Nothing> recover() override;
 
   process::Future<bool> apply(process::Owned<Operation> operation) override;
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/f670e2b3/src/tests/resource_provider_manager_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/resource_provider_manager_tests.cpp 
b/src/tests/resource_provider_manager_tests.cpp
index eb8e4fc..1664073 100644
--- a/src/tests/resource_provider_manager_tests.cpp
+++ b/src/tests/resource_provider_manager_tests.cpp
@@ -841,6 +841,10 @@ TEST_F(ResourceProviderRegistrarTest, GenericRegistrar)
   ASSERT_SOME(registrar);
   ASSERT_NE(nullptr, registrar->get());
 
+  // Applying operations on a not yet recovered registrar fails.
+  AWAIT_FAILED(registrar.get()->apply(Owned<Registrar::Operation>(
+      new AdmitResourceProvider(resourceProviderId))));
+
   AWAIT_READY(registrar.get()->recover());
 
   Future<bool> admitResourceProvider =
@@ -869,16 +873,15 @@ TEST_F(ResourceProviderRegistrarTest, MasterRegistrar)
 
   const MasterInfo masterInfo = protobuf::createMasterInfo({});
 
-  Future<Registry> registry = masterRegistrar.recover(masterInfo);
-  AWAIT_READY(registry);
-
-  Try<Owned<Registrar>> registrar = Registrar::create(
-      &masterRegistrar,
-      registry->resource_provider_registry());
+  Try<Owned<Registrar>> registrar = Registrar::create(&masterRegistrar);
 
   ASSERT_SOME(registrar);
   ASSERT_NE(nullptr, registrar->get());
 
+  // Applying operations on a not yet recovered registrar fails.
+  AWAIT_FAILED(registrar.get()->apply(Owned<Registrar::Operation>(
+      new AdmitResourceProvider(resourceProviderId))));
+
   AWAIT_READY(masterRegistrar.recover(masterInfo));
 
   Future<bool> admitResourceProvider =

Reply via email to