Repository: mesos Updated Branches: refs/heads/master b979d03c5 -> 3facf2009
Cleaned up log recovery code to use continuation style. Review: https://reviews.apache.org/r/18584 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/3facf200 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/3facf200 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/3facf200 Branch: refs/heads/master Commit: 3facf200981541459336068d39e11b4f05486315 Parents: b979d03 Author: Jie Yu <[email protected]> Authored: Thu Feb 27 11:22:11 2014 -0800 Committer: Jie Yu <[email protected]> Committed: Fri Mar 7 12:28:01 2014 -0800 ---------------------------------------------------------------------- src/log/recover.cpp | 466 +++++++++++++++++++++++++---------------------- 1 file changed, 250 insertions(+), 216 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/3facf200/src/log/recover.cpp ---------------------------------------------------------------------- diff --git a/src/log/recover.cpp b/src/log/recover.cpp index e611a4e..688da5f 100644 --- a/src/log/recover.cpp +++ b/src/log/recover.cpp @@ -26,6 +26,7 @@ #include <process/id.hpp> #include <process/process.hpp> +#include <stout/check.hpp> #include <stout/foreach.hpp> #include <stout/hashmap.hpp> #include <stout/lambda.hpp> @@ -47,169 +48,99 @@ namespace mesos { namespace internal { namespace log { -// This process is used to recover a replica. The flow of the recover -// process is described as follows: -// A) Check the status of the local replica. -// A1) If it is VOTING, exit. -// A2) If it is not VOTING, goto (B). -// B) Broadcast a RecoverRequest to all replicas in the network. -// B1) <<< Catch-up >>> If a quorum of replicas are found in VOTING -// status (no matter what the status of the local replica is), -// set the status of the local replica to RECOVERING, and start -// doing catch-up. If the local replica has been caught-up, set -// the status of the local replica to VOTING and exit. -// B2) If a quorum is not found, goto (B). -// -// In the following, we list a few scenarios and show how the recover -// process will respond in those scenarios. All the examples assume a -// quorum size of 2. Remember that a new replica is always put in -// EMPTY status initially. -// -// 1) Replica A, B and C are all in VOTING status. The operator adds -// replica D. In that case, D will go into RECOVERING status and -// then go into VOTING status. Therefore, we should avoid adding a -// new replica unless we know that one replica has been removed. -// -// 2) Replica A and B are in VOTING status. The operator adds replica -// C. In that case, C will go into RECOVERING status and then go -// into VOTING status, which is expected. + +// This class is responsible for executing the log recover protocol. +// Any time a replica in non-VOTING status starts, we will run this +// protocol. We first broadcast a recover request to all the replicas +// in the network, and then collect recover responses to decide what +// status the local replica should be in next. The details of the +// recover protocol is shown as follows: // -// 3) Replica A is in VOTING status. The operator adds replica B. In -// that case, B will stay in EMPTY status forever. This is expected -// because we cannot make progress if VOTING replicas are not -// enough (i.e., less than quorum). +// A) Broadcast a RecoverRequest to all replicas in the network. +// B) Collect RecoverResponse from each replica +// B1) If a quorum of replicas are found in VOTING status, the local +// replica will be in RECOVERING status next. +// B2) Otherwise, goto (A). // -// 4) Replica A is in VOTING status and B is in EMPTY status. The -// operator adds replica C. In that case, C will stay in EMPTY -// status forever similar to case 3). -class RecoverProcess : public Process<RecoverProcess> +// We re-use RecoverResponse to specify the return value. The 'status' +// field specifies the next status of the local replica. If the next +// status is RECOVERING, we set the fields 'begin' and 'end' to be the +// lowest begin and highest end position seen in these responses. +class RecoverProtocolProcess : public Process<RecoverProtocolProcess> { public: - RecoverProcess( + RecoverProtocolProcess( size_t _quorum, - const Owned<Replica>& _replica, const Shared<Network>& _network) - : ProcessBase(ID::generate("log-recover")), + : ProcessBase(ID::generate("log-recover-protocol")), quorum(_quorum), - replica(_replica), network(_network) {} - Future<Owned<Replica> > future() { return promise.future(); } + Future<RecoverResponse> future() { return promise.future(); } protected: virtual void initialize() { - LOG(INFO) << "Start recovering a replica"; - - // Stop when no one cares. - promise.future().onDiscard(lambda::bind( - static_cast<void(*)(const UPID&, bool)>(terminate), self(), true)); - - // Check the current status of the local replica and decide if - // recovery is needed. Recovery is needed if the local replica is - // not in VOTING status. - replica->status().onAny(defer(self(), &Self::checked, lambda::_1)); - } - - virtual void finalize() - { - LOG(INFO) << "Recover process terminated"; - - // Cancel all operations if they are still pending. - discard(responses); - catching.discard(); + // Register a callback to handle user initiated discard. + promise.future().onDiscard(defer(self(), &Self::discard)); - // TODO(benh): Discard our promise only after 'catching' has - // completed (ready, failed, or discarded). - promise.discard(); + start(); } private: - void checked(const Future<Metadata::Status>& future) + void discard() { - if (!future.isReady()) { - promise.fail( - future.isFailed() ? - "Failed to get replica status: " + future.failure() : - "Not expecting discarded future"); - - terminate(self()); - return; - } - - status = future.get(); - - LOG(INFO) << "Replica is in " << status << " status"; - - if (status == Metadata::VOTING) { - promise.set(replica); - terminate(self()); - } else { - recover(); - } + chain.discard(); } - void recover() + void start() { - CHECK_NE(status, Metadata::VOTING); - // Wait until there are enough (i.e., quorum of) replicas in the // network to avoid unnecessary retries. - network->watch(quorum, Network::GREATER_THAN_OR_EQUAL_TO) - .onAny(defer(self(), &Self::watched, lambda::_1)); + chain = network->watch(quorum, Network::GREATER_THAN_OR_EQUAL_TO) + .then(defer(self(), &Self::broadcast)) + .then(defer(self(), &Self::receive)) + .onAny(defer(self(), &Self::finished, lambda::_1)); } - void watched(const Future<size_t>& future) + Future<Nothing> broadcast() { - if (!future.isReady()) { - promise.fail( - future.isFailed() ? - future.failure() : - "Not expecting discarded future"); - - terminate(self()); - return; - } - - CHECK_GE(future.get(), quorum); - // Broadcast recover request to all replicas. - network->broadcast(protocol::recover, RecoverRequest()) - .onAny(defer(self(), &Self::broadcasted, lambda::_1)); + return network->broadcast(protocol::recover, RecoverRequest()) + .then(defer(self(), &Self::broadcasted, lambda::_1)); } - void broadcasted(const Future<set<Future<RecoverResponse> > >& future) + Future<Nothing> broadcasted(const set<Future<RecoverResponse> >& _responses) { - if (!future.isReady()) { - promise.fail( - future.isFailed() ? - "Failed to broadcast the recover request: " + future.failure() : - "Not expecting discarded future"); + responses = _responses; - terminate(self()); - return; - } + // Reset the counters. + responsesReceived.clear(); + lowestBeginPosition = None(); + highestEndPosition = None(); - responses = future.get(); + return Nothing(); + } + // Returns None if we need to re-run the protocol. + Future<Option<RecoverResponse> > receive() + { if (responses.empty()) { - // Retry if no replica is currently in the network. - retry(); - } else { - // Instead of using a for loop here, we use select to process - // responses one after another so that we can ignore the rest if - // we have collected enough responses. - select(responses) - .onReady(defer(self(), &Self::received, lambda::_1)); - - // Reset the counters. - responsesReceived.clear(); - lowestBeginPosition = None(); - highestEndPosition = None(); + // All responses have been received but we haven't received + // enough (i.e., a quorum of) responses from VOTING replicas to + // start the catch-up. We will re-run the recovery protocol. + return None(); } + + // Instead of using a for loop here, we use select to process + // responses one after another so that we can ignore the rest if + // we have collected enough responses. + return select(responses) + .then(defer(self(), &Self::received, lambda::_1)); } - void received(const Future<RecoverResponse>& future) + Future<Option<RecoverResponse> > received( + const Future<RecoverResponse>& future) { // Enforced by the select semantics. CHECK_READY(future); @@ -242,69 +173,173 @@ private: // begin position and the highest end position since we haven't // persisted this information on disk. if (responsesReceived[Metadata::VOTING] >= quorum) { - discard(responses); - update(Metadata::RECOVERING); - return; + process::discard(responses); + + CHECK_SOME(lowestBeginPosition); + CHECK_SOME(highestEndPosition); + CHECK_LE(lowestBeginPosition.get(), highestEndPosition.get()); + + RecoverResponse result; + result.set_status(Metadata::RECOVERING); + result.set_begin(lowestBeginPosition.get()); + result.set_end(highestEndPosition.get()); + + return result; } - if (responses.empty()) { - // All responses have been received but neither have we received - // enough responses from VOTING replicas to do catch-up, nor are - // we in start-up case. This is either because we don't have - // enough replicas in the network (e.g. ZooKeeper blip), or we - // don't have enough VOTING replicas to proceed. We will retry - // the recovery in both cases. - retry(); + // Handle the next response. + return receive(); + } + + void finished(const Future<Option<RecoverResponse> >& future) + { + if (future.isDiscarded()) { + promise.discard(); + terminate(self()); + } else if (future.isFailed()) { + promise.fail(future.failure()); + terminate(self()); + } else if (future.get().isNone()) { + // Re-run the protocol. We add a random delay before each retry + // because we do not want to saturate the network/disk IO in + // some cases. The delay is chosen randomly to reduce the + // likelihood of conflicts (i.e., a replica receives a recover + // request while it is changing its status). + static const Duration T = Milliseconds(500); + Duration d = T * (1.0 + (double) ::random() / RAND_MAX); + delay(d, self(), &Self::start); } else { - // Wait for the next response. - select(responses) - .onReady(defer(self(), &Self::received, lambda::_1)); + promise.set(future.get().get()); + terminate(self()); } } - void update(const Metadata::Status& _status) + const size_t quorum; + const Shared<Network> network; + + set<Future<RecoverResponse> > responses; + hashmap<Metadata::Status, size_t> responsesReceived; + Option<uint64_t> lowestBeginPosition; + Option<uint64_t> highestEndPosition; + Future<Option<RecoverResponse> > chain; + + process::Promise<RecoverResponse> promise; +}; + + +// The wrapper for running the recover protocol. +static Future<RecoverResponse> runRecoverProtocol( + size_t quorum, + const Shared<Network>& network) +{ + RecoverProtocolProcess* process = + new RecoverProtocolProcess(quorum, network); + + Future<RecoverResponse> future = process->future(); + spawn(process, true); + return future; +} + + +// This process is used to recover a replica. We first check the +// status of the local replica. If it is in VOTING status, the recover +// process will terminate immediately. If the local replica is in +// non-VOTING status, we will run the log recover protocol described +// above to decide what status the local replica should be in next. If +// the next status is determined to be RECOVERING, we will start doing +// catch-up. Later, if the local replica has caught-up, we will set +// the status of the local replica to VOTING and terminate the +// process, indicating the recovery has completed. +// +// Here, we list a few scenarios and show how the recover process will +// respond in those scenarios. All the examples assume a quorum size +// of 2. Remember that a new replica is always put in EMPTY status +// initially. +// +// 1) Replica A, B and C are all in VOTING status. The operator adds +// replica D. In that case, D will go into RECOVERING status and +// then go into VOTING status. Therefore, we should avoid adding a +// new replica unless we know that one replica has been removed. +// +// 2) Replica A and B are in VOTING status. The operator adds replica +// C. In that case, C will go into RECOVERING status and then go +// into VOTING status, which is expected. +// +// 3) Replica A is in VOTING status. The operator adds replica B. In +// that case, B will stay in EMPTY status forever. This is expected +// because we cannot make progress if VOTING replicas are not +// enough (i.e., less than quorum). +// +// 4) Replica A is in VOTING status and B is in EMPTY status. The +// operator adds replica C. In that case, C will stay in EMPTY +// status forever similar to case 3). +class RecoverProcess : public Process<RecoverProcess> +{ +public: + RecoverProcess( + size_t _quorum, + const Owned<Replica>& _replica, + const Shared<Network>& _network) + : ProcessBase(ID::generate("log-recover")), + quorum(_quorum), + replica(_replica), + network(_network) {} + + Future<Owned<Replica> > future() { return promise.future(); } + +protected: + virtual void initialize() { - LOG(INFO) << "Updating replica status from " - << status << " to " << _status; + LOG(INFO) << "Starting replica recovery"; - replica->update(_status) - .onAny(defer(self(), &Self::updated, _status, lambda::_1)); + // Register a callback to handle user initiated discard. + promise.future().onDiscard(defer(self(), &Self::discard)); + + // Check the current status of the local replica and decide if + // recovery is needed. Recovery is needed only if the local + // replica is not in VOTING status. + chain = replica->status() + .then(defer(self(), &Self::recover, lambda::_1)) + .onAny(defer(self(), &Self::finished, lambda::_1)); } - void updated(const Metadata::Status& _status, const Future<bool>& future) + virtual void finalize() { - if (!future.isReady()) { - promise.fail( - future.isFailed() ? - "Failed to update replica status: " + future.failure() : - "Not expecting discarded future"); + LOG(INFO) << "Recover process terminated"; + } - terminate(self()); - return; - } else if (!future.get()) { - promise.fail("Failed to update replica status"); - terminate(self()); - return; - } +private: + void discard() + { + chain.discard(); + } - // The replica status has been updated successfully. Depending on - // the new status, we decide what the next action should be. - status = _status; + Future<Nothing> recover(const Metadata::Status& status) + { + LOG(INFO) << "Replica is in " << status << " status"; if (status == Metadata::VOTING) { - LOG(INFO) << "Successfully joined the Paxos group"; + // No need to do recovery. + return Nothing(); + } else { + return runRecoverProtocol(quorum, network) + .then(defer(self(), &Self::_recover, lambda::_1)); + } + } - promise.set(replica); - terminate(self()); - } else if (status == Metadata::RECOVERING) { - catchup(); + Future<Nothing> _recover(const RecoverResponse& result) + { + if (result.status() == Metadata::RECOVERING) { + CHECK(result.has_begin() && result.has_end()); + + return updateReplicaStatus(Metadata::RECOVERING) + .then(defer(self(), &Self::catchup, result.begin(), result.end())); } else { - // The replica should not be in any other status. - LOG(FATAL) << "Unexpected replica status"; + return Failure("Unexpected status returned from the recover protocol"); } } - void catchup() + Future<Nothing> catchup(uint64_t begin, uint64_t end) { // We reach here either because the log is empty (uninitialized), // or the log is not empty but a previous unfinished catch-up @@ -330,17 +365,13 @@ private: // _begin_, it should have already been truncated and the // truncation should have already been agreed. Therefore, allowing // the local replica to vote for that position is safe. - CHECK(lowestBeginPosition.isSome()); - CHECK(highestEndPosition.isSome()); - CHECK_LE(lowestBeginPosition.get(), highestEndPosition.get()); + CHECK_LE(begin, end); - LOG(INFO) << "Starting catch-up from position " - << lowestBeginPosition.get() << " to " - << highestEndPosition.get(); + LOG(INFO) << "Starting catch-up from position " << begin << " to " << end; IntervalSet<uint64_t> positions( - Bound<uint64_t>::closed(lowestBeginPosition.get()), - Bound<uint64_t>::closed(highestEndPosition.get())); + Bound<uint64_t>::closed(begin), + Bound<uint64_t>::closed(end)); // Share the ownership of the replica. From this point until the // point where the ownership of the replica is regained, we should @@ -350,63 +381,66 @@ private: // Since we do not know what proposal number to use (the log is // empty), we use none and leave log::catchup to automatically // bump the proposal number. - catching = log::catchup(quorum, shared, network, None(), positions); - catching.onAny(defer(self(), &Self::caughtup, shared, lambda::_1)); + return log::catchup(quorum, shared, network, None(), positions) + .then(defer(self(), &Self::getReplicaOwnership, shared)) + .then(defer(self(), &Self::updateReplicaStatus, Metadata::VOTING)); } - void caughtup(Shared<Replica> shared, const Future<Nothing>& future) + Future<Nothing> updateReplicaStatus(const Metadata::Status& status) { - if (!future.isReady()) { - promise.fail( - future.isFailed() ? - "Failed to catch-up: " + future.failure() : - "Not expecting discarded future"); + LOG(INFO) << "Updating replica status to " << status; - terminate(self()); - } else { - // Try to regain the ownership of the replica. - shared.own().onAny(defer(self(), &Self::owned, lambda::_1)); - } + return replica->update(status) + .then(defer(self(), &Self::_updateReplicaStatus, lambda::_1, status)); } - void owned(const Future<Owned<Replica> >& future) + Future<Nothing> _updateReplicaStatus( + bool updated, const Metadata::Status& status) { - if (!future.isReady()) { - promise.fail( - future.isFailed() ? - "Failed to own the replica: " + future.failure() : - "Not expecting discarded future"); + if (!updated) { + return Failure("Failed to update replica status"); + } - terminate(self()); - } else { - // Allow the replica to vote once the catch-up is done. - replica = future.get(); - update(Metadata::VOTING); + if (status == Metadata::VOTING) { + LOG(INFO) << "Successfully joined the Paxos group"; } + + return Nothing(); } - void retry() + Future<Nothing> getReplicaOwnership(Shared<Replica> shared) { - // We add a random delay before each retry because we do not want - // to saturate the network/disk IO in some cases (e.g., network - // size is less than quorum). The delay is chosen randomly to - // reduce the likelihood of conflicts (i.e., a replica receives a - // recover request while it is changing its status). - static const Duration T = Milliseconds(500); - Duration d = T * (1.0 + (double) ::random() / RAND_MAX); - delay(d, self(), &Self::recover); + // Try to regain the ownership of the replica. + return shared.own() + .then(defer(self(), &Self::_getReplicaOwnership, lambda::_1)); + } + + Future<Nothing> _getReplicaOwnership(Owned<Replica> owned) + { + replica = owned; + + return Nothing(); + } + + void finished(const Future<Nothing>& future) + { + if (future.isDiscarded()) { + promise.discard(); + terminate(self()); + } else if (future.isFailed()) { + promise.fail(future.failure()); + terminate(self()); + } else { + promise.set(replica); + terminate(self()); + } } const size_t quorum; Owned<Replica> replica; const Shared<Network> network; - Metadata::Status status; - set<Future<RecoverResponse> > responses; - hashmap<Metadata::Status, size_t> responsesReceived; - Option<uint64_t> lowestBeginPosition; - Option<uint64_t> highestEndPosition; - Future<Nothing> catching; + Future<Nothing> chain; process::Promise<Owned<Replica> > promise; };
