Repository: mesos Updated Branches: refs/heads/master 3bcce8fb3 -> 82b6112ca
Fixed race between coordinator election and recovery in replicated log. MESOS-3280. The basic problem is that replicas silently ignore inbound Promise and Write requests if they have not finished the recovery protocol yet (because they can't safely vote on such requests). Hence, if we try to do a Paxos round while a quorum of nodes have not finished recovering, the Paxos round will never complete. In particular, this might happen during coordinator election: coordinator election (which is implemented as performing a full Paxos round) starts as soon as the candidate coordinator replica has finished the recovery protocol. If several nodes start concurrently, a quorum of those nodes might still be executing the recovery protocol, and hence the coordinator will never be elected. To address this, add "ignored" responses to the Promise and Write sub-protocols: if a proposer sees a quorum of "ignored" responses to a promise or write request it has issued, it knows the request will never succeed. When used for coordinator election, the current coding will retry immediately (without a backoff). Note that replicas will still silently drop promise/write requests if another kind of problem occurs (e.g., an I/O error prevents reading/writing log data). We might consider changing this, although it will require some thought: e.g., if a replica's disk is broken, sending an "ignored" message on every request might flood the network. REVIEWER NOTES: * GMock code is ugly, but see notes below. * Should we add exponential backoff when retrying coordinator election? Review: https://reviews.apache.org/r/39325 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/82b6112c Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/82b6112c Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/82b6112c Branch: refs/heads/master Commit: 82b6112cabc838f9bfa69a052f22c31784653dab Parents: 3bcce8f Author: Neil Conway <[email protected]> Authored: Tue Nov 3 11:17:11 2015 -0800 Committer: Jie Yu <[email protected]> Committed: Tue Nov 3 11:35:05 2015 -0800 ---------------------------------------------------------------------- src/log/consensus.cpp | 105 ++++++++++++++++++++++++-- src/log/coordinator.cpp | 15 +++- src/log/replica.cpp | 62 ++++++++++++---- src/log/replica.hpp | 8 +- src/messages/log.proto | 76 ++++++++++++++----- src/tests/log_tests.cpp | 172 ++++++++++++++++++++++++++++++++++++++----- 6 files changed, 373 insertions(+), 65 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/82b6112c/src/log/consensus.cpp ---------------------------------------------------------------------- diff --git a/src/log/consensus.cpp b/src/log/consensus.cpp index 71cd5f3..80569be 100644 --- a/src/log/consensus.cpp +++ b/src/log/consensus.cpp @@ -43,6 +43,28 @@ namespace mesos { namespace internal { namespace log { +static bool isRejectedPromise(const PromiseResponse& response) { + if (response.has_type()) { + // New format (Mesos >= 0.26). + return response.type() == PromiseResponse::REJECT; + } else { + // Old format (Mesos < 0.26). + return !response.okay(); + } +} + + +static bool isRejectedWrite(const WriteResponse& response) { + if (response.has_type()) { + // New format (Mesos >= 0.26). + return response.type() == WriteResponse::REJECT; + } else { + // Old format (Mesos < 0.26). + return !response.okay(); + } +} + + class ExplicitPromiseProcess : public Process<ExplicitPromiseProcess> { public: @@ -56,7 +78,8 @@ public: network(_network), proposal(_proposal), position(_position), - responsesReceived(0) {} + responsesReceived(0), + ignoresReceived(0) {} virtual ~ExplicitPromiseProcess() {} @@ -127,9 +150,29 @@ private: void received(const PromiseResponse& response) { + if (response.has_type() && response.type() == PromiseResponse::IGNORE) { + ignoresReceived++; + + // A quorum of replicas have ignored the request. + if (ignoresReceived >= quorum) { + LOG(INFO) << "Aborting explicit promise request because " + << ignoresReceived << " ignores received"; + + // If the "type" is PromiseResponse::IGNORE, the rest of the + // fields don't matter. + PromiseResponse result; + result.set_type(PromiseResponse::IGNORE); + + promise.set(result); + terminate(self()); + } + + return; + } + responsesReceived++; - if (!response.okay()) { + if (isRejectedPromise(response)) { // Failed to get the promise from a replica for this position // because it has been promised to a proposer with a higher // proposal number. The 'proposal' field in the response @@ -196,9 +239,11 @@ private: PromiseResponse result; if (highestNackProposal.isSome()) { + result.set_type(PromiseResponse::REJECT); result.set_okay(false); result.set_proposal(highestNackProposal.get()); } else { + result.set_type(PromiseResponse::ACCEPT); result.set_okay(true); if (highestAckAction.isSome()) { result.mutable_action()->CopyFrom(highestAckAction.get()); @@ -218,6 +263,7 @@ private: PromiseRequest request; set<Future<PromiseResponse> > responses; size_t responsesReceived; + size_t ignoresReceived; Option<uint64_t> highestNackProposal; Option<Action> highestAckAction; @@ -236,7 +282,8 @@ public: quorum(_quorum), network(_network), proposal(_proposal), - responsesReceived(0) {} + responsesReceived(0), + ignoresReceived(0) {} virtual ~ImplicitPromiseProcess() {} @@ -306,9 +353,29 @@ private: void received(const PromiseResponse& response) { + if (response.has_type() && response.type() == PromiseResponse::IGNORE) { + ignoresReceived++; + + // A quorum of replicas have ignored the request. + if (ignoresReceived >= quorum) { + LOG(INFO) << "Aborting implicit promise request because " + << ignoresReceived << " ignores received"; + + // If the "type" is PromiseResponse::IGNORE, the rest of the + // fields don't matter. + PromiseResponse result; + result.set_type(PromiseResponse::IGNORE); + + promise.set(result); + terminate(self()); + } + + return; + } + responsesReceived++; - if (!response.okay()) { + if (isRejectedPromise(response)) { // Failed to get the promise from a replica because it has // promised a proposer with a higher proposal number. The // 'proposal' field in the response specifies the proposal @@ -335,11 +402,13 @@ private: PromiseResponse result; if (highestNackProposal.isSome()) { + result.set_type(PromiseResponse::REJECT); result.set_okay(false); result.set_proposal(highestNackProposal.get()); } else { CHECK_SOME(highestEndPosition); + result.set_type(PromiseResponse::ACCEPT); result.set_okay(true); result.set_position(highestEndPosition.get()); } @@ -356,6 +425,7 @@ private: PromiseRequest request; set<Future<PromiseResponse> > responses; size_t responsesReceived; + size_t ignoresReceived; Option<uint64_t> highestNackProposal; Option<uint64_t> highestEndPosition; @@ -376,7 +446,8 @@ public: network(_network), proposal(_proposal), action(_action), - responsesReceived(0) {} + responsesReceived(0), + ignoresReceived(0) {} virtual ~WriteProcess() {} @@ -466,9 +537,28 @@ private: { CHECK_EQ(response.position(), request.position()); + if (response.has_type() && response.type() == WriteResponse::IGNORE) { + ignoresReceived++; + + if (ignoresReceived >= quorum) { + LOG(INFO) << "Aborting write request because " + << ignoresReceived << " ignores received"; + + // If the "type" is WriteResponse::IGNORE, the rest of the + // fields don't matter. + WriteResponse result; + result.set_type(WriteResponse::IGNORE); + + promise.set(result); + terminate(self()); + } + + return; + } + responsesReceived++; - if (!response.okay()) { + if (isRejectedWrite(response)) { // A replica rejects the write request because this position has // been promised to a proposer with a higher proposal number. // The 'proposal' field in the response specifies the proposal @@ -485,9 +575,11 @@ private: WriteResponse result; if (highestNackProposal.isSome()) { + result.set_type(WriteResponse::REJECT); result.set_okay(false); result.set_proposal(highestNackProposal.get()); } else { + result.set_type(WriteResponse::ACCEPT); result.set_okay(true); } @@ -504,6 +596,7 @@ private: WriteRequest request; set<Future<WriteResponse> > responses; size_t responsesReceived; + size_t ignoresReceived; Option<uint64_t> highestNackProposal; process::Promise<WriteResponse> promise; http://git-wip-us.apache.org/repos/asf/mesos/blob/82b6112c/src/log/coordinator.cpp ---------------------------------------------------------------------- diff --git a/src/log/coordinator.cpp b/src/log/coordinator.cpp index e1df8b0..62438ff 100644 --- a/src/log/coordinator.cpp +++ b/src/log/coordinator.cpp @@ -194,15 +194,24 @@ Future<PromiseResponse> CoordinatorProcess::runPromisePhase() Future<Option<uint64_t> > CoordinatorProcess::checkPromisePhase( const PromiseResponse& response) { - if (!response.okay()) { + CHECK(response.has_type()); + + switch (response.type()) { + case PromiseResponse::IGNORE: + // A quorum of replicas ignored the request, but it can be + // retried. + return None(); + + case PromiseResponse::REJECT: // Lost an election, but can be retried. We save the proposal // number here so that most likely we will have a high enough // proposal number when we retry. CHECK_LE(proposal, response.proposal()); proposal = response.proposal(); - return None(); - } else { + + default: + CHECK(response.type() == PromiseResponse::ACCEPT); CHECK(response.has_position()); index = response.position(); http://git-wip-us.apache.org/repos/asf/mesos/blob/82b6112c/src/log/replica.cpp ---------------------------------------------------------------------- diff --git a/src/log/replica.cpp b/src/log/replica.cpp index 414e116..ac0f223 100644 --- a/src/log/replica.cpp +++ b/src/log/replica.cpp @@ -350,27 +350,40 @@ bool ReplicaProcess::updatePromised(uint64_t promised) return true; } - -// Note that certain failures that occur result in returning from the -// current function but *NOT* sending a NACK back to the proposer -// because that implies a proposer has been demoted. Not sending -// anything is equivalent to pretending like the request never made it -// here. TODO(benh): At some point, however, we might want to actually -// "fail" more dramatically because there could be something rather -// seriously wrong on this box that we are ignoring (like a bad disk). -// This could be accomplished by changing most LOG(ERROR) statements -// to LOG(FATAL), or by counting the number of errors and after -// reaching some threshold aborting. In addition, sending the error -// information back to the proposer "might" help the debugging -// procedure. +// When handling replicated log protocol requests, we handle errors in +// three different ways: +// +// 1. If we've accepted a conflicting request with a higher proposal +// number, we return a REJECT response. +// 2. If we can't vote on the request because we're in the wrong +// state (e.g., not finished the recovery or catchup protocols), +// we return an IGNORE response. +// 3. If we encounter an error (e.g., I/O failure) handling the +// request, we log the error and silently ignore the request. +// +// TODO(benh): At some point, however, we might want to actually +// "fail" more dramatically for case three, because there could be +// something rather seriously wrong on this box that we are ignoring +// (like a bad disk). This could be accomplished by changing most +// LOG(ERROR) statements to LOG(FATAL), or by counting the number of +// errors and after reaching some threshold aborting. In addition, +// sending the error information back to the proposer "might" help the +// debugging procedure. void ReplicaProcess::promise(const UPID& from, const PromiseRequest& request) { - // Ignore promise requests if this replica is not in VOTING status. + // Ignore promise requests if this replica is not in VOTING status; + // we also inform the requester, so that they can retry promptly. if (status() != Metadata::VOTING) { LOG(INFO) << "Replica ignoring promise request from " << from << " as it is in " << status() << " status"; + + PromiseResponse response; + response.set_type(PromiseResponse::IGNORE); + response.set_okay(false); + response.set_proposal(request.proposal()); + reply(response); return; } @@ -402,6 +415,7 @@ void ReplicaProcess::promise(const UPID& from, const PromiseRequest& request) action.mutable_nop()->MergeFrom(Action::Nop()); PromiseResponse response; + response.set_type(PromiseResponse::ACCEPT); response.set_okay(true); response.set_proposal(request.proposal()); response.mutable_action()->MergeFrom(action); @@ -435,6 +449,7 @@ void ReplicaProcess::promise(const UPID& from, const PromiseRequest& request) // number so that the proposer can bump its proposal number // and retry if needed to ensure liveness. PromiseResponse response; + response.set_type(PromiseResponse::REJECT); response.set_okay(false); response.set_proposal(promised()); reply(response); @@ -445,6 +460,7 @@ void ReplicaProcess::promise(const UPID& from, const PromiseRequest& request) if (persist(action)) { PromiseResponse response; + response.set_type(PromiseResponse::ACCEPT); response.set_okay(true); response.set_proposal(request.proposal()); response.set_position(request.position()); @@ -458,6 +474,7 @@ void ReplicaProcess::promise(const UPID& from, const PromiseRequest& request) if (request.proposal() <= action.promised()) { PromiseResponse response; + response.set_type(PromiseResponse::REJECT); response.set_okay(false); response.set_proposal(action.promised()); reply(response); @@ -467,6 +484,7 @@ void ReplicaProcess::promise(const UPID& from, const PromiseRequest& request) if (persist(action)) { PromiseResponse response; + response.set_type(PromiseResponse::ACCEPT); response.set_okay(true); response.set_proposal(request.proposal()); response.mutable_action()->MergeFrom(original); @@ -483,6 +501,7 @@ void ReplicaProcess::promise(const UPID& from, const PromiseRequest& request) LOG(INFO) << "Replica denying promise request with proposal " << request.proposal(); PromiseResponse response; + response.set_type(PromiseResponse::REJECT); response.set_okay(false); response.set_proposal(promised()); reply(response); @@ -490,6 +509,7 @@ void ReplicaProcess::promise(const UPID& from, const PromiseRequest& request) if (updatePromised(request.proposal())) { // Return the last position written. PromiseResponse response; + response.set_type(PromiseResponse::ACCEPT); response.set_okay(true); response.set_proposal(request.proposal()); response.set_position(end); @@ -502,10 +522,18 @@ void ReplicaProcess::promise(const UPID& from, const PromiseRequest& request) void ReplicaProcess::write(const UPID& from, const WriteRequest& request) { - // Ignore write requests if this replica is not in VOTING status. + // Ignore write requests if this replica is not in VOTING status; we + // also inform the requester, so that they can retry promptly. if (status() != Metadata::VOTING) { LOG(INFO) << "Replica ignoring write request from " << from << " as it is in " << status() << " status"; + + WriteResponse response; + response.set_type(WriteResponse::IGNORE); + response.set_okay(false); + response.set_proposal(request.proposal()); + response.set_position(request.position()); + reply(response); return; } @@ -520,6 +548,7 @@ void ReplicaProcess::write(const UPID& from, const WriteRequest& request) } else if (result.isNone()) { if (request.proposal() < promised()) { WriteResponse response; + response.set_type(WriteResponse::REJECT); response.set_okay(false); response.set_proposal(promised()); response.set_position(request.position()); @@ -551,6 +580,7 @@ void ReplicaProcess::write(const UPID& from, const WriteRequest& request) if (persist(action)) { WriteResponse response; + response.set_type(WriteResponse::ACCEPT); response.set_okay(true); response.set_proposal(request.proposal()); response.set_position(request.position()); @@ -563,6 +593,7 @@ void ReplicaProcess::write(const UPID& from, const WriteRequest& request) if (request.proposal() < action.promised()) { WriteResponse response; + response.set_type(WriteResponse::REJECT); response.set_okay(false); response.set_proposal(action.promised()); response.set_position(request.position()); @@ -628,6 +659,7 @@ void ReplicaProcess::write(const UPID& from, const WriteRequest& request) if (persist(action)) { WriteResponse response; + response.set_type(WriteResponse::ACCEPT); response.set_okay(true); response.set_proposal(request.proposal()); response.set_position(request.position()); http://git-wip-us.apache.org/repos/asf/mesos/blob/82b6112c/src/log/replica.hpp ---------------------------------------------------------------------- diff --git a/src/log/replica.hpp b/src/log/replica.hpp index 70f415f..00a7636 100644 --- a/src/log/replica.hpp +++ b/src/log/replica.hpp @@ -60,7 +60,7 @@ public: // process will later decide if this replica can be re-allowed to // vote depending on the status of other replicas. explicit Replica(const std::string& path); - ~Replica(); + virtual ~Replica(); // Returns all the actions between the specified positions, unless // those positions are invalid, in which case returns an error. @@ -91,8 +91,10 @@ public: // Returns the highest implicit promise this replica has given. process::Future<uint64_t> promised() const; - // Updates the status of this replica. - process::Future<bool> update(const Metadata::Status& status); + // Updates the status of this replica. Returns true if status was + // updated successfully, false otherwise. Made "virtual" for + // mocking in tests. + virtual process::Future<bool> update(const Metadata::Status& status); // Returns the PID associated with this replica. process::PID<ReplicaProcess> pid() const; http://git-wip-us.apache.org/repos/asf/mesos/blob/82b6112c/src/messages/log.proto ---------------------------------------------------------------------- diff --git a/src/messages/log.proto b/src/messages/log.proto index d73b33f..514f403 100644 --- a/src/messages/log.proto +++ b/src/messages/log.proto @@ -126,18 +126,38 @@ message PromiseRequest { optional uint64 position = 2; } - -// Represents a "promise" response from a replica back to a proposer. -// A replica represents a NACK (because it has promised a proposer -// with a higher proposal number) by setting the okay field to false. +// Represents a promise response corresponding to a promise request. +// The kind of the response is given by the "type" field: +// +// 1. IGNORE: The recipient of the promise request was not in VOTING +// state, so it ignored the request. +// 2. REJECT: The recipient of the proposal has already promised a +// proposer with a higher proposal number. This is called a +// "NACK" in the code. +// 3. ACCEPT: The promise request was accepted. +// +// Before 0.26, we only sent responses for cases 2 and 3, so the +// 'okay' field was used to distinguish these responses. For backward +// compatibility, we continue setting 'okay' to false for both cases 1 +// and 2; this means old masters will treat IGNORE as a NACK: this +// might result in demoting the current coordinator, but that should +// be tolerable. TODO(neilc): Remove 'okay' in 0.27. +// // The 'proposal' is either the aforementioned higher proposal number -// when the response is a NACK, or the corresponding request's -// proposal number if it is an ACK. The replica either sends back the -// highest position it has recorded in the log (using the 'position' -// field) or the specific action (if any) it has at the position -// requested in PromiseRequest (using the 'action' field). +// (for case 2), or the corresponding request's proposal number (for +// cases 1 and 3). The replica either sends back the highest position +// it has recorded in the log (using the 'position' field) or the +// specific action (if any) it has at the position requested in +// PromiseRequest (using the 'action' field). message PromiseResponse { - required bool okay = 1; + enum Type { + ACCEPT = 1; + REJECT = 2; + IGNORE = 3; + } + + required bool okay = 1; // DEPRECATED + optional Type type = 5; required uint64 proposal = 2; optional uint64 position = 4; optional Action action = 3; @@ -159,16 +179,36 @@ message WriteRequest { } -// Represents a write response corresponding to a write request. A -// replica represents a NACK (because it has promised a proposer with -// a higher proposal number) by setting the okay field to false. If -// the proposer is a coordinator, then it has been demoted. The -// 'position' should always correspond to the position set in the +// Represents a write response corresponding to a write request. The +// kind of the response is given by the "type" field: +// +// 1. IGNORE: The recipient of the write request was not in VOTING +// state, so it ignored the request. +// 2. REJECT: The recipient of the proposal has already promised a +// proposer with a higher proposal number. This is called a +// "NACK" in the code. +// 3. ACCEPT: The promise request was accepted. +// +// Before 0.26, we only sent responses for cases 2 and 3, so the +// 'okay' field was used to distinguish these responses. For backward +// compatibility, we continue setting 'okay' to false for both cases 1 +// and 2; this means old masters will treat IGNORE as a NACK: this +// might result in demoting the current coordinator, but that should +// be tolerable. TODO(neilc): Remove 'okay' in 0.27. +// +// The 'position' should always correspond to the position set in the // request. The 'proposal' is either the same proposal number set in -// the request in the case of an ACK, or the higher proposal number -// this position has been promised to in the case of a NACK. +// the request (cases 1 and 3), or the higher proposal number +// this position has been promised to (case 2). message WriteResponse { - required bool okay = 1; + enum Type { + ACCEPT = 1; + REJECT = 2; + IGNORE = 3; + } + + required bool okay = 1; // DEPRECATED + optional Type type = 4; required uint64 proposal = 2; required uint64 position = 3; } http://git-wip-us.apache.org/repos/asf/mesos/blob/82b6112c/src/tests/log_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/log_tests.cpp b/src/tests/log_tests.cpp index 222a12e..bbf7a7e 100644 --- a/src/tests/log_tests.cpp +++ b/src/tests/log_tests.cpp @@ -69,6 +69,7 @@ using std::string; using testing::_; using testing::Eq; +using testing::Invoke; using testing::Return; namespace mesos { @@ -522,8 +523,8 @@ TEST_F(ReplicaTest, Restore) } -// This test verifies that a non-VOTING replica does not reply to -// promise or write requests. +// This test verifies that a non-VOTING replica replies to promise and +// write requests with an "ignored" response. TEST_F(ReplicaTest, NonVoting) { const string path = os::getcwd() + "/.log"; @@ -533,17 +534,14 @@ TEST_F(ReplicaTest, NonVoting) PromiseRequest promiseRequest; promiseRequest.set_proposal(2); - Future<PromiseResponse> promiseResponse = + Future<PromiseResponse> promiseResponse_ = protocol::promise(replica.pid(), promiseRequest); - // Flush the event queue to make sure that if the replica could - // reply to the promise request, the future 'promiseResponse' would - // be satisfied before the pending check below. - Clock::pause(); - Clock::settle(); - Clock::resume(); - - EXPECT_TRUE(promiseResponse.isPending()); + AWAIT_READY(promiseResponse_); + PromiseResponse promiseResponse = promiseResponse_.get(); + EXPECT_EQ(PromiseResponse::IGNORE, promiseResponse.type()); + EXPECT_FALSE(promiseResponse.okay()); + EXPECT_EQ(2u, promiseResponse.proposal()); WriteRequest writeRequest; writeRequest.set_proposal(3); @@ -551,17 +549,15 @@ TEST_F(ReplicaTest, NonVoting) writeRequest.set_type(Action::APPEND); writeRequest.mutable_append()->set_bytes("hello world"); - Future<WriteResponse> writeResponse = + Future<WriteResponse> writeResponse_ = protocol::write(replica.pid(), writeRequest); - // Flush the event queue to make sure that if the replica could - // reply to the write request, the future 'writeResponse' would be - // satisfied before the pending check below. - Clock::pause(); - Clock::settle(); - Clock::resume(); - - EXPECT_TRUE(writeResponse.isPending()); + AWAIT_READY(writeResponse_); + WriteResponse writeResponse = writeResponse_.get(); + EXPECT_EQ(WriteResponse::IGNORE, writeResponse.type()); + EXPECT_FALSE(writeResponse.okay()); + EXPECT_EQ(3u, writeResponse.proposal()); + EXPECT_EQ(1u, writeResponse.position()); } @@ -1507,6 +1503,142 @@ TEST_F(CoordinatorTest, TruncateLearnedFill) } +class MockReplica : public Replica +{ +public: + explicit MockReplica(const string& path) : + Replica(path) {} + + virtual ~MockReplica() {} + + MOCK_METHOD1(update, Future<bool>(const Metadata::Status& status)); + + Future<bool> _update(const Metadata::Status& status) + { + return Replica::update(status); + } +}; + + +// If a coordinator tries to get elected while there is not a quorum +// of replicas in VOTING state, the non-VOTING replicas should +// instruct the coordinator that they have ignored the coordinator's +// request, so the coordinator can promptly retry. MESOS-3280. +TEST_F(CoordinatorTest, RecoveryRace) +{ + const string path1 = os::getcwd() + "/.log1"; + const string path2 = os::getcwd() + "/.log2"; + const string path3 = os::getcwd() + "/.log3"; + + MockReplica* replica1(new MockReplica(path1)); + MockReplica* replica2(new MockReplica(path2)); + MockReplica* replica3(new MockReplica(path3)); + + set<UPID> pids{replica1->pid(), replica2->pid(), replica3->pid()}; + Shared<Network> network(new Network(pids)); + + // Set when each replica transitions from EMPTY -> STARTING; the + // replica will then block until the associated "continue" promise + // is set. + Future<Nothing> replica1Starting; + Future<Nothing> replica2Starting; + Future<Nothing> replica3Starting; + process::Promise<bool> replica1ContinueStarting; + process::Promise<bool> replica2ContinueStarting; + process::Promise<bool> replica3ContinueStarting; + + // Set when each replica transitions from STARTING -> VOTING. + Future<Nothing> replica1Voting; + Future<Nothing> replica2Voting; + process::Promise<bool> replica1ContinueVoting; + process::Promise<bool> replica2ContinueVoting; + + // Arrange mocks to allow us to block and unblock each replica when + // it changes state. + // TODO(neilc): Refactor this to reduce duplicated code. + EXPECT_CALL(*replica1, update(_)) + .WillOnce(DoAll(IgnoreResult(Invoke(replica1, &MockReplica::_update)), + FutureSatisfy(&replica1Starting), + Return(replica1ContinueStarting.future()))) + .WillOnce(DoAll(IgnoreResult(Invoke(replica1, &MockReplica::_update)), + FutureSatisfy(&replica1Voting), + Return(replica1ContinueVoting.future()))); + EXPECT_CALL(*replica2, update(_)) + .WillOnce(DoAll(IgnoreResult(Invoke(replica2, &MockReplica::_update)), + FutureSatisfy(&replica2Starting), + Return(replica2ContinueStarting.future()))) + .WillOnce(DoAll(IgnoreResult(Invoke(replica2, &MockReplica::_update)), + FutureSatisfy(&replica2Voting), + Return(replica2ContinueVoting.future()))); + EXPECT_CALL(*replica3, update(_)) + .WillOnce(DoAll(IgnoreResult(Invoke(replica3, &MockReplica::_update)), + FutureSatisfy(&replica3Starting), + Return(replica3ContinueStarting.future()))) + .WillRepeatedly(Invoke(replica3, &MockReplica::_update)); + + Future<Owned<Replica>> recovering1 = + recover(2, Owned<Replica>(replica1), network, true); + Future<Owned<Replica>> recovering2 = + recover(2, Owned<Replica>(replica2), network, true); + Future<Owned<Replica>> recovering3 = + recover(2, Owned<Replica>(replica3), network, true); + + AWAIT_READY(replica1Starting); + AWAIT_READY(replica2Starting); + AWAIT_READY(replica3Starting); + + // Allow replica1 to advance from STARTING -> VOTING. + // TODO(neilc): Due to an apparent bug in FutureResult (MESOS-3812), + // we can't save the return value of _update() when setting up the + // mocks above. Hence, we have to assume that _update() returned + // true, which we then use to unblock the process. + replica1ContinueStarting.set(true); + AWAIT_READY(replica1Voting); + replica1ContinueVoting.set(true); + + AWAIT_READY(recovering1); + Owned<Replica> shared_ = recovering1.get(); + Shared<Replica> shared = shared_.share(); + + // Electing a coordinator should fail because we don't have a quorum + // of replicas in VOTING status. + { + Coordinator coord1(2, shared, network); + Future<Option<uint64_t>> electing = coord1.elect(); + AWAIT_READY(electing); + ASSERT_NONE(electing.get()); + } + + // Allow replica2 to advance from STARTING -> VOTING. + replica2ContinueStarting.set(true); + AWAIT_READY(replica2Voting); + replica2ContinueVoting.set(true); + + AWAIT_READY(recovering2); + + // Electing a coordinator should now succeed. + Coordinator coord2(2, shared, network); + { + Future<Option<uint64_t>> electing = coord2.elect(); + AWAIT_READY(electing); + EXPECT_SOME_EQ(0u, electing.get()); + } + + // Allow replica3 to advance from STARTING -> RECOVERING -> VOTING. + // TODO(neilc): Transition to RECOVERING is dubious and should + // probably be omitted. + replica3ContinueStarting.set(true); + + AWAIT_READY(recovering3); + + { + Future<Option<uint64_t>> appending = coord2.append("hello world"); + AWAIT_READY(appending); + EXPECT_SOME_EQ(1u, appending.get()); + } +} + + class RecoverTest : public TemporaryDirectoryTest { protected:
