Revert "Externalized creation of resource provider manager backing storage."
This reverts commit 6f6413b618b4d7aec7c8f8e6fa9e3542f1af2b9c. Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/ed92ee4e Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/ed92ee4e Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/ed92ee4e Branch: refs/heads/master Commit: ed92ee4e61c44c4fe81da277bb68cee56c818fa7 Parents: 6d56382 Author: Alexander Rukletsov <[email protected]> Authored: Wed Apr 25 17:09:24 2018 +0200 Committer: Alexander Rukletsov <[email protected]> Committed: Wed Apr 25 17:09:24 2018 +0200 ---------------------------------------------------------------------- src/resource_provider/registrar.cpp | 49 ++++++++++++++++++---- src/resource_provider/registrar.hpp | 15 +++---- src/tests/resource_provider_manager_tests.cpp | 27 +++++++++++- 3 files changed, 75 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/ed92ee4e/src/resource_provider/registrar.cpp ---------------------------------------------------------------------- diff --git a/src/resource_provider/registrar.cpp b/src/resource_provider/registrar.cpp index dbb55dd..92ef9ae 100644 --- a/src/resource_provider/registrar.cpp +++ b/src/resource_provider/registrar.cpp @@ -27,6 +27,10 @@ #include <mesos/state/in_memory.hpp> +#ifndef __WINDOWS__ +#include <mesos/state/leveldb.hpp> +#endif // __WINDOWS__ + #include <mesos/state/protobuf.hpp> #include <process/defer.hpp> @@ -49,6 +53,12 @@ using std::string; using mesos::resource_provider::registry::Registry; using mesos::resource_provider::registry::ResourceProvider; +using mesos::state::InMemoryStorage; + +#ifndef __WINDOWS__ +using mesos::state::LevelDBStorage; +#endif // __WINDOWS__ + using mesos::state::Storage; using mesos::state::protobuf::Variable; @@ -86,9 +96,11 @@ bool Registrar::Operation::set() } -Try<Owned<Registrar>> Registrar::create(Owned<Storage> storage) +Try<Owned<Registrar>> Registrar::create( + const slave::Flags& slaveFlags, + const SlaveID& slaveId) { - return new AgentRegistrar(std::move(storage)); + return new AgentRegistrar(slaveFlags, slaveId); } @@ -148,7 +160,7 @@ Try<bool> RemoveResourceProvider::perform(Registry* registry) class AgentRegistrarProcess : public Process<AgentRegistrarProcess> { public: - AgentRegistrarProcess(Owned<Storage> storage); + AgentRegistrarProcess(const slave::Flags& flags, const SlaveID& slaveId); Future<Nothing> recover(); @@ -179,12 +191,33 @@ private: deque<Owned<Registrar::Operation>> operations; bool updating = false; + + static Owned<Storage> createStorage(const std::string& path); }; -AgentRegistrarProcess::AgentRegistrarProcess(Owned<Storage> _storage) +Owned<Storage> AgentRegistrarProcess::createStorage(const std::string& path) +{ + // The registrar uses LevelDB as underlying storage. Since LevelDB + // is currently not supported on Windows (see MESOS-5932), we fall + // back to in-memory storage there. + // + // TODO(bbannier): Remove this Windows workaround once MESOS-5932 is fixed. +#ifndef __WINDOWS__ + return Owned<Storage>(new LevelDBStorage(path)); +#else + LOG(WARNING) + << "Persisting resource provider manager state is not supported on Windows"; + return Owned<Storage>(new InMemoryStorage()); +#endif // __WINDOWS__ +} + + +AgentRegistrarProcess::AgentRegistrarProcess( + const slave::Flags& flags, const SlaveID& slaveId) : ProcessBase(process::ID::generate("resource-provider-agent-registrar")), - storage(std::move(_storage)), + storage(createStorage(slave::paths::getResourceProviderRegistryPath( + flags.work_dir, slaveId))), state(storage.get()) {} @@ -322,8 +355,10 @@ void AgentRegistrarProcess::_update( } -AgentRegistrar::AgentRegistrar(Owned<Storage> storage) - : process(new AgentRegistrarProcess(std::move(storage))) +AgentRegistrar::AgentRegistrar( + const slave::Flags& slaveFlags, + const SlaveID& slaveId) + : process(new AgentRegistrarProcess(slaveFlags, slaveId)) { process::spawn(process.get(), false); } http://git-wip-us.apache.org/repos/asf/mesos/blob/ed92ee4e/src/resource_provider/registrar.hpp ---------------------------------------------------------------------- diff --git a/src/resource_provider/registrar.hpp b/src/resource_provider/registrar.hpp index 34cb166..39f45b0 100644 --- a/src/resource_provider/registrar.hpp +++ b/src/resource_provider/registrar.hpp @@ -19,8 +19,6 @@ #include <memory> -#include <mesos/state/storage.hpp> - #include <process/future.hpp> #include <process/owned.hpp> @@ -66,14 +64,15 @@ public: bool success = false; }; - // Create a registry on top of generic storage. - static Try<process::Owned<Registrar>> create( - process::Owned<state::Storage> storage); - // Create a registry on top of a master's persistent state. static Try<process::Owned<Registrar>> create( mesos::internal::master::Registrar* registrar); + // Create a registry on top of an agent's persistent state. + static Try<process::Owned<Registrar>> create( + const mesos::internal::slave::Flags& slaveFlags, + const SlaveID& slaveId); + virtual ~Registrar() = default; virtual process::Future<Nothing> recover() = 0; @@ -111,7 +110,9 @@ class AgentRegistrarProcess; class AgentRegistrar : public Registrar { public: - AgentRegistrar(process::Owned<state::Storage> storage); + AgentRegistrar( + const mesos::internal::slave::Flags& slaveFlags, + const SlaveID& slaveId); ~AgentRegistrar() override; http://git-wip-us.apache.org/repos/asf/mesos/blob/ed92ee4e/src/tests/resource_provider_manager_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/resource_provider_manager_tests.cpp b/src/tests/resource_provider_manager_tests.cpp index 72e8122..0de4e79 100644 --- a/src/tests/resource_provider_manager_tests.cpp +++ b/src/tests/resource_provider_manager_tests.cpp @@ -825,8 +825,31 @@ TEST_F(ResourceProviderRegistrarTest, AgentRegistrar) ResourceProviderID resourceProviderId; resourceProviderId.set_value("foo"); - Owned<mesos::state::Storage> storage(new mesos::state::InMemoryStorage()); - Try<Owned<Registrar>> registrar = Registrar::create(std::move(storage)); + Try<Owned<cluster::Master>> master = StartMaster(); + ASSERT_SOME(master); + + Owned<MasterDetector> detector = master.get()->createDetector(); + + const slave::Flags flags = CreateSlaveFlags(); + + Future<SlaveRegisteredMessage> slaveRegisteredMessage = + FUTURE_PROTOBUF(SlaveRegisteredMessage(), master.get()->pid, _); + + Future<UpdateSlaveMessage> updateSlaveMessage = + FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _); + + Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), flags); + ASSERT_SOME(slave); + + AWAIT_READY(slaveRegisteredMessage); + + // The agent will send `UpdateSlaveMessage` after it has created its + // meta directories. Await the message to make sure the agent + // registrar can create its store in the meta hierarchy. + AWAIT_READY(updateSlaveMessage); + + Try<Owned<Registrar>> registrar = + Registrar::create(flags, slaveRegisteredMessage->slave_id()); ASSERT_SOME(registrar); ASSERT_NE(nullptr, registrar->get());
