Repository: mesos
Updated Branches:
  refs/heads/master 3facf2009 -> 49daa6deb


Added a Recovery Operation to the Registrar to force a Registry version change 
during recovery.

Review: https://reviews.apache.org/r/18675


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/49d3e57f
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/49d3e57f
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/49d3e57f

Branch: refs/heads/master
Commit: 49d3e57f4fb1f2878bdaa57cf9ef30e7758ee9e2
Parents: 3facf20
Author: Benjamin Mahler <[email protected]>
Authored: Sun Mar 2 18:55:13 2014 -0800
Committer: Benjamin Mahler <[email protected]>
Committed: Fri Mar 7 13:25:50 2014 -0800

----------------------------------------------------------------------
 src/master/registrar.cpp      | 111 ++++++++++++++++++++++++++++---------
 src/master/registrar.hpp      |  19 ++++++-
 src/tests/registrar_tests.cpp |  43 ++++++++++++++
 3 files changed, 145 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/49d3e57f/src/master/registrar.cpp
----------------------------------------------------------------------
diff --git a/src/master/registrar.cpp b/src/master/registrar.cpp
index 37337c0..e453f3f 100644
--- a/src/master/registrar.cpp
+++ b/src/master/registrar.cpp
@@ -22,6 +22,7 @@
 #include <process/defer.hpp>
 #include <process/dispatch.hpp>
 #include <process/future.hpp>
+#include <process/owned.hpp>
 #include <process/process.hpp>
 
 #include <stout/lambda.hpp>
@@ -42,6 +43,7 @@ using mesos::internal::state::protobuf::Variable;
 using process::dispatch;
 using process::Failure;
 using process::Future;
+using process::Owned;
 using process::Process;
 using process::Promise;
 using process::spawn;
@@ -66,6 +68,7 @@ public:
   virtual ~RegistrarProcess() {}
 
   // Registrar implementation.
+  Future<Registry> recover(const MasterInfo& info);
   Future<bool> admit(const SlaveInfo& info);
   Future<bool> readmit(const SlaveInfo& info);
   Future<bool> remove(const SlaveInfo& info);
@@ -99,6 +102,22 @@ private:
     bool success;
   };
 
+  // The 'Recover' operation adds the latest MasterInfo.
+  struct Recover : Operation<Registry>
+  {
+    Recover(const MasterInfo& _info) : info(_info) {}
+
+  protected:
+    virtual Result<Registry> perform(Registry registry)
+    {
+      registry.mutable_master()->mutable_info()->CopyFrom(info);
+      return registry;
+    }
+
+    const MasterInfo info;
+  };
+
+  // Slave Admission.
   struct Admit : Operation<Registry>
   {
     Admit(const SlaveInfo& _info) : info(_info) {}
@@ -121,6 +140,7 @@ private:
     const SlaveInfo info;
   };
 
+  // Slave Readmission.
   struct Readmit : Operation<Registry>
   {
     Readmit(const SlaveInfo& _info) : info(_info) {}
@@ -140,6 +160,7 @@ private:
     const SlaveInfo info;
   };
 
+  // Slave Removal.
   struct Remove : Operation<Registry>
   {
     Remove(const SlaveInfo& _info) : info(_info) {}
@@ -169,14 +190,14 @@ private:
   bool updating; // Used to signify fetching (recovering) or storing.
 
   // Continuations.
+  void _recover(
+      const MasterInfo& info,
+      const Future<Variable<Registry> >& recovery);
+  void __recover(const Future<bool>& recover);
   Future<bool> _admit(const SlaveInfo& info);
   Future<bool> _readmit(const SlaveInfo& info);
   Future<bool> _remove(const SlaveInfo& info);
 
-  // Helper for recovering state (performing fetch).
-  Future<Nothing> recover();
-  void _recover(const Future<Variable<Registry> >& recovery);
-
   // Helper for updating state (performing store).
   void update();
   void _update(
@@ -186,44 +207,71 @@ private:
   State* state;
 
   // Used to compose our operations with recovery.
-  Promise<Nothing> recovered;
+  Option<Owned<Promise<Registry> > > recovered;
 };
 
 
-Future<Nothing> RegistrarProcess::recover()
+Future<Registry> RegistrarProcess::recover(const MasterInfo& info)
 {
   LOG(INFO) << "Recovering registrar";
 
-  if (variable.isNone() && !updating) {
+  if (recovered.isNone()) {
     // TODO(benh): Don't wait forever to recover?
     state->fetch<Registry>("registry")
-      .onAny(defer(self(), &Self::_recover, lambda::_1));
+      .onAny(defer(self(), &Self::_recover, info, lambda::_1));
     updating = true;
+    recovered = Owned<Promise<Registry> >(new Promise<Registry>());
   }
 
-  return recovered.future();
+  return recovered.get()->future();
 }
 
 
 void RegistrarProcess::_recover(
+    const MasterInfo& info,
     const Future<Variable<Registry> >& recovery)
 {
   updating = false;
 
   CHECK(!recovery.isPending());
 
-  if (recovery.isFailed() || recovery.isDiscarded()) {
-    LOG(WARNING) << "Failed to recover registrar: "
-                 << (recovery.isFailed() ? recovery.failure() : "discarded");
-    recover(); // Retry! TODO(benh): Don't retry forever?
+  if (!recovery.isReady()) {
+    recovered.get()->fail("Failed to recover registrar: " +
+        (recovery.isFailed() ? recovery.failure() : "discarded"));
   } else {
     LOG(INFO) << "Successfully recovered registrar";
 
     // Save the registry.
     variable = recovery.get();
 
-    // Signal the recovery is complete.
-    recovered.set(Nothing());
+    // Perform the Recover operation to add the new MasterInfo.
+    Operation<Registry>* operation = new Recover(info);
+    operations.push_back(operation);
+    operation->future()
+      .onAny(defer(self(), &Self::__recover, lambda::_1));
+
+    update();
+  }
+}
+
+
+void RegistrarProcess::__recover(const Future<bool>& recover)
+{
+  CHECK(!recover.isPending());
+
+  if (!recover.isReady()) {
+    recovered.get()->fail("Failed to recover registrar: "
+        "Failed to persist MasterInfo: " +
+        (recover.isFailed() ? recover.failure() : "discarded"));
+  } else if (!recover.get()) {
+    recovered.get()->fail("Failed to recover registrar: "
+        "Failed to persist MasterInfo: version mismatch");
+  } else {
+    // At this point _update() has updated 'variable' to contain
+    // the Registry with the latest MasterInfo.
+    // Set the promise and un-gate any pending operations.
+    CHECK_SOME(variable);
+    recovered.get()->set(variable.get().get());
   }
 }
 
@@ -234,7 +282,11 @@ Future<bool> RegistrarProcess::admit(const SlaveInfo& info)
     return Failure("SlaveInfo is missing the 'id' field");
   }
 
-  return recover()
+  if (recovered.isNone()) {
+    return Failure("Attempted to admit slave before recovering");
+  }
+
+  return recovered.get()->future()
     .then(defer(self(), &Self::_admit, info));
 }
 
@@ -242,6 +294,7 @@ Future<bool> RegistrarProcess::admit(const SlaveInfo& info)
 Future<bool> RegistrarProcess::_admit(const SlaveInfo& info)
 {
   CHECK_SOME(variable);
+
   Operation<Registry>* operation = new Admit(info);
   operations.push_back(operation);
   Future<bool> future = operation->future();
@@ -258,7 +311,11 @@ Future<bool> RegistrarProcess::readmit(const SlaveInfo& 
info)
     return Failure("SlaveInfo is missing the 'id' field");
   }
 
-  return recover()
+  if (recovered.isNone()) {
+    return Failure("Attempted to readmit slave before recovering");
+  }
+
+  return recovered.get()->future()
     .then(defer(self(), &Self::_readmit, info));
 }
 
@@ -268,10 +325,6 @@ Future<bool> RegistrarProcess::_readmit(
 {
   CHECK_SOME(variable);
 
-  if (!info.has_id()) {
-    return Failure("Expecting SlaveInfo to have a SlaveID");
-  }
-
   Operation<Registry>* operation = new Readmit(info);
   operations.push_back(operation);
   Future<bool> future = operation->future();
@@ -288,7 +341,11 @@ Future<bool> RegistrarProcess::remove(const SlaveInfo& 
info)
     return Failure("SlaveInfo is missing the 'id' field");
   }
 
-  return recover()
+  if (recovered.isNone()) {
+    return Failure("Attempted to remove slave before recovering");
+  }
+
+  return recovered.get()->future()
     .then(defer(self(), &Self::_remove, info));
 }
 
@@ -298,10 +355,6 @@ Future<bool> RegistrarProcess::_remove(
 {
   CHECK_SOME(variable);
 
-  if (!info.has_id()) {
-    return Failure("Expecting SlaveInfo to have a SlaveID");
-  }
-
   Operation<Registry>* operation = new Remove(info);
   operations.push_back(operation);
   Future<bool> future = operation->future();
@@ -404,6 +457,12 @@ Registrar::~Registrar()
 }
 
 
+Future<Registry> Registrar::recover(const MasterInfo& info)
+{
+  return dispatch(process, &RegistrarProcess::recover, info);
+}
+
+
 Future<bool> Registrar::admit(const SlaveInfo& info)
 {
   return dispatch(process, &RegistrarProcess::admit, info);

http://git-wip-us.apache.org/repos/asf/mesos/blob/49d3e57f/src/master/registrar.hpp
----------------------------------------------------------------------
diff --git a/src/master/registrar.hpp b/src/master/registrar.hpp
index 20734af..987a63b 100644
--- a/src/master/registrar.hpp
+++ b/src/master/registrar.hpp
@@ -23,6 +23,8 @@
 
 #include <process/future.hpp>
 
+#include "master/registry.hpp"
+
 #include "state/protobuf.hpp"
 
 namespace mesos {
@@ -38,8 +40,21 @@ public:
   Registrar(state::protobuf::State* state);
   ~Registrar();
 
-  // Returns the future for slave admission into the Registry. The
-  // SlaveInfo must contain an 'id', otherwise a Failure will result.
+  // Recovers the Registry, persisting the new Master information.
+  // The Registrar must be recovered to allow other operations to
+  // proceed.
+  // TODO(bmahler): Consider a "factory" for constructing the
+  // Registrar, to eliminate the need for passing 'MasterInfo'.
+  // This is required as the Registrar is injected into the Master,
+  // and therefore MasterInfo is unknown during construction.
+  process::Future<Registry> recover(const MasterInfo& info);
+
+  // The following are operations that can be performed on the
+  // Registry for a slave. Returns:
+  //   true if the operation is permitted.
+  //   false if the operation is not permitted.
+  //   Failure if the operation fails (possibly lost log leadership),
+  //     recovery failed, or if 'info' is missing an ID.
   process::Future<bool> admit(const SlaveInfo& info);
   process::Future<bool> readmit(const SlaveInfo& info);
   process::Future<bool> remove(const SlaveInfo& info);

http://git-wip-us.apache.org/repos/asf/mesos/blob/49d3e57f/src/tests/registrar_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/registrar_tests.cpp b/src/tests/registrar_tests.cpp
index 3bf42bd..8620e8a 100644
--- a/src/tests/registrar_tests.cpp
+++ b/src/tests/registrar_tests.cpp
@@ -24,6 +24,8 @@
 #include <process/pid.hpp>
 #include <process/process.hpp>
 
+#include "common/type_utils.hpp"
+
 #include "master/registrar.hpp"
 
 #include "state/leveldb.hpp"
@@ -76,9 +78,48 @@ private:
 };
 
 
+TEST_F(RegistrarTest, recover)
+{
+  Registrar registrar(state);
+
+  SlaveInfo slave;
+  slave.set_hostname("localhost");
+  SlaveID id;
+  id.set_value("1");
+  slave.mutable_id()->CopyFrom(id);
+
+  // Operations preceding recovery will fail.
+  AWAIT_EXPECT_FAILED(registrar.admit(slave));
+  AWAIT_EXPECT_FAILED(registrar.readmit(slave));
+  AWAIT_EXPECT_FAILED(registrar.remove(slave));
+
+  MasterInfo info;
+  info.set_id("foobar");
+  info.set_ip(0);
+  info.set_port(5050);
+  info.set_pid("0:5050");
+
+  Future<Registry> registry = registrar.recover(info);
+
+  // Before waiting for the recovery to complete, invoke some
+  // operations to ensure they do not fail.
+  Future<bool> admit = registrar.admit(slave);
+  Future<bool> readmit = registrar.readmit(slave);
+  Future<bool> remove = registrar.remove(slave);
+
+  AWAIT_READY(registry);
+  EXPECT_EQ(info, registry.get().master().info());
+
+  AWAIT_EQ(true, admit);
+  AWAIT_EQ(true, readmit);
+  AWAIT_EQ(true, remove);
+}
+
+
 TEST_F(RegistrarTest, admit)
 {
   Registrar registrar(state);
+  AWAIT_READY(registrar.recover(MasterInfo()));
 
   SlaveInfo info1;
   info1.set_hostname("localhost");
@@ -98,6 +139,7 @@ TEST_F(RegistrarTest, admit)
 TEST_F(RegistrarTest, readmit)
 {
   Registrar registrar(state);
+  AWAIT_READY(registrar.recover(MasterInfo()));
 
   SlaveInfo info1;
   info1.set_hostname("localhost");
@@ -127,6 +169,7 @@ TEST_F(RegistrarTest, readmit)
 TEST_F(RegistrarTest, remove)
 {
   Registrar registrar(state);
+  AWAIT_READY(registrar.recover(MasterInfo()));
 
   SlaveInfo info1;
   info1.set_hostname("localhost");

Reply via email to