Revert "Set up recovery code paths of resource provider manager."
This reverts commit bfcf5571869598a2e6d75550013fdaefa57dd6cb. Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/f670e2b3 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/f670e2b3 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/f670e2b3 Branch: refs/heads/master Commit: f670e2b329514282515a5df2b52d0ddbcbb92a8b Parents: 26626f4 Author: Alexander Rukletsov <[email protected]> Authored: Wed Apr 25 17:09:03 2018 +0200 Committer: Alexander Rukletsov <[email protected]> Committed: Wed Apr 25 17:09:03 2018 +0200 ---------------------------------------------------------------------- src/resource_provider/manager.cpp | 323 +++++++++------------ src/resource_provider/registrar.cpp | 91 +++--- src/resource_provider/registrar.hpp | 18 +- src/tests/resource_provider_manager_tests.cpp | 15 +- 4 files changed, 187 insertions(+), 260 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/f670e2b3/src/resource_provider/manager.cpp ---------------------------------------------------------------------- diff --git a/src/resource_provider/manager.cpp b/src/resource_provider/manager.cpp index 6393e7a..259a810 100644 --- a/src/resource_provider/manager.cpp +++ b/src/resource_provider/manager.cpp @@ -36,7 +36,6 @@ #include <process/metrics/metrics.hpp> #include <stout/hashmap.hpp> -#include <stout/nothing.hpp> #include <stout/protobuf.hpp> #include <stout/uuid.hpp> @@ -48,7 +47,6 @@ #include "internal/devolve.hpp" #include "internal/evolve.hpp" -#include "resource_provider/registry.hpp" #include "resource_provider/validation.hpp" namespace http = process::http; @@ -62,8 +60,6 @@ using mesos::resource_provider::Call; using mesos::resource_provider::Event; using mesos::resource_provider::Registrar; -using mesos::resource_provider::registry::Registry; - using process::Failure; using process::Future; using process::Owned; @@ -80,10 +76,10 @@ using process::wait; using process::http::Accepted; using process::http::BadRequest; +using process::http::OK; using process::http::MethodNotAllowed; using process::http::NotAcceptable; using process::http::NotImplemented; -using process::http::OK; using process::http::Pipe; using process::http::UnsupportedMediaType; @@ -196,11 +192,6 @@ private: ResourceProvider* resourceProvider, const Call::UpdatePublishResourcesStatus& update); - Future<Nothing> recover( - const mesos::resource_provider::registry::Registry& registry); - - void initialize() override; - ResourceProviderID newResourceProviderId(); double gaugeSubscribed(); @@ -218,9 +209,6 @@ private: Gauge subscribed; }; - Owned<Registrar> registrar; - Promise<Nothing> recovered; - Metrics metrics; }; @@ -228,191 +216,152 @@ private: ResourceProviderManagerProcess::ResourceProviderManagerProcess( Owned<Registrar> _registrar) : ProcessBase(process::ID::generate("resource-provider-manager")), - registrar(std::move(_registrar)), metrics(*this) { - CHECK_NOTNULL(registrar.get()); + CHECK_NOTNULL(_registrar.get()); } -void ResourceProviderManagerProcess::initialize() +Future<http::Response> ResourceProviderManagerProcess::api( + const http::Request& request, + const Option<Principal>& principal) { - // Recover the registrar. - registrar->recover() - .then(defer(self(), &ResourceProviderManagerProcess::recover, lambda::_1)) - .onAny([](const Future<Nothing>& recovered) { - if (!recovered.isReady()) { - LOG(FATAL) - << "Failed to recover resource provider manager registry: " - << recovered; - } - }); -} + if (request.method != "POST") { + return MethodNotAllowed({"POST"}, request.method); + } + v1::resource_provider::Call v1Call; -Future<Nothing> ResourceProviderManagerProcess::recover( - const mesos::resource_provider::registry::Registry& registry) -{ - recovered.set(Nothing()); + // TODO(anand): Content type values are case-insensitive. + Option<string> contentType = request.headers.get("Content-Type"); - return Nothing(); -} + if (contentType.isNone()) { + return BadRequest("Expecting 'Content-Type' to be present"); + } + if (contentType.get() == APPLICATION_PROTOBUF) { + if (!v1Call.ParseFromString(request.body)) { + return BadRequest("Failed to parse body into Call protobuf"); + } + } else if (contentType.get() == APPLICATION_JSON) { + Try<JSON::Value> value = JSON::parse(request.body); + if (value.isError()) { + return BadRequest("Failed to parse body into JSON: " + value.error()); + } -Future<http::Response> ResourceProviderManagerProcess::api( - const http::Request& request, - const Option<Principal>& principal) -{ - // TODO(bbannier): This implementation does not limit the number of messages - // in the actor's inbox which could become large should a big number of - // resource providers attempt to subscribe before recovery completed. Consider - // rejecting requests until the resource provider manager has recovered. This - // would likely require implementing retry logic in resource providers. - return recovered.future().then(defer( - self(), [this, request, principal](const Nothing&) -> http::Response { - if (request.method != "POST") { - return MethodNotAllowed({"POST"}, request.method); - } - - v1::resource_provider::Call v1Call; - - // TODO(anand): Content type values are case-insensitive. - Option<string> contentType = request.headers.get("Content-Type"); - - if (contentType.isNone()) { - return BadRequest("Expecting 'Content-Type' to be present"); - } - - if (contentType.get() == APPLICATION_PROTOBUF) { - if (!v1Call.ParseFromString(request.body)) { - return BadRequest("Failed to parse body into Call protobuf"); - } - } else if (contentType.get() == APPLICATION_JSON) { - Try<JSON::Value> value = JSON::parse(request.body); - if (value.isError()) { - return BadRequest( - "Failed to parse body into JSON: " + value.error()); - } - - Try<v1::resource_provider::Call> parse = - ::protobuf::parse<v1::resource_provider::Call>(value.get()); - - if (parse.isError()) { - return BadRequest( - "Failed to convert JSON into Call protobuf: " + parse.error()); - } - - v1Call = parse.get(); - } else { - return UnsupportedMediaType( - string("Expecting 'Content-Type' of ") + APPLICATION_JSON + - " or " + APPLICATION_PROTOBUF); - } - - Call call = devolve(v1Call); - - Option<Error> error = validate(call); - if (error.isSome()) { - return BadRequest( - "Failed to validate resource_provider::Call: " + error->message); - } - - if (call.type() == Call::SUBSCRIBE) { - // We default to JSON 'Content-Type' in the response since an empty - // 'Accept' header results in all media types considered acceptable. - ContentType acceptType = ContentType::JSON; - - if (request.acceptsMediaType(APPLICATION_JSON)) { - acceptType = ContentType::JSON; - } else if (request.acceptsMediaType(APPLICATION_PROTOBUF)) { - acceptType = ContentType::PROTOBUF; - } else { - return NotAcceptable( - string("Expecting 'Accept' to allow ") + "'" + - APPLICATION_PROTOBUF + "' or '" + APPLICATION_JSON + "'"); - } - - if (request.headers.contains("Mesos-Stream-Id")) { - return BadRequest( - "Subscribe calls should not include the 'Mesos-Stream-Id' " - "header"); - } - - Pipe pipe; - OK ok; - - ok.headers["Content-Type"] = stringify(acceptType); - ok.type = http::Response::PIPE; - ok.reader = pipe.reader(); - - // Generate a stream ID and return it in the response. - id::UUID streamId = id::UUID::random(); - ok.headers["Mesos-Stream-Id"] = streamId.toString(); - - HttpConnection http(pipe.writer(), acceptType, streamId); - this->subscribe(http, call.subscribe()); - - return std::move(ok); - } - - if (!this->resourceProviders.subscribed.contains( - call.resource_provider_id())) { - return BadRequest("Resource provider is not subscribed"); - } - - ResourceProvider* resourceProvider = - this->resourceProviders.subscribed.at(call.resource_provider_id()) - .get(); - - // This isn't a `SUBSCRIBE` call, so the request should include a stream - // ID. - if (!request.headers.contains("Mesos-Stream-Id")) { - return BadRequest( - "All non-subscribe calls should include to 'Mesos-Stream-Id' " - "header"); - } - - const string& streamId = request.headers.at("Mesos-Stream-Id"); - if (streamId != resourceProvider->http.streamId.toString()) { - return BadRequest( - "The stream ID '" + streamId + - "' included in this request " - "didn't match the stream ID currently associated with " - " resource provider ID " + - resourceProvider->info.id().value()); - } - - switch (call.type()) { - case Call::UNKNOWN: { - return NotImplemented(); - } - - case Call::SUBSCRIBE: { - // `SUBSCRIBE` call should have been handled above. - LOG(FATAL) << "Unexpected 'SUBSCRIBE' call"; - } - - case Call::UPDATE_OPERATION_STATUS: { - this->updateOperationStatus( - resourceProvider, call.update_operation_status()); - - return Accepted(); - } - - case Call::UPDATE_STATE: { - this->updateState(resourceProvider, call.update_state()); - return Accepted(); - } - - case Call::UPDATE_PUBLISH_RESOURCES_STATUS: { - this->updatePublishResourcesStatus( - resourceProvider, call.update_publish_resources_status()); - return Accepted(); - } - } - - UNREACHABLE(); - })); + Try<v1::resource_provider::Call> parse = + ::protobuf::parse<v1::resource_provider::Call>(value.get()); + + if (parse.isError()) { + return BadRequest("Failed to convert JSON into Call protobuf: " + + parse.error()); + } + + v1Call = parse.get(); + } else { + return UnsupportedMediaType( + string("Expecting 'Content-Type' of ") + + APPLICATION_JSON + " or " + APPLICATION_PROTOBUF); + } + + Call call = devolve(v1Call); + + Option<Error> error = validate(call); + if (error.isSome()) { + return BadRequest( + "Failed to validate resource_provider::Call: " + error->message); + } + + if (call.type() == Call::SUBSCRIBE) { + // We default to JSON 'Content-Type' in the response since an empty + // 'Accept' header results in all media types considered acceptable. + ContentType acceptType = ContentType::JSON; + + if (request.acceptsMediaType(APPLICATION_JSON)) { + acceptType = ContentType::JSON; + } else if (request.acceptsMediaType(APPLICATION_PROTOBUF)) { + acceptType = ContentType::PROTOBUF; + } else { + return NotAcceptable( + string("Expecting 'Accept' to allow ") + + "'" + APPLICATION_PROTOBUF + "' or '" + APPLICATION_JSON + "'"); + } + + if (request.headers.contains("Mesos-Stream-Id")) { + return BadRequest( + "Subscribe calls should not include the 'Mesos-Stream-Id' header"); + } + + Pipe pipe; + OK ok; + + ok.headers["Content-Type"] = stringify(acceptType); + ok.type = http::Response::PIPE; + ok.reader = pipe.reader(); + + // Generate a stream ID and return it in the response. + id::UUID streamId = id::UUID::random(); + ok.headers["Mesos-Stream-Id"] = streamId.toString(); + + HttpConnection http(pipe.writer(), acceptType, streamId); + subscribe(http, call.subscribe()); + + return ok; + } + + if (!resourceProviders.subscribed.contains(call.resource_provider_id())) { + return BadRequest("Resource provider is not subscribed"); + } + + ResourceProvider* resourceProvider = + resourceProviders.subscribed.at(call.resource_provider_id()).get(); + + // This isn't a `SUBSCRIBE` call, so the request should include a stream ID. + if (!request.headers.contains("Mesos-Stream-Id")) { + return BadRequest( + "All non-subscribe calls should include to 'Mesos-Stream-Id' header"); + } + + const string& streamId = request.headers.at("Mesos-Stream-Id"); + if (streamId != resourceProvider->http.streamId.toString()) { + return BadRequest( + "The stream ID '" + streamId + "' included in this request " + "didn't match the stream ID currently associated with " + " resource provider ID " + resourceProvider->info.id().value()); + } + + switch(call.type()) { + case Call::UNKNOWN: { + return NotImplemented(); + } + + case Call::SUBSCRIBE: { + // `SUBSCRIBE` call should have been handled above. + LOG(FATAL) << "Unexpected 'SUBSCRIBE' call"; + } + + case Call::UPDATE_OPERATION_STATUS: { + updateOperationStatus( + resourceProvider, + call.update_operation_status()); + + return Accepted(); + } + + case Call::UPDATE_STATE: { + updateState(resourceProvider, call.update_state()); + return Accepted(); + } + + case Call::UPDATE_PUBLISH_RESOURCES_STATUS: { + updatePublishResourcesStatus( + resourceProvider, + call.update_publish_resources_status()); + return Accepted(); + } + } + + UNREACHABLE(); } http://git-wip-us.apache.org/repos/asf/mesos/blob/f670e2b3/src/resource_provider/registrar.cpp ---------------------------------------------------------------------- diff --git a/src/resource_provider/registrar.cpp b/src/resource_provider/registrar.cpp index d7ec6a6..b151e2b 100644 --- a/src/resource_provider/registrar.cpp +++ b/src/resource_provider/registrar.cpp @@ -92,11 +92,9 @@ Try<Owned<Registrar>> Registrar::create(Owned<Storage> storage) } -Try<Owned<Registrar>> Registrar::create( - master::Registrar* registrar, - Registry registry) +Try<Owned<Registrar>> Registrar::create(master::Registrar* registrar) { - return new MasterRegistrar(registrar, std::move(registry)); + return new MasterRegistrar(registrar); } @@ -152,29 +150,28 @@ class GenericRegistrarProcess : public Process<GenericRegistrarProcess> public: GenericRegistrarProcess(Owned<Storage> storage); - Future<Registry> recover(); + Future<Nothing> recover(); Future<bool> apply(Owned<Registrar::Operation> operation); - void update(); - - void initialize() override; - -private: Future<bool> _apply(Owned<Registrar::Operation> operation); + void update(); + void _update( const Future<Option<Variable<Registry>>>& store, + const Registry& updatedRegistry, deque<Owned<Registrar::Operation>> applied); - +private: Owned<Storage> storage; // Use fully qualified type for `State` to disambiguate with `State` // enumeration in `ProcessBase`. mesos::state::protobuf::State state; - Promise<Nothing> recovered; + Option<Future<Nothing>> recovered; + Option<Registry> registry; Option<Variable<Registry>> variable; Option<Error> error; @@ -194,36 +191,32 @@ GenericRegistrarProcess::GenericRegistrarProcess(Owned<Storage> _storage) } -void GenericRegistrarProcess::initialize() +Future<Nothing> GenericRegistrarProcess::recover() { constexpr char NAME[] = "RESOURCE_PROVIDER_REGISTRAR"; - CHECK_NONE(variable); - - recovered.associate(state.fetch<Registry>(NAME).then( - defer(self(), [this](const Variable<Registry>& recovery) { - variable = recovery; - return Nothing(); - }))); -} + if (recovered.isNone()) { + recovered = state.fetch<Registry>(NAME).then( + defer(self(), [this](const Variable<Registry>& recovery) { + registry = recovery.get(); + variable = recovery; + return Nothing(); + })); + } -Future<Registry> GenericRegistrarProcess::recover() -{ - // Prevent discards on the returned `Future` by marking the result as - // `undiscardable` so that we control the lifetime of the recovering registry. - return undiscardable(recovered.future()).then([this](const Nothing&) { - CHECK_SOME(this->variable); - return this->variable->get(); - }); + return recovered.get(); } Future<bool> GenericRegistrarProcess::apply( Owned<Registrar::Operation> operation) { - return undiscardable(recovered.future()).then( - defer(self(), &Self::_apply, std::move(operation))); + if (recovered.isNone()) { + return Failure("Attempted to apply the operation before recovering"); + } + + return recovered->then(defer(self(), &Self::_apply, std::move(operation))); } @@ -256,9 +249,8 @@ void GenericRegistrarProcess::update() updating = true; - CHECK_SOME(variable); - - Registry updatedRegistry = variable->get(); + CHECK_SOME(registry); + Registry updatedRegistry = registry.get(); foreach (Owned<Registrar::Operation>& operation, operations) { Try<bool> operationResult = (*operation)(&updatedRegistry); @@ -280,6 +272,7 @@ void GenericRegistrarProcess::update() self(), &Self::_update, lambda::_1, + updatedRegistry, std::move(operations))); operations.clear(); @@ -288,6 +281,7 @@ void GenericRegistrarProcess::update() void GenericRegistrarProcess::_update( const Future<Option<Variable<Registry>>>& store, + const Registry& updatedRegistry, deque<Owned<Registrar::Operation>> applied) { updating = false; @@ -316,6 +310,7 @@ void GenericRegistrarProcess::_update( } variable = store->get(); + registry = updatedRegistry; // Remove the operations. while (!applied.empty()) { @@ -345,7 +340,7 @@ GenericRegistrar::~GenericRegistrar() } -Future<Registry> GenericRegistrar::recover() +Future<Nothing> GenericRegistrar::recover() { return dispatch(process.get(), &GenericRegistrarProcess::recover); } @@ -369,8 +364,6 @@ class MasterRegistrarProcess : public Process<MasterRegistrarProcess> public: AdaptedOperation(Owned<Registrar::Operation> operation); - Future<registry::Registry> recover(); - private: Try<bool> perform(internal::Registry* registry, hashset<SlaveID>*) override; @@ -383,17 +376,12 @@ class MasterRegistrarProcess : public Process<MasterRegistrarProcess> }; public: - explicit MasterRegistrarProcess( - master::Registrar* registrar, - Registry registry); + explicit MasterRegistrarProcess(master::Registrar* registrar); Future<bool> apply(Owned<Registrar::Operation> operation); - Future<registry::Registry> recover() { return registry; } - private: master::Registrar* registrar = nullptr; - Registry registry; }; @@ -410,12 +398,9 @@ Try<bool> MasterRegistrarProcess::AdaptedOperation::perform( } -MasterRegistrarProcess::MasterRegistrarProcess( - master::Registrar* _registrar, - registry::Registry _registry) +MasterRegistrarProcess::MasterRegistrarProcess(master::Registrar* _registrar) : ProcessBase(process::ID::generate("resource-provider-agent-registrar")), - registrar(_registrar), - registry(std::move(_registry)) {} + registrar(_registrar) {} Future<bool> MasterRegistrarProcess::apply( @@ -428,10 +413,8 @@ Future<bool> MasterRegistrarProcess::apply( } -MasterRegistrar::MasterRegistrar( - master::Registrar* registrar, - registry::Registry registry) - : process(new MasterRegistrarProcess(registrar, std::move(registry))) +MasterRegistrar::MasterRegistrar(master::Registrar* registrar) + : process(new MasterRegistrarProcess(registrar)) { spawn(process.get(), false); } @@ -444,9 +427,9 @@ MasterRegistrar::~MasterRegistrar() } -Future<Registry> MasterRegistrar::recover() +Future<Nothing> MasterRegistrar::recover() { - return dispatch(process.get(), &MasterRegistrarProcess::recover); + return Nothing(); } http://git-wip-us.apache.org/repos/asf/mesos/blob/f670e2b3/src/resource_provider/registrar.hpp ---------------------------------------------------------------------- diff --git a/src/resource_provider/registrar.hpp b/src/resource_provider/registrar.hpp index ded56e1..3c10785 100644 --- a/src/resource_provider/registrar.hpp +++ b/src/resource_provider/registrar.hpp @@ -71,16 +71,12 @@ public: process::Owned<state::Storage> storage); // Create a registry on top of a master's persistent state. - // - // The created registrar does not take ownership of the passed registrar - // which needs to be valid as long as the created registrar is alive. static Try<process::Owned<Registrar>> create( - mesos::internal::master::Registrar* registrar, - registry::Registry registry); + mesos::internal::master::Registrar* registrar); virtual ~Registrar() = default; - virtual process::Future<registry::Registry> recover() = 0; + virtual process::Future<Nothing> recover() = 0; virtual process::Future<bool> apply(process::Owned<Operation> operation) = 0; }; @@ -119,7 +115,7 @@ public: ~GenericRegistrar() override; - process::Future<registry::Registry> recover() override; + process::Future<Nothing> recover() override; process::Future<bool> apply(process::Owned<Operation> operation) override; @@ -134,17 +130,13 @@ class MasterRegistrarProcess; class MasterRegistrar : public Registrar { public: - // The created registrar does not take ownership of the passed registrar - // which needs to be valid as long as the created registrar is alive. - explicit MasterRegistrar( - mesos::internal::master::Registrar* registrar, - registry::Registry registry); + explicit MasterRegistrar(mesos::internal::master::Registrar* Registrar); ~MasterRegistrar() override; // This registrar performs no recovery; instead to recover // the underlying master registrar needs to be recovered. - process::Future<registry::Registry> recover() override; + process::Future<Nothing> recover() override; process::Future<bool> apply(process::Owned<Operation> operation) override; http://git-wip-us.apache.org/repos/asf/mesos/blob/f670e2b3/src/tests/resource_provider_manager_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/resource_provider_manager_tests.cpp b/src/tests/resource_provider_manager_tests.cpp index eb8e4fc..1664073 100644 --- a/src/tests/resource_provider_manager_tests.cpp +++ b/src/tests/resource_provider_manager_tests.cpp @@ -841,6 +841,10 @@ TEST_F(ResourceProviderRegistrarTest, GenericRegistrar) ASSERT_SOME(registrar); ASSERT_NE(nullptr, registrar->get()); + // Applying operations on a not yet recovered registrar fails. + AWAIT_FAILED(registrar.get()->apply(Owned<Registrar::Operation>( + new AdmitResourceProvider(resourceProviderId)))); + AWAIT_READY(registrar.get()->recover()); Future<bool> admitResourceProvider = @@ -869,16 +873,15 @@ TEST_F(ResourceProviderRegistrarTest, MasterRegistrar) const MasterInfo masterInfo = protobuf::createMasterInfo({}); - Future<Registry> registry = masterRegistrar.recover(masterInfo); - AWAIT_READY(registry); - - Try<Owned<Registrar>> registrar = Registrar::create( - &masterRegistrar, - registry->resource_provider_registry()); + Try<Owned<Registrar>> registrar = Registrar::create(&masterRegistrar); ASSERT_SOME(registrar); ASSERT_NE(nullptr, registrar->get()); + // Applying operations on a not yet recovered registrar fails. + AWAIT_FAILED(registrar.get()->apply(Owned<Registrar::Operation>( + new AdmitResourceProvider(resourceProviderId)))); + AWAIT_READY(masterRegistrar.recover(masterInfo)); Future<bool> admitResourceProvider =
