Repository: mesos Updated Branches: refs/heads/master d6f8620fd -> e40379ce6
Refactored the Registrar to push the operations to the callers. Review: https://reviews.apache.org/r/19372 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/e40379ce Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/e40379ce Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/e40379ce Branch: refs/heads/master Commit: e40379ce6963d2fc53a788c043f390f11c698199 Parents: d6f8620 Author: Jiang Yan Xu <[email protected]> Authored: Fri Mar 21 15:54:54 2014 -0700 Committer: Benjamin Mahler <[email protected]> Committed: Fri Mar 21 15:54:54 2014 -0700 ---------------------------------------------------------------------- src/master/master.hpp | 101 +++++++++++++++ src/master/registrar.cpp | 253 ++++--------------------------------- src/master/registrar.hpp | 41 +++++- src/tests/registrar_tests.cpp | 74 +++++------ 4 files changed, 195 insertions(+), 274 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/e40379ce/src/master/master.hpp ---------------------------------------------------------------------- diff --git a/src/master/master.hpp b/src/master/master.hpp index a37a2a2..a8ed5ec 100644 --- a/src/master/master.hpp +++ b/src/master/master.hpp @@ -713,6 +713,107 @@ struct Role hashmap<FrameworkID, Framework*> frameworks; }; + +// Implementation of slave admission Registrar operation. +class AdmitSlave : public Operation +{ +public: + AdmitSlave(const SlaveInfo& _info) : info(_info) + { + CHECK(info.has_id()) << "SlaveInfo is missing the 'id' field"; + } + +protected: + virtual Try<bool> perform(Registry* registry, bool strict) + { + // Check and see if this slave already exists. + foreach (const Registry::Slave& slave, registry->slaves().slaves()) { + if (slave.info().id() == info.id()) { + if (strict) { + return Error("Slave already admitted"); + } else { + return false; // No mutation. + } + } + } + + Registry::Slave* slave = registry->mutable_slaves()->add_slaves(); + slave->mutable_info()->CopyFrom(info); + return true; // Mutation. + } + +private: + const SlaveInfo info; +}; + + +// Implementation of slave readmission Registrar operation. +class ReadmitSlave : public Operation +{ +public: + ReadmitSlave(const SlaveInfo& _info) : info(_info) + { + CHECK(info.has_id()) << "SlaveInfo is missing the 'id' field"; + } + +protected: + virtual Try<bool> perform(Registry* registry, bool strict) + { + foreach (const Registry::Slave& slave, registry->slaves().slaves()) { + if (slave.info().id() == info.id()) { + return false; // No mutation. + } + } + + if (strict) { + return Error("Slave not yet admitted"); + } else { + Registry::Slave* slave = registry->mutable_slaves()->add_slaves(); + slave->mutable_info()->CopyFrom(info); + return true; // Mutation. + } + } + +private: + const SlaveInfo info; +}; + + +// Implementation of slave removal Registrar operation. +class RemoveSlave : public Operation +{ +public: + RemoveSlave(const SlaveInfo& _info) : info(_info) + { + CHECK(info.has_id()) << "SlaveInfo is missing the 'id' field"; + } + +protected: + virtual Try<bool> perform(Registry* registry, bool strict) + { + for (int i = 0; i < registry->slaves().slaves().size(); i++) { + const Registry::Slave& slave = registry->slaves().slaves(i); + if (slave.info().id() == info.id()) { + for (int j = i + 1; j < registry->slaves().slaves().size(); j++) { + registry-> + mutable_slaves()->mutable_slaves()->SwapElements(j - 1, j); + } + registry->mutable_slaves()->mutable_slaves()->RemoveLast(); + return true; // Mutation. + } + } + + if (strict) { + return Error("Slave not yet admitted"); + } else { + return false; // No mutation. + } + } + +private: + const SlaveInfo info; +}; + } // namespace master { } // namespace internal { } // namespace mesos { http://git-wip-us.apache.org/repos/asf/mesos/blob/e40379ce/src/master/registrar.cpp ---------------------------------------------------------------------- diff --git a/src/master/registrar.cpp b/src/master/registrar.cpp index cbb67bd..2b41700 100644 --- a/src/master/registrar.cpp +++ b/src/master/registrar.cpp @@ -57,26 +57,6 @@ namespace mesos { namespace internal { namespace master { -// TODO(bmahler): Consider an implementation that pushes the -// operations to the caller to simplify the interface: -// -// In this design, operations can be maintained in a separate -// header and added independently of the Registrar logic. However, -// we would need to ensure that all Operations can be generalized -// to result in a Future<bool>. -// -// Registrar -// { -// Future<Registrar> recover(const MasterInfo&); -// Future<bool> apply(const Operation&); -// } -// -// Registrar registrar(flags, state); -// -// Future<bool> admit = registrar.apply(Admit(slaveInfo)); -// Future<bool> readmit = registrar.apply(Redmit(slaveInfo)); -// Future<bool> remove = registrar.apply(Remove(slaveInfo)); - class RegistrarProcess : public Process<RegistrarProcess> { public: @@ -90,138 +70,28 @@ public: // Registrar implementation. Future<Registry> recover(const MasterInfo& info); - Future<bool> admit(const SlaveInfo& info); - Future<bool> readmit(const SlaveInfo& info); - Future<bool> remove(const SlaveInfo& info); + Future<bool> apply(Owned<Operation> operation); private: - template <typename T> - struct Operation : process::Promise<bool> - { - Operation() : success(false) {} - - // Attempts to invoke the operation on 't'. - // Returns whether the operation mutates 't', or an error if the - // operation cannot be applied successfully. - Try<bool> operator () (T* t, bool strict) - { - const Try<bool>& result = perform(t, strict); - - success = !result.isError(); - - return result; - } - - // Sets the promise based on whether the operation was successful. - bool set() { return Promise<bool>::set(success); } - - protected: - virtual Try<bool> perform(T* t, bool strict) = 0; - - private: - bool success; - }; - // The 'Recover' operation adds the latest MasterInfo. - struct Recover : Operation<Registry> + class Recover : public Operation { + public: Recover(const MasterInfo& _info) : info(_info) {} protected: virtual Try<bool> perform(Registry* registry, bool strict) { registry->mutable_master()->mutable_info()->CopyFrom(info); - return true; - } - - const MasterInfo info; - }; - - // Slave Admission. - struct Admit : Operation<Registry> - { - Admit(const SlaveInfo& _info) : info(_info) {} - - protected: - virtual Try<bool> perform(Registry* registry, bool strict) - { - // Check and see if this slave already exists. - foreach (const Registry::Slave& slave, registry->slaves().slaves()) { - if (slave.info().id() == info.id()) { - if (strict) { - return Error("Slave already admitted"); - } else { - return false; // No mutation. - } - } - } - - Registry::Slave* slave = registry->mutable_slaves()->add_slaves(); - slave->mutable_info()->CopyFrom(info); return true; // Mutation. } - const SlaveInfo info; - }; - - // Slave Readmission. - struct Readmit : Operation<Registry> - { - Readmit(const SlaveInfo& _info) : info(_info) {} - - protected: - virtual Try<bool> perform(Registry* registry, bool strict) - { - foreach (const Registry::Slave& slave, registry->slaves().slaves()) { - if (slave.info().id() == info.id()) { - return false; // No mutation. - } - } - - if (strict) { - return Error("Slave not yet admitted"); - } else { - Registry::Slave* slave = registry->mutable_slaves()->add_slaves(); - slave->mutable_info()->CopyFrom(info); - return true; // Mutation. - } - } - - const SlaveInfo info; - }; - - // Slave Removal. - struct Remove : Operation<Registry> - { - Remove(const SlaveInfo& _info) : info(_info) {} - - protected: - virtual Try<bool> perform(Registry* registry, bool strict) - { - for (int i = 0; i < registry->slaves().slaves().size(); i++) { - const Registry::Slave& slave = registry->slaves().slaves(i); - if (slave.info().id() == info.id()) { - for (int j = i + 1; j < registry->slaves().slaves().size(); j++) { - registry-> - mutable_slaves()->mutable_slaves()->SwapElements(j - 1, j); - } - registry->mutable_slaves()->mutable_slaves()->RemoveLast(); - return true; - } - } - - if (strict) { - return Error("Slave not yet admitted"); - } else { - return false; // No mutation. - } - } - - const SlaveInfo info; + private: + const MasterInfo info; }; Option<Variable<Registry> > variable; - deque<Operation<Registry>*> operations; + deque<Owned<Operation> > operations; bool updating; // Used to signify fetching (recovering) or storing. // Continuations. @@ -229,15 +99,13 @@ private: const MasterInfo& info, const Future<Variable<Registry> >& recovery); void __recover(const Future<bool>& recover); - Future<bool> _admit(const SlaveInfo& info); - Future<bool> _readmit(const SlaveInfo& info); - Future<bool> _remove(const SlaveInfo& info); + Future<bool> _apply(Owned<Operation> operation); // Helper for updating state (performing store). void update(); void _update( const Future<Option<Variable<Registry> > >& store, - deque<Operation<Registry>*> operations); + deque<Owned<Operation> > operations); const Flags flags; State* state; @@ -281,7 +149,7 @@ void RegistrarProcess::_recover( variable = recovery.get(); // Perform the Recover operation to add the new MasterInfo. - Operation<Registry>* operation = new Recover(info); + Owned<Operation> operation(new Recover(info)); operations.push_back(operation); operation->future() .onAny(defer(self(), &Self::__recover, lambda::_1)); @@ -312,84 +180,21 @@ void RegistrarProcess::__recover(const Future<bool>& recover) } -Future<bool> RegistrarProcess::admit(const SlaveInfo& info) -{ - if (!info.has_id()) { - return Failure("SlaveInfo is missing the 'id' field"); - } - - if (recovered.isNone()) { - return Failure("Attempted to admit slave before recovering"); - } - - return recovered.get()->future() - .then(defer(self(), &Self::_admit, info)); -} - - -Future<bool> RegistrarProcess::_admit(const SlaveInfo& info) -{ - CHECK_SOME(variable); - - Operation<Registry>* operation = new Admit(info); - operations.push_back(operation); - Future<bool> future = operation->future(); - if (!updating) { - update(); - } - return future; -} - - -Future<bool> RegistrarProcess::readmit(const SlaveInfo& info) +Future<bool> RegistrarProcess::apply(Owned<Operation> operation) { - if (!info.has_id()) { - return Failure("SlaveInfo is missing the 'id' field"); - } - if (recovered.isNone()) { - return Failure("Attempted to readmit slave before recovering"); + return Failure("Attempted to apply the operation before recovering"); } return recovered.get()->future() - .then(defer(self(), &Self::_readmit, info)); + .then(defer(self(), &Self::_apply, operation)); } -Future<bool> RegistrarProcess::_readmit(const SlaveInfo& info) +Future<bool> RegistrarProcess::_apply(Owned<Operation> operation) { CHECK_SOME(variable); - Operation<Registry>* operation = new Readmit(info); - operations.push_back(operation); - Future<bool> future = operation->future(); - if (!updating) { - update(); - } - return future; -} - - -Future<bool> RegistrarProcess::remove(const SlaveInfo& info) -{ - if (!info.has_id()) { - return Failure("SlaveInfo is missing the 'id' field"); - } - - if (recovered.isNone()) { - return Failure("Attempted to remove slave before recovering"); - } - - return recovered.get()->future() - .then(defer(self(), &Self::_remove, info)); -} - - -Future<bool> RegistrarProcess::_remove(const SlaveInfo& info) -{ - CHECK_SOME(variable); - - Operation<Registry>* operation = new Remove(info); operations.push_back(operation); Future<bool> future = operation->future(); if (!updating) { @@ -415,7 +220,7 @@ void RegistrarProcess::update() Registry registry = variable.get().get(); - foreach (Operation<Registry>* operation, operations) { + foreach (Owned<Operation> operation, operations) { // No need to process the result of the operation. (*operation)(®istry, flags.registry_strict); } @@ -433,7 +238,7 @@ void RegistrarProcess::update() void RegistrarProcess::_update( const Future<Option<Variable<Registry> > >& store, - deque<Operation<Registry>*> operations) + deque<Owned<Operation> > applied) { updating = false; @@ -449,9 +254,9 @@ void RegistrarProcess::_update( } // Remove the operations. - while (!operations.empty()) { - Operation<Registry>* operation = operations.front(); - operations.pop_front(); + while (!applied.empty()) { + Owned<Operation> operation = applied.front(); + applied.pop_front(); if (!store.isReady()) { operation->fail("Failed to update 'registry': " + @@ -463,13 +268,11 @@ void RegistrarProcess::_update( operation->set(); } } - - delete operation; } - operations.clear(); + applied.clear(); - if (!this->operations.empty()) { + if (!operations.empty()) { update(); } } @@ -495,21 +298,9 @@ Future<Registry> Registrar::recover(const MasterInfo& info) } -Future<bool> Registrar::admit(const SlaveInfo& info) -{ - return dispatch(process, &RegistrarProcess::admit, info); -} - - -Future<bool> Registrar::readmit(const SlaveInfo& info) -{ - return dispatch(process, &RegistrarProcess::readmit, info); -} - - -Future<bool> Registrar::remove(const SlaveInfo& info) +Future<bool> Registrar::apply(Owned<Operation> operation) { - return dispatch(process, &RegistrarProcess::remove, info); + return dispatch(process, &RegistrarProcess::apply, operation); } } // namespace master { http://git-wip-us.apache.org/repos/asf/mesos/blob/e40379ce/src/master/registrar.hpp ---------------------------------------------------------------------- diff --git a/src/master/registrar.hpp b/src/master/registrar.hpp index 98bfa1e..0d659c5 100644 --- a/src/master/registrar.hpp +++ b/src/master/registrar.hpp @@ -22,6 +22,7 @@ #include <mesos/mesos.hpp> #include <process/future.hpp> +#include <process/owned.hpp> #include "master/flags.hpp" #include "master/registry.hpp" @@ -35,6 +36,36 @@ namespace master { // Forward declaration. class RegistrarProcess; +// Defines an abstraction for operations that can be applied on the +// Registry. +class Operation : public process::Promise<bool> +{ +public: + Operation() : success(false) {} + + // Attempts to invoke the operation on 't'. + // Returns whether the operation mutates 't', or an error if the + // operation cannot be applied successfully. + Try<bool> operator () (Registry* registry, bool strict) + { + const Try<bool>& result = perform(registry, strict); + + success = !result.isError(); + + return result; + } + + // Sets the promise based on whether the operation was successful. + bool set() { return process::Promise<bool>::set(success); } + +protected: + virtual Try<bool> perform(Registry* registry, bool strict) = 0; + +private: + bool success; +}; + + class Registrar { public: @@ -52,15 +83,13 @@ public: // and therefore MasterInfo is unknown during construction. process::Future<Registry> recover(const MasterInfo& info); - // The following are operations that can be performed on the - // Registry for a slave. Returns: + // Applies an operation on the Registry. + // Returns: // true if the operation is permitted. // false if the operation is not permitted. // Failure if the operation fails (possibly lost log leadership), - // recovery failed, or if 'info' is missing an ID. - process::Future<bool> admit(const SlaveInfo& info); - process::Future<bool> readmit(const SlaveInfo& info); - process::Future<bool> remove(const SlaveInfo& info); + // or recovery failed. + process::Future<bool> apply(process::Owned<Operation> operation); private: RegistrarProcess* process; http://git-wip-us.apache.org/repos/asf/mesos/blob/e40379ce/src/tests/registrar_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/registrar_tests.cpp b/src/tests/registrar_tests.cpp index 41836ae..67c26aa 100644 --- a/src/tests/registrar_tests.cpp +++ b/src/tests/registrar_tests.cpp @@ -28,6 +28,7 @@ #include "common/type_utils.hpp" #include "master/flags.hpp" +#include "master/master.hpp" #include "master/registrar.hpp" #include "state/leveldb.hpp" @@ -101,17 +102,23 @@ TEST_P(RegistrarTest, recover) slave.mutable_id()->CopyFrom(id); // Operations preceding recovery will fail. - AWAIT_EXPECT_FAILED(registrar.admit(slave)); - AWAIT_EXPECT_FAILED(registrar.readmit(slave)); - AWAIT_EXPECT_FAILED(registrar.remove(slave)); + AWAIT_EXPECT_FAILED( + registrar.apply(Owned<Operation>(new AdmitSlave(slave)))); + AWAIT_EXPECT_FAILED( + registrar.apply(Owned<Operation>(new ReadmitSlave(slave)))); + AWAIT_EXPECT_FAILED( + registrar.apply(Owned<Operation>(new RemoveSlave(slave)))); Future<Registry> registry = registrar.recover(master); // Before waiting for the recovery to complete, invoke some // operations to ensure they do not fail. - Future<bool> admit = registrar.admit(slave); - Future<bool> readmit = registrar.readmit(slave); - Future<bool> remove = registrar.remove(slave); + Future<bool> admit = registrar.apply( + Owned<Operation>(new AdmitSlave(slave))); + Future<bool> readmit = registrar.apply( + Owned<Operation>(new ReadmitSlave(slave))); + Future<bool> remove = registrar.apply( + Owned<Operation>(new RemoveSlave(slave))); AWAIT_READY(registry); EXPECT_EQ(master, registry.get().master().info()); @@ -130,19 +137,16 @@ TEST_P(RegistrarTest, admit) SlaveInfo info1; info1.set_hostname("localhost"); - // Missing ID results in a Failure. - AWAIT_EXPECT_FAILED(registrar.admit(info1)); - SlaveID id1; id1.set_value("1"); info1.mutable_id()->CopyFrom(id1); - AWAIT_EQ(true, registrar.admit(info1)); + AWAIT_EQ(true, registrar.apply(Owned<Operation>(new AdmitSlave(info1)))); if (flags.registry_strict) { - AWAIT_EQ(false, registrar.admit(info1)); + AWAIT_EQ(false, registrar.apply(Owned<Operation>(new AdmitSlave(info1)))); } else { - AWAIT_EQ(true, registrar.admit(info1)); + AWAIT_EQ(true, registrar.apply(Owned<Operation>(new AdmitSlave(info1)))); } } @@ -155,9 +159,6 @@ TEST_P(RegistrarTest, readmit) SlaveInfo info1; info1.set_hostname("localhost"); - // Missing ID results in a failure. - AWAIT_EXPECT_FAILED(registrar.readmit(info1)); - SlaveID id1; id1.set_value("1"); info1.mutable_id()->CopyFrom(id1); @@ -169,14 +170,14 @@ TEST_P(RegistrarTest, readmit) info2.set_hostname("localhost"); info2.mutable_id()->CopyFrom(id2); - AWAIT_EQ(true, registrar.admit(info1)); + AWAIT_EQ(true, registrar.apply(Owned<Operation>(new AdmitSlave(info1)))); - AWAIT_EQ(true, registrar.readmit(info1)); + AWAIT_EQ(true, registrar.apply(Owned<Operation>(new ReadmitSlave(info1)))); if (flags.registry_strict) { - AWAIT_EQ(false, registrar.readmit(info2)); + AWAIT_EQ(false, registrar.apply(Owned<Operation>(new ReadmitSlave(info2)))); } else { - AWAIT_EQ(true, registrar.readmit(info2)); + AWAIT_EQ(true, registrar.apply(Owned<Operation>(new ReadmitSlave(info2)))); } } @@ -189,9 +190,6 @@ TEST_P(RegistrarTest, remove) SlaveInfo info1; info1.set_hostname("localhost"); - // Missing ID results in a Failure. - AWAIT_EXPECT_FAILED(registrar.remove(info1)); - SlaveID id1; id1.set_value("1"); info1.mutable_id()->CopyFrom(id1); @@ -210,34 +208,34 @@ TEST_P(RegistrarTest, remove) info3.set_hostname("localhost"); info3.mutable_id()->CopyFrom(id3); - AWAIT_EQ(true, registrar.admit(info1)); - AWAIT_EQ(true, registrar.admit(info2)); - AWAIT_EQ(true, registrar.admit(info3)); + AWAIT_EQ(true, registrar.apply(Owned<Operation>(new AdmitSlave(info1)))); + AWAIT_EQ(true, registrar.apply(Owned<Operation>(new AdmitSlave(info2)))); + AWAIT_EQ(true, registrar.apply(Owned<Operation>(new AdmitSlave(info3)))); - AWAIT_EQ(true, registrar.remove(info1)); + AWAIT_EQ(true, registrar.apply(Owned<Operation>(new RemoveSlave(info1)))); if (flags.registry_strict) { - AWAIT_EQ(false, registrar.remove(info1)); + AWAIT_EQ(false, registrar.apply(Owned<Operation>(new RemoveSlave(info1)))); } else { - AWAIT_EQ(true, registrar.remove(info1)); + AWAIT_EQ(true, registrar.apply(Owned<Operation>(new RemoveSlave(info1)))); } - AWAIT_EQ(true, registrar.admit(info1)); + AWAIT_EQ(true, registrar.apply(Owned<Operation>(new AdmitSlave(info1)))); - AWAIT_EQ(true, registrar.remove(info2)); + AWAIT_EQ(true, registrar.apply(Owned<Operation>(new RemoveSlave(info2)))); if (flags.registry_strict) { - AWAIT_EQ(false, registrar.remove(info2)); + AWAIT_EQ(false, registrar.apply(Owned<Operation>(new RemoveSlave(info2)))); } else { - AWAIT_EQ(true, registrar.remove(info2)); + AWAIT_EQ(true, registrar.apply(Owned<Operation>(new RemoveSlave(info2)))); } - AWAIT_EQ(true, registrar.remove(info3)); + AWAIT_EQ(true, registrar.apply(Owned<Operation>(new RemoveSlave(info3)))); if (flags.registry_strict) { - AWAIT_EQ(false, registrar.remove(info3)); + AWAIT_EQ(false, registrar.apply(Owned<Operation>(new RemoveSlave(info3)))); } else { - AWAIT_EQ(true, registrar.remove(info3)); + AWAIT_EQ(true, registrar.apply(Owned<Operation>(new RemoveSlave(info3)))); } } @@ -258,9 +256,11 @@ TEST_P(RegistrarTest, bootstrap) // If not strict, we should be allowed to readmit the slave. if (flags.registry_strict) { - AWAIT_EQ(false, registrar.readmit(info)); + AWAIT_EQ(false, + registrar.apply(Owned<Operation>(new ReadmitSlave(info)))); } else { - AWAIT_EQ(true, registrar.readmit(info)); + AWAIT_EQ(true, + registrar.apply(Owned<Operation>(new ReadmitSlave(info)))); } }
