Repository: mesos Updated Branches: refs/heads/master 3facf2009 -> 49daa6deb
Added a Recovery Operation to the Registrar to force a Registry version change during recovery. Review: https://reviews.apache.org/r/18675 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/49d3e57f Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/49d3e57f Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/49d3e57f Branch: refs/heads/master Commit: 49d3e57f4fb1f2878bdaa57cf9ef30e7758ee9e2 Parents: 3facf20 Author: Benjamin Mahler <[email protected]> Authored: Sun Mar 2 18:55:13 2014 -0800 Committer: Benjamin Mahler <[email protected]> Committed: Fri Mar 7 13:25:50 2014 -0800 ---------------------------------------------------------------------- src/master/registrar.cpp | 111 ++++++++++++++++++++++++++++--------- src/master/registrar.hpp | 19 ++++++- src/tests/registrar_tests.cpp | 43 ++++++++++++++ 3 files changed, 145 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/49d3e57f/src/master/registrar.cpp ---------------------------------------------------------------------- diff --git a/src/master/registrar.cpp b/src/master/registrar.cpp index 37337c0..e453f3f 100644 --- a/src/master/registrar.cpp +++ b/src/master/registrar.cpp @@ -22,6 +22,7 @@ #include <process/defer.hpp> #include <process/dispatch.hpp> #include <process/future.hpp> +#include <process/owned.hpp> #include <process/process.hpp> #include <stout/lambda.hpp> @@ -42,6 +43,7 @@ using mesos::internal::state::protobuf::Variable; using process::dispatch; using process::Failure; using process::Future; +using process::Owned; using process::Process; using process::Promise; using process::spawn; @@ -66,6 +68,7 @@ public: virtual ~RegistrarProcess() {} // Registrar implementation. + Future<Registry> recover(const MasterInfo& info); Future<bool> admit(const SlaveInfo& info); Future<bool> readmit(const SlaveInfo& info); Future<bool> remove(const SlaveInfo& info); @@ -99,6 +102,22 @@ private: bool success; }; + // The 'Recover' operation adds the latest MasterInfo. + struct Recover : Operation<Registry> + { + Recover(const MasterInfo& _info) : info(_info) {} + + protected: + virtual Result<Registry> perform(Registry registry) + { + registry.mutable_master()->mutable_info()->CopyFrom(info); + return registry; + } + + const MasterInfo info; + }; + + // Slave Admission. struct Admit : Operation<Registry> { Admit(const SlaveInfo& _info) : info(_info) {} @@ -121,6 +140,7 @@ private: const SlaveInfo info; }; + // Slave Readmission. struct Readmit : Operation<Registry> { Readmit(const SlaveInfo& _info) : info(_info) {} @@ -140,6 +160,7 @@ private: const SlaveInfo info; }; + // Slave Removal. struct Remove : Operation<Registry> { Remove(const SlaveInfo& _info) : info(_info) {} @@ -169,14 +190,14 @@ private: bool updating; // Used to signify fetching (recovering) or storing. // Continuations. + void _recover( + const MasterInfo& info, + const Future<Variable<Registry> >& recovery); + void __recover(const Future<bool>& recover); Future<bool> _admit(const SlaveInfo& info); Future<bool> _readmit(const SlaveInfo& info); Future<bool> _remove(const SlaveInfo& info); - // Helper for recovering state (performing fetch). - Future<Nothing> recover(); - void _recover(const Future<Variable<Registry> >& recovery); - // Helper for updating state (performing store). void update(); void _update( @@ -186,44 +207,71 @@ private: State* state; // Used to compose our operations with recovery. - Promise<Nothing> recovered; + Option<Owned<Promise<Registry> > > recovered; }; -Future<Nothing> RegistrarProcess::recover() +Future<Registry> RegistrarProcess::recover(const MasterInfo& info) { LOG(INFO) << "Recovering registrar"; - if (variable.isNone() && !updating) { + if (recovered.isNone()) { // TODO(benh): Don't wait forever to recover? state->fetch<Registry>("registry") - .onAny(defer(self(), &Self::_recover, lambda::_1)); + .onAny(defer(self(), &Self::_recover, info, lambda::_1)); updating = true; + recovered = Owned<Promise<Registry> >(new Promise<Registry>()); } - return recovered.future(); + return recovered.get()->future(); } void RegistrarProcess::_recover( + const MasterInfo& info, const Future<Variable<Registry> >& recovery) { updating = false; CHECK(!recovery.isPending()); - if (recovery.isFailed() || recovery.isDiscarded()) { - LOG(WARNING) << "Failed to recover registrar: " - << (recovery.isFailed() ? recovery.failure() : "discarded"); - recover(); // Retry! TODO(benh): Don't retry forever? + if (!recovery.isReady()) { + recovered.get()->fail("Failed to recover registrar: " + + (recovery.isFailed() ? recovery.failure() : "discarded")); } else { LOG(INFO) << "Successfully recovered registrar"; // Save the registry. variable = recovery.get(); - // Signal the recovery is complete. - recovered.set(Nothing()); + // Perform the Recover operation to add the new MasterInfo. + Operation<Registry>* operation = new Recover(info); + operations.push_back(operation); + operation->future() + .onAny(defer(self(), &Self::__recover, lambda::_1)); + + update(); + } +} + + +void RegistrarProcess::__recover(const Future<bool>& recover) +{ + CHECK(!recover.isPending()); + + if (!recover.isReady()) { + recovered.get()->fail("Failed to recover registrar: " + "Failed to persist MasterInfo: " + + (recover.isFailed() ? recover.failure() : "discarded")); + } else if (!recover.get()) { + recovered.get()->fail("Failed to recover registrar: " + "Failed to persist MasterInfo: version mismatch"); + } else { + // At this point _update() has updated 'variable' to contain + // the Registry with the latest MasterInfo. + // Set the promise and un-gate any pending operations. + CHECK_SOME(variable); + recovered.get()->set(variable.get().get()); } } @@ -234,7 +282,11 @@ Future<bool> RegistrarProcess::admit(const SlaveInfo& info) return Failure("SlaveInfo is missing the 'id' field"); } - return recover() + if (recovered.isNone()) { + return Failure("Attempted to admit slave before recovering"); + } + + return recovered.get()->future() .then(defer(self(), &Self::_admit, info)); } @@ -242,6 +294,7 @@ Future<bool> RegistrarProcess::admit(const SlaveInfo& info) Future<bool> RegistrarProcess::_admit(const SlaveInfo& info) { CHECK_SOME(variable); + Operation<Registry>* operation = new Admit(info); operations.push_back(operation); Future<bool> future = operation->future(); @@ -258,7 +311,11 @@ Future<bool> RegistrarProcess::readmit(const SlaveInfo& info) return Failure("SlaveInfo is missing the 'id' field"); } - return recover() + if (recovered.isNone()) { + return Failure("Attempted to readmit slave before recovering"); + } + + return recovered.get()->future() .then(defer(self(), &Self::_readmit, info)); } @@ -268,10 +325,6 @@ Future<bool> RegistrarProcess::_readmit( { CHECK_SOME(variable); - if (!info.has_id()) { - return Failure("Expecting SlaveInfo to have a SlaveID"); - } - Operation<Registry>* operation = new Readmit(info); operations.push_back(operation); Future<bool> future = operation->future(); @@ -288,7 +341,11 @@ Future<bool> RegistrarProcess::remove(const SlaveInfo& info) return Failure("SlaveInfo is missing the 'id' field"); } - return recover() + if (recovered.isNone()) { + return Failure("Attempted to remove slave before recovering"); + } + + return recovered.get()->future() .then(defer(self(), &Self::_remove, info)); } @@ -298,10 +355,6 @@ Future<bool> RegistrarProcess::_remove( { CHECK_SOME(variable); - if (!info.has_id()) { - return Failure("Expecting SlaveInfo to have a SlaveID"); - } - Operation<Registry>* operation = new Remove(info); operations.push_back(operation); Future<bool> future = operation->future(); @@ -404,6 +457,12 @@ Registrar::~Registrar() } +Future<Registry> Registrar::recover(const MasterInfo& info) +{ + return dispatch(process, &RegistrarProcess::recover, info); +} + + Future<bool> Registrar::admit(const SlaveInfo& info) { return dispatch(process, &RegistrarProcess::admit, info); http://git-wip-us.apache.org/repos/asf/mesos/blob/49d3e57f/src/master/registrar.hpp ---------------------------------------------------------------------- diff --git a/src/master/registrar.hpp b/src/master/registrar.hpp index 20734af..987a63b 100644 --- a/src/master/registrar.hpp +++ b/src/master/registrar.hpp @@ -23,6 +23,8 @@ #include <process/future.hpp> +#include "master/registry.hpp" + #include "state/protobuf.hpp" namespace mesos { @@ -38,8 +40,21 @@ public: Registrar(state::protobuf::State* state); ~Registrar(); - // Returns the future for slave admission into the Registry. The - // SlaveInfo must contain an 'id', otherwise a Failure will result. + // Recovers the Registry, persisting the new Master information. + // The Registrar must be recovered to allow other operations to + // proceed. + // TODO(bmahler): Consider a "factory" for constructing the + // Registrar, to eliminate the need for passing 'MasterInfo'. + // This is required as the Registrar is injected into the Master, + // and therefore MasterInfo is unknown during construction. + process::Future<Registry> recover(const MasterInfo& info); + + // The following are operations that can be performed on the + // Registry for a slave. Returns: + // true if the operation is permitted. + // false if the operation is not permitted. + // Failure if the operation fails (possibly lost log leadership), + // recovery failed, or if 'info' is missing an ID. process::Future<bool> admit(const SlaveInfo& info); process::Future<bool> readmit(const SlaveInfo& info); process::Future<bool> remove(const SlaveInfo& info); http://git-wip-us.apache.org/repos/asf/mesos/blob/49d3e57f/src/tests/registrar_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/registrar_tests.cpp b/src/tests/registrar_tests.cpp index 3bf42bd..8620e8a 100644 --- a/src/tests/registrar_tests.cpp +++ b/src/tests/registrar_tests.cpp @@ -24,6 +24,8 @@ #include <process/pid.hpp> #include <process/process.hpp> +#include "common/type_utils.hpp" + #include "master/registrar.hpp" #include "state/leveldb.hpp" @@ -76,9 +78,48 @@ private: }; +TEST_F(RegistrarTest, recover) +{ + Registrar registrar(state); + + SlaveInfo slave; + slave.set_hostname("localhost"); + SlaveID id; + id.set_value("1"); + slave.mutable_id()->CopyFrom(id); + + // Operations preceding recovery will fail. + AWAIT_EXPECT_FAILED(registrar.admit(slave)); + AWAIT_EXPECT_FAILED(registrar.readmit(slave)); + AWAIT_EXPECT_FAILED(registrar.remove(slave)); + + MasterInfo info; + info.set_id("foobar"); + info.set_ip(0); + info.set_port(5050); + info.set_pid("0:5050"); + + Future<Registry> registry = registrar.recover(info); + + // Before waiting for the recovery to complete, invoke some + // operations to ensure they do not fail. + Future<bool> admit = registrar.admit(slave); + Future<bool> readmit = registrar.readmit(slave); + Future<bool> remove = registrar.remove(slave); + + AWAIT_READY(registry); + EXPECT_EQ(info, registry.get().master().info()); + + AWAIT_EQ(true, admit); + AWAIT_EQ(true, readmit); + AWAIT_EQ(true, remove); +} + + TEST_F(RegistrarTest, admit) { Registrar registrar(state); + AWAIT_READY(registrar.recover(MasterInfo())); SlaveInfo info1; info1.set_hostname("localhost"); @@ -98,6 +139,7 @@ TEST_F(RegistrarTest, admit) TEST_F(RegistrarTest, readmit) { Registrar registrar(state); + AWAIT_READY(registrar.recover(MasterInfo())); SlaveInfo info1; info1.set_hostname("localhost"); @@ -127,6 +169,7 @@ TEST_F(RegistrarTest, readmit) TEST_F(RegistrarTest, remove) { Registrar registrar(state); + AWAIT_READY(registrar.recover(MasterInfo())); SlaveInfo info1; info1.set_hostname("localhost");
