Repository: mesos
Updated Branches:
  refs/heads/master b979d03c5 -> 3facf2009


Cleaned up log recovery code to use continuation style.

Review: https://reviews.apache.org/r/18584


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/3facf200
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/3facf200
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/3facf200

Branch: refs/heads/master
Commit: 3facf200981541459336068d39e11b4f05486315
Parents: b979d03
Author: Jie Yu <[email protected]>
Authored: Thu Feb 27 11:22:11 2014 -0800
Committer: Jie Yu <[email protected]>
Committed: Fri Mar 7 12:28:01 2014 -0800

----------------------------------------------------------------------
 src/log/recover.cpp | 466 +++++++++++++++++++++++++----------------------
 1 file changed, 250 insertions(+), 216 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/3facf200/src/log/recover.cpp
----------------------------------------------------------------------
diff --git a/src/log/recover.cpp b/src/log/recover.cpp
index e611a4e..688da5f 100644
--- a/src/log/recover.cpp
+++ b/src/log/recover.cpp
@@ -26,6 +26,7 @@
 #include <process/id.hpp>
 #include <process/process.hpp>
 
+#include <stout/check.hpp>
 #include <stout/foreach.hpp>
 #include <stout/hashmap.hpp>
 #include <stout/lambda.hpp>
@@ -47,169 +48,99 @@ namespace mesos {
 namespace internal {
 namespace log {
 
-// This process is used to recover a replica. The flow of the recover
-// process is described as follows:
-// A) Check the status of the local replica.
-//    A1) If it is VOTING, exit.
-//    A2) If it is not VOTING, goto (B).
-// B) Broadcast a RecoverRequest to all replicas in the network.
-//    B1) <<< Catch-up >>> If a quorum of replicas are found in VOTING
-//        status (no matter what the status of the local replica is),
-//        set the status of the local replica to RECOVERING, and start
-//        doing catch-up. If the local replica has been caught-up, set
-//        the status of the local replica to VOTING and exit.
-//    B2) If a quorum is not found, goto (B).
-//
-// In the following, we list a few scenarios and show how the recover
-// process will respond in those scenarios. All the examples assume a
-// quorum size of 2. Remember that a new replica is always put in
-// EMPTY status initially.
-//
-// 1) Replica A, B and C are all in VOTING status. The operator adds
-//    replica D. In that case, D will go into RECOVERING status and
-//    then go into VOTING status. Therefore, we should avoid adding a
-//    new replica unless we know that one replica has been removed.
-//
-// 2) Replica A and B are in VOTING status. The operator adds replica
-//    C. In that case, C will go into RECOVERING status and then go
-//    into VOTING status, which is expected.
+
+// This class is responsible for executing the log recover protocol.
+// Any time a replica in non-VOTING status starts, we will run this
+// protocol. We first broadcast a recover request to all the replicas
+// in the network, and then collect recover responses to decide what
+// status the local replica should be in next. The details of the
+// recover protocol is shown as follows:
 //
-// 3) Replica A is in VOTING status. The operator adds replica B. In
-//    that case, B will stay in EMPTY status forever. This is expected
-//    because we cannot make progress if VOTING replicas are not
-//    enough (i.e., less than quorum).
+// A) Broadcast a RecoverRequest to all replicas in the network.
+// B) Collect RecoverResponse from each replica
+//   B1) If a quorum of replicas are found in VOTING status, the local
+//       replica will be in RECOVERING status next.
+//   B2) Otherwise, goto (A).
 //
-// 4) Replica A is in VOTING status and B is in EMPTY status. The
-//    operator adds replica C. In that case, C will stay in EMPTY
-//    status forever similar to case 3).
-class RecoverProcess : public Process<RecoverProcess>
+// We re-use RecoverResponse to specify the return value. The 'status'
+// field specifies the next status of the local replica. If the next
+// status is RECOVERING, we set the fields 'begin' and 'end' to be the
+// lowest begin and highest end position seen in these responses.
+class RecoverProtocolProcess : public Process<RecoverProtocolProcess>
 {
 public:
-  RecoverProcess(
+  RecoverProtocolProcess(
       size_t _quorum,
-      const Owned<Replica>& _replica,
       const Shared<Network>& _network)
-    : ProcessBase(ID::generate("log-recover")),
+    : ProcessBase(ID::generate("log-recover-protocol")),
       quorum(_quorum),
-      replica(_replica),
       network(_network) {}
 
-  Future<Owned<Replica> > future() { return promise.future(); }
+  Future<RecoverResponse> future() { return promise.future(); }
 
 protected:
   virtual void initialize()
   {
-    LOG(INFO) << "Start recovering a replica";
-
-    // Stop when no one cares.
-    promise.future().onDiscard(lambda::bind(
-          static_cast<void(*)(const UPID&, bool)>(terminate), self(), true));
-
-    // Check the current status of the local replica and decide if
-    // recovery is needed. Recovery is needed if the local replica is
-    // not in VOTING status.
-    replica->status().onAny(defer(self(), &Self::checked, lambda::_1));
-  }
-
-  virtual void finalize()
-  {
-    LOG(INFO) << "Recover process terminated";
-
-    // Cancel all operations if they are still pending.
-    discard(responses);
-    catching.discard();
+    // Register a callback to handle user initiated discard.
+    promise.future().onDiscard(defer(self(), &Self::discard));
 
-    // TODO(benh): Discard our promise only after 'catching' has
-    // completed (ready, failed, or discarded).
-    promise.discard();
+    start();
   }
 
 private:
-  void checked(const Future<Metadata::Status>& future)
+  void discard()
   {
-    if (!future.isReady()) {
-      promise.fail(
-          future.isFailed() ?
-          "Failed to get replica status: " + future.failure() :
-          "Not expecting discarded future");
-
-      terminate(self());
-      return;
-    }
-
-    status = future.get();
-
-    LOG(INFO) << "Replica is in " << status << " status";
-
-    if (status == Metadata::VOTING) {
-      promise.set(replica);
-      terminate(self());
-    } else {
-      recover();
-    }
+    chain.discard();
   }
 
-  void recover()
+  void start()
   {
-    CHECK_NE(status, Metadata::VOTING);
-
     // Wait until there are enough (i.e., quorum of) replicas in the
     // network to avoid unnecessary retries.
-    network->watch(quorum, Network::GREATER_THAN_OR_EQUAL_TO)
-      .onAny(defer(self(), &Self::watched, lambda::_1));
+    chain = network->watch(quorum, Network::GREATER_THAN_OR_EQUAL_TO)
+      .then(defer(self(), &Self::broadcast))
+      .then(defer(self(), &Self::receive))
+      .onAny(defer(self(), &Self::finished, lambda::_1));
   }
 
-  void watched(const Future<size_t>& future)
+  Future<Nothing> broadcast()
   {
-    if (!future.isReady()) {
-      promise.fail(
-          future.isFailed() ?
-          future.failure() :
-          "Not expecting discarded future");
-
-      terminate(self());
-      return;
-    }
-
-    CHECK_GE(future.get(), quorum);
-
     // Broadcast recover request to all replicas.
-    network->broadcast(protocol::recover, RecoverRequest())
-      .onAny(defer(self(), &Self::broadcasted, lambda::_1));
+    return network->broadcast(protocol::recover, RecoverRequest())
+      .then(defer(self(), &Self::broadcasted, lambda::_1));
   }
 
-  void broadcasted(const Future<set<Future<RecoverResponse> > >& future)
+  Future<Nothing> broadcasted(const set<Future<RecoverResponse> >& _responses)
   {
-    if (!future.isReady()) {
-      promise.fail(
-          future.isFailed() ?
-          "Failed to broadcast the recover request: " + future.failure() :
-          "Not expecting discarded future");
+    responses = _responses;
 
-      terminate(self());
-      return;
-    }
+    // Reset the counters.
+    responsesReceived.clear();
+    lowestBeginPosition = None();
+    highestEndPosition = None();
 
-    responses = future.get();
+    return Nothing();
+  }
 
+  // Returns None if we need to re-run the protocol.
+  Future<Option<RecoverResponse> > receive()
+  {
     if (responses.empty()) {
-      // Retry if no replica is currently in the network.
-      retry();
-    } else {
-      // Instead of using a for loop here, we use select to process
-      // responses one after another so that we can ignore the rest if
-      // we have collected enough responses.
-      select(responses)
-        .onReady(defer(self(), &Self::received, lambda::_1));
-
-      // Reset the counters.
-      responsesReceived.clear();
-      lowestBeginPosition = None();
-      highestEndPosition = None();
+      // All responses have been received but we haven't received
+      // enough (i.e., a quorum of) responses from VOTING replicas to
+      // start the catch-up. We will re-run the recovery protocol.
+      return None();
     }
+
+    // Instead of using a for loop here, we use select to process
+    // responses one after another so that we can ignore the rest if
+    // we have collected enough responses.
+    return select(responses)
+      .then(defer(self(), &Self::received, lambda::_1));
   }
 
-  void received(const Future<RecoverResponse>& future)
+  Future<Option<RecoverResponse> > received(
+      const Future<RecoverResponse>& future)
   {
     // Enforced by the select semantics.
     CHECK_READY(future);
@@ -242,69 +173,173 @@ private:
     // begin position and the highest end position since we haven't
     // persisted this information on disk.
     if (responsesReceived[Metadata::VOTING] >= quorum) {
-      discard(responses);
-      update(Metadata::RECOVERING);
-      return;
+      process::discard(responses);
+
+      CHECK_SOME(lowestBeginPosition);
+      CHECK_SOME(highestEndPosition);
+      CHECK_LE(lowestBeginPosition.get(), highestEndPosition.get());
+
+      RecoverResponse result;
+      result.set_status(Metadata::RECOVERING);
+      result.set_begin(lowestBeginPosition.get());
+      result.set_end(highestEndPosition.get());
+
+      return result;
     }
 
-    if (responses.empty()) {
-      // All responses have been received but neither have we received
-      // enough responses from VOTING replicas to do catch-up, nor are
-      // we in start-up case. This is either because we don't have
-      // enough replicas in the network (e.g. ZooKeeper blip), or we
-      // don't have enough VOTING replicas to proceed. We will retry
-      // the recovery in both cases.
-      retry();
+    // Handle the next response.
+    return receive();
+  }
+
+  void finished(const Future<Option<RecoverResponse> >& future)
+  {
+    if (future.isDiscarded()) {
+      promise.discard();
+      terminate(self());
+    } else if (future.isFailed()) {
+      promise.fail(future.failure());
+      terminate(self());
+    } else if (future.get().isNone()) {
+      // Re-run the protocol. We add a random delay before each retry
+      // because we do not want to saturate the network/disk IO in
+      // some cases. The delay is chosen randomly to reduce the
+      // likelihood of conflicts (i.e., a replica receives a recover
+      // request while it is changing its status).
+      static const Duration T = Milliseconds(500);
+      Duration d = T * (1.0 + (double) ::random() / RAND_MAX);
+      delay(d, self(), &Self::start);
     } else {
-      // Wait for the next response.
-      select(responses)
-        .onReady(defer(self(), &Self::received, lambda::_1));
+      promise.set(future.get().get());
+      terminate(self());
     }
   }
 
-  void update(const Metadata::Status& _status)
+  const size_t quorum;
+  const Shared<Network> network;
+
+  set<Future<RecoverResponse> > responses;
+  hashmap<Metadata::Status, size_t> responsesReceived;
+  Option<uint64_t> lowestBeginPosition;
+  Option<uint64_t> highestEndPosition;
+  Future<Option<RecoverResponse> > chain;
+
+  process::Promise<RecoverResponse> promise;
+};
+
+
+// The wrapper for running the recover protocol.
+static Future<RecoverResponse> runRecoverProtocol(
+    size_t quorum,
+    const Shared<Network>& network)
+{
+  RecoverProtocolProcess* process =
+    new RecoverProtocolProcess(quorum, network);
+
+  Future<RecoverResponse> future = process->future();
+  spawn(process, true);
+  return future;
+}
+
+
+// This process is used to recover a replica. We first check the
+// status of the local replica. If it is in VOTING status, the recover
+// process will terminate immediately. If the local replica is in
+// non-VOTING status, we will run the log recover protocol described
+// above to decide what status the local replica should be in next. If
+// the next status is determined to be RECOVERING, we will start doing
+// catch-up. Later, if the local replica has caught-up, we will set
+// the status of the local replica to VOTING and terminate the
+// process, indicating the recovery has completed.
+//
+// Here, we list a few scenarios and show how the recover process will
+// respond in those scenarios. All the examples assume a quorum size
+// of 2. Remember that a new replica is always put in EMPTY status
+// initially.
+//
+// 1) Replica A, B and C are all in VOTING status. The operator adds
+//    replica D. In that case, D will go into RECOVERING status and
+//    then go into VOTING status. Therefore, we should avoid adding a
+//    new replica unless we know that one replica has been removed.
+//
+// 2) Replica A and B are in VOTING status. The operator adds replica
+//    C. In that case, C will go into RECOVERING status and then go
+//    into VOTING status, which is expected.
+//
+// 3) Replica A is in VOTING status. The operator adds replica B. In
+//    that case, B will stay in EMPTY status forever. This is expected
+//    because we cannot make progress if VOTING replicas are not
+//    enough (i.e., less than quorum).
+//
+// 4) Replica A is in VOTING status and B is in EMPTY status. The
+//    operator adds replica C. In that case, C will stay in EMPTY
+//    status forever similar to case 3).
+class RecoverProcess : public Process<RecoverProcess>
+{
+public:
+  RecoverProcess(
+      size_t _quorum,
+      const Owned<Replica>& _replica,
+      const Shared<Network>& _network)
+    : ProcessBase(ID::generate("log-recover")),
+      quorum(_quorum),
+      replica(_replica),
+      network(_network) {}
+
+  Future<Owned<Replica> > future() { return promise.future(); }
+
+protected:
+  virtual void initialize()
   {
-    LOG(INFO) << "Updating replica status from "
-              << status << " to " << _status;
+    LOG(INFO) << "Starting replica recovery";
 
-    replica->update(_status)
-      .onAny(defer(self(), &Self::updated, _status, lambda::_1));
+    // Register a callback to handle user initiated discard.
+    promise.future().onDiscard(defer(self(), &Self::discard));
+
+    // Check the current status of the local replica and decide if
+    // recovery is needed. Recovery is needed only if the local
+    // replica is not in VOTING status.
+    chain = replica->status()
+      .then(defer(self(), &Self::recover, lambda::_1))
+      .onAny(defer(self(), &Self::finished, lambda::_1));
   }
 
-  void updated(const Metadata::Status& _status, const Future<bool>& future)
+  virtual void finalize()
   {
-    if (!future.isReady()) {
-      promise.fail(
-          future.isFailed() ?
-          "Failed to update replica status: " + future.failure() :
-          "Not expecting discarded future");
+    LOG(INFO) << "Recover process terminated";
+  }
 
-      terminate(self());
-      return;
-    } else if (!future.get()) {
-      promise.fail("Failed to update replica status");
-      terminate(self());
-      return;
-    }
+private:
+  void discard()
+  {
+    chain.discard();
+  }
 
-    // The replica status has been updated successfully. Depending on
-    // the new status, we decide what the next action should be.
-    status = _status;
+  Future<Nothing> recover(const Metadata::Status& status)
+  {
+    LOG(INFO) << "Replica is in " << status << " status";
 
     if (status == Metadata::VOTING) {
-      LOG(INFO) << "Successfully joined the Paxos group";
+      // No need to do recovery.
+      return Nothing();
+    } else {
+      return runRecoverProtocol(quorum, network)
+        .then(defer(self(), &Self::_recover, lambda::_1));
+    }
+  }
 
-      promise.set(replica);
-      terminate(self());
-    } else if (status == Metadata::RECOVERING) {
-      catchup();
+  Future<Nothing> _recover(const RecoverResponse& result)
+  {
+    if (result.status() == Metadata::RECOVERING) {
+      CHECK(result.has_begin() && result.has_end());
+
+      return updateReplicaStatus(Metadata::RECOVERING)
+        .then(defer(self(), &Self::catchup, result.begin(), result.end()));
     } else {
-      // The replica should not be in any other status.
-      LOG(FATAL) << "Unexpected replica status";
+      return Failure("Unexpected status returned from the recover protocol");
     }
   }
 
-  void catchup()
+  Future<Nothing> catchup(uint64_t begin, uint64_t end)
   {
     // We reach here either because the log is empty (uninitialized),
     // or the log is not empty but a previous unfinished catch-up
@@ -330,17 +365,13 @@ private:
     // _begin_, it should have already been truncated and the
     // truncation should have already been agreed. Therefore, allowing
     // the local replica to vote for that position is safe.
-    CHECK(lowestBeginPosition.isSome());
-    CHECK(highestEndPosition.isSome());
-    CHECK_LE(lowestBeginPosition.get(), highestEndPosition.get());
+    CHECK_LE(begin, end);
 
-    LOG(INFO) << "Starting catch-up from position "
-              << lowestBeginPosition.get() << " to "
-              << highestEndPosition.get();
+    LOG(INFO) << "Starting catch-up from position " << begin << " to " << end;
 
     IntervalSet<uint64_t> positions(
-        Bound<uint64_t>::closed(lowestBeginPosition.get()),
-        Bound<uint64_t>::closed(highestEndPosition.get()));
+        Bound<uint64_t>::closed(begin),
+        Bound<uint64_t>::closed(end));
 
     // Share the ownership of the replica. From this point until the
     // point where the ownership of the replica is regained, we should
@@ -350,63 +381,66 @@ private:
     // Since we do not know what proposal number to use (the log is
     // empty), we use none and leave log::catchup to automatically
     // bump the proposal number.
-    catching = log::catchup(quorum, shared, network, None(), positions);
-    catching.onAny(defer(self(), &Self::caughtup, shared, lambda::_1));
+    return log::catchup(quorum, shared, network, None(), positions)
+      .then(defer(self(), &Self::getReplicaOwnership, shared))
+      .then(defer(self(), &Self::updateReplicaStatus, Metadata::VOTING));
   }
 
-  void caughtup(Shared<Replica> shared, const Future<Nothing>& future)
+  Future<Nothing> updateReplicaStatus(const Metadata::Status& status)
   {
-    if (!future.isReady()) {
-      promise.fail(
-          future.isFailed() ?
-          "Failed to catch-up: " + future.failure() :
-          "Not expecting discarded future");
+    LOG(INFO) << "Updating replica status to " << status;
 
-      terminate(self());
-    } else {
-      // Try to regain the ownership of the replica.
-      shared.own().onAny(defer(self(), &Self::owned, lambda::_1));
-    }
+    return replica->update(status)
+      .then(defer(self(), &Self::_updateReplicaStatus, lambda::_1, status));
   }
 
-  void owned(const Future<Owned<Replica> >& future)
+  Future<Nothing> _updateReplicaStatus(
+      bool updated, const Metadata::Status& status)
   {
-    if (!future.isReady()) {
-      promise.fail(
-          future.isFailed() ?
-          "Failed to own the replica: " + future.failure() :
-          "Not expecting discarded future");
+    if (!updated) {
+      return Failure("Failed to update replica status");
+    }
 
-      terminate(self());
-    } else {
-      // Allow the replica to vote once the catch-up is done.
-      replica = future.get();
-      update(Metadata::VOTING);
+    if (status == Metadata::VOTING) {
+      LOG(INFO) << "Successfully joined the Paxos group";
     }
+
+    return Nothing();
   }
 
-  void retry()
+  Future<Nothing> getReplicaOwnership(Shared<Replica> shared)
   {
-    // We add a random delay before each retry because we do not want
-    // to saturate the network/disk IO in some cases (e.g., network
-    // size is less than quorum). The delay is chosen randomly to
-    // reduce the likelihood of conflicts (i.e., a replica receives a
-    // recover request while it is changing its status).
-    static const Duration T = Milliseconds(500);
-    Duration d = T * (1.0 + (double) ::random() / RAND_MAX);
-    delay(d, self(), &Self::recover);
+    // Try to regain the ownership of the replica.
+    return shared.own()
+      .then(defer(self(), &Self::_getReplicaOwnership, lambda::_1));
+  }
+
+  Future<Nothing> _getReplicaOwnership(Owned<Replica> owned)
+  {
+    replica = owned;
+
+    return Nothing();
+  }
+
+  void finished(const Future<Nothing>& future)
+  {
+    if (future.isDiscarded()) {
+      promise.discard();
+      terminate(self());
+    } else if (future.isFailed()) {
+      promise.fail(future.failure());
+      terminate(self());
+    } else {
+      promise.set(replica);
+      terminate(self());
+    }
   }
 
   const size_t quorum;
   Owned<Replica> replica;
   const Shared<Network> network;
 
-  Metadata::Status status;
-  set<Future<RecoverResponse> > responses;
-  hashmap<Metadata::Status, size_t> responsesReceived;
-  Option<uint64_t> lowestBeginPosition;
-  Option<uint64_t> highestEndPosition;
-  Future<Nothing> catching;
+  Future<Nothing> chain;
 
   process::Promise<Owned<Replica> > promise;
 };

Reply via email to