Revert "Delayed construction of the agent's resource provider manager."
This reverts commit 0424a6623d08440d8dbe5aff5ec2f18df7b93e24. Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/a1c6a7a3 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/a1c6a7a3 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/a1c6a7a3 Branch: refs/heads/master Commit: a1c6a7a3c54518f97a34f28b1792885b928b948c Parents: ed92ee4 Author: Alexander Rukletsov <[email protected]> Authored: Wed Apr 25 17:09:33 2018 +0200 Committer: Alexander Rukletsov <[email protected]> Committed: Wed Apr 25 17:09:33 2018 +0200 ---------------------------------------------------------------------- src/slave/slave.cpp | 94 +++++----------------- src/slave/slave.hpp | 6 +- src/tests/resource_provider_manager_tests.cpp | 9 +-- 3 files changed, 22 insertions(+), 87 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/a1c6a7a3/src/slave/slave.cpp ---------------------------------------------------------------------- diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp index d313777..d0ff5f8 100644 --- a/src/slave/slave.cpp +++ b/src/slave/slave.cpp @@ -30,8 +30,6 @@ #include <utility> #include <vector> -#include <glog/logging.h> - #include <mesos/type_utils.hpp> #include <mesos/authentication/secret_generator.hpp> @@ -774,20 +772,15 @@ void Slave::initialize() logRequest(request); return http.executor(request, principal); }); - route( - "/api/v1/resource_provider", - READWRITE_HTTP_AUTHENTICATION_REALM, - Http::RESOURCE_PROVIDER_HELP(), - [this](const http::Request& request, const Option<Principal>& principal) - -> Future<http::Response> { - logRequest(request); - - if (resourceProviderManager.get() == nullptr) { - return http::ServiceUnavailable(); - } - return resourceProviderManager->api(request, principal); - }); + route("/api/v1/resource_provider", + READWRITE_HTTP_AUTHENTICATION_REALM, + Http::RESOURCE_PROVIDER_HELP(), + [this](const http::Request& request, + const Option<Principal>& principal) { + logRequest(request); + return resourceProviderManager.api(request, principal); + }); // TODO(ijimenez): Remove this endpoint at the end of the // deprecation cycle on 0.26. @@ -1509,8 +1502,6 @@ void Slave::registered( CHECK_SOME(state::checkpoint(path, info)); - initializeResourceProviderManager(flags, info.id()); - // We start the local resource providers daemon once the agent is // running, so the resource providers can use the agent API. localResourceProviderDaemon->start(info.id()); @@ -4353,7 +4344,7 @@ void Slave::applyOperation(const ApplyOperationMessage& message) } if (resourceProviderId.isSome()) { - CHECK_NOTNULL(resourceProviderManager.get())->applyOperation(message); + resourceProviderManager.applyOperation(message); return; } @@ -4426,7 +4417,7 @@ void Slave::reconcileOperations(const ReconcileOperationsMessage& message) } if (containsResourceProviderOperations) { - CHECK_NOTNULL(resourceProviderManager.get())->reconcileOperations(message); + resourceProviderManager.reconcileOperations(message); } } @@ -4558,19 +4549,7 @@ void Slave::operationStatusAcknowledgement( { Operation* operation = getOperation(acknowledgement.operation_uuid()); if (operation != nullptr) { - // If the operation was on resource provider resources forward the - // acknowledgement to the resource provider manager as well. - Result<ResourceProviderID> resourceProviderId = - getResourceProviderId(operation->info()); - - CHECK(!resourceProviderId.isError()) - << "Could not determine resource provider of operation " << operation - << ": " << resourceProviderId.error(); - - if (resourceProviderId.isSome()) { - CHECK_NOTNULL(resourceProviderManager.get()) - ->acknowledgeOperationStatus(acknowledgement); - } + resourceProviderManager.acknowledgeOperationStatus(acknowledgement); CHECK(operation->statuses_size() > 0); if (protobuf::isTerminalState( @@ -7340,8 +7319,10 @@ void Slave::__recover(const Future<Nothing>& future) detection = detector->detect() .onAny(defer(self(), &Slave::detected, lambda::_1)); - if (info.has_id()) { - initializeResourceProviderManager(flags, info.id()); + if (capabilities.resourceProvider) { + // Start listening for messages from the resource provider manager. + resourceProviderManager.messages().get().onAny( + defer(self(), &Self::handleResourceProviderMessage, lambda::_1)); } // Forward oversubscribed resources. @@ -7619,7 +7600,7 @@ void Slave::handleResourceProviderMessage( << (message.isFailed() ? message.failure() : "future discarded"); // Wait for the next message. - CHECK_NOTNULL(resourceProviderManager.get())->messages().get() + resourceProviderManager.messages().get() .onAny(defer(self(), &Self::handleResourceProviderMessage, lambda::_1)); return; @@ -7878,7 +7859,7 @@ void Slave::handleResourceProviderMessage( } // Wait for the next message. - CHECK_NOTNULL(resourceProviderManager.get())->messages().get() + resourceProviderManager.messages().get() .onAny(defer(self(), &Self::handleResourceProviderMessage, lambda::_1)); } @@ -8133,24 +8114,6 @@ void Slave::apply(Operation* operation) Future<Nothing> Slave::publishResources( const Option<Resources>& additionalResources) { - // If the resource provider manager has not been created yet no resource - // providers have been added and we do not need to publish anything. - if (resourceProviderManager == nullptr) { - // We check whether the passed additional resources are compatible - // with the expectation that no resource provider resources are in - // use, yet. This is not an exhaustive consistency check. - if (additionalResources.isSome()) { - foreach (const Resource& resource, additionalResources.get()) { - CHECK(!resource.has_provider_id()) - << "Cannot publish resource provider resources " - << additionalResources.get() - << " until resource providers have subscribed"; - } - } - - return Nothing(); - } - Resources resources; // NOTE: For resources providers that serve quantity-based resources @@ -8171,8 +8134,7 @@ Future<Nothing> Slave::publishResources( resources += additionalResources.get(); } - return CHECK_NOTNULL(resourceProviderManager.get()) - ->publishResources(resources); + return resourceProviderManager.publishResources(resources); } @@ -8792,26 +8754,6 @@ double Slave::_resources_revocable_percent(const string& name) } -void Slave::initializeResourceProviderManager( - const Flags& flags, - const SlaveID& slaveId) -{ - // To simplify reasoning about lifetimes we do not allow - // reinitialization of the resource provider manager. - if (resourceProviderManager.get() != nullptr) { - return; - } - - resourceProviderManager.reset(new ResourceProviderManager()); - - if (capabilities.resourceProvider) { - // Start listening for messages from the resource provider manager. - resourceProviderManager->messages().get().onAny( - defer(self(), &Self::handleResourceProviderMessage, lambda::_1)); - } -} - - Framework::Framework( Slave* _slave, const Flags& slaveFlags, http://git-wip-us.apache.org/repos/asf/mesos/blob/a1c6a7a3/src/slave/slave.hpp ---------------------------------------------------------------------- diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp index c3866c6..c35996b 100644 --- a/src/slave/slave.hpp +++ b/src/slave/slave.hpp @@ -711,10 +711,6 @@ private: const SlaveInfo& previous, const SlaveInfo& current) const; - void initializeResourceProviderManager( - const Flags& flags, - const SlaveID& slaveId); - protobuf::master::Capabilities requiredMasterCapabilities; const Flags flags; @@ -816,7 +812,7 @@ private: // (allocated and oversubscribable) resources. Option<Resources> oversubscribedResources; - process::Owned<ResourceProviderManager> resourceProviderManager; + ResourceProviderManager resourceProviderManager; process::Owned<LocalResourceProviderDaemon> localResourceProviderDaemon; // Local resource providers known by the agent. http://git-wip-us.apache.org/repos/asf/mesos/blob/a1c6a7a3/src/tests/resource_provider_manager_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/resource_provider_manager_tests.cpp b/src/tests/resource_provider_manager_tests.cpp index 0de4e79..c52541b 100644 --- a/src/tests/resource_provider_manager_tests.cpp +++ b/src/tests/resource_provider_manager_tests.cpp @@ -755,17 +755,14 @@ TEST_P(ResourceProviderManagerHttpApiTest, AgentEndpoint) Try<Owned<cluster::Master>> master = StartMaster(); ASSERT_SOME(master); - Owned<MasterDetector> detector = master.get()->createDetector(); + Future<Nothing> __recover = FUTURE_DISPATCH(_, &Slave::__recover); - // For the agent's resource provider manager to start, - // the agent needs to have been assigned an agent ID. - Future<SlaveRegisteredMessage> slaveRegisteredMessage = - FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _); + Owned<MasterDetector> detector = master.get()->createDetector(); Try<Owned<cluster::Slave>> agent = StartSlave(detector.get()); ASSERT_SOME(agent); - AWAIT_READY(slaveRegisteredMessage); + AWAIT_READY(__recover); // Wait for recovery to be complete. Clock::pause();
