Repository: kudu Updated Branches: refs/heads/master 4923a74d7 -> 45c1512fc
KUDU-1365. Add leader pre-elections This implements the "pre-election" extension to the Raft algorithm. The idea is that, before calling a leader election, a candidate first sends a pre-election vote request to all voters. The voters respond as they would have in a real vote, except they don't actually record their vote. Cluster tests verify that this substantially reduces the election storms after a node has a temporarily slow disk or otherwise freezes. A new experimental flag 'raft_enable_pre_election' is introduced which defaults the feature to on, but provides a safety valve to disable this in case we find some bug after release. Tested this patch (along with the following series of cleanups) with: [1] 1000 loops of RaftConsensusITest.MultiThreadedInsertWithFailovers [2] 500 loops of raft-consensus-itest overall [3] 1000 loops of exactly_once_writes-itest The above tests have usually been pretty good about finding bugs in consensus in the past. [1] http://dist-test.cloudera.org//job?job_id=todd.1476502616.16080 [2] http://dist-test.cloudera.org//job?job_id=todd.1476503111.17428 [3] http://dist-test.cloudera.org//job?job_id=todd.1476503268.18454 Change-Id: Ifcfabd8c9ffe31f17ab768542a046426f656db43 Reviewed-on: http://gerrit.cloudera.org:8080/4694 Tested-by: Kudu Jenkins Reviewed-by: David Ribeiro Alves <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/kudu/repo Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/45c1512f Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/45c1512f Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/45c1512f Branch: refs/heads/master Commit: 45c1512fcc42772173490be4e7aefcf45129fed2 Parents: 4923a74 Author: Todd Lipcon <[email protected]> Authored: Tue Oct 11 23:34:22 2016 -0700 Committer: Todd Lipcon <[email protected]> Committed: Tue Oct 18 04:45:18 2016 +0000 ---------------------------------------------------------------------- src/kudu/consensus/consensus.h | 26 +- src/kudu/consensus/consensus.proto | 4 + src/kudu/consensus/leader_election-test.cc | 31 ++- src/kudu/consensus/leader_election.cc | 55 ++-- src/kudu/consensus/leader_election.h | 17 +- src/kudu/consensus/raft_consensus.cc | 250 +++++++++++++------ src/kudu/consensus/raft_consensus.h | 13 +- .../consensus/raft_consensus_quorum-test.cc | 18 +- .../integration-tests/raft_consensus-itest.cc | 36 ++- src/kudu/tserver/tablet_service.cc | 3 +- 10 files changed, 309 insertions(+), 144 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kudu/blob/45c1512f/src/kudu/consensus/consensus.h ---------------------------------------------------------------------- diff --git a/src/kudu/consensus/consensus.h b/src/kudu/consensus/consensus.h index ba0d510..5d5447d 100644 --- a/src/kudu/consensus/consensus.h +++ b/src/kudu/consensus/consensus.h @@ -116,18 +116,40 @@ class Consensus : public RefCountedThreadSafe<Consensus> { // Emulates a leader election by simply making this peer leader. virtual Status EmulateElection() = 0; - // Triggers a leader election. + // Modes for StartElection(). enum ElectionMode { // A normal leader election. Peers will not vote for this node // if they believe that a leader is alive. NORMAL_ELECTION, + // A "pre-election". Peers will vote as they would for a normal + // election, except that the votes will not be "binding". In other + // words, they will not durably record their vote. + PRE_ELECTION, + // In this mode, peers will vote for this candidate even if they // think a leader is alive. This can be used for a faster hand-off // between a leader and one of its replicas. ELECT_EVEN_IF_LEADER_IS_ALIVE }; - virtual Status StartElection(ElectionMode mode) = 0; + + // Reasons for StartElection(). + enum ElectionReason { + // The election is being called because the Raft configuration has only + // a single node and has just started up. + INITIAL_SINGLE_NODE_ELECTION, + + // The election is being called because the timeout expired. In other + // words, the previous leader probably failed (or there was no leader + // in this term) + ELECTION_TIMEOUT_EXPIRED, + + // The election is being started because of an explicit external request. + EXTERNAL_REQUEST + }; + + // Triggers a leader election. + virtual Status StartElection(ElectionMode mode, ElectionReason reason) = 0; // Wait until the node has LEADER role. // Returns Status::TimedOut if the role is not LEADER within 'timeout'. http://git-wip-us.apache.org/repos/asf/kudu/blob/45c1512f/src/kudu/consensus/consensus.proto ---------------------------------------------------------------------- diff --git a/src/kudu/consensus/consensus.proto b/src/kudu/consensus/consensus.proto index 3841aee..c10183e 100644 --- a/src/kudu/consensus/consensus.proto +++ b/src/kudu/consensus/consensus.proto @@ -270,6 +270,10 @@ message VoteRequestPB { // for example to force a faster leader hand-off rather than waiting for // the election timer to expire. optional bool ignore_live_leader = 5 [ default = false ]; + + // In a "pre-election", voters should respond how they _would_ have voted + // but not actually record the vote. + optional bool is_pre_election = 7 [ default = false ]; } // A response from a replica to a leader election request. http://git-wip-us.apache.org/repos/asf/kudu/blob/45c1512f/src/kudu/consensus/leader_election-test.cc ---------------------------------------------------------------------- diff --git a/src/kudu/consensus/leader_election-test.cc b/src/kudu/consensus/leader_election-test.cc index 3f65511..e523af0 100644 --- a/src/kudu/consensus/leader_election-test.cc +++ b/src/kudu/consensus/leader_election-test.cc @@ -285,7 +285,7 @@ TEST_F(LeaderElectionTest, TestPerfectElection) { election->Run(); latch_.Wait(); - ASSERT_EQ(election_term, result_->election_term); + ASSERT_EQ(election_term, result_->vote_request.candidate_term()); ASSERT_EQ(VOTE_GRANTED, result_->decision); pool_->Wait(); @@ -307,10 +307,9 @@ TEST_F(LeaderElectionTest, TestHigherTermBeforeDecision) { ->Respond(TestPeerProxy::kRequestVote); latch_.Wait(); - ASSERT_EQ(kElectionTerm, result_->election_term); + ASSERT_EQ(kElectionTerm, result_->vote_request.candidate_term()); ASSERT_EQ(VOTE_DENIED, result_->decision); - ASSERT_TRUE(result_->has_higher_term); - ASSERT_EQ(kElectionTerm + 1, result_->higher_term); + ASSERT_EQ(kElectionTerm + 1, result_->highest_voter_term); LOG(INFO) << "Election lost. Reason: " << result_->message; // This guy will vote "yes". @@ -332,10 +331,10 @@ TEST_F(LeaderElectionTest, TestHigherTermAfterDecision) { ->Respond(TestPeerProxy::kRequestVote); latch_.Wait(); - ASSERT_EQ(kElectionTerm, result_->election_term); + ASSERT_EQ(kElectionTerm, result_->vote_request.candidate_term()); ASSERT_EQ(VOTE_GRANTED, result_->decision); - ASSERT_FALSE(result_->has_higher_term); - ASSERT_TRUE(result_->message.empty()); + ASSERT_EQ(kElectionTerm, result_->highest_voter_term); + ASSERT_EQ("achieved majority votes", result_->message); LOG(INFO) << "Election won."; // This guy has a higher term. @@ -357,10 +356,10 @@ TEST_F(LeaderElectionTest, TestWithDenyVotes) { election->Run(); latch_.Wait(); - ASSERT_EQ(kElectionTerm, result_->election_term); + ASSERT_EQ(kElectionTerm, result_->vote_request.candidate_term()); ASSERT_EQ(VOTE_DENIED, result_->decision); - ASSERT_FALSE(result_->has_higher_term); - ASSERT_TRUE(result_->message.empty()); + ASSERT_EQ(kElectionTerm, result_->highest_voter_term); + ASSERT_EQ("could not achieve majority", result_->message); LOG(INFO) << "Election denied."; pool_->Wait(); // Wait for the election callbacks to finish before we destroy proxies. @@ -377,10 +376,10 @@ TEST_F(LeaderElectionTest, TestWithErrorVotes) { election->Run(); latch_.Wait(); - ASSERT_EQ(kElectionTerm, result_->election_term); + ASSERT_EQ(kElectionTerm, result_->vote_request.candidate_term()); ASSERT_EQ(VOTE_DENIED, result_->decision); - ASSERT_FALSE(result_->has_higher_term); - ASSERT_TRUE(result_->message.empty()); + ASSERT_EQ(0, result_->highest_voter_term); // no valid votes + ASSERT_EQ("could not achieve majority", result_->message); LOG(INFO) << "Election denied."; pool_->Wait(); // Wait for the election callbacks to finish before we destroy proxies. @@ -413,10 +412,10 @@ TEST_F(LeaderElectionTest, TestFailToCreateProxy) { Unretained(this)))); election->Run(); latch_.Wait(); - ASSERT_EQ(kElectionTerm, result_->election_term); + ASSERT_EQ(kElectionTerm, result_->vote_request.candidate_term()); ASSERT_EQ(VOTE_DENIED, result_->decision); - ASSERT_FALSE(result_->has_higher_term); - ASSERT_TRUE(result_->message.empty()); + ASSERT_EQ(0, result_->highest_voter_term); // no votes + ASSERT_EQ("could not achieve majority", result_->message); } //////////////////////////////////////// http://git-wip-us.apache.org/repos/asf/kudu/blob/45c1512f/src/kudu/consensus/leader_election.cc ---------------------------------------------------------------------- diff --git a/src/kudu/consensus/leader_election.cc b/src/kudu/consensus/leader_election.cc index 1a40e6d..49eb3c8 100644 --- a/src/kudu/consensus/leader_election.cc +++ b/src/kudu/consensus/leader_election.cc @@ -17,6 +17,7 @@ #include "kudu/consensus/leader_election.h" +#include <algorithm> #include <boost/bind.hpp> #include <mutex> @@ -128,22 +129,12 @@ bool VoteCounter::AreAllVotesIn() const { // ElectionResult /////////////////////////////////////////////////// -ElectionResult::ElectionResult(ConsensusTerm election_term, ElectionVote decision) - : election_term(election_term), +ElectionResult::ElectionResult(const VoteRequestPB& vote_request, ElectionVote decision, + ConsensusTerm highest_voter_term, const std::string& message) + : vote_request(vote_request), decision(decision), - has_higher_term(false), - higher_term(kMinimumTerm) { -} - -ElectionResult::ElectionResult(ConsensusTerm election_term, ElectionVote decision, - ConsensusTerm higher_term, const std::string& message) - : election_term(election_term), - decision(decision), - has_higher_term(true), - higher_term(higher_term), + highest_voter_term(highest_voter_term), message(message) { - CHECK_EQ(VOTE_DENIED, decision); - CHECK_GT(higher_term, election_term); DCHECK(!message.empty()); } @@ -161,7 +152,8 @@ LeaderElection::LeaderElection(const RaftConfigPB& config, request_(request), vote_counter_(std::move(vote_counter)), timeout_(timeout), - decision_callback_(std::move(decision_callback)) { + decision_callback_(std::move(decision_callback)), + highest_voter_term_(0) { for (const RaftPeerPB& peer : config.peers()) { if (request.candidate_uuid() == peer.permanent_uuid()) continue; follower_uuids_.push_back(peer.permanent_uuid()); @@ -220,7 +212,9 @@ void LeaderElection::Run() { } // Send the RPC request. - LOG_WITH_PREFIX(INFO) << "Requesting vote from peer " << voter_uuid; + LOG_WITH_PREFIX(INFO) << "Requesting " + << (request_.is_pre_election() ? "pre-" : "") + << "vote from peer " << voter_uuid; state->rpc.set_timeout(timeout_); state->request = request_; @@ -247,7 +241,9 @@ void LeaderElection::CheckForDecision() { CHECK_OK(vote_counter_->GetDecision(&decision)); LOG_WITH_PREFIX(INFO) << "Election decided. Result: candidate " << ((decision == VOTE_GRANTED) ? "won." : "lost."); - result_.reset(new ElectionResult(election_term(), decision)); + string msg = (decision == VOTE_GRANTED) ? + "achieved majority votes" : "could not achieve majority"; + result_.reset(new ElectionResult(request_, decision, highest_voter_term_, msg)); } // Check whether to respond. This can happen as a result of either getting // a majority vote or of something invalidating the election, like @@ -290,13 +286,15 @@ void LeaderElection::VoteResponseRpcCallback(const std::string& voter_uuid) { << voter_uuid << ", but its actual UUID is " << state->response.responder_uuid(); RecordVoteUnlocked(voter_uuid, VOTE_DENIED); - // Count the granted votes. - } else if (state->response.vote_granted()) { - HandleVoteGrantedUnlocked(voter_uuid, *state); - - // Anything else is a denied vote. } else { - HandleVoteDeniedUnlocked(voter_uuid, *state); + // No error: count actual votes. + + highest_voter_term_ = std::max(highest_voter_term_, state->response.responder_term()); + if (state->response.vote_granted()) { + HandleVoteGrantedUnlocked(voter_uuid, *state); + } else { + HandleVoteDeniedUnlocked(voter_uuid, *state); + } } } @@ -334,14 +332,16 @@ void LeaderElection::HandleHigherTermUnlocked(const string& voter_uuid, const Vo if (!result_) { LOG_WITH_PREFIX(INFO) << "Cancelling election due to peer responding with higher term"; - result_.reset(new ElectionResult(election_term(), VOTE_DENIED, + result_.reset(new ElectionResult(request_, VOTE_DENIED, state.response.responder_term(), msg)); } } void LeaderElection::HandleVoteGrantedUnlocked(const string& voter_uuid, const VoterState& state) { DCHECK(lock_.is_locked()); - DCHECK_EQ(state.response.responder_term(), election_term()); + if (!request_.is_pre_election()) { + DCHECK_EQ(state.response.responder_term(), election_term()); + } DCHECK(state.response.vote_granted()); LOG_WITH_PREFIX(INFO) << "Vote granted by peer " << voter_uuid; @@ -364,10 +364,11 @@ void LeaderElection::HandleVoteDeniedUnlocked(const string& voter_uuid, const Vo } std::string LeaderElection::LogPrefix() const { - return Substitute("T $0 P $1 [CANDIDATE]: Term $2 election: ", + return Substitute("T $0 P $1 [CANDIDATE]: Term $2 $3election: ", request_.tablet_id(), request_.candidate_uuid(), - request_.candidate_term()); + request_.candidate_term(), + request_.is_pre_election() ? "pre-" : ""); } } // namespace consensus http://git-wip-us.apache.org/repos/asf/kudu/blob/45c1512f/src/kudu/consensus/leader_election.h ---------------------------------------------------------------------- diff --git a/src/kudu/consensus/leader_election.h b/src/kudu/consensus/leader_election.h index 289215f..d0129f4 100644 --- a/src/kudu/consensus/leader_election.h +++ b/src/kudu/consensus/leader_election.h @@ -100,19 +100,17 @@ class VoteCounter { // The result of a leader election. struct ElectionResult { public: - ElectionResult(ConsensusTerm election_term, ElectionVote decision); - ElectionResult(ConsensusTerm election_term, ElectionVote decision, - ConsensusTerm higher_term, const std::string& message); + ElectionResult(const VoteRequestPB& vote_request, ElectionVote decision, + ConsensusTerm highest_term, const std::string& message); - // Term the election was run for. - const ConsensusTerm election_term; + // The vote request that was sent to the voters for this election. + const VoteRequestPB vote_request; // The overall election GRANTED/DENIED decision of the configuration. const ElectionVote decision; - // At least one voter had a higher term than the candidate. - const bool has_higher_term; - const ConsensusTerm higher_term; + // The highest term seen from any voter. + const ConsensusTerm highest_voter_term; // Human-readable explanation of the vote result, if any. const std::string message; @@ -232,6 +230,9 @@ class LeaderElection : public RefCountedThreadSafe<LeaderElection> { // Map of UUID -> VoterState. VoterStateMap voter_state_; + + // The highest term seen from a voter so far (or 0 if no votes). + int64_t highest_voter_term_; }; } // namespace consensus http://git-wip-us.apache.org/repos/asf/kudu/blob/45c1512f/src/kudu/consensus/raft_consensus.cc ---------------------------------------------------------------------- diff --git a/src/kudu/consensus/raft_consensus.cc b/src/kudu/consensus/raft_consensus.cc index 1ff1de3..4c3de3d 100644 --- a/src/kudu/consensus/raft_consensus.cc +++ b/src/kudu/consensus/raft_consensus.cc @@ -102,8 +102,16 @@ DEFINE_bool(follower_fail_all_prepare, false, "Warning! This is only intended for testing."); TAG_FLAG(follower_fail_all_prepare, unsafe); +DEFINE_bool(raft_enable_pre_election, true, + "When enabled, candidates will call a pre-election before " + "running a real leader election."); +TAG_FLAG(raft_enable_pre_election, experimental); +TAG_FLAG(raft_enable_pre_election, runtime); + DECLARE_int32(memory_limit_warn_threshold_percentage); +// Metrics +// --------- METRIC_DEFINE_counter(tablet, follower_memory_pressure_rejections, "Follower Memory Pressure Rejections", kudu::MetricUnit::kRequests, @@ -233,6 +241,7 @@ RaftConsensus::RaftConsensus( FLAGS_leader_failure_max_missed_heartbeat_periods))), withhold_votes_until_(MonoTime::Min()), last_received_cur_leader_(MinimumOpId()), + failed_elections_since_stable_leader_(0), mark_dirty_clbk_(std::move(mark_dirty_clbk)), shutdown_(false), update_calls_for_tests_(0), @@ -318,7 +327,7 @@ Status RaftConsensus::Start(const ConsensusBootstrapInfo& info) { RETURN_NOT_OK(IsSingleVoterConfig(&single_voter)); if (single_voter && FLAGS_enable_leader_failure_detection) { LOG_WITH_PREFIX(INFO) << "Only one voter in the Raft config. Triggering election immediately"; - RETURN_NOT_OK(StartElection(NORMAL_ELECTION)); + RETURN_NOT_OK(StartElection(NORMAL_ELECTION, INITIAL_SINGLE_NODE_ELECTION)); } // Report become visible to the Master. @@ -346,10 +355,40 @@ Status RaftConsensus::EmulateElection() { return BecomeLeaderUnlocked(); } -Status RaftConsensus::StartElection(ElectionMode mode) { +namespace { +const char* ModeString(Consensus::ElectionMode mode) { + switch (mode) { + case Consensus::NORMAL_ELECTION: + return "leader election"; + case Consensus::PRE_ELECTION: + return "pre-election"; + case Consensus::ELECT_EVEN_IF_LEADER_IS_ALIVE: + return "forced leader election"; + } + __builtin_unreachable(); // silence gcc warnings +} +string ReasonString(Consensus::ElectionReason reason, StringPiece leader_uuid) { + switch (reason) { + case Consensus::INITIAL_SINGLE_NODE_ELECTION: + return "initial election of a single-replica configuration"; + case Consensus::EXTERNAL_REQUEST: + return "received explicit request"; + case Consensus::ELECTION_TIMEOUT_EXPIRED: + if (leader_uuid.empty()) { + return "no leader contacted us within the election timeout"; + } + return Substitute("detected failure of leader $0", leader_uuid); + } + __builtin_unreachable(); // silence gcc warnings +} +} // anonymous namespace + +Status RaftConsensus::StartElection(ElectionMode mode, ElectionReason reason) { + const char* mode_str = ModeString(mode); + TRACE_EVENT2("consensus", "RaftConsensus::StartElection", - "peer", peer_uuid(), - "tablet", tablet_id()); + "peer", state_->LogPrefixThreadSafe(), + "mode", mode_str); scoped_refptr<LeaderElection> election; { ReplicaState::UniqueLock lock; @@ -357,7 +396,7 @@ Status RaftConsensus::StartElection(ElectionMode mode) { RaftPeerPB::Role active_role = state_->GetActiveRoleUnlocked(); if (active_role == RaftPeerPB::LEADER) { - LOG_WITH_PREFIX_UNLOCKED(INFO) << "Not starting election -- already leader"; + LOG_WITH_PREFIX_UNLOCKED(INFO) << "Not starting " << mode << " -- already leader"; return Status::OK(); } if (PREDICT_FALSE(active_role == RaftPeerPB::NON_PARTICIPANT)) { @@ -366,40 +405,38 @@ Status RaftConsensus::StartElection(ElectionMode mode) { "a non-participant in the raft config", state_->GetActiveConfigUnlocked().ShortDebugString()); } - - if (state_->HasLeaderUnlocked()) { - LOG_WITH_PREFIX_UNLOCKED(INFO) - << "Failure of leader " << state_->GetLeaderUuidUnlocked() - << " detected. Triggering leader election"; - } else { - LOG_WITH_PREFIX_UNLOCKED(INFO) - << "No leader contacted us within the election timeout. " - << "Triggering leader election"; - } - - // Increment the term. - // TODO: this causes an extra flush of the consensus metadata which - // will be flushed again for our vote below. Consolidate these. - RETURN_NOT_OK(IncrementTermUnlocked()); + LOG_WITH_PREFIX_UNLOCKED(INFO) + << "Starting " << mode_str + << " (" << ReasonString(reason, state_->GetLeaderUuidUnlocked()) << ")"; // Snooze to avoid the election timer firing again as much as possible. - // We do not disable the election timer while running an election. + // We do not disable the election timer while running an election, so that + // if the election times out, we will try again. RETURN_NOT_OK(EnsureFailureDetectorEnabledUnlocked()); MonoDelta timeout = LeaderElectionExpBackoffDeltaUnlocked(); RETURN_NOT_OK(SnoozeFailureDetectorUnlocked(timeout, ALLOW_LOGGING)); + // Increment the term and vote for ourselves, unless it's a pre-election. + if (mode != PRE_ELECTION) { + // TODO(todd): the IncrementTermUnlocked call flushes to disk once, and then + // the SetVotedForCurrentTerm flushes again. We should avoid flushing to disk + // on the term bump. + // TODO(mpercy): Consider using a separate Mutex for voting, which must sync to disk. + RETURN_NOT_OK(IncrementTermUnlocked()); + RETURN_NOT_OK(state_->SetVotedForCurrentTermUnlocked(state_->GetPeerUuid())); + } + const RaftConfigPB& active_config = state_->GetActiveConfigUnlocked(); - LOG_WITH_PREFIX_UNLOCKED(INFO) << "Starting election with config: " + LOG_WITH_PREFIX_UNLOCKED(INFO) << "Starting " << mode_str << " with config: " << active_config.ShortDebugString(); // Initialize the VoteCounter. int num_voters = CountVoters(active_config); int majority_size = MajoritySize(num_voters); gscoped_ptr<VoteCounter> counter(new VoteCounter(num_voters, majority_size)); + // Vote for ourselves. - // TODO: Consider using a separate Mutex for voting, which must sync to disk. - RETURN_NOT_OK(state_->SetVotedForCurrentTermUnlocked(state_->GetPeerUuid())); bool duplicate; RETURN_NOT_OK(counter->RegisterVote(state_->GetPeerUuid(), VOTE_GRANTED, &duplicate)); CHECK(!duplicate) << state_->LogPrefixUnlocked() @@ -409,7 +446,14 @@ Status RaftConsensus::StartElection(ElectionMode mode) { VoteRequestPB request; request.set_ignore_live_leader(mode == ELECT_EVEN_IF_LEADER_IS_ALIVE); request.set_candidate_uuid(state_->GetPeerUuid()); - request.set_candidate_term(state_->GetCurrentTermUnlocked()); + if (mode == PRE_ELECTION) { + // In a pre-election, we haven't bumped our own term yet, so we need to be + // asking for votes for the next term. + request.set_is_pre_election(true); + request.set_candidate_term(state_->GetCurrentTermUnlocked() + 1); + } else { + request.set_candidate_term(state_->GetCurrentTermUnlocked()); + } request.set_tablet_id(state_->GetOptions().tablet_id); *request.mutable_candidate_status()->mutable_last_received() = queue_->GetLastOpIdInLog(); @@ -417,7 +461,7 @@ Status RaftConsensus::StartElection(ElectionMode mode) { election.reset(new LeaderElection(active_config, peer_proxy_factory_.get(), request, std::move(counter), timeout, - Bind(&RaftConsensus::ElectionCallback, this))); + Bind(&RaftConsensus::ElectionCallback, this, reason))); } // Start the election outside the lock. @@ -453,10 +497,11 @@ Status RaftConsensus::StepDown(LeaderStepDownResponsePB* resp) { return Status::OK(); } -void RaftConsensus::ReportFailureDetected(const std::string& name, const Status& msg) { +void RaftConsensus::ReportFailureDetected(const std::string& name, const Status& /*msg*/) { DCHECK_EQ(name, kTimerId); // Start an election. - Status s = StartElection(NORMAL_ELECTION); + Status s = StartElection(FLAGS_raft_enable_pre_election ? PRE_ELECTION : NORMAL_ELECTION, + ELECTION_TIMEOUT_EXPIRED); if (PREDICT_FALSE(!s.ok())) { LOG_WITH_PREFIX(WARNING) << "Failed to trigger leader election: " << s.ToString(); } @@ -1346,8 +1391,12 @@ Status RaftConsensus::RequestVote(const VoteRequestPB* request, VoteResponsePB* bool vote_yes = !OpIdLessThan(request->candidate_status().last_received(), local_last_logged_opid); - // Record the term advancement if necessary. - if (request->candidate_term() > state_->GetCurrentTermUnlocked()) { + // Record the term advancement if necessary. We don't do so in the case of + // pre-elections because it's possible that the node who called the pre-election + // has actually now successfully become leader of the prior term, in which case + // bumping our term here would disrupt it. + if (!request->is_pre_election() && + request->candidate_term() > state_->GetCurrentTermUnlocked()) { // If we are going to vote for this peer, then we will flush the consensus metadata // to disk below when we record the vote, and we can skip flushing the term advancement // to disk here. @@ -1528,8 +1577,10 @@ Status RaftConsensus::AdvanceTermForTests(int64_t new_term) { return HandleTermAdvanceUnlocked(new_term); } -std::string RaftConsensus::GetRequestVoteLogPrefixUnlocked() const { - return state_->LogPrefixUnlocked() + "Leader election vote request"; +std::string RaftConsensus::GetRequestVoteLogPrefixUnlocked(const VoteRequestPB& request) const { + return Substitute("$0Leader $1election vote request", + state_->LogPrefixUnlocked(), + request.is_pre_election() ? "pre-" : ""); } void RaftConsensus::FillVoteResponseVoteGranted(VoteResponsePB* response) { @@ -1549,7 +1600,7 @@ Status RaftConsensus::RequestVoteRespondInvalidTerm(const VoteRequestPB* request FillVoteResponseVoteDenied(ConsensusErrorPB::INVALID_TERM, response); string msg = Substitute("$0: Denying vote to candidate $1 for earlier term $2. " "Current term is $3.", - GetRequestVoteLogPrefixUnlocked(), + GetRequestVoteLogPrefixUnlocked(*request), request->candidate_uuid(), request->candidate_term(), state_->GetCurrentTermUnlocked()); @@ -1563,7 +1614,7 @@ Status RaftConsensus::RequestVoteRespondVoteAlreadyGranted(const VoteRequestPB* FillVoteResponseVoteGranted(response); LOG(INFO) << Substitute("$0: Already granted yes vote for candidate $1 in term $2. " "Re-sending same reply.", - GetRequestVoteLogPrefixUnlocked(), + GetRequestVoteLogPrefixUnlocked(*request), request->candidate_uuid(), request->candidate_term()); return Status::OK(); @@ -1574,7 +1625,7 @@ Status RaftConsensus::RequestVoteRespondAlreadyVotedForOther(const VoteRequestPB FillVoteResponseVoteDenied(ConsensusErrorPB::ALREADY_VOTED, response); string msg = Substitute("$0: Denying vote to candidate $1 in current term $2: " "Already voted for candidate $3 in this term.", - GetRequestVoteLogPrefixUnlocked(), + GetRequestVoteLogPrefixUnlocked(*request), request->candidate_uuid(), state_->GetCurrentTermUnlocked(), state_->GetVotedForCurrentTermUnlocked()); @@ -1590,7 +1641,7 @@ Status RaftConsensus::RequestVoteRespondLastOpIdTooOld(const OpId& local_last_lo string msg = Substitute("$0: Denying vote to candidate $1 for term $2 because " "replica has last-logged OpId of $3, which is greater than that of the " "candidate, which has last-logged OpId of $4.", - GetRequestVoteLogPrefixUnlocked(), + GetRequestVoteLogPrefixUnlocked(*request), request->candidate_uuid(), request->candidate_term(), local_last_logged_opid.ShortDebugString(), @@ -1606,7 +1657,7 @@ Status RaftConsensus::RequestVoteRespondLeaderIsAlive(const VoteRequestPB* reque string msg = Substitute("$0: Denying vote to candidate $1 for term $2 because " "replica is either leader or believes a valid leader to " "be alive.", - GetRequestVoteLogPrefixUnlocked(), + GetRequestVoteLogPrefixUnlocked(*request), request->candidate_uuid(), request->candidate_term()); LOG(INFO) << msg; @@ -1620,7 +1671,7 @@ Status RaftConsensus::RequestVoteRespondIsBusy(const VoteRequestPB* request, string msg = Substitute("$0: Denying vote to candidate $1 for term $2 because " "replica is already servicing an update from a current leader " "or another vote.", - GetRequestVoteLogPrefixUnlocked(), + GetRequestVoteLogPrefixUnlocked(*request), request->candidate_uuid(), request->candidate_term()); LOG(INFO) << msg; @@ -1637,8 +1688,10 @@ Status RaftConsensus::RequestVoteRespondVoteGranted(const VoteRequestPB* request MonoDelta additional_backoff = LeaderElectionExpBackoffDeltaUnlocked(); RETURN_NOT_OK(SnoozeFailureDetectorUnlocked(additional_backoff, ALLOW_LOGGING)); - // Persist our vote to disk. - RETURN_NOT_OK(state_->SetVotedForCurrentTermUnlocked(request->candidate_uuid())); + if (!request->is_pre_election()) { + // Persist our vote to disk. + RETURN_NOT_OK(state_->SetVotedForCurrentTermUnlocked(request->candidate_uuid())); + } FillVoteResponseVoteGranted(response); @@ -1647,7 +1700,7 @@ Status RaftConsensus::RequestVoteRespondVoteGranted(const VoteRequestPB* request RETURN_NOT_OK(SnoozeFailureDetectorUnlocked(additional_backoff, DO_NOT_LOG)); LOG(INFO) << Substitute("$0: Granting yes vote for candidate $1 in term $2.", - GetRequestVoteLogPrefixUnlocked(), + GetRequestVoteLogPrefixUnlocked(*request), request->candidate_uuid(), state_->GetCurrentTermUnlocked()); return Status::OK(); @@ -1668,6 +1721,7 @@ std::string RaftConsensus::LogPrefix() { } void RaftConsensus::SetLeaderUuidUnlocked(const string& uuid) { + failed_elections_since_stable_leader_ = 0; state_->SetLeaderUuidUnlocked(uuid); MarkDirty("New leader " + uuid); } @@ -1766,78 +1820,129 @@ ReplicaState* RaftConsensus::GetReplicaStateForTests() { return state_.get(); } -void RaftConsensus::ElectionCallback(const ElectionResult& result) { +void RaftConsensus::ElectionCallback(ElectionReason reason, const ElectionResult& result) { // The election callback runs on a reactor thread, so we need to defer to our // threadpool. If the threadpool is already shut down for some reason, it's OK -- // we're OK with the callback never running. - WARN_NOT_OK(thread_pool_->SubmitClosure(Bind(&RaftConsensus::DoElectionCallback, this, result)), + WARN_NOT_OK(thread_pool_->SubmitClosure(Bind(&RaftConsensus::DoElectionCallback, + this, reason, result)), state_->LogPrefixThreadSafe() + "Unable to run election callback"); } -void RaftConsensus::DoElectionCallback(const ElectionResult& result) { +void RaftConsensus::DoElectionCallback(ElectionReason reason, const ElectionResult& result) { + const int64_t election_term = result.vote_request.candidate_term(); + const bool was_pre_election = result.vote_request.is_pre_election(); + const char* election_type = was_pre_election ? "pre-election" : "election"; + // Snooze to avoid the election timer firing again as much as possible. { ReplicaState::UniqueLock lock; CHECK_OK(state_->LockForRead(&lock)); // We need to snooze when we win and when we lose: // - When we win because we're about to disable the timer and become leader. - // - When we loose or otherwise we can fall into a cycle, where everyone keeps + // - When we lose or otherwise we can fall into a cycle, where everyone keeps // triggering elections but no election ever completes because by the time they // finish another one is triggered already. // We ignore the status as we don't want to fail if we the timer is // disabled. ignore_result(SnoozeFailureDetectorUnlocked(LeaderElectionExpBackoffDeltaUnlocked(), ALLOW_LOGGING)); - } - if (result.decision == VOTE_DENIED) { - LOG_WITH_PREFIX(INFO) << "Leader election lost for term " << result.election_term - << ". Reason: " - << (!result.message.empty() ? result.message : "None given"); - return; + if (result.decision == VOTE_DENIED) { + failed_elections_since_stable_leader_++; + + // If we called an election and one of the voters had a higher term than we did, + // we should bump our term before we potentially try again. This is particularly + // important with pre-elections to avoid getting "stuck" in a case like: + // Peer A: has ops through 1.10, term = 2, voted in term 2 for peer C + // Peer B: has ops through 1.15, term = 1 + // In this case, Peer B will reject peer A's pre-elections for term 3 because + // the local log is longer. Peer A will reject B's pre-elections for term 2 + // because it already voted in term 2. The check below ensures that peer B + // will bump to term 2 when it gets the vote rejection, such that its + // next pre-election (for term 3) would succeed. + if (result.highest_voter_term > state_->GetCurrentTermUnlocked()) { + HandleTermAdvanceUnlocked(result.highest_voter_term); + } + + LOG_WITH_PREFIX_UNLOCKED(INFO) + << "Leader " << election_type << " lost for term " << election_term + << ". Reason: " + << (!result.message.empty() ? result.message : "None given"); + return; + } } ReplicaState::UniqueLock lock; Status s = state_->LockForConfigChange(&lock); if (PREDICT_FALSE(!s.ok())) { - LOG_WITH_PREFIX(INFO) << "Received election callback for term " - << result.election_term << " while not running: " + LOG_WITH_PREFIX(INFO) << "Received " << election_type << " callback for term " + << election_term << " while not running: " << s.ToString(); return; } - if (result.election_term != state_->GetCurrentTermUnlocked()) { - LOG_WITH_PREFIX_UNLOCKED(INFO) << "Leader election decision for defunct term " - << result.election_term << ": " - << (result.decision == VOTE_GRANTED ? "won" : "lost"); + // In a pre-election, we collected votes for the _next_ term. + // So, we need to adjust our expectations of what the current term should be. + int64_t election_started_in_term = election_term; + if (was_pre_election) { + election_started_in_term--; + } + + if (election_started_in_term != state_->GetCurrentTermUnlocked()) { + LOG_WITH_PREFIX_UNLOCKED(INFO) + << "Leader " << election_type << " decision vote started in " + << "defunct term " << election_started_in_term << ": " + << (result.decision == VOTE_GRANTED ? "won" : "lost"); return; } const RaftConfigPB& active_config = state_->GetActiveConfigUnlocked(); if (!IsRaftConfigVoter(state_->GetPeerUuid(), active_config)) { - LOG_WITH_PREFIX_UNLOCKED(WARNING) << "Leader election decision while not in active config. " - << "Result: Term " << result.election_term << ": " + LOG_WITH_PREFIX_UNLOCKED(WARNING) << "Leader " << election_type + << " decision while not in active config. " + << "Result: Term " << election_term << ": " << (result.decision == VOTE_GRANTED ? "won" : "lost") << ". RaftConfig: " << active_config.ShortDebugString(); return; } if (state_->GetActiveRoleUnlocked() == RaftPeerPB::LEADER) { - LOG_WITH_PREFIX_UNLOCKED(DFATAL) << "Leader election callback while already leader! " - "Result: Term " << result.election_term << ": " - << (result.decision == VOTE_GRANTED ? "won" : "lost"); + // If this was a pre-election, it's possible to see the following interleaving: + // + // 1. Term N (follower): send a real election for term N + // 2. Election callback expires again + // 3. Term N (follower): send a pre-election for term N+1 + // 4. Election callback for real election from term N completes. + // Peer is now leader for term N. + // 5. Pre-election callback from term N+1 completes, even though + // we are currently a leader of term N. + // In this case, we should just ignore the pre-election, since we're + // happily the leader of the prior term. + if (was_pre_election) return; + LOG_WITH_PREFIX_UNLOCKED(DFATAL) + << "Leader " << election_type << " callback while already leader! " + << "Result: Term " << election_term << ": " + << (result.decision == VOTE_GRANTED ? "won" : "lost"); return; } - LOG_WITH_PREFIX_UNLOCKED(INFO) << "Leader election won for term " << result.election_term; + LOG_WITH_PREFIX_UNLOCKED(INFO) << "Leader " << election_type << " won for term " << election_term; - // Convert role to LEADER. - SetLeaderUuidUnlocked(state_->GetPeerUuid()); + if (was_pre_election) { + // We just won the pre-election. So, we need to call a real election. + lock.unlock(); + WARN_NOT_OK(StartElection(NORMAL_ELECTION, reason), + "Couldn't start leader election after successful pre-election"); + } else { + // We won a real election. Convert role to LEADER. + SetLeaderUuidUnlocked(state_->GetPeerUuid()); - // TODO: BecomeLeaderUnlocked() can fail due to state checks during shutdown. - // It races with the above state check. - // This could be a problem during tablet deletion. - CHECK_OK(BecomeLeaderUnlocked()); + // TODO(todd): BecomeLeaderUnlocked() can fail due to state checks during shutdown. + // It races with the above state check. + // This could be a problem during tablet deletion. + CHECK_OK(BecomeLeaderUnlocked()); + } } Status RaftConsensus::GetLastOpId(OpIdType type, OpId* id) { @@ -1964,11 +2069,8 @@ MonoDelta RaftConsensus::MinimumElectionTimeout() const { MonoDelta RaftConsensus::LeaderElectionExpBackoffDeltaUnlocked() { // Compute a backoff factor based on how many leader elections have - // taken place since a leader was successfully elected. - int term_difference = state_->GetCurrentTermUnlocked() - - state_->GetTermWithLastCommittedOpUnlocked(); - - double backoff_factor = pow(1.5, term_difference); + // failed since a stable leader was last seen. + double backoff_factor = pow(1.5, failed_elections_since_stable_leader_ + 1); double min_timeout = MinimumElectionTimeout().ToMilliseconds(); double max_timeout = std::min<double>( min_timeout * backoff_factor, http://git-wip-us.apache.org/repos/asf/kudu/blob/45c1512f/src/kudu/consensus/raft_consensus.h ---------------------------------------------------------------------- diff --git a/src/kudu/consensus/raft_consensus.h b/src/kudu/consensus/raft_consensus.h index eb71b94..33bf7da 100644 --- a/src/kudu/consensus/raft_consensus.h +++ b/src/kudu/consensus/raft_consensus.h @@ -99,7 +99,7 @@ class RaftConsensus : public Consensus, // enabled, as it could result in a split-brain scenario. Status EmulateElection() override; - Status StartElection(ElectionMode mode) override; + Status StartElection(ElectionMode mode, ElectionReason reason) override; Status WaitUntilLeaderForTests(const MonoDelta& timeout) override; @@ -278,7 +278,7 @@ class RaftConsensus : public Consensus, Status IsSingleVoterConfig(bool* single_voter) const; // Return header string for RequestVote log messages. The ReplicaState lock must be held. - std::string GetRequestVoteLogPrefixUnlocked() const; + std::string GetRequestVoteLogPrefixUnlocked(const VoteRequestPB& request) const; // Fills the response with the current status, if an update was successful. void FillConsensusResponseOKUnlocked(ConsensusResponsePB* response); @@ -331,8 +331,8 @@ class RaftConsensus : public Consensus, // Callback for leader election driver. ElectionCallback is run on the // reactor thread, so it simply defers its work to DoElectionCallback. - void ElectionCallback(const ElectionResult& result); - void DoElectionCallback(const ElectionResult& result); + void ElectionCallback(ElectionReason reason, const ElectionResult& result); + void DoElectionCallback(ElectionReason reason, const ElectionResult& result); // Start tracking the leader for failures. This typically occurs at startup // and when the local peer steps down as leader. @@ -454,6 +454,11 @@ class RaftConsensus : public Consensus, // point to continue sending operations. OpId last_received_cur_leader_; + // The number of times this node has called and lost a leader election since + // the last time it saw a stable leader (either itself or another node). + // This is used to calculate back-off of the election timeout. + int failed_elections_since_stable_leader_; + const Callback<void(const std::string& reason)> mark_dirty_clbk_; // TODO(dralves) hack to serialize updates due to repeated/out-of-order messages http://git-wip-us.apache.org/repos/asf/kudu/blob/45c1512f/src/kudu/consensus/raft_consensus_quorum-test.cc ---------------------------------------------------------------------- diff --git a/src/kudu/consensus/raft_consensus_quorum-test.cc b/src/kudu/consensus/raft_consensus_quorum-test.cc index 465cfdb..2ca8e1b 100644 --- a/src/kudu/consensus/raft_consensus_quorum-test.cc +++ b/src/kudu/consensus/raft_consensus_quorum-test.cc @@ -897,7 +897,8 @@ TEST_F(RaftConsensusQuorumTest, TestLeaderElectionWithQuiescedQuorum) { // This will force an election in which we expect to make the last // non-shutdown peer in the list become leader. LOG(INFO) << "Running election for future leader with index " << (current_config_size - 1); - ASSERT_OK(new_leader->StartElection(Consensus::ELECT_EVEN_IF_LEADER_IS_ALIVE)); + ASSERT_OK(new_leader->StartElection(Consensus::ELECT_EVEN_IF_LEADER_IS_ALIVE, + Consensus::EXTERNAL_REQUEST)); WaitUntilLeaderForTests(new_leader.get()); LOG(INFO) << "Election won"; @@ -1097,6 +1098,21 @@ TEST_F(RaftConsensusQuorumTest, TestRequestVote) { ASSERT_EQ(0, flush_count() - flush_count_before) << "Rejected votes for old terms should not flush"; + // Ensure that replicas don't change term or flush any metadata for a pre-election + // request, even when they vote "yes". + flush_count_before = flush_count(); + request.set_candidate_term(last_op_id.term() + 3); + request.set_is_pre_election(true); + response.Clear(); + ASSERT_OK(peer->RequestVote(&request, &response)); + ASSERT_TRUE(response.vote_granted()); + ASSERT_FALSE(response.has_consensus_error()); + ASSERT_EQ(last_op_id.term() + 2, response.responder_term()); + ASSERT_NO_FATAL_FAILURE(AssertDurableTermAndVote(kPeerIndex, last_op_id.term() + 2, "peer-0")); + ASSERT_EQ(0, flush_count() - flush_count_before) + << "Pre-elections should not flush"; + request.set_is_pre_election(false); + // // Ensure replicas vote no for an old op index. // http://git-wip-us.apache.org/repos/asf/kudu/blob/45c1512f/src/kudu/integration-tests/raft_consensus-itest.cc ---------------------------------------------------------------------- diff --git a/src/kudu/integration-tests/raft_consensus-itest.cc b/src/kudu/integration-tests/raft_consensus-itest.cc index 99b110b..a950ad4 100644 --- a/src/kudu/integration-tests/raft_consensus-itest.cc +++ b/src/kudu/integration-tests/raft_consensus-itest.cc @@ -371,11 +371,19 @@ class RaftConsensusITest : public TabletServerIntegrationTestBase { int64_t* orig_term, string* fell_behind_uuid); + // Retrieve the current term of the first tablet on this tablet server. + Status GetTermMetricValue(ExternalTabletServer* ts, int64_t* term); + shared_ptr<KuduTable> table_; std::vector<scoped_refptr<kudu::Thread> > threads_; CountDownLatch inserters_; }; +Status RaftConsensusITest::GetTermMetricValue(ExternalTabletServer* ts, + int64_t *term) { + return ts->GetInt64Metric(&METRIC_ENTITY_tablet, nullptr, &METRIC_raft_term, "value", term); +} + void RaftConsensusITest::AddFlagsForLogRolls(vector<string>* extra_tserver_flags) { // We configure a small log segment size so that we roll frequently, // configure a small cache size so that we evict data from the cache, and @@ -777,6 +785,7 @@ TEST_F(RaftConsensusITest, TestFollowerFallsBehindLeaderGC) { int64_t orig_term; string follower_uuid; NO_FATALS(CauseFollowerToFallBehindLogGC(&leader_uuid, &orig_term, &follower_uuid)); + SCOPED_TRACE(Substitute("leader: $0 follower: $1", leader_uuid, follower_uuid)); // Wait for remaining majority to agree. TabletServerMap active_tablet_servers = tablet_servers_; @@ -787,12 +796,21 @@ TEST_F(RaftConsensusITest, TestFollowerFallsBehindLeaderGC) { if (AllowSlowTests()) { // Sleep long enough that the "abandoned" server's leader election interval - // will trigger several times. Then, verify that the term has not increased. - // This ensures that the other servers properly ignore the election requests - // from the abandoned node. - // TODO: would be nicer to use an RPC to check the current term of the - // abandoned replica, and wait until it has incremented a couple of times. + // will trigger several times. Then, verify that the term has not increased + // on any of the servers. + // This ensures that the other servers properly reject the pre-election requests + // from the abandoned node, and that the abandoned node doesn't bump its term + // either, since that would cause spurious leader elections upon the node coming back + // to life. SleepFor(MonoDelta::FromSeconds(5)); + + for (int i = 0; i < cluster_->num_tablet_servers(); i++) { + ExternalTabletServer* ts = cluster_->tablet_server(i); + SCOPED_TRACE(ts->uuid()); + int64_t term_from_metric = -1; + ASSERT_OK(GetTermMetricValue(ts, &term_from_metric)); + ASSERT_EQ(term_from_metric, orig_term); + } OpId op_id; TServerDetails* leader = tablet_servers_[leader_uuid]; ASSERT_OK(GetLastOpIdForReplica(tablet_id_, leader, consensus::RECEIVED_OPID, @@ -1193,12 +1211,8 @@ TEST_F(RaftConsensusITest, TestReplicaBehaviorViaRPC) { // Check that the 'term' metric is correctly exposed. { int64_t term_from_metric = -1; - ASSERT_OK(cluster_->tablet_server_by_uuid(replica_ts->uuid())->GetInt64Metric( - &METRIC_ENTITY_tablet, - nullptr, - &METRIC_raft_term, - "value", - &term_from_metric)); + ASSERT_OK(GetTermMetricValue(cluster_->tablet_server_by_uuid(replica_ts->uuid()), + &term_from_metric)); ASSERT_EQ(term_from_metric, 1); } http://git-wip-us.apache.org/repos/asf/kudu/blob/45c1512f/src/kudu/tserver/tablet_service.cc ---------------------------------------------------------------------- diff --git a/src/kudu/tserver/tablet_service.cc b/src/kudu/tserver/tablet_service.cc index 2a5c7cd..740908b 100644 --- a/src/kudu/tserver/tablet_service.cc +++ b/src/kudu/tserver/tablet_service.cc @@ -887,7 +887,8 @@ void ConsensusServiceImpl::RunLeaderElection(const RunLeaderElectionRequestPB* r scoped_refptr<Consensus> consensus; if (!GetConsensusOrRespond(tablet_peer, resp, context, &consensus)) return; Status s = consensus->StartElection( - consensus::Consensus::ELECT_EVEN_IF_LEADER_IS_ALIVE); + consensus::Consensus::ELECT_EVEN_IF_LEADER_IS_ALIVE, + consensus::Consensus::EXTERNAL_REQUEST); if (PREDICT_FALSE(!s.ok())) { SetupErrorAndRespond(resp->mutable_error(), s, TabletServerErrorPB::UNKNOWN_ERROR,
