consensus: Fix NON_VOTER ack-counting bug This patch fixes an issue where we were not differentiating between replicating to voters and non-voters.
This enables the test written by Alexey and also makes some changes to it. The test fails without this patch and passes with the patch. Tests added: * Added a unit test in consensus_queue-test * Updated and enabled the system test in raft_consensus_nonvoter-itest Change-Id: I13143e9bb4b76af3fd6dada28fcec05b27d24476 Reviewed-on: http://gerrit.cloudera.org:8080/8868 Tested-by: Kudu Jenkins Reviewed-by: Alexey Serbin <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/kudu/repo Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/1277f69a Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/1277f69a Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/1277f69a Branch: refs/heads/master Commit: 1277f69a1feb3715750552991bc19444282f652e Parents: 710b238 Author: Mike Percy <[email protected]> Authored: Mon Dec 18 15:13:16 2017 -0800 Committer: Mike Percy <[email protected]> Committed: Mon Jan 8 19:38:17 2018 +0000 ---------------------------------------------------------------------- src/kudu/consensus/consensus-test-util.h | 13 ++- src/kudu/consensus/consensus_peers-test.cc | 1 + src/kudu/consensus/consensus_peers.cc | 2 +- src/kudu/consensus/consensus_queue-test.cc | 91 ++++++++++++++-- src/kudu/consensus/consensus_queue.cc | 105 +++++++++++++------ src/kudu/consensus/consensus_queue.h | 34 ++++-- src/kudu/consensus/raft_consensus.cc | 8 +- .../raft_consensus_nonvoter-itest.cc | 63 ++++++----- 8 files changed, 231 insertions(+), 86 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kudu/blob/1277f69a/src/kudu/consensus/consensus-test-util.h ---------------------------------------------------------------------- diff --git a/src/kudu/consensus/consensus-test-util.h b/src/kudu/consensus/consensus-test-util.h index 409841b..9cb0473 100644 --- a/src/kudu/consensus/consensus-test-util.h +++ b/src/kudu/consensus/consensus-test-util.h @@ -80,6 +80,7 @@ inline gscoped_ptr<ReplicateMsg> CreateDummyReplicate(int64_t term, inline RaftPeerPB FakeRaftPeerPB(const std::string& uuid) { RaftPeerPB peer_pb; peer_pb.set_permanent_uuid(uuid); + peer_pb.set_member_type(RaftPeerPB::VOTER); peer_pb.mutable_last_known_addr()->set_host(strings::Substitute( "$0-fake-hostname", CURRENT_TEST_NAME())); peer_pb.mutable_last_known_addr()->set_port(0); @@ -107,9 +108,9 @@ inline void AppendReplicateMessagesToQueue( } // Builds a configuration of 'num' voters. -inline RaftConfigPB BuildRaftConfigPBForTests(int num) { +inline RaftConfigPB BuildRaftConfigPBForTests(int num_voters, int num_non_voters = 0) { RaftConfigPB raft_config; - for (int i = 0; i < num; i++) { + for (int i = 0; i < num_voters; i++) { RaftPeerPB* peer_pb = raft_config.add_peers(); peer_pb->set_member_type(RaftPeerPB::VOTER); peer_pb->set_permanent_uuid(strings::Substitute("peer-$0", i)); @@ -117,6 +118,14 @@ inline RaftConfigPB BuildRaftConfigPBForTests(int num) { hp->set_host(strings::Substitute("peer-$0.fake-domain-for-tests", i)); hp->set_port(0); } + for (int i = 0; i < num_non_voters; i++) { + RaftPeerPB* peer_pb = raft_config.add_peers(); + peer_pb->set_member_type(RaftPeerPB::NON_VOTER); + peer_pb->set_permanent_uuid(strings::Substitute("non-voter-peer-$0", i)); + HostPortPB* hp = peer_pb->mutable_last_known_addr(); + hp->set_host(strings::Substitute("non-voter-peer-$0.fake-domain-for-tests", i)); + hp->set_port(0); + } return raft_config; } http://git-wip-us.apache.org/repos/asf/kudu/blob/1277f69a/src/kudu/consensus/consensus_peers-test.cc ---------------------------------------------------------------------- diff --git a/src/kudu/consensus/consensus_peers-test.cc b/src/kudu/consensus/consensus_peers-test.cc index 7f4eb57..f10cb34 100644 --- a/src/kudu/consensus/consensus_peers-test.cc +++ b/src/kudu/consensus/consensus_peers-test.cc @@ -126,6 +126,7 @@ class ConsensusPeersTest : public KuduTest { shared_ptr<Peer>* peer) { RaftPeerPB peer_pb; peer_pb.set_permanent_uuid(peer_name); + peer_pb.set_member_type(RaftPeerPB::VOTER); auto proxy_ptr = new DelayablePeerProxy<NoOpTestPeerProxy>( raft_pool_.get(), new NoOpTestPeerProxy(raft_pool_.get(), peer_pb)); gscoped_ptr<PeerProxy> proxy(proxy_ptr); http://git-wip-us.apache.org/repos/asf/kudu/blob/1277f69a/src/kudu/consensus/consensus_peers.cc ---------------------------------------------------------------------- diff --git a/src/kudu/consensus/consensus_peers.cc b/src/kudu/consensus/consensus_peers.cc index ffa373d..6d68528 100644 --- a/src/kudu/consensus/consensus_peers.cc +++ b/src/kudu/consensus/consensus_peers.cc @@ -142,7 +142,7 @@ Peer::Peer(RaftPeerPB peer_pb, Status Peer::Init() { { std::lock_guard<simple_spinlock> l(peer_lock_); - queue_->TrackPeer(peer_pb_.permanent_uuid()); + queue_->TrackPeer(peer_pb_); } // Capture a weak_ptr reference into the functor so it can safely handle http://git-wip-us.apache.org/repos/asf/kudu/blob/1277f69a/src/kudu/consensus/consensus_queue-test.cc ---------------------------------------------------------------------- diff --git a/src/kudu/consensus/consensus_queue-test.cc b/src/kudu/consensus/consensus_queue-test.cc index 62a2345..c8dc086 100644 --- a/src/kudu/consensus/consensus_queue-test.cc +++ b/src/kudu/consensus/consensus_queue-test.cc @@ -126,6 +126,14 @@ class ConsensusQueueTest : public KuduTest { payload_size).release())); } + RaftPeerPB MakePeer(const std::string& peer_uuid, + RaftPeerPB::MemberType member_type) { + RaftPeerPB peer_pb; + *peer_pb.mutable_permanent_uuid() = peer_uuid; + peer_pb.set_member_type(member_type); + return peer_pb; + } + // Updates the peer's watermark in the queue so that it matches // the operation we want, since the queue always assumes that // when a peer gets tracked it's always tracked starting at the @@ -137,7 +145,7 @@ class ConsensusQueueTest : public KuduTest { int last_committed_idx, bool* more_pending) { - queue_->TrackPeer(kPeerUuid); + queue_->TrackPeer(MakePeer(kPeerUuid, RaftPeerPB::VOTER)); response->set_responder_uuid(kPeerUuid); // Ask for a request. The queue assumes the peer is up-to-date so @@ -406,10 +414,10 @@ TEST_F(ConsensusQueueTest, TestPeersDontAckBeyondWatermarks) { TEST_F(ConsensusQueueTest, TestQueueAdvancesCommittedIndex) { queue_->SetLeaderMode(kMinimumOpIdIndex, kMinimumTerm, BuildRaftConfigPBForTests(5)); // Track 4 additional peers (in addition to the local peer) - queue_->TrackPeer("peer-1"); - queue_->TrackPeer("peer-2"); - queue_->TrackPeer("peer-3"); - queue_->TrackPeer("peer-4"); + queue_->TrackPeer(MakePeer("peer-1", RaftPeerPB::VOTER)); + queue_->TrackPeer(MakePeer("peer-2", RaftPeerPB::VOTER)); + queue_->TrackPeer(MakePeer("peer-3", RaftPeerPB::VOTER)); + queue_->TrackPeer(MakePeer("peer-4", RaftPeerPB::VOTER)); // Append 10 messages to the queue. // This should add messages 0.1 -> 0.7, 1.8 -> 1.10 to the queue. @@ -479,6 +487,68 @@ TEST_F(ConsensusQueueTest, TestQueueAdvancesCommittedIndex) { ASSERT_EQ(queue_->GetAllReplicatedIndex(), 5); } +// Ensure that the acks for a non-voter don't count toward the majority. +TEST_F(ConsensusQueueTest, TestNonVoterAcksDontCountTowardMajority) { + const auto kOtherVoterPeer = "peer-1"; + const auto kNonVoterPeer = "non-voter-peer-0"; + + // 1. Add a non-voter to the config where there are 2 voters. + queue_->SetLeaderMode(kMinimumOpIdIndex, kMinimumTerm, + BuildRaftConfigPBForTests(/*num_voters=*/ 2, + /*num_non_voters=*/ 1)); + // Track 2 additional peers (in addition to the local peer) + queue_->TrackPeer(MakePeer(kOtherVoterPeer, RaftPeerPB::VOTER)); + queue_->TrackPeer(MakePeer(kNonVoterPeer, RaftPeerPB::NON_VOTER)); + + // 2. Add some writes. Only the local leader immediately acks them, which is + // not enough to commit in a 2-voter + 1 non-voter config. + // + // Append 10 messages to the queue. + // This should add messages 0.1 -> 0.7, 1.8 -> 1.10 to the queue. + const int kNumMessages = 10; + AppendReplicateMessagesToQueue(queue_.get(), clock_, + /*first=*/ 1, /*count=*/ kNumMessages); + WaitForLocalPeerToAckIndex(kNumMessages); + + // Since only the local log has acked at this point, the committed_index + // should be 0. + const int64_t kNoneCommittedIndex = 0; + ASSERT_EQ(kNoneCommittedIndex, queue_->GetCommittedIndex()); + + // 3. Ack the operations from the NON_VOTER peer. The writes will not have + // been committed yet, because the 2nd VOTER has not yet acked them. + ConsensusResponsePB response; + response.set_responder_uuid(kNonVoterPeer); + const int64_t kCurrentTerm = 1; + response.set_responder_term(kCurrentTerm); + SetLastReceivedAndLastCommitted(&response, + /*last_received=*/ MakeOpId(kCurrentTerm, kNumMessages), + /*last_committed_idx=*/ kNoneCommittedIndex); + + bool more_pending; + queue_->ResponseFromPeer(response.responder_uuid(), response, &more_pending); + ASSERT_FALSE(more_pending); + + // Committed index should be the same. + ASSERT_EQ(kNoneCommittedIndex, queue_->GetCommittedIndex()); + + // 4. Send an identical ack from the 2nd VOTER peer. This should cause the + // operation to be committed. + response.set_responder_uuid(kOtherVoterPeer); + queue_->ResponseFromPeer(response.responder_uuid(), response, &more_pending); + ASSERT_TRUE(more_pending); // The committed index has increased. + + // The committed index should include the full set of ops now. + ASSERT_EQ(kNumMessages, queue_->GetCommittedIndex()); + + SetLastReceivedAndLastCommitted(&response, + /*last_received=*/ MakeOpId(kCurrentTerm, kNumMessages), + /*last_committed_idx=*/ kNumMessages); + + queue_->ResponseFromPeer(response.responder_uuid(), response, &more_pending); + ASSERT_FALSE(more_pending); +} + // In this test we append a sequence of operations to a log // and then start tracking a peer whose first required operation // is before the first operation in the queue. @@ -588,7 +658,7 @@ TEST_F(ConsensusQueueTest, TestQueueHandlesOperationOverwriting) { response.set_responder_uuid(kPeerUuid); bool more_pending = false; - queue_->TrackPeer(kPeerUuid); + queue_->TrackPeer(MakePeer(kPeerUuid, RaftPeerPB::VOTER)); // Ask for a request. The queue assumes the peer is up-to-date so // this should contain no operations. @@ -653,7 +723,7 @@ TEST_F(ConsensusQueueTest, TestQueueHandlesOperationOverwriting) { // operations, which would cause a check failure on the write immediately // following the overwriting write. TEST_F(ConsensusQueueTest, TestQueueMovesWatermarksBackward) { - queue_->SetNonLeaderMode(); + queue_->SetNonLeaderMode(BuildRaftConfigPBForTests(3)); // Append a bunch of messages and update as if they were also appeneded to the leader. queue_->UpdateLastIndexAppendedToLeader(10); AppendReplicateMessagesToQueue(queue_.get(), clock_, 1, 10); @@ -820,7 +890,7 @@ TEST_F(ConsensusQueueTest, TestTriggerTabletCopyIfTabletNotFound) { ConsensusRequestPB request; ConsensusResponsePB response; response.set_responder_uuid(kPeerUuid); - queue_->TrackPeer(kPeerUuid); + queue_->TrackPeer(MakePeer(kPeerUuid, RaftPeerPB::VOTER)); // Create request for new peer. vector<ReplicateRefPtr> refs; @@ -848,7 +918,7 @@ TEST_F(ConsensusQueueTest, TestTriggerTabletCopyIfTabletNotFound) { } TEST_F(ConsensusQueueTest, TestFollowerCommittedIndexAndMetrics) { - queue_->SetNonLeaderMode(); + queue_->SetNonLeaderMode(BuildRaftConfigPBForTests(3)); // Emulate a follower sending a request to replicate 10 messages. queue_->UpdateLastIndexAppendedToLeader(10); @@ -861,7 +931,8 @@ TEST_F(ConsensusQueueTest, TestFollowerCommittedIndexAndMetrics) { // Update the committed index. In real life, this would be done by the consensus // implementation when it receives an updated committed index from the leader. - queue_->UpdateFollowerWatermarks(10, 10); + queue_->UpdateFollowerWatermarks(/*committed_index=*/ 10, + /*all_replicated_index=*/ 10); ASSERT_EQ(10, queue_->GetCommittedIndex()); // Check the metrics have the right values based on the updated committed index. http://git-wip-us.apache.org/repos/asf/kudu/blob/1277f69a/src/kudu/consensus/consensus_queue.cc ---------------------------------------------------------------------- diff --git a/src/kudu/consensus/consensus_queue.cc b/src/kudu/consensus/consensus_queue.cc index 6d09b33..f6d0db1 100644 --- a/src/kudu/consensus/consensus_queue.cc +++ b/src/kudu/consensus/consensus_queue.cc @@ -123,7 +123,7 @@ const char* PeerStatusToString(PeerStatus p) { std::string PeerMessageQueue::TrackedPeer::ToString() const { return Substitute("Peer: $0, Status: $1, Last received: $2, Next index: $3, " "Last known committed idx: $4, Time since last communication: $5", - uuid, + SecureShortDebugString(peer_pb), PeerStatusToString(last_exchange_status), OpIdToString(last_received), next_index, last_known_committed_index, @@ -170,7 +170,6 @@ PeerMessageQueue::PeerMessageQueue(scoped_refptr<MetricEntity> metric_entity, queue_state_.state = kQueueOpen; // TODO(mpercy): Merge LogCache::Init() with its constructor. log_cache_.Init(queue_state_.last_appended); - TrackPeer(local_peer_pb_.permanent_uuid()); } void PeerMessageQueue::SetLeaderMode(int64_t committed_index, @@ -186,49 +185,53 @@ void PeerMessageQueue::SetLeaderMode(int64_t committed_index, queue_state_.committed_index = committed_index; queue_state_.majority_replicated_index = committed_index; queue_state_.active_config.reset(new RaftConfigPB(active_config)); - CHECK(IsRaftConfigVoter(local_peer_pb_.permanent_uuid(), *queue_state_.active_config)) - << SecureShortDebugString(local_peer_pb_) << " not a voter in config: " - << SecureShortDebugString(*queue_state_.active_config); queue_state_.majority_size_ = MajoritySize(CountVoters(*queue_state_.active_config)); queue_state_.mode = LEADER; - LOG_WITH_PREFIX_UNLOCKED(INFO) << "Queue going to LEADER mode. State: " - << queue_state_.ToString(); + TrackLocalPeerUnlocked(); CheckPeersInActiveConfigIfLeaderUnlocked(); + LOG_WITH_PREFIX_UNLOCKED(INFO) << "Queue going to LEADER mode. State: " + << queue_state_.ToString(); + // Reset last communication time with all peers to reset the clock on the // failure timeout. - MonoTime now(MonoTime::Now()); + const auto now = MonoTime::Now(); for (const PeersMap::value_type& entry : peers_map_) { entry.second->last_communication_time = now; } time_manager_->SetLeaderMode(); } -void PeerMessageQueue::SetNonLeaderMode() { +void PeerMessageQueue::SetNonLeaderMode(const RaftConfigPB& active_config) { std::lock_guard<simple_spinlock> lock(queue_lock_); - queue_state_.active_config.reset(); + queue_state_.active_config.reset(new RaftConfigPB(active_config)); queue_state_.mode = NON_LEADER; queue_state_.majority_size_ = -1; // Update this when stepping down, since it doesn't get tracked as LEADER. queue_state_.last_idx_appended_to_leader = queue_state_.last_appended.index(); + + TrackLocalPeerUnlocked(); + LOG_WITH_PREFIX_UNLOCKED(INFO) << "Queue going to NON_LEADER mode. State: " - << queue_state_.ToString(); + << queue_state_.ToString(); + time_manager_->SetNonLeaderMode(); } -void PeerMessageQueue::TrackPeer(const string& uuid) { +void PeerMessageQueue::TrackPeer(const RaftPeerPB& peer_pb) { std::lock_guard<simple_spinlock> lock(queue_lock_); - TrackPeerUnlocked(uuid); + TrackPeerUnlocked(peer_pb); } -void PeerMessageQueue::TrackPeerUnlocked(const string& uuid) { - CHECK(!uuid.empty()) << "Got request to track peer with empty UUID"; +void PeerMessageQueue::TrackPeerUnlocked(const RaftPeerPB& peer_pb) { + CHECK(!peer_pb.permanent_uuid().empty()) << SecureShortDebugString(peer_pb); + CHECK(peer_pb.has_member_type()) << SecureShortDebugString(peer_pb); DCHECK(queue_lock_.is_locked()); DCHECK_EQ(queue_state_.state, kQueueOpen); - TrackedPeer* tracked_peer = new TrackedPeer(uuid); + TrackedPeer* tracked_peer = new TrackedPeer(peer_pb); // We don't know the last operation received by the peer so, following the // Raft protocol, we set next_index to one past the end of our own log. This // way, if calling this method is the result of a successful leader election @@ -238,7 +241,7 @@ void PeerMessageQueue::TrackPeerUnlocked(const string& uuid) { // does not have a log that matches ours, the normal queue negotiation // process will eventually find the right point to resume from. tracked_peer->next_index = queue_state_.last_appended.index() + 1; - InsertOrDie(&peers_map_, uuid, tracked_peer); + InsertOrDie(&peers_map_, tracked_peer->uuid(), tracked_peer); CheckPeersInActiveConfigIfLeaderUnlocked(); @@ -249,10 +252,39 @@ void PeerMessageQueue::TrackPeerUnlocked(const string& uuid) { void PeerMessageQueue::UntrackPeer(const string& uuid) { std::lock_guard<simple_spinlock> lock(queue_lock_); + UntrackPeerUnlocked(uuid); +} + +void PeerMessageQueue::UntrackPeerUnlocked(const string& uuid) { + DCHECK(queue_lock_.is_locked()); TrackedPeer* peer = EraseKeyReturnValuePtr(&peers_map_, uuid); - if (peer != nullptr) { - delete peer; + delete peer; // Deleting a nullptr is safe. +} + +void PeerMessageQueue::TrackLocalPeerUnlocked() { + DCHECK(queue_lock_.is_locked()); + RaftPeerPB* local_peer_in_config; + Status s = GetRaftConfigMember(queue_state_.active_config.get(), + local_peer_pb_.permanent_uuid(), + &local_peer_in_config); + auto local_copy = local_peer_pb_; + if (!s.ok()) { + // The local peer is not a member of the config. The queue requires the + // 'member_type' field to be set for any tracked peer, so we explicitly + // mark the local peer as a NON_VOTER. This case is only possible when the + // local peer is not the leader, so the choice is not particularly + // important, but NON_VOTER is the most reasonable option. + local_copy.set_member_type(RaftPeerPB::NON_VOTER); + local_peer_in_config = &local_copy; + } + CHECK(local_peer_in_config->member_type() == RaftPeerPB::VOTER || + queue_state_.mode != LEADER) + << "local peer " << local_peer_pb_.permanent_uuid() + << " is not a voter in config: " << queue_state_.ToString(); + if (ContainsKey(peers_map_, local_peer_pb_.permanent_uuid())) { + UntrackPeerUnlocked(local_peer_pb_.permanent_uuid()); } + TrackPeerUnlocked(*local_peer_in_config); } unordered_map<string, HealthReportPB> PeerMessageQueue::ReportHealthOfPeers() const { @@ -471,15 +503,15 @@ void PeerMessageQueue::UpdatePeerHealthUnlocked(TrackedPeer* peer) { string error_msg; if (overall_health_status == HealthReportPB::FAILED) { if (peer->last_exchange_status == PeerStatus::TABLET_FAILED) { - error_msg = Substitute("The tablet replica hosted on peer $0 has failed", peer->uuid); + error_msg = Substitute("The tablet replica hosted on peer $0 has failed", peer->uuid()); } else if (!peer->wal_catchup_possible) { error_msg = Substitute("The logs necessary to catch up peer $0 have been " "garbage collected. The replica will never be able " - "to catch up", peer->uuid); + "to catch up", peer->uuid()); } else { error_msg = Substitute("Leader has been unable to successfully communicate " "with peer $0 for more than $1 seconds ($2)", - peer->uuid, + peer->uuid(), FLAGS_follower_unavailable_considered_failed_sec, (MonoTime::Now() - peer->last_communication_time).ToString()); } @@ -499,8 +531,8 @@ void PeerMessageQueue::UpdatePeerHealthUnlocked(TrackedPeer* peer) { } } else { if (overall_health_status == HealthReportPB::FAILED && - SafeToEvictUnlocked(peer->uuid)) { - NotifyObserversOfFailedFollower(peer->uuid, queue_state_.current_term, error_msg); + SafeToEvictUnlocked(peer->uuid())) { + NotifyObserversOfFailedFollower(peer->uuid(), queue_state_.current_term, error_msg); } } } @@ -711,6 +743,7 @@ void PeerMessageQueue::AdvanceQueueWatermark(const char* type, const OpId& replicated_before, const OpId& replicated_after, int num_peers_required, + ReplicaTypes replica_types, const TrackedPeer* who_caused) { if (VLOG_IS_ON(2)) { @@ -729,7 +762,11 @@ void PeerMessageQueue::AdvanceQueueWatermark(const char* type, // will be the new 'watermark'. vector<int64_t> watermarks; for (const PeersMap::value_type& peer : peers_map_) { - // TODO: The fact that we only consider peers whose last exchange was + if (replica_types == VOTER_REPLICAS && + peer.second->peer_pb.member_type() != RaftPeerPB::VOTER) { + continue; + } + // TODO(todd): The fact that we only consider peers whose last exchange was // successful can cause the "all_replicated" watermark to lag behind // farther than necessary. For example: // - local peer has replicated opid 100 @@ -922,7 +959,7 @@ void PeerMessageQueue::ResponseFromPeer(const std::string& peer_uuid, // in the TrackedPeer data structure. The downside is that we'd end up // with multiple sources of truth that would need to be kept in sync. Status s = GetRaftConfigMember(DCHECK_NOTNULL(queue_state_.active_config.get()), - peer->uuid, &peer_pb); + peer->uuid(), &peer_pb); if (s.ok() && peer_pb && peer_pb->member_type() == RaftPeerPB::NON_VOTER && @@ -936,7 +973,7 @@ void PeerMessageQueue::ResponseFromPeer(const std::string& peer_uuid, // committed config's opid_index because if we're in the middle of a // config change, this requested config change will be rejected // anyway. - NotifyObserversOfPeerToPromote(peer->uuid, + NotifyObserversOfPeerToPromote(peer->uuid(), queue_state_.current_term, queue_state_.active_config->opid_index()); } @@ -1022,17 +1059,19 @@ void PeerMessageQueue::ResponseFromPeer(const std::string& peer_uuid, // Advance the majority replicated index. AdvanceQueueWatermark("majority_replicated", &queue_state_.majority_replicated_index, - previous.last_received, - peer->last_received, - queue_state_.majority_size_, + /*replicated_before=*/ previous.last_received, + /*replicated_after=*/ peer->last_received, + /*num_peers_required=*/ queue_state_.majority_size_, + VOTER_REPLICAS, peer); // Advance the all replicated index. AdvanceQueueWatermark("all_replicated", &queue_state_.all_replicated_index, - previous.last_received, - peer->last_received, - peers_map_.size(), + /*replicated_before=*/ previous.last_received, + /*replicated_after=*/ peer->last_received, + /*num_peers_required=*/ peers_map_.size(), + ALL_REPLICAS, peer); // If the majority-replicated index is in our current term, http://git-wip-us.apache.org/repos/asf/kudu/blob/1277f69a/src/kudu/consensus/consensus_queue.h ---------------------------------------------------------------------- diff --git a/src/kudu/consensus/consensus_queue.h b/src/kudu/consensus/consensus_queue.h index 59d0f19..0808c69 100644 --- a/src/kudu/consensus/consensus_queue.h +++ b/src/kudu/consensus/consensus_queue.h @@ -113,8 +113,8 @@ const char* PeerStatusToString(PeerStatus p); class PeerMessageQueue { public: struct TrackedPeer { - explicit TrackedPeer(std::string uuid) - : uuid(std::move(uuid)), + explicit TrackedPeer(RaftPeerPB peer_pb) + : peer_pb(std::move(peer_pb)), next_index(kInvalidOpIdIndex), last_received(MinimumOpId()), last_known_committed_index(MinimumOpId().index()), @@ -136,10 +136,13 @@ class PeerMessageQueue { last_seen_term_ = term; } + const std::string& uuid() const { + return peer_pb.permanent_uuid(); + } + std::string ToString() const; - // UUID of the peer. - std::string uuid; + RaftPeerPB peer_pb; // Next index to send to the peer. // This corresponds to "nextIndex" as specified in Raft. @@ -212,10 +215,10 @@ class PeerMessageQueue { // be tracked so that the cache is only evicted when the peers no longer need // the operations but the queue will no longer advance the majority replicated // index or notify observers of its advancement. - void SetNonLeaderMode(); + void SetNonLeaderMode(const RaftConfigPB& active_config); // Makes the queue track this peer. - void TrackPeer(const std::string& uuid); + void TrackPeer(const RaftPeerPB& peer_pb); // Makes the queue untrack this peer. void UntrackPeer(const std::string& uuid); @@ -378,6 +381,12 @@ class PeerMessageQueue { kQueueClosed }; + // Types of replicas to count when advancing a queue watermark. + enum ReplicaTypes { + ALL_REPLICAS, + VOTER_REPLICAS, + }; + struct QueueState { // The first operation that has been replicated to all currently @@ -485,7 +494,13 @@ class PeerMessageQueue { // 'preceding_first_op_in_queue_' if the queue is empty. const OpId& GetLastOp() const; - void TrackPeerUnlocked(const std::string& uuid); + void TrackPeerUnlocked(const RaftPeerPB& peer_pb); + + void UntrackPeerUnlocked(const std::string& uuid); + + // We need the local peer in the config because it contains the current + // 'member_type' of the local node while 'local_peer_pb_' does not. + void TrackLocalPeerUnlocked(); // Checks that if the queue is in LEADER mode then all registered peers are // in the active config. Crashes with a FATAL log message if this invariant @@ -498,11 +513,16 @@ class PeerMessageQueue { const Status& status); // Advances 'watermark' to the smallest op that 'num_peers_required' have. + // If 'replica_types' is set to VOTER_REPLICAS, the 'num_peers_required' is + // interpreted as "number of voters required". If 'replica_types' is set to + // ALL_REPLICAS, 'num_peers_required' counts any peer, regardless of its + // voting status. void AdvanceQueueWatermark(const char* type, int64_t* watermark, const OpId& replicated_before, const OpId& replicated_after, int num_peers_required, + ReplicaTypes replica_types, const TrackedPeer* who_caused); std::vector<PeerMessageQueueObserver*> observers_; http://git-wip-us.apache.org/repos/asf/kudu/blob/1277f69a/src/kudu/consensus/raft_consensus.cc ---------------------------------------------------------------------- diff --git a/src/kudu/consensus/raft_consensus.cc b/src/kudu/consensus/raft_consensus.cc index cb3118e..0892f9f 100644 --- a/src/kudu/consensus/raft_consensus.cc +++ b/src/kudu/consensus/raft_consensus.cc @@ -604,12 +604,12 @@ Status RaftConsensus::BecomeReplicaUnlocked(boost::optional<MonoDelta> fd_delta) // Now that we're a replica, we can allow voting for other nodes. withhold_votes_until_ = MonoTime::Min(); + // Deregister ourselves from the queue. We no longer need to track what gets + // replicated since we're stepping down. queue_->UnRegisterObserver(this); - // Deregister ourselves from the queue. We don't care what get's replicated, since - // we're stepping down. - queue_->SetNonLeaderMode(); - + queue_->SetNonLeaderMode(cmeta_->ActiveConfig()); peer_manager_->Close(); + return Status::OK(); } http://git-wip-us.apache.org/repos/asf/kudu/blob/1277f69a/src/kudu/integration-tests/raft_consensus_nonvoter-itest.cc ---------------------------------------------------------------------- diff --git a/src/kudu/integration-tests/raft_consensus_nonvoter-itest.cc b/src/kudu/integration-tests/raft_consensus_nonvoter-itest.cc index 740c559..f0350dc 100644 --- a/src/kudu/integration-tests/raft_consensus_nonvoter-itest.cc +++ b/src/kudu/integration-tests/raft_consensus_nonvoter-itest.cc @@ -20,6 +20,7 @@ #include <numeric> #include <ostream> #include <string> +#include <thread> #include <unordered_map> #include <utility> #include <vector> @@ -1724,7 +1725,7 @@ TEST_F(RaftConsensusNonVoterITest, RestartClusterWithNonVoter) { // The test scenario is simple: try to make a configuration change in a 3 voter // Raft cluster, adding a new non-voter replica, when a majority of voters // is not online. Make sure the configuration change is not committed. -TEST_F(RaftConsensusNonVoterITest, DISABLED_NonVoterReplicasInConsensusQueue) { +TEST_F(RaftConsensusNonVoterITest, NonVoterReplicasInConsensusQueue) { if (!AllowSlowTests()) { LOG(WARNING) << "test is skipped; set KUDU_ALLOW_SLOW_TESTS=1 to run"; return; @@ -1775,50 +1776,54 @@ TEST_F(RaftConsensusNonVoterITest, DISABLED_NonVoterReplicasInConsensusQueue) { // Pause all but the leader replica and try to add a new non-voter into the // configuration. It should not pass. + LOG(INFO) << "Getting leader replica..."; TServerDetails* leader; ASSERT_OK(GetLeaderReplicaWithRetries(tablet_id, &leader)); - const auto do_resume = [&] { - for (auto& e : replica_servers) { - const auto& uuid = e.first; - if (uuid == leader->uuid()) { - continue; - } - ExternalTabletServer* ts = cluster_->tablet_server_by_uuid(uuid); - ASSERT_OK(ts->Resume()); - } - }; - auto resumer = MakeScopedCleanup([&] { - do_resume(); - }); - + LOG(INFO) << "Shutting down non-leader replicas..."; for (auto& e : replica_servers) { const auto& uuid = e.first; - if (uuid == leader->uuid()) { - continue; - } - ExternalTabletServer* ts = cluster_->tablet_server_by_uuid(uuid); - ASSERT_OK(ts->Pause()); + if (uuid == leader->uuid()) continue; + cluster_->tablet_server_by_uuid(uuid)->Shutdown(); } - const Status s = AddServer(leader, tablet_id, new_replica, - RaftPeerPB::NON_VOTER, kTimeout); - EXPECT_FALSE(s.ok()) << s.ToString(); - - NO_FATALS(do_resume()); - resumer.cancel(); + LOG(INFO) << "Adding NON_VOTER replica..."; + std::thread t([&] { + AddServer(leader, tablet_id, new_replica, RaftPeerPB::NON_VOTER, kTimeout); + }); + SCOPED_CLEANUP({ t.join(); }); // Verify that the configuration hasn't changed. + LOG(INFO) << "Waiting for pending config..."; consensus::ConsensusStatePB cstate; ASSERT_EVENTUALLY([&] { ASSERT_OK(GetConsensusState(leader, tablet_id, kTimeout, &cstate)); - ASSERT_FALSE(cstate.has_pending_config()); + ASSERT_TRUE(cstate.has_pending_config()); }); + + // Ensure it does not commit. + SleepFor(MonoDelta::FromSeconds(5)); + ASSERT_OK(GetConsensusState(leader, tablet_id, kTimeout, &cstate)); + ASSERT_TRUE(cstate.has_pending_config()); + const auto& new_replica_uuid = new_replica->uuid(); - EXPECT_FALSE(IsRaftConfigMember(new_replica_uuid, cstate.committed_config())) + ASSERT_FALSE(IsRaftConfigMember(new_replica_uuid, cstate.committed_config())) << pb_util::SecureDebugString(cstate.committed_config()) << "new non-voter replica UUID: " << new_replica_uuid; - EXPECT_EQ(kOriginalReplicasNum, cstate.committed_config().peers_size()); + ASSERT_EQ(kOriginalReplicasNum, cstate.committed_config().peers_size()); + + // Restart the tablet servers. + for (auto& e : replica_servers) { + const auto& uuid = e.first; + if (uuid == leader->uuid()) continue; + ASSERT_OK(cluster_->tablet_server_by_uuid(uuid)->Restart()); + } + + // Once the new replicas come back online, this should be committed. + ASSERT_EVENTUALLY([&] { + ASSERT_OK(GetConsensusState(leader, tablet_id, kTimeout, &cstate)); + ASSERT_FALSE(cstate.has_pending_config()); + }); NO_FATALS(cluster_->AssertNoCrashes()); }
