consensus: Get rid of ReplicaState class Merges the logic in ReplicaState into the RaftConsensus class. ReplicaState adds complexity but doesn't really serve a purpose anymore.
There are no functional changes in this patch. Change-Id: Ie1e62eff37d3f8655100b364939375608063aa80 Reviewed-on: http://gerrit.cloudera.org:8080/7007 Reviewed-by: Todd Lipcon <[email protected]> Tested-by: Mike Percy <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/kudu/repo Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/9e40867c Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/9e40867c Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/9e40867c Branch: refs/heads/master Commit: 9e40867cc9a0d9a7cdc912140a6554eaec5a8d1e Parents: ca2704c Author: Mike Percy <[email protected]> Authored: Fri May 26 19:09:33 2017 -0700 Committer: Mike Percy <[email protected]> Committed: Wed May 31 21:27:42 2017 +0000 ---------------------------------------------------------------------- src/kudu/consensus/CMakeLists.txt | 1 - src/kudu/consensus/consensus.h | 4 +- src/kudu/consensus/raft_consensus.cc | 733 +++++++++++++------ src/kudu/consensus/raft_consensus.h | 221 +++++- .../consensus/raft_consensus_quorum-test.cc | 25 +- src/kudu/consensus/raft_consensus_state.cc | 360 --------- src/kudu/consensus/raft_consensus_state.h | 232 ------ 7 files changed, 700 insertions(+), 876 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kudu/blob/9e40867c/src/kudu/consensus/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/src/kudu/consensus/CMakeLists.txt b/src/kudu/consensus/CMakeLists.txt index 929fb16..e93ea94 100644 --- a/src/kudu/consensus/CMakeLists.txt +++ b/src/kudu/consensus/CMakeLists.txt @@ -106,7 +106,6 @@ set(CONSENSUS_SRCS pending_rounds.cc quorum_util.cc raft_consensus.cc - raft_consensus_state.cc time_manager.cc ) http://git-wip-us.apache.org/repos/asf/kudu/blob/9e40867c/src/kudu/consensus/consensus.h ---------------------------------------------------------------------- diff --git a/src/kudu/consensus/consensus.h b/src/kudu/consensus/consensus.h index 7084aff..59910be 100644 --- a/src/kudu/consensus/consensus.h +++ b/src/kudu/consensus/consensus.h @@ -258,10 +258,10 @@ class Consensus : public RefCountedThreadSafe<Consensus> { virtual RaftPeerPB::Role role() const = 0; // Returns the uuid of this peer. - virtual std::string peer_uuid() const = 0; + virtual const std::string& peer_uuid() const = 0; // Returns the id of the tablet whose updates this consensus instance helps coordinate. - virtual std::string tablet_id() const = 0; + virtual const std::string& tablet_id() const = 0; virtual scoped_refptr<TimeManager> time_manager() const = 0; http://git-wip-us.apache.org/repos/asf/kudu/blob/9e40867c/src/kudu/consensus/raft_consensus.cc ---------------------------------------------------------------------- diff --git a/src/kudu/consensus/raft_consensus.cc b/src/kudu/consensus/raft_consensus.cc index 363478b..c5370c7 100644 --- a/src/kudu/consensus/raft_consensus.cc +++ b/src/kudu/consensus/raft_consensus.cc @@ -32,7 +32,6 @@ #include "kudu/consensus/metadata.pb.h" #include "kudu/consensus/peer_manager.h" #include "kudu/consensus/quorum_util.h" -#include "kudu/consensus/raft_consensus_state.h" #include "kudu/gutil/map-util.h" #include "kudu/gutil/stl_util.h" #include "kudu/gutil/stringprintf.h" @@ -161,7 +160,7 @@ using tserver::TabletServerErrorPB; static const char* const kTimerId = "election-timer"; scoped_refptr<RaftConsensus> RaftConsensus::Create( - const ConsensusOptions& options, + ConsensusOptions options, unique_ptr<ConsensusMetadata> cmeta, const RaftPeerPB& local_peer_pb, const scoped_refptr<MetricEntity>& metric_entity, @@ -201,7 +200,7 @@ scoped_refptr<RaftConsensus> RaftConsensus::Create( log)); return make_scoped_refptr(new RaftConsensus( - options, + std::move(options), std::move(cmeta), std::move(rpc_factory), std::move(queue), @@ -217,27 +216,31 @@ scoped_refptr<RaftConsensus> RaftConsensus::Create( } RaftConsensus::RaftConsensus( - const ConsensusOptions& options, + ConsensusOptions options, unique_ptr<ConsensusMetadata> cmeta, gscoped_ptr<PeerProxyFactory> peer_proxy_factory, gscoped_ptr<PeerMessageQueue> queue, gscoped_ptr<PeerManager> peer_manager, gscoped_ptr<ThreadPool> thread_pool, const scoped_refptr<MetricEntity>& metric_entity, - const std::string& peer_uuid, + std::string peer_uuid, scoped_refptr<TimeManager> time_manager, ReplicaTransactionFactory* txn_factory, const scoped_refptr<log::Log>& log, shared_ptr<MemTracker> parent_mem_tracker, Callback<void(const std::string& reason)> mark_dirty_clbk) - : thread_pool_(std::move(thread_pool)), - log_(log), - time_manager_(std::move(time_manager)), - peer_proxy_factory_(std::move(peer_proxy_factory)), - txn_factory_(txn_factory), - peer_manager_(std::move(peer_manager)), - queue_(std::move(queue)), - pending_(Substitute("T $0 P $1: ", options.tablet_id, peer_uuid), time_manager_), + : options_(std::move(options)), + peer_uuid_(std::move(peer_uuid)), + state_(kInitialized), + cmeta_(DCHECK_NOTNULL(std::move(cmeta))), + thread_pool_(DCHECK_NOTNULL(std::move(thread_pool))), + log_(DCHECK_NOTNULL(log)), + time_manager_(DCHECK_NOTNULL(std::move(time_manager))), + peer_proxy_factory_(DCHECK_NOTNULL(std::move(peer_proxy_factory))), + txn_factory_(DCHECK_NOTNULL(txn_factory)), + peer_manager_(DCHECK_NOTNULL(std::move(peer_manager))), + queue_(DCHECK_NOTNULL(std::move(queue))), + pending_(Substitute("T $0 P $1: ", options_.tablet_id, peer_uuid_), time_manager_), rng_(GetRandomSeed32()), failure_monitor_(GetRandomSeed32(), GetFailureMonitorCheckMeanMs(), GetFailureMonitorCheckStddevMs()), @@ -253,12 +256,8 @@ RaftConsensus::RaftConsensus( follower_memory_pressure_rejections_(metric_entity->FindOrCreateCounter( &METRIC_follower_memory_pressure_rejections)), term_metric_(metric_entity->FindOrCreateGauge(&METRIC_raft_term, - cmeta->current_term())), + cmeta_->current_term())), parent_mem_tracker_(std::move(parent_mem_tracker)) { - DCHECK(log_); - state_.reset(new ReplicaState(options, - peer_uuid, - std::move(cmeta))); } RaftConsensus::~RaftConsensus() { @@ -275,21 +274,31 @@ Status RaftConsensus::Start(const ConsensusBootstrapInfo& info) { // We still have not enabled failure detection for the leader election timer. // That happens separately via the helper functions // EnsureFailureDetector(Enabled/Disabled)Unlocked(); - RETURN_NOT_OK(failure_monitor_.MonitorFailureDetector(state_->GetOptions().tablet_id, + RETURN_NOT_OK(failure_monitor_.MonitorFailureDetector(options_.tablet_id, failure_detector_)); { - ReplicaState::UniqueLock lock; - RETURN_NOT_OK(state_->LockForStart(&lock)); - state_->ClearLeaderUnlocked(); + UniqueLock lock; + RETURN_NOT_OK(LockForStart(&lock)); + ClearLeaderUnlocked(); + + // Our last persisted term can be higher than the last persisted operation + // (i.e. if we called an election) but reverse should never happen. + if (info.last_id.term() > GetCurrentTermUnlocked()) { + return Status::Corruption(Substitute("Unable to start RaftConsensus: " + "The last op in the WAL with id $0 has a term ($1) that is greater " + "than the latest recorded term, which is $2", + OpIdToString(info.last_id), + info.last_id.term(), + GetCurrentTermUnlocked())); + } - RETURN_NOT_OK_PREPEND(state_->StartUnlocked(info.last_id), - "Unable to start Raft ReplicaState"); + state_ = kRunning; LOG_WITH_PREFIX_UNLOCKED(INFO) << "Replica starting. Triggering " << info.orphaned_replicates.size() << " pending transactions. Active config: " - << SecureShortDebugString(state_->GetActiveConfigUnlocked()); + << SecureShortDebugString(GetActiveConfigUnlocked()); for (ReplicateMsg* replicate : info.orphaned_replicates) { ReplicateRefPtr replicate_ptr = make_scoped_refptr_replicate(new ReplicateMsg(*replicate)); RETURN_NOT_OK(StartReplicaTransactionUnlocked(replicate_ptr)); @@ -301,14 +310,14 @@ Status RaftConsensus::Start(const ConsensusBootstrapInfo& info) { } { - ReplicaState::UniqueLock lock; - RETURN_NOT_OK(state_->LockForConfigChange(&lock)); + UniqueLock lock; + RETURN_NOT_OK(LockForConfigChange(&lock)); RETURN_NOT_OK(EnsureFailureDetectorEnabledUnlocked()); // If this is the first term expire the FD immediately so that we have a fast first // election, otherwise we just let the timer expire normally. - if (state_->GetCurrentTermUnlocked() == 0) { + if (GetCurrentTermUnlocked() == 0) { // Initialize the failure detector timeout to some time in the past so that // the next time the failure detector monitor runs it triggers an election // (unless someone else requested a vote from us first, which resets the @@ -341,21 +350,21 @@ Status RaftConsensus::Start(const ConsensusBootstrapInfo& info) { } bool RaftConsensus::IsRunning() const { - ReplicaState::UniqueLock lock; - Status s = state_->LockForRead(&lock); + UniqueLock lock; + Status s = LockForRead(&lock); if (PREDICT_FALSE(!s.ok())) return false; - return state_->state() == ReplicaState::kRunning; + return state_ == kRunning; } Status RaftConsensus::EmulateElection() { - ReplicaState::UniqueLock lock; - RETURN_NOT_OK(state_->LockForConfigChange(&lock)); + UniqueLock lock; + RETURN_NOT_OK(LockForConfigChange(&lock)); LOG_WITH_PREFIX_UNLOCKED(INFO) << "Emulating election..."; // Assume leadership of new term. - RETURN_NOT_OK(HandleTermAdvanceUnlocked(state_->GetCurrentTermUnlocked() + 1)); - SetLeaderUuidUnlocked(state_->GetPeerUuid()); + RETURN_NOT_OK(HandleTermAdvanceUnlocked(GetCurrentTermUnlocked() + 1)); + SetLeaderUuidUnlocked(peer_uuid_); return BecomeLeaderUnlocked(); } @@ -391,14 +400,14 @@ Status RaftConsensus::StartElection(ElectionMode mode, ElectionReason reason) { const char* mode_str = ModeString(mode); TRACE_EVENT2("consensus", "RaftConsensus::StartElection", - "peer", state_->LogPrefixThreadSafe(), + "peer", LogPrefixThreadSafe(), "mode", mode_str); scoped_refptr<LeaderElection> election; { - ReplicaState::UniqueLock lock; - RETURN_NOT_OK(state_->LockForConfigChange(&lock)); + UniqueLock lock; + RETURN_NOT_OK(LockForConfigChange(&lock)); - RaftPeerPB::Role active_role = state_->GetActiveRoleUnlocked(); + RaftPeerPB::Role active_role = GetActiveRoleUnlocked(); if (active_role == RaftPeerPB::LEADER) { LOG_WITH_PREFIX_UNLOCKED(INFO) << "Not starting " << mode << " -- already leader"; return Status::OK(); @@ -407,11 +416,11 @@ Status RaftConsensus::StartElection(ElectionMode mode, ElectionReason reason) { SnoozeFailureDetectorUnlocked(); // Avoid excessive election noise while in this state. return Status::IllegalState("Not starting election: Node is currently " "a non-participant in the raft config", - SecureShortDebugString(state_->GetActiveConfigUnlocked())); + SecureShortDebugString(GetActiveConfigUnlocked())); } LOG_WITH_PREFIX_UNLOCKED(INFO) << "Starting " << mode_str - << " (" << ReasonString(reason, state_->GetLeaderUuidUnlocked()) << ")"; + << " (" << ReasonString(reason, GetLeaderUuidUnlocked()) << ")"; // Snooze to avoid the election timer firing again as much as possible. // We do not disable the election timer while running an election, so that @@ -427,12 +436,12 @@ Status RaftConsensus::StartElection(ElectionMode mode, ElectionReason reason) { // We skip flushing the term to disk because setting the vote just below also // flushes to disk, and the double fsync doesn't buy us anything. - RETURN_NOT_OK(HandleTermAdvanceUnlocked(state_->GetCurrentTermUnlocked() + 1, - ReplicaState::SKIP_FLUSH_TO_DISK)); - RETURN_NOT_OK(state_->SetVotedForCurrentTermUnlocked(state_->GetPeerUuid())); + RETURN_NOT_OK(HandleTermAdvanceUnlocked(GetCurrentTermUnlocked() + 1, + SKIP_FLUSH_TO_DISK)); + RETURN_NOT_OK(SetVotedForCurrentTermUnlocked(peer_uuid_)); } - const RaftConfigPB& active_config = state_->GetActiveConfigUnlocked(); + const RaftConfigPB& active_config = GetActiveConfigUnlocked(); LOG_WITH_PREFIX_UNLOCKED(INFO) << "Starting " << mode_str << " with config: " << SecureShortDebugString(active_config); @@ -443,23 +452,23 @@ Status RaftConsensus::StartElection(ElectionMode mode, ElectionReason reason) { // Vote for ourselves. bool duplicate; - RETURN_NOT_OK(counter->RegisterVote(state_->GetPeerUuid(), VOTE_GRANTED, &duplicate)); - CHECK(!duplicate) << state_->LogPrefixUnlocked() + RETURN_NOT_OK(counter->RegisterVote(peer_uuid_, VOTE_GRANTED, &duplicate)); + CHECK(!duplicate) << LogPrefixUnlocked() << "Inexplicable duplicate self-vote for term " - << state_->GetCurrentTermUnlocked(); + << GetCurrentTermUnlocked(); VoteRequestPB request; request.set_ignore_live_leader(mode == ELECT_EVEN_IF_LEADER_IS_ALIVE); - request.set_candidate_uuid(state_->GetPeerUuid()); + request.set_candidate_uuid(peer_uuid_); if (mode == PRE_ELECTION) { // In a pre-election, we haven't bumped our own term yet, so we need to be // asking for votes for the next term. request.set_is_pre_election(true); - request.set_candidate_term(state_->GetCurrentTermUnlocked() + 1); + request.set_candidate_term(GetCurrentTermUnlocked() + 1); } else { - request.set_candidate_term(state_->GetCurrentTermUnlocked()); + request.set_candidate_term(GetCurrentTermUnlocked()); } - request.set_tablet_id(state_->GetOptions().tablet_id); + request.set_tablet_id(options_.tablet_id); *request.mutable_candidate_status()->mutable_last_received() = queue_->GetLastOpIdInLog(); @@ -480,7 +489,8 @@ Status RaftConsensus::WaitUntilLeaderForTests(const MonoDelta& timeout) { while (role() != consensus::RaftPeerPB::LEADER) { if (MonoTime::Now() >= deadline) { return Status::TimedOut(Substitute("Peer $0 is not leader of tablet $1 after $2. Role: $3", - peer_uuid(), tablet_id(), timeout.ToString(), role())); + peer_uuid_, options_.tablet_id, timeout.ToString(), + role())); } SleepFor(MonoDelta::FromMilliseconds(10)); } @@ -489,9 +499,9 @@ Status RaftConsensus::WaitUntilLeaderForTests(const MonoDelta& timeout) { Status RaftConsensus::StepDown(LeaderStepDownResponsePB* resp) { TRACE_EVENT0("consensus", "RaftConsensus::StepDown"); - ReplicaState::UniqueLock lock; - RETURN_NOT_OK(state_->LockForConfigChange(&lock)); - if (state_->GetActiveRoleUnlocked() != RaftPeerPB::LEADER) { + UniqueLock lock; + RETURN_NOT_OK(LockForConfigChange(&lock)); + if (GetActiveRoleUnlocked() != RaftPeerPB::LEADER) { resp->mutable_error()->set_code(TabletServerErrorPB::NOT_THE_LEADER); StatusToPB(Status::IllegalState("Not currently leader"), resp->mutable_error()->mutable_status()); @@ -514,9 +524,9 @@ void RaftConsensus::ReportFailureDetected(const std::string& name, const Status& Status RaftConsensus::BecomeLeaderUnlocked() { TRACE_EVENT2("consensus", "RaftConsensus::BecomeLeaderUnlocked", - "peer", peer_uuid(), - "tablet", tablet_id()); - LOG_WITH_PREFIX_UNLOCKED(INFO) << "Becoming Leader. State: " << state_->ToStringUnlocked(); + "peer", peer_uuid_, + "tablet", options_.tablet_id); + LOG_WITH_PREFIX_UNLOCKED(INFO) << "Becoming Leader. State: " << ToStringUnlocked(); // Disable FD while we are leader. RETURN_NOT_OK(EnsureFailureDetectorDisabledUnlocked()); @@ -547,9 +557,9 @@ Status RaftConsensus::BecomeLeaderUnlocked() { Status RaftConsensus::BecomeReplicaUnlocked() { LOG_WITH_PREFIX_UNLOCKED(INFO) << "Becoming Follower/Learner. State: " - << state_->ToStringUnlocked(); + << ToStringUnlocked(); - state_->ClearLeaderUnlocked(); + ClearLeaderUnlocked(); // FD should be running while we are a follower. RETURN_NOT_OK(EnsureFailureDetectorEnabledUnlocked()); @@ -570,9 +580,9 @@ Status RaftConsensus::Replicate(const scoped_refptr<ConsensusRound>& round) { std::lock_guard<simple_spinlock> lock(update_lock_); { - ReplicaState::UniqueLock lock; - RETURN_NOT_OK(state_->LockForReplicate(&lock, *round->replicate_msg())); - RETURN_NOT_OK(round->CheckBoundTerm(state_->GetCurrentTermUnlocked())); + UniqueLock lock; + RETURN_NOT_OK(LockForReplicate(&lock, *round->replicate_msg())); + RETURN_NOT_OK(round->CheckBoundTerm(GetCurrentTermUnlocked())); RETURN_NOT_OK(AppendNewRoundToQueueUnlocked(round)); } @@ -581,9 +591,9 @@ Status RaftConsensus::Replicate(const scoped_refptr<ConsensusRound>& round) { } Status RaftConsensus::CheckLeadershipAndBindTerm(const scoped_refptr<ConsensusRound>& round) { - ReplicaState::UniqueLock lock; - RETURN_NOT_OK(state_->LockForReplicate(&lock, *round->replicate_msg())); - round->BindToTerm(state_->GetCurrentTermUnlocked()); + UniqueLock lock; + RETURN_NOT_OK(LockForReplicate(&lock, *round->replicate_msg())); + round->BindToTerm(GetCurrentTermUnlocked()); return Status::OK(); } @@ -615,7 +625,7 @@ Status RaftConsensus::AddPendingOperationUnlocked(const scoped_refptr<ConsensusR const RaftConfigPB& new_config = change_record->new_config(); if (!new_config.unsafe_config_change()) { - Status s = state_->CheckNoConfigChangePendingUnlocked(); + Status s = CheckNoConfigChangePendingUnlocked(); if (PREDICT_FALSE(!s.ok())) { s = s.CloneAndAppend(Substitute("\n New config: $0", SecureShortDebugString(new_config))); LOG_WITH_PREFIX_UNLOCKED(INFO) << s.ToString(); @@ -625,10 +635,10 @@ Status RaftConsensus::AddPendingOperationUnlocked(const scoped_refptr<ConsensusR // Check if the pending Raft config has an OpId less than the committed // config. If so, this is a replay at startup in which the COMMIT // messages were delayed. - const RaftConfigPB& committed_config = state_->GetCommittedConfigUnlocked(); + const RaftConfigPB& committed_config = GetCommittedConfigUnlocked(); if (round->replicate_msg()->id().index() > committed_config.opid_index()) { - RETURN_NOT_OK(state_->SetPendingConfigUnlocked(new_config)); - if (state_->GetActiveRoleUnlocked() == RaftPeerPB::LEADER) { + RETURN_NOT_OK(SetPendingConfigUnlocked(new_config)); + if (GetActiveRoleUnlocked() == RaftPeerPB::LEADER) { RETURN_NOT_OK(RefreshConsensusQueueAndPeersUnlocked()); } } else { @@ -645,10 +655,10 @@ Status RaftConsensus::AddPendingOperationUnlocked(const scoped_refptr<ConsensusR } void RaftConsensus::NotifyCommitIndex(int64_t commit_index) { - ReplicaState::UniqueLock lock; - Status s = state_->LockForCommit(&lock); + UniqueLock lock; + Status s = LockForCommit(&lock); if (PREDICT_FALSE(!s.ok())) { - LOG(WARNING) << state_->LogPrefixThreadSafe() + LOG(WARNING) << LogPrefixThreadSafe() << "Unable to take state lock to update committed index: " << s.ToString(); return; @@ -656,16 +666,16 @@ void RaftConsensus::NotifyCommitIndex(int64_t commit_index) { pending_.AdvanceCommittedIndex(commit_index); - if (state_->GetActiveRoleUnlocked() == RaftPeerPB::LEADER) { + if (GetActiveRoleUnlocked() == RaftPeerPB::LEADER) { peer_manager_->SignalRequest(false); } } void RaftConsensus::NotifyTermChange(int64_t term) { - ReplicaState::UniqueLock lock; - Status s = state_->LockForConfigChange(&lock); + UniqueLock lock; + Status s = LockForConfigChange(&lock); if (PREDICT_FALSE(!s.ok())) { - LOG(WARNING) << state_->LogPrefixThreadSafe() << "Unable to lock ReplicaState for term change" + LOG(WARNING) << LogPrefixThreadSafe() << "Unable to lock consensus for term change" << " when notified of new term " << term << ": " << s.ToString(); return; } @@ -680,22 +690,22 @@ void RaftConsensus::NotifyFailedFollower(const string& uuid, uuid, term, reason); if (!FLAGS_evict_failed_followers) { - LOG(INFO) << state_->LogPrefixThreadSafe() << fail_msg + LOG(INFO) << LogPrefixThreadSafe() << fail_msg << "Eviction of failed followers is disabled. Doing nothing."; return; } RaftConfigPB committed_config; { - ReplicaState::UniqueLock lock; - Status s = state_->LockForRead(&lock); + UniqueLock lock; + Status s = LockForRead(&lock); if (PREDICT_FALSE(!s.ok())) { - LOG(WARNING) << state_->LogPrefixThreadSafe() << fail_msg - << "Unable to lock ReplicaState for read: " << s.ToString(); + LOG(WARNING) << LogPrefixThreadSafe() << fail_msg + << "Unable to lock consensus for read: " << s.ToString(); return; } - int64_t current_term = state_->GetCurrentTermUnlocked(); + int64_t current_term = GetCurrentTermUnlocked(); if (current_term != term) { LOG_WITH_PREFIX_UNLOCKED(INFO) << fail_msg << "Notified about a follower failure in " << "previous term " << term << ", but a leader election " @@ -704,34 +714,34 @@ void RaftConsensus::NotifyFailedFollower(const string& uuid, return; } - if (state_->IsConfigChangePendingUnlocked()) { + if (IsConfigChangePendingUnlocked()) { LOG_WITH_PREFIX_UNLOCKED(INFO) << fail_msg << "There is already a config change operation " << "in progress. Unable to evict follower until it completes. " << "Doing nothing."; return; } - committed_config = state_->GetCommittedConfigUnlocked(); + committed_config = GetCommittedConfigUnlocked(); } - // Run config change on thread pool after dropping ReplicaState lock. + // Run config change on thread pool after dropping lock. WARN_NOT_OK(thread_pool_->SubmitClosure(Bind(&RaftConsensus::TryRemoveFollowerTask, this, uuid, committed_config, reason)), - state_->LogPrefixThreadSafe() + "Unable to start RemoteFollowerTask"); + LogPrefixThreadSafe() + "Unable to start RemoteFollowerTask"); } void RaftConsensus::TryRemoveFollowerTask(const string& uuid, const RaftConfigPB& committed_config, const std::string& reason) { ChangeConfigRequestPB req; - req.set_tablet_id(tablet_id()); + req.set_tablet_id(options_.tablet_id); req.mutable_server()->set_permanent_uuid(uuid); req.set_type(REMOVE_SERVER); req.set_cas_config_opid_index(committed_config.opid_index()); - LOG(INFO) << state_->LogPrefixThreadSafe() << "Attempting to remove follower " + LOG(INFO) << LogPrefixThreadSafe() << "Attempting to remove follower " << uuid << " from the Raft config. Reason: " << reason; boost::optional<TabletServerErrorPB::Code> error_code; WARN_NOT_OK(ChangeConfig(req, Bind(&DoNothingStatusCB), &error_code), - state_->LogPrefixThreadSafe() + "Unable to remove follower " + uuid); + LogPrefixThreadSafe() + "Unable to remove follower " + uuid); } Status RaftConsensus::Update(const ConsensusRequestPB* request, @@ -743,7 +753,7 @@ Status RaftConsensus::Update(const ConsensusRequestPB* request, "is set to true."); } - response->set_responder_uuid(state_->GetPeerUuid()); + response->set_responder_uuid(peer_uuid_); VLOG_WITH_PREFIX(2) << "Replica received request: " << SecureShortDebugString(*request); @@ -753,7 +763,7 @@ Status RaftConsensus::Update(const ConsensusRequestPB* request, if (PREDICT_FALSE(VLOG_IS_ON(1))) { if (request->ops_size() == 0) { VLOG_WITH_PREFIX(1) << "Replica replied to status only request. Replica: " - << state_->ToString() << ". Response: " + << ToString() << ". Response: " << SecureShortDebugString(*response); } } @@ -784,10 +794,10 @@ Status RaftConsensus::StartReplicaTransactionUnlocked(const ReplicateRefPtr& msg } Status RaftConsensus::IsSingleVoterConfig(bool* single_voter) const { - ReplicaState::UniqueLock lock; - RETURN_NOT_OK(state_->LockForRead(&lock)); - const RaftConfigPB& config = state_->GetCommittedConfigUnlocked(); - const string& uuid = state_->GetPeerUuid(); + UniqueLock lock; + RETURN_NOT_OK(LockForRead(&lock)); + const RaftConfigPB& config = GetCommittedConfigUnlocked(); + const string& uuid = peer_uuid_; if (CountVoters(config) == 1 && IsRaftConfigVoter(uuid, config)) { *single_voter = true; } else { @@ -876,16 +886,16 @@ void RaftConsensus::DeduplicateLeaderRequestUnlocked(ConsensusRequestPB* rpc_req Status RaftConsensus::HandleLeaderRequestTermUnlocked(const ConsensusRequestPB* request, ConsensusResponsePB* response) { // Do term checks first: - if (PREDICT_FALSE(request->caller_term() != state_->GetCurrentTermUnlocked())) { + if (PREDICT_FALSE(request->caller_term() != GetCurrentTermUnlocked())) { // If less, reject. - if (request->caller_term() < state_->GetCurrentTermUnlocked()) { + if (request->caller_term() < GetCurrentTermUnlocked()) { string msg = Substitute("Rejecting Update request from peer $0 for earlier term $1. " "Current term is $2. Ops: $3", request->caller_uuid(), request->caller_term(), - state_->GetCurrentTermUnlocked(), + GetCurrentTermUnlocked(), OpsRangeString(*request)); LOG_WITH_PREFIX_UNLOCKED(INFO) << msg; FillConsensusResponseError(response, @@ -1019,13 +1029,13 @@ Status RaftConsensus::CheckLeaderRequestUnlocked(const ConsensusRequestPB* reque // the effective leader of the configuration. If they are not currently marked as // the leader locally, mark them as leader now. const string& caller_uuid = request->caller_uuid(); - if (PREDICT_FALSE(state_->HasLeaderUnlocked() && - state_->GetLeaderUuidUnlocked() != caller_uuid)) { + if (PREDICT_FALSE(HasLeaderUnlocked() && + GetLeaderUuidUnlocked() != caller_uuid)) { LOG_WITH_PREFIX_UNLOCKED(FATAL) << "Unexpected new leader in same term! " - << "Existing leader UUID: " << state_->GetLeaderUuidUnlocked() << ", " + << "Existing leader UUID: " << GetLeaderUuidUnlocked() << ", " << "new leader UUID: " << caller_uuid; } - if (PREDICT_FALSE(!state_->HasLeaderUnlocked())) { + if (PREDICT_FALSE(!HasLeaderUnlocked())) { SetLeaderUuidUnlocked(request->caller_uuid()); } @@ -1035,8 +1045,8 @@ Status RaftConsensus::CheckLeaderRequestUnlocked(const ConsensusRequestPB* reque Status RaftConsensus::UpdateReplica(const ConsensusRequestPB* request, ConsensusResponsePB* response) { TRACE_EVENT2("consensus", "RaftConsensus::UpdateReplica", - "peer", peer_uuid(), - "tablet", tablet_id()); + "peer", peer_uuid_, + "tablet", options_.tablet_id); Synchronizer log_synchronizer; StatusCallback sync_status_cb = log_synchronizer.AsStatusCallback(); @@ -1132,8 +1142,8 @@ Status RaftConsensus::UpdateReplica(const ConsensusRequestPB* request, LeaderRequest deduped_req; { - ReplicaState::UniqueLock lock; - RETURN_NOT_OK(state_->LockForUpdate(&lock)); + UniqueLock lock; + RETURN_NOT_OK(LockForUpdate(&lock)); deduped_req.leader_uuid = request->caller_uuid(); @@ -1323,8 +1333,8 @@ Status RaftConsensus::UpdateReplica(const ConsensusRequestPB* request, // If just waiting for our log append to finish lets snooze the timer. // We don't want to fire leader election because we're waiting on our own log. if (s.IsTimedOut()) { - ReplicaState::UniqueLock lock; - RETURN_NOT_OK(state_->LockForRead(&lock)); + UniqueLock lock; + RETURN_NOT_OK(LockForRead(&lock)); SnoozeFailureDetectorUnlocked(); } } while (s.IsTimedOut()); @@ -1333,7 +1343,7 @@ Status RaftConsensus::UpdateReplica(const ConsensusRequestPB* request, TRACE("finished"); } - VLOG_WITH_PREFIX(2) << "Replica updated. " << state_->ToString() + VLOG_WITH_PREFIX(2) << "Replica updated. " << ToString() << ". Request: " << SecureShortDebugString(*request); TRACE("UpdateReplicas() finished"); @@ -1342,7 +1352,7 @@ Status RaftConsensus::UpdateReplica(const ConsensusRequestPB* request, void RaftConsensus::FillConsensusResponseOKUnlocked(ConsensusResponsePB* response) { TRACE("Filling consensus response to leader."); - response->set_responder_term(state_->GetCurrentTermUnlocked()); + response->set_responder_term(GetCurrentTermUnlocked()); response->mutable_status()->mutable_last_received()->CopyFrom( queue_->GetLastOpIdInLog()); response->mutable_status()->mutable_last_received_current_leader()->CopyFrom( @@ -1361,13 +1371,13 @@ void RaftConsensus::FillConsensusResponseError(ConsensusResponsePB* response, Status RaftConsensus::RequestVote(const VoteRequestPB* request, VoteResponsePB* response) { TRACE_EVENT2("consensus", "RaftConsensus::RequestVote", - "peer", peer_uuid(), - "tablet", tablet_id()); - response->set_responder_uuid(state_->GetPeerUuid()); + "peer", peer_uuid_, + "tablet", options_.tablet_id); + response->set_responder_uuid(peer_uuid_); // We must acquire the update lock in order to ensure that this vote action // takes place between requests. - // Lock ordering: The update lock must be acquired before the ReplicaState lock. + // Lock ordering: update_lock_ must be acquired before lock_. std::unique_lock<simple_spinlock> update_guard(update_lock_, std::defer_lock); if (FLAGS_enable_leader_failure_detection) { update_guard.try_lock(); @@ -1383,18 +1393,18 @@ Status RaftConsensus::RequestVote(const VoteRequestPB* request, VoteResponsePB* // timeouts, just vote a quick NO. // // We still need to take the state lock in order to respond with term info, etc. - ReplicaState::UniqueLock state_guard; - RETURN_NOT_OK(state_->LockForConfigChange(&state_guard)); + UniqueLock state_guard; + RETURN_NOT_OK(LockForConfigChange(&state_guard)); return RequestVoteRespondIsBusy(request, response); } // Acquire the replica state lock so we can read / modify the consensus state. - ReplicaState::UniqueLock state_guard; - RETURN_NOT_OK(state_->LockForConfigChange(&state_guard)); + UniqueLock state_guard; + RETURN_NOT_OK(LockForConfigChange(&state_guard)); // If the node is not in the configuration, allow the vote (this is required by Raft) // but log an informational message anyway. - if (!IsRaftConfigMember(request->candidate_uuid(), state_->GetActiveConfigUnlocked())) { + if (!IsRaftConfigMember(request->candidate_uuid(), GetActiveConfigUnlocked())) { LOG_WITH_PREFIX_UNLOCKED(INFO) << "Handling vote request from an unknown peer " << request->candidate_uuid(); } @@ -1421,16 +1431,16 @@ Status RaftConsensus::RequestVote(const VoteRequestPB* request, VoteResponsePB* } // Candidate is running behind. - if (request->candidate_term() < state_->GetCurrentTermUnlocked()) { + if (request->candidate_term() < GetCurrentTermUnlocked()) { return RequestVoteRespondInvalidTerm(request, response); } // We already voted this term. - if (request->candidate_term() == state_->GetCurrentTermUnlocked() && - state_->HasVotedCurrentTermUnlocked()) { + if (request->candidate_term() == GetCurrentTermUnlocked() && + HasVotedCurrentTermUnlocked()) { // Already voted for the same candidate in the current term. - if (state_->GetVotedForCurrentTermUnlocked() == request->candidate_uuid()) { + if (GetVotedForCurrentTermUnlocked() == request->candidate_uuid()) { return RequestVoteRespondVoteAlreadyGranted(request, response); } @@ -1449,14 +1459,14 @@ Status RaftConsensus::RequestVote(const VoteRequestPB* request, VoteResponsePB* // has actually now successfully become leader of the prior term, in which case // bumping our term here would disrupt it. if (!request->is_pre_election() && - request->candidate_term() > state_->GetCurrentTermUnlocked()) { + request->candidate_term() > GetCurrentTermUnlocked()) { // If we are going to vote for this peer, then we will flush the consensus metadata // to disk below when we record the vote, and we can skip flushing the term advancement // to disk here. - auto flush = vote_yes ? ReplicaState::SKIP_FLUSH_TO_DISK : ReplicaState::FLUSH_TO_DISK; + auto flush = vote_yes ? SKIP_FLUSH_TO_DISK : FLUSH_TO_DISK; RETURN_NOT_OK_PREPEND(HandleTermAdvanceUnlocked(request->candidate_term(), flush), Substitute("Could not step down in RequestVote. Current term: $0, candidate term: $1", - state_->GetCurrentTermUnlocked(), request->candidate_term())); + GetCurrentTermUnlocked(), request->candidate_term())); } if (!vote_yes) { @@ -1482,10 +1492,10 @@ Status RaftConsensus::ChangeConfig(const ChangeConfigRequestPB& req, ChangeConfigType type = req.type(); const RaftPeerPB& server = req.server(); { - ReplicaState::UniqueLock lock; - RETURN_NOT_OK(state_->LockForConfigChange(&lock)); - RETURN_NOT_OK(state_->CheckActiveLeaderUnlocked()); - RETURN_NOT_OK(state_->CheckNoConfigChangePendingUnlocked()); + UniqueLock lock; + RETURN_NOT_OK(LockForConfigChange(&lock)); + RETURN_NOT_OK(CheckActiveLeaderUnlocked()); + RETURN_NOT_OK(CheckNoConfigChangePendingUnlocked()); // We are required by Raft to reject config change operations until we have // committed at least one operation in our current term as leader. @@ -1498,7 +1508,7 @@ Status RaftConsensus::ChangeConfig(const ChangeConfigRequestPB& req, return Status::InvalidArgument("server must have permanent_uuid specified", SecureShortDebugString(req)); } - const RaftConfigPB& committed_config = state_->GetCommittedConfigUnlocked(); + const RaftConfigPB& committed_config = GetCommittedConfigUnlocked(); // Support atomic ChangeConfig requests. if (req.has_cas_config_opid_index()) { @@ -1535,13 +1545,13 @@ Status RaftConsensus::ChangeConfig(const ChangeConfigRequestPB& req, break; case REMOVE_SERVER: - if (server_uuid == peer_uuid()) { + if (server_uuid == peer_uuid_) { return Status::InvalidArgument( Substitute("Cannot remove peer $0 from the config because it is the leader. " "Force another leader to be elected to remove this server. " "Consensus state: $1", server_uuid, - SecureShortDebugString(state_->ConsensusStateUnlocked()))); + SecureShortDebugString(ConsensusStateUnlocked()))); } if (!RemoveFromRaftConfig(&new_config, server_uuid)) { return Status::NotFound( @@ -1586,21 +1596,19 @@ Status RaftConsensus::UnsafeChangeConfig(const UnsafeChangeConfigRequestPB& req, int64 last_committed_index; OpId preceding_opid; uint64 msg_timestamp; - string local_peer_uuid; { // Take the snapshot of the replica state and queue state so that // we can stick them in the consensus update request later. - ReplicaState::UniqueLock lock; - RETURN_NOT_OK(state_->LockForRead(&lock)); - local_peer_uuid = state_->GetPeerUuid(); - current_term = state_->GetCurrentTermUnlocked(); - committed_config = state_->GetCommittedConfigUnlocked(); - if (state_->IsConfigChangePendingUnlocked()) { + UniqueLock lock; + RETURN_NOT_OK(LockForRead(&lock)); + current_term = GetCurrentTermUnlocked(); + committed_config = GetCommittedConfigUnlocked(); + if (IsConfigChangePendingUnlocked()) { LOG_WITH_PREFIX_UNLOCKED(WARNING) << "Replica has a pending config, but the new config " << "will be unsafely changed anyway. " << "Currently pending config on the node: " - << SecureShortDebugString(state_->GetPendingConfigUnlocked()); + << SecureShortDebugString(GetPendingConfigUnlocked()); } all_replicated_index = queue_->GetAllReplicatedIndex(); last_committed_index = queue_->GetCommittedIndex(); @@ -1641,13 +1649,13 @@ Status RaftConsensus::UnsafeChangeConfig(const UnsafeChangeConfigRequestPB& req, // Although it is valid for a local replica to not have itself // in the committed config, it is rare and a replica without itself // in the latest config is definitely not caught up with the latest leader's log. - if (!IsRaftConfigVoter(local_peer_uuid, new_config)) { + if (!IsRaftConfigVoter(peer_uuid_, new_config)) { return Status::InvalidArgument(Substitute("Local replica uuid $0 is not " "a VOTER in the new config, " "rejecting the unsafe config " "change request for tablet $1. " "Rejected config: $2" , - local_peer_uuid, req.tablet_id(), + peer_uuid_, req.tablet_id(), SecureShortDebugString(new_config))); } new_config.set_unsafe_config_change(true); @@ -1718,9 +1726,9 @@ void RaftConsensus::Shutdown() { if (shutdown_.Load(kMemOrderAcquire)) return; { - ReplicaState::UniqueLock lock; + UniqueLock lock; // Transition to kShuttingDown state. - CHECK_OK(state_->LockForShutdown(&lock)); + CHECK_OK(LockForShutdown(&lock)); LOG_WITH_PREFIX_UNLOCKED(INFO) << "Raft consensus shutting down."; } @@ -1730,13 +1738,12 @@ void RaftConsensus::Shutdown() { // We must close the queue after we close the peers. queue_->Close(); - { - ReplicaState::UniqueLock lock; - CHECK_OK(state_->LockForShutdown(&lock)); - CHECK_EQ(ReplicaState::kShuttingDown, state_->state()); + UniqueLock lock; + CHECK_OK(LockForShutdown(&lock)); + CHECK_EQ(kShuttingDown, state_); CHECK_OK(pending_.CancelPendingTransactions()); - CHECK_OK(state_->ShutdownUnlocked()); + state_ = kShutDown; LOG_WITH_PREFIX_UNLOCKED(INFO) << "Raft consensus is shut down!"; } @@ -1772,25 +1779,25 @@ Status RaftConsensus::StartConsensusOnlyRoundUnlocked(const ReplicateRefPtr& msg } Status RaftConsensus::AdvanceTermForTests(int64_t new_term) { - ReplicaState::UniqueLock lock; - CHECK_OK(state_->LockForConfigChange(&lock)); + UniqueLock lock; + CHECK_OK(LockForConfigChange(&lock)); return HandleTermAdvanceUnlocked(new_term); } std::string RaftConsensus::GetRequestVoteLogPrefixUnlocked(const VoteRequestPB& request) const { return Substitute("$0Leader $1election vote request", - state_->LogPrefixUnlocked(), + LogPrefixUnlocked(), request.is_pre_election() ? "pre-" : ""); } void RaftConsensus::FillVoteResponseVoteGranted(VoteResponsePB* response) { - response->set_responder_term(state_->GetCurrentTermUnlocked()); + response->set_responder_term(GetCurrentTermUnlocked()); response->set_vote_granted(true); } void RaftConsensus::FillVoteResponseVoteDenied(ConsensusErrorPB::Code error_code, VoteResponsePB* response) { - response->set_responder_term(state_->GetCurrentTermUnlocked()); + response->set_responder_term(GetCurrentTermUnlocked()); response->set_vote_granted(false); response->mutable_consensus_error()->set_code(error_code); } @@ -1803,7 +1810,7 @@ Status RaftConsensus::RequestVoteRespondInvalidTerm(const VoteRequestPB* request GetRequestVoteLogPrefixUnlocked(*request), request->candidate_uuid(), request->candidate_term(), - state_->GetCurrentTermUnlocked()); + GetCurrentTermUnlocked()); LOG(INFO) << msg; StatusToPB(Status::InvalidArgument(msg), response->mutable_consensus_error()->mutable_status()); return Status::OK(); @@ -1827,8 +1834,8 @@ Status RaftConsensus::RequestVoteRespondAlreadyVotedForOther(const VoteRequestPB "Already voted for candidate $3 in this term.", GetRequestVoteLogPrefixUnlocked(*request), request->candidate_uuid(), - state_->GetCurrentTermUnlocked(), - state_->GetVotedForCurrentTermUnlocked()); + GetCurrentTermUnlocked(), + GetVotedForCurrentTermUnlocked()); LOG(INFO) << msg; StatusToPB(Status::InvalidArgument(msg), response->mutable_consensus_error()->mutable_status()); return Status::OK(); @@ -1890,7 +1897,7 @@ Status RaftConsensus::RequestVoteRespondVoteGranted(const VoteRequestPB* request if (!request->is_pre_election()) { // Persist our vote to disk. - RETURN_NOT_OK(state_->SetVotedForCurrentTermUnlocked(request->candidate_uuid())); + RETURN_NOT_OK(SetVotedForCurrentTermUnlocked(request->candidate_uuid())); } FillVoteResponseVoteGranted(response); @@ -1902,38 +1909,30 @@ Status RaftConsensus::RequestVoteRespondVoteGranted(const VoteRequestPB* request LOG(INFO) << Substitute("$0: Granting yes vote for candidate $1 in term $2.", GetRequestVoteLogPrefixUnlocked(*request), request->candidate_uuid(), - state_->GetCurrentTermUnlocked()); + GetCurrentTermUnlocked()); return Status::OK(); } RaftPeerPB::Role RaftConsensus::role() const { - ReplicaState::UniqueLock lock; - CHECK_OK(state_->LockForRead(&lock)); - return state_->GetActiveRoleUnlocked(); -} - -std::string RaftConsensus::LogPrefixUnlocked() { - return state_->LogPrefixUnlocked(); -} - -std::string RaftConsensus::LogPrefix() { - return state_->LogPrefix(); + UniqueLock lock; + CHECK_OK(LockForRead(&lock)); + return GetActiveRoleUnlocked(); } void RaftConsensus::SetLeaderUuidUnlocked(const string& uuid) { + DCHECK(lock_.is_locked()); failed_elections_since_stable_leader_ = 0; - state_->SetLeaderUuidUnlocked(uuid); - MarkDirty("New leader " + uuid); + cmeta_->set_leader_uuid(uuid); + MarkDirty(Substitute("New leader $0", uuid)); } - Status RaftConsensus::ReplicateConfigChangeUnlocked(const RaftConfigPB& old_config, const RaftConfigPB& new_config, const StatusCallback& client_cb) { auto cc_replicate = new ReplicateMsg(); cc_replicate->set_op_type(CHANGE_CONFIG_OP); ChangeConfigRecordPB* cc_req = cc_replicate->mutable_change_config_record(); - cc_req->set_tablet_id(tablet_id()); + cc_req->set_tablet_id(options_.tablet_id); *cc_req->mutable_old_config() = old_config; *cc_req->mutable_new_config() = new_config; CHECK_OK(time_manager_->AssignTimestamp(cc_replicate)); @@ -1950,8 +1949,8 @@ Status RaftConsensus::ReplicateConfigChangeUnlocked(const RaftConfigPB& old_conf } Status RaftConsensus::RefreshConsensusQueueAndPeersUnlocked() { - DCHECK_EQ(RaftPeerPB::LEADER, state_->GetActiveRoleUnlocked()); - const RaftConfigPB& active_config = state_->GetActiveConfigUnlocked(); + DCHECK_EQ(RaftPeerPB::LEADER, GetActiveRoleUnlocked()); + const RaftConfigPB& active_config = GetActiveConfigUnlocked(); // Change the peers so that we're able to replicate messages remotely and // locally. The peer manager must be closed before updating the active config @@ -1961,46 +1960,46 @@ Status RaftConsensus::RefreshConsensusQueueAndPeersUnlocked() { // TODO(todd): should use queue committed index here? in that case do // we need to pass it in at all? queue_->SetLeaderMode(pending_.GetCommittedIndex(), - state_->GetCurrentTermUnlocked(), + GetCurrentTermUnlocked(), active_config); RETURN_NOT_OK(peer_manager_->UpdateRaftConfig(active_config)); return Status::OK(); } -string RaftConsensus::peer_uuid() const { - return state_->GetPeerUuid(); +const string& RaftConsensus::peer_uuid() const { + return peer_uuid_; } -string RaftConsensus::tablet_id() const { - return state_->GetOptions().tablet_id; +const string& RaftConsensus::tablet_id() const { + return options_.tablet_id; } ConsensusStatePB RaftConsensus::ConsensusState() const { - ReplicaState::UniqueLock lock; - CHECK_OK(state_->LockForRead(&lock)); - return state_->ConsensusStateUnlocked(); + UniqueLock lock; + CHECK_OK(LockForRead(&lock)); + return ConsensusStateUnlocked(); } RaftConfigPB RaftConsensus::CommittedConfig() const { - ReplicaState::UniqueLock lock; - CHECK_OK(state_->LockForRead(&lock)); - return state_->GetCommittedConfigUnlocked(); + UniqueLock lock; + CHECK_OK(LockForRead(&lock)); + return GetCommittedConfigUnlocked(); } void RaftConsensus::DumpStatusHtml(std::ostream& out) const { out << "<h1>Raft Consensus State</h1>" << std::endl; out << "<h2>State</h2>" << std::endl; - out << "<pre>" << EscapeForHtmlToString(state_->ToString()) << "</pre>" << std::endl; + out << "<pre>" << EscapeForHtmlToString(ToString()) << "</pre>" << std::endl; out << "<h2>Queue</h2>" << std::endl; out << "<pre>" << EscapeForHtmlToString(queue_->ToString()) << "</pre>" << std::endl; // Dump the queues on a leader. RaftPeerPB::Role role; { - ReplicaState::UniqueLock lock; - CHECK_OK(state_->LockForRead(&lock)); - role = state_->GetActiveRoleUnlocked(); + UniqueLock lock; + CHECK_OK(LockForRead(&lock)); + role = GetActiveRoleUnlocked(); } if (role == RaftPeerPB::LEADER) { out << "<h2>Queue overview</h2>" << std::endl; @@ -2011,17 +2010,13 @@ void RaftConsensus::DumpStatusHtml(std::ostream& out) const { } } -ReplicaState* RaftConsensus::GetReplicaStateForTests() { - return state_.get(); -} - void RaftConsensus::ElectionCallback(ElectionReason reason, const ElectionResult& result) { // The election callback runs on a reactor thread, so we need to defer to our // threadpool. If the threadpool is already shut down for some reason, it's OK -- // we're OK with the callback never running. WARN_NOT_OK(thread_pool_->SubmitClosure(Bind(&RaftConsensus::DoElectionCallback, this, reason, result)), - state_->LogPrefixThreadSafe() + "Unable to run election callback"); + LogPrefixThreadSafe() + "Unable to run election callback"); } void RaftConsensus::DoElectionCallback(ElectionReason reason, const ElectionResult& result) { @@ -2031,8 +2026,8 @@ void RaftConsensus::DoElectionCallback(ElectionReason reason, const ElectionResu // Snooze to avoid the election timer firing again as much as possible. { - ReplicaState::UniqueLock lock; - CHECK_OK(state_->LockForRead(&lock)); + UniqueLock lock; + CHECK_OK(LockForRead(&lock)); // We need to snooze when we win and when we lose: // - When we win because we're about to disable the timer and become leader. // - When we lose or otherwise we can fall into a cycle, where everyone keeps @@ -2056,7 +2051,7 @@ void RaftConsensus::DoElectionCallback(ElectionReason reason, const ElectionResu // because it already voted in term 2. The check below ensures that peer B // will bump to term 2 when it gets the vote rejection, such that its // next pre-election (for term 3) would succeed. - if (result.highest_voter_term > state_->GetCurrentTermUnlocked()) { + if (result.highest_voter_term > GetCurrentTermUnlocked()) { HandleTermAdvanceUnlocked(result.highest_voter_term); } @@ -2069,8 +2064,8 @@ void RaftConsensus::DoElectionCallback(ElectionReason reason, const ElectionResu } // The vote was granted, become leader. - ReplicaState::UniqueLock lock; - Status s = state_->LockForConfigChange(&lock); + UniqueLock lock; + Status s = LockForConfigChange(&lock); if (PREDICT_FALSE(!s.ok())) { LOG_WITH_PREFIX(INFO) << "Received " << election_type << " callback for term " << election_term << " while not running: " @@ -2085,7 +2080,7 @@ void RaftConsensus::DoElectionCallback(ElectionReason reason, const ElectionResu election_started_in_term--; } - if (election_started_in_term != state_->GetCurrentTermUnlocked()) { + if (election_started_in_term != GetCurrentTermUnlocked()) { LOG_WITH_PREFIX_UNLOCKED(INFO) << "Leader " << election_type << " decision vote started in " << "defunct term " << election_started_in_term << ": " @@ -2093,8 +2088,8 @@ void RaftConsensus::DoElectionCallback(ElectionReason reason, const ElectionResu return; } - const RaftConfigPB& active_config = state_->GetActiveConfigUnlocked(); - if (!IsRaftConfigVoter(state_->GetPeerUuid(), active_config)) { + const RaftConfigPB& active_config = GetActiveConfigUnlocked(); + if (!IsRaftConfigVoter(peer_uuid_, active_config)) { LOG_WITH_PREFIX_UNLOCKED(WARNING) << "Leader " << election_type << " decision while not in active config. " << "Result: Term " << election_term << ": " @@ -2103,7 +2098,7 @@ void RaftConsensus::DoElectionCallback(ElectionReason reason, const ElectionResu return; } - if (state_->GetActiveRoleUnlocked() == RaftPeerPB::LEADER) { + if (GetActiveRoleUnlocked() == RaftPeerPB::LEADER) { // If this was a pre-election, it's possible to see the following interleaving: // // 1. Term N (follower): send a real election for term N @@ -2132,7 +2127,7 @@ void RaftConsensus::DoElectionCallback(ElectionReason reason, const ElectionResu "Couldn't start leader election after successful pre-election"); } else { // We won a real election. Convert role to LEADER. - SetLeaderUuidUnlocked(state_->GetPeerUuid()); + SetLeaderUuidUnlocked(peer_uuid_); // TODO(todd): BecomeLeaderUnlocked() can fail due to state checks during shutdown. // It races with the above state check. @@ -2142,8 +2137,8 @@ void RaftConsensus::DoElectionCallback(ElectionReason reason, const ElectionResu } Status RaftConsensus::GetLastOpId(OpIdType type, OpId* id) { - ReplicaState::UniqueLock lock; - RETURN_NOT_OK(state_->LockForRead(&lock)); + UniqueLock lock; + RETURN_NOT_OK(LockForRead(&lock)); if (type == RECEIVED_OPID) { *DCHECK_NOTNULL(id) = queue_->GetLastOpIdInLog(); } else if (type == COMMITTED_OPID) { @@ -2166,7 +2161,7 @@ log::RetentionIndexes RaftConsensus::GetRetentionIndexes() { void RaftConsensus::MarkDirty(const std::string& reason) { WARN_NOT_OK(thread_pool_->SubmitClosure(Bind(mark_dirty_clbk_, reason)), - state_->LogPrefixThreadSafe() + "Unable to run MarkDirty callback"); + LogPrefixThreadSafe() + "Unable to run MarkDirty callback"); } void RaftConsensus::MarkDirtyOnSuccess(const string& reason, @@ -2181,8 +2176,9 @@ void RaftConsensus::MarkDirtyOnSuccess(const string& reason, void RaftConsensus::NonTxRoundReplicationFinished(ConsensusRound* round, const StatusCallback& client_cb, const Status& status) { - // NOTE: the ReplicaState lock is held here because this is triggered by - // ReplicaState's abort or commit paths. + // NOTE: lock_ is held here because this is triggered by + // PendingRounds::AbortOpsAfter() and AdvanceCommittedIndex(). + DCHECK(lock_.is_locked()); OperationType op_type = round->replicate_msg()->op_type(); const string& op_type_str = OperationType_Name(op_type); CHECK(IsConsensusOnlyOperation(op_type)) << "Unexpected op type: " << op_type_str; @@ -2192,13 +2188,16 @@ void RaftConsensus::NonTxRoundReplicationFinished(ConsensusRound* round, // Fall through to the generic handling. } + // TODO(mpercy): May need some refactoring to unlock 'lock_' before invoking + // the client callback. + if (!status.ok()) { - LOG(INFO) << state_->LogPrefixThreadSafe() << op_type_str << " replication failed: " + LOG(INFO) << LogPrefixThreadSafe() << op_type_str << " replication failed: " << status.ToString(); client_cb.Run(status); return; } - VLOG(1) << state_->LogPrefixThreadSafe() << "Committing " << op_type_str << " with op id " + VLOG(1) << LogPrefixThreadSafe() << "Committing " << op_type_str << " with op id " << round->id(); gscoped_ptr<CommitMsg> commit_msg(new CommitMsg); commit_msg->set_op_type(round->replicate_msg()->op_type()); @@ -2216,11 +2215,11 @@ void RaftConsensus::CompleteConfigChangeRoundUnlocked(ConsensusRound* round, con if (!status.ok()) { // If the config change being aborted is the current pending one, abort it. - if (state_->IsConfigChangePendingUnlocked() && - state_->GetPendingConfigUnlocked().opid_index() == op_id.index()) { + if (IsConfigChangePendingUnlocked() && + GetPendingConfigUnlocked().opid_index() == op_id.index()) { LOG_WITH_PREFIX_UNLOCKED(INFO) << "Aborting config change with OpId " << op_id << ": " << status.ToString(); - state_->ClearPendingConfigUnlocked(); + ClearPendingConfigUnlocked(); } else { LOG_WITH_PREFIX_UNLOCKED(INFO) << "Skipping abort of non-pending config change with OpId " @@ -2250,14 +2249,14 @@ void RaftConsensus::CompleteConfigChangeRoundUnlocked(ConsensusRound* round, con // Check if the pending Raft config has an OpId less than the committed // config. If so, this is a replay at startup in which the COMMIT // messages were delayed. - const RaftConfigPB& committed_config = state_->GetCommittedConfigUnlocked(); + const RaftConfigPB& committed_config = GetCommittedConfigUnlocked(); if (new_config.opid_index() > committed_config.opid_index()) { LOG_WITH_PREFIX_UNLOCKED(INFO) << "Committing config change with OpId " << op_id << ": " << DiffRaftConfigs(old_config, new_config) << ". New config: { " << SecureShortDebugString(new_config) << " }"; - CHECK_OK(state_->SetCommittedConfigUnlocked(new_config)); + CHECK_OK(SetCommittedConfigUnlocked(new_config)); } else { LOG_WITH_PREFIX_UNLOCKED(INFO) << "Ignoring commit of config change with OpId " @@ -2347,23 +2346,303 @@ MonoDelta RaftConsensus::LeaderElectionExpBackoffDeltaUnlocked() { } Status RaftConsensus::HandleTermAdvanceUnlocked(ConsensusTerm new_term, - ReplicaState::FlushToDisk flush) { - if (new_term <= state_->GetCurrentTermUnlocked()) { + FlushToDisk flush) { + if (new_term <= GetCurrentTermUnlocked()) { return Status::IllegalState(Substitute("Can't advance term to: $0 current term: $1 is higher.", - new_term, state_->GetCurrentTermUnlocked())); + new_term, GetCurrentTermUnlocked())); } - if (state_->GetActiveRoleUnlocked() == RaftPeerPB::LEADER) { + if (GetActiveRoleUnlocked() == RaftPeerPB::LEADER) { LOG_WITH_PREFIX_UNLOCKED(INFO) << "Stepping down as leader of term " - << state_->GetCurrentTermUnlocked(); + << GetCurrentTermUnlocked(); RETURN_NOT_OK(BecomeReplicaUnlocked()); } LOG_WITH_PREFIX_UNLOCKED(INFO) << "Advancing to term " << new_term; - RETURN_NOT_OK(state_->SetCurrentTermUnlocked(new_term, flush)); + RETURN_NOT_OK(SetCurrentTermUnlocked(new_term, flush)); term_metric_->set_value(new_term); last_received_cur_leader_ = MinimumOpId(); return Status::OK(); } +Status RaftConsensus::LockForStart(UniqueLock* lock) const { + ThreadRestrictions::AssertWaitAllowed(); + UniqueLock l(lock_); + CHECK_EQ(state_, kInitialized) << "Illegal state for Start()." + << " Replica is not in kInitialized state"; + lock->swap(l); + return Status::OK(); +} + +Status RaftConsensus::LockForRead(UniqueLock* lock) const { + ThreadRestrictions::AssertWaitAllowed(); + UniqueLock l(lock_); + lock->swap(l); + return Status::OK(); +} + +Status RaftConsensus::LockForReplicate(UniqueLock* lock, const ReplicateMsg& msg) const { + ThreadRestrictions::AssertWaitAllowed(); + DCHECK(!msg.has_id()) << "Should not have an ID yet: " << SecureShortDebugString(msg); + UniqueLock l(lock_); + if (PREDICT_FALSE(state_ != kRunning)) { + return Status::IllegalState("Replica not in running state"); + } + + RETURN_NOT_OK(CheckActiveLeaderUnlocked()); + lock->swap(l); + return Status::OK(); +} + +Status RaftConsensus::LockForCommit(UniqueLock* lock) const { + TRACE_EVENT0("consensus", "RaftConsensus::LockForCommit"); + ThreadRestrictions::AssertWaitAllowed(); + UniqueLock l(lock_); + if (PREDICT_FALSE(state_ != kRunning && state_ != kShuttingDown)) { + return Status::IllegalState("Replica not in running state"); + } + lock->swap(l); + return Status::OK(); +} + +Status RaftConsensus::LockForUpdate(UniqueLock* lock) const { + TRACE_EVENT0("consensus", "RaftConsensus::LockForUpdate"); + ThreadRestrictions::AssertWaitAllowed(); + UniqueLock l(lock_); + if (PREDICT_FALSE(state_ != kRunning)) { + return Status::IllegalState("Replica not in running state"); + } + if (!IsRaftConfigVoter(peer_uuid_, cmeta_->active_config())) { + LOG_WITH_PREFIX_UNLOCKED(INFO) << "Allowing update even though not a member of the config"; + } + lock->swap(l); + return Status::OK(); +} + +Status RaftConsensus::LockForConfigChange(UniqueLock* lock) const { + TRACE_EVENT0("consensus", "RaftConsensus::LockForConfigChange"); + + ThreadRestrictions::AssertWaitAllowed(); + UniqueLock l(lock_); + // Can only change the config on running replicas. + if (PREDICT_FALSE(state_ != kRunning)) { + return Status::IllegalState("Unable to lock ReplicaState for config change", + Substitute("State = $0", state_)); + } + lock->swap(l); + return Status::OK(); +} + +Status RaftConsensus::LockForShutdown(UniqueLock* lock) { + TRACE_EVENT0("consensus", "RaftConsensus::LockForShutdown"); + ThreadRestrictions::AssertWaitAllowed(); + UniqueLock l(lock_); + if (state_ != kShuttingDown && state_ != kShutDown) { + state_ = kShuttingDown; + } + lock->swap(l); + return Status::OK(); +} + +Status RaftConsensus::CheckActiveLeaderUnlocked() const { + RaftPeerPB::Role role = GetActiveRoleUnlocked(); + switch (role) { + case RaftPeerPB::LEADER: + return Status::OK(); + default: + ConsensusStatePB cstate = ConsensusStateUnlocked(); + return Status::IllegalState(Substitute("Replica $0 is not leader of this config. Role: $1. " + "Consensus state: $2", + peer_uuid_, + RaftPeerPB::Role_Name(role), + SecureShortDebugString(cstate))); + } +} + +ConsensusStatePB RaftConsensus::ConsensusStateUnlocked() const { + return cmeta_->ToConsensusStatePB(); +} + +RaftPeerPB::Role RaftConsensus::GetActiveRoleUnlocked() const { + DCHECK(lock_.is_locked()); + return cmeta_->active_role(); +} + +bool RaftConsensus::IsConfigChangePendingUnlocked() const { + DCHECK(lock_.is_locked()); + return cmeta_->has_pending_config(); +} + +Status RaftConsensus::CheckNoConfigChangePendingUnlocked() const { + DCHECK(lock_.is_locked()); + if (IsConfigChangePendingUnlocked()) { + return Status::IllegalState( + Substitute("RaftConfig change currently pending. Only one is allowed at a time.\n" + " Committed config: $0.\n Pending config: $1", + SecureShortDebugString(GetCommittedConfigUnlocked()), + SecureShortDebugString(GetPendingConfigUnlocked()))); + } + return Status::OK(); +} + +Status RaftConsensus::SetPendingConfigUnlocked(const RaftConfigPB& new_config) { + DCHECK(lock_.is_locked()); + RETURN_NOT_OK_PREPEND(VerifyRaftConfig(new_config, PENDING_CONFIG), + "Invalid config to set as pending"); + if (!new_config.unsafe_config_change()) { + CHECK(!cmeta_->has_pending_config()) + << "Attempt to set pending config while another is already pending! " + << "Existing pending config: " << SecureShortDebugString(cmeta_->pending_config()) << "; " + << "Attempted new pending config: " << SecureShortDebugString(new_config); + } else if (cmeta_->has_pending_config()) { + LOG_WITH_PREFIX_UNLOCKED(INFO) + << "Allowing unsafe config change even though there is a pending config! " + << "Existing pending config: " << SecureShortDebugString(cmeta_->pending_config()) << "; " + << "New pending config: " << SecureShortDebugString(new_config); + } + cmeta_->set_pending_config(new_config); + return Status::OK(); +} + +void RaftConsensus::ClearPendingConfigUnlocked() { + cmeta_->clear_pending_config(); +} + +const RaftConfigPB& RaftConsensus::GetPendingConfigUnlocked() const { + DCHECK(lock_.is_locked()); + CHECK(IsConfigChangePendingUnlocked()) << "No pending config"; + return cmeta_->pending_config(); +} + +Status RaftConsensus::SetCommittedConfigUnlocked(const RaftConfigPB& config_to_commit) { + TRACE_EVENT0("consensus", "RaftConsensus::SetCommittedConfigUnlocked"); + DCHECK(lock_.is_locked()); + DCHECK(config_to_commit.IsInitialized()); + RETURN_NOT_OK_PREPEND(VerifyRaftConfig(config_to_commit, COMMITTED_CONFIG), + "Invalid config to set as committed"); + + // Compare committed with pending configuration, ensure that they are the same. + // In the event of an unsafe config change triggered by an administrator, + // it is possible that the config being committed may not match the pending config + // because unsafe config change allows multiple pending configs to exist. + // Therefore we only need to validate that 'config_to_commit' matches the pending config + // if the pending config does not have its 'unsafe_config_change' flag set. + if (IsConfigChangePendingUnlocked()) { + const RaftConfigPB& pending_config = GetPendingConfigUnlocked(); + if (!pending_config.unsafe_config_change()) { + // Quorums must be exactly equal, even w.r.t. peer ordering. + CHECK_EQ(GetPendingConfigUnlocked().SerializeAsString(), + config_to_commit.SerializeAsString()) + << Substitute("New committed config must equal pending config, but does not. " + "Pending config: $0, committed config: $1", + SecureShortDebugString(pending_config), + SecureShortDebugString(config_to_commit)); + } + } + cmeta_->set_committed_config(config_to_commit); + cmeta_->clear_pending_config(); + CHECK_OK(cmeta_->Flush()); + return Status::OK(); +} + +const RaftConfigPB& RaftConsensus::GetCommittedConfigUnlocked() const { + DCHECK(lock_.is_locked()); + return cmeta_->committed_config(); +} + +const RaftConfigPB& RaftConsensus::GetActiveConfigUnlocked() const { + DCHECK(lock_.is_locked()); + return cmeta_->active_config(); +} + +Status RaftConsensus::SetCurrentTermUnlocked(int64_t new_term, + FlushToDisk flush) { + TRACE_EVENT1("consensus", "RaftConsensus::SetCurrentTermUnlocked", + "term", new_term); + DCHECK(lock_.is_locked()); + if (PREDICT_FALSE(new_term <= GetCurrentTermUnlocked())) { + return Status::IllegalState( + Substitute("Cannot change term to a term that is lower than or equal to the current one. " + "Current: $0, Proposed: $1", GetCurrentTermUnlocked(), new_term)); + } + cmeta_->set_current_term(new_term); + cmeta_->clear_voted_for(); + if (flush == FLUSH_TO_DISK) { + CHECK_OK(cmeta_->Flush()); + } + ClearLeaderUnlocked(); + return Status::OK(); +} + +const int64_t RaftConsensus::GetCurrentTermUnlocked() const { + DCHECK(lock_.is_locked()); + return cmeta_->current_term(); +} + +const string& RaftConsensus::GetLeaderUuidUnlocked() const { + DCHECK(lock_.is_locked()); + return cmeta_->leader_uuid(); +} + +const bool RaftConsensus::HasVotedCurrentTermUnlocked() const { + DCHECK(lock_.is_locked()); + return cmeta_->has_voted_for(); +} + +Status RaftConsensus::SetVotedForCurrentTermUnlocked(const std::string& uuid) { + TRACE_EVENT1("consensus", "RaftConsensus::SetVotedForCurrentTermUnlocked", + "uuid", uuid); + DCHECK(lock_.is_locked()); + cmeta_->set_voted_for(uuid); + CHECK_OK(cmeta_->Flush()); + return Status::OK(); +} + +const std::string& RaftConsensus::GetVotedForCurrentTermUnlocked() const { + DCHECK(lock_.is_locked()); + DCHECK(cmeta_->has_voted_for()); + return cmeta_->voted_for(); +} + +const ConsensusOptions& RaftConsensus::GetOptions() const { + return options_; +} + +string RaftConsensus::LogPrefix() const { + UniqueLock lock; + CHECK_OK(LockForRead(&lock)); + return LogPrefixUnlocked(); +} + +string RaftConsensus::LogPrefixUnlocked() const { + DCHECK(lock_.is_locked()); + return Substitute("T $0 P $1 [term $2 $3]: ", + options_.tablet_id, + peer_uuid_, + GetCurrentTermUnlocked(), + RaftPeerPB::Role_Name(GetActiveRoleUnlocked())); +} + +string RaftConsensus::LogPrefixThreadSafe() const { + return Substitute("T $0 P $1: ", + options_.tablet_id, + peer_uuid_); +} + +string RaftConsensus::ToString() const { + ThreadRestrictions::AssertWaitAllowed(); + UniqueLock lock(lock_); + return ToStringUnlocked(); +} + +string RaftConsensus::ToStringUnlocked() const { + DCHECK(lock_.is_locked()); + return Substitute("Replica: $0, State: $1, Role: $2", + peer_uuid_, state_, RaftPeerPB::Role_Name(GetActiveRoleUnlocked())); +} + +ConsensusMetadata* RaftConsensus::consensus_metadata_for_tests() const { + return cmeta_.get(); +} + } // namespace consensus } // namespace kudu http://git-wip-us.apache.org/repos/asf/kudu/blob/9e40867c/src/kudu/consensus/raft_consensus.h ---------------------------------------------------------------------- diff --git a/src/kudu/consensus/raft_consensus.h b/src/kudu/consensus/raft_consensus.h index b2186db..b1badde 100644 --- a/src/kudu/consensus/raft_consensus.h +++ b/src/kudu/consensus/raft_consensus.h @@ -15,8 +15,7 @@ // specific language governing permissions and limitations // under the License. -#ifndef KUDU_CONSENSUS_RAFT_CONSENSUS_H_ -#define KUDU_CONSENSUS_RAFT_CONSENSUS_H_ +#pragma once #include <boost/optional/optional_fwd.hpp> #include <memory> @@ -30,7 +29,6 @@ #include "kudu/consensus/consensus_meta.h" #include "kudu/consensus/consensus_queue.h" #include "kudu/consensus/pending_rounds.h" -#include "kudu/consensus/raft_consensus_state.h" #include "kudu/consensus/time_manager.h" #include "kudu/util/atomic.h" #include "kudu/util/failure_detector.h" @@ -64,8 +62,10 @@ struct ElectionResult; class RaftConsensus : public Consensus, public PeerMessageQueueObserver { public: + typedef std::unique_lock<simple_spinlock> UniqueLock; + static scoped_refptr<RaftConsensus> Create( - const ConsensusOptions& options, + ConsensusOptions options, std::unique_ptr<ConsensusMetadata> cmeta, const RaftPeerPB& local_peer_pb, const scoped_refptr<MetricEntity>& metric_entity, @@ -76,14 +76,14 @@ class RaftConsensus : public Consensus, const std::shared_ptr<MemTracker>& parent_mem_tracker, const Callback<void(const std::string& reason)>& mark_dirty_clbk); - RaftConsensus(const ConsensusOptions& options, + RaftConsensus(ConsensusOptions options, std::unique_ptr<ConsensusMetadata> cmeta, gscoped_ptr<PeerProxyFactory> peer_proxy_factory, gscoped_ptr<PeerMessageQueue> queue, gscoped_ptr<PeerManager> peer_manager, gscoped_ptr<ThreadPool> thread_pool, const scoped_refptr<MetricEntity>& metric_entity, - const std::string& peer_uuid, + std::string peer_uuid, scoped_refptr<TimeManager> time_manager, ReplicaTransactionFactory* txn_factory, const scoped_refptr<log::Log>& log, @@ -133,9 +133,11 @@ class RaftConsensus : public Consensus, RaftPeerPB::Role role() const override; - std::string peer_uuid() const override; + // Thread-safe. + const std::string& peer_uuid() const override; - std::string tablet_id() const override; + // Thread-safe. + const std::string& tablet_id() const override; scoped_refptr<TimeManager> time_manager() const override { return time_manager_; } @@ -150,11 +152,6 @@ class RaftConsensus : public Consensus, // Makes this peer advance it's term (and step down if leader), for tests. Status AdvanceTermForTests(int64_t new_term); - // Returns the replica state for tests. This should never be used outside of - // tests, in particular calling the LockFor* methods on the returned object - // can cause consensus to deadlock. - ReplicaState* GetReplicaStateForTests(); - int update_calls_for_tests() const { return update_calls_for_tests_.Load(); } @@ -177,9 +174,29 @@ class RaftConsensus : public Consensus, log::RetentionIndexes GetRetentionIndexes() override; private: - friend class ReplicaState; friend class RaftConsensusQuorumTest; + FRIEND_TEST(RaftConsensusQuorumTest, TestConsensusContinuesIfAMinorityFallsBehind); + FRIEND_TEST(RaftConsensusQuorumTest, TestConsensusStopsIfAMajorityFallsBehind); + FRIEND_TEST(RaftConsensusQuorumTest, TestLeaderElectionWithQuiescedQuorum); FRIEND_TEST(RaftConsensusQuorumTest, TestReplicasEnforceTheLogMatchingProperty); + FRIEND_TEST(RaftConsensusQuorumTest, TestRequestVote); + + enum State { + // State after the replica is built. + kInitialized, + + // State signaling the replica accepts requests (from clients + // if leader, from leader if follower) + kRunning, + + // State signaling that the replica is shutting down and no longer accepting + // new transactions or commits. + kShuttingDown, + + // State signaling the replica is shut down and does not accept + // any more requests. + kShutDown, + }; // Control whether printing of log messages should be done for a particular // function call. @@ -188,6 +205,12 @@ class RaftConsensus : public Consensus, ALLOW_LOGGING = 1, }; + // Enum for the 'flush' argument to SetCurrentTermUnlocked() below. + enum FlushToDisk { + SKIP_FLUSH_TO_DISK, + FLUSH_TO_DISK, + }; + // Helper struct that contains the messages from the leader that we need to // append to our log, after they've been deduplicated. struct LeaderRequest { @@ -201,10 +224,6 @@ class RaftConsensus : public Consensus, std::string OpsRangeString() const; }; - std::string LogPrefixUnlocked(); - - std::string LogPrefix(); - // Set the leader UUID of the configuration and mark the tablet config dirty for // reporting to the master. void SetLeaderUuidUnlocked(const std::string& uuid); @@ -224,12 +243,12 @@ class RaftConsensus : public Consensus, // Returns OK once the change config transaction that has this peer as leader // has been enqueued, the transaction will complete asynchronously. // - // The ReplicaState must be locked for configuration change before calling. + // 'lock_' must be held for configuration change before calling. Status BecomeLeaderUnlocked(); // Makes the peer become a replica, i.e. a FOLLOWER or a LEARNER. // - // The ReplicaState must be locked for configuration change before calling. + // 'lock_' must be held for configuration change before calling. Status BecomeReplicaUnlocked(); // Updates the state in a replica by storing the received operations in the log @@ -286,7 +305,7 @@ class RaftConsensus : public Consensus, // Raft configuration. Status IsSingleVoterConfig(bool* single_voter) const; - // Return header string for RequestVote log messages. The ReplicaState lock must be held. + // Return header string for RequestVote log messages. 'lock_' must be held. std::string GetRequestVoteLogPrefixUnlocked(const VoteRequestPB& request) const; // Fills the response with the current status, if an update was successful. @@ -386,7 +405,7 @@ class RaftConsensus : public Consensus, // // 'flush' may be used to control whether the term change is flushed to disk. Status HandleTermAdvanceUnlocked(ConsensusTerm new_term, - ReplicaState::FlushToDisk flush = ReplicaState::FLUSH_TO_DISK); + FlushToDisk flush = FLUSH_TO_DISK); // Asynchronously (on thread_pool_) notify the TabletReplica that the consensus configuration // has changed, thus reporting it back to the master. @@ -426,25 +445,162 @@ class RaftConsensus : public Consensus, // type of message it is. // The 'client_cb' will be invoked at the end of this execution. // - // NOTE: this must be called with the ReplicaState lock held. + // NOTE: Must be called while holding 'lock_'. void NonTxRoundReplicationFinished(ConsensusRound* round, const StatusCallback& client_cb, const Status& status); // As a leader, append a new ConsensusRound to the queue. - // Only virtual and protected for mocking purposes. Status AppendNewRoundToQueueUnlocked(const scoped_refptr<ConsensusRound>& round); // As a follower, start a consensus round not associated with a Transaction. - // Only virtual and protected for mocking purposes. Status StartConsensusOnlyRoundUnlocked(const ReplicateRefPtr& msg); - // Add a new pending operation to the ReplicaState, including the special handling + // Add a new pending operation to PendingRounds, including the special handling // necessary if this round contains a configuration change. These rounds must // take effect as soon as they are received, rather than waiting for commitment // (see Diego Ongaro's thesis section 4.1). Status AddPendingOperationUnlocked(const scoped_refptr<ConsensusRound>& round); + // Locks a replica in preparation for StartUnlocked(). Makes + // sure the replica is in kInitialized state. + Status LockForStart(UniqueLock* lock) const WARN_UNUSED_RESULT; + + // Obtains the lock for a state read, does not check state. + Status LockForRead(UniqueLock* lock) const WARN_UNUSED_RESULT; + + // Locks a replica down until the critical section of an append completes, + // i.e. until the replicate message has been assigned an id and placed in + // the log queue. + // This also checks that the replica is in the appropriate + // state (role) to replicate the provided operation, that the operation + // contains a replicate message and is of the appropriate type, and returns + // Status::IllegalState if that is not the case. + Status LockForReplicate(UniqueLock* lock, const ReplicateMsg& msg) const WARN_UNUSED_RESULT; + + // Locks a replica down until the critical section of a commit completes. + // This succeeds for all states since a replica which has initiated + // a Prepare()/Replicate() must eventually commit even if it's state + // has changed after the initial Append()/Update(). + Status LockForCommit(UniqueLock* lock) const WARN_UNUSED_RESULT; + + // Locks a replica down until an the critical section of an update completes. + // Further updates from the same or some other leader will be blocked until + // this completes. This also checks that the replica is in the appropriate + // state (role) to be updated and returns Status::IllegalState if that + // is not the case. + Status LockForUpdate(UniqueLock* lock) const WARN_UNUSED_RESULT; + + Status LockForConfigChange(UniqueLock* lock) const WARN_UNUSED_RESULT; + + // Changes the role to non-participant and returns a lock that can be + // used to make sure no state updates come in until Shutdown() is + // completed. + Status LockForShutdown(UniqueLock* lock) WARN_UNUSED_RESULT; + + // Ensure the local peer is the active leader. + // Returns OK if leader, IllegalState otherwise. + Status CheckActiveLeaderUnlocked() const; + + // Return current consensus state summary. + ConsensusStatePB ConsensusStateUnlocked() const; + + // Returns the currently active Raft role. + RaftPeerPB::Role GetActiveRoleUnlocked() const; + + // Returns true if there is a configuration change currently in-flight but not yet + // committed. + bool IsConfigChangePendingUnlocked() const; + + // Inverse of IsConfigChangePendingUnlocked(): returns OK if there is + // currently *no* configuration change pending, and IllegalState is there *is* a + // configuration change pending. + Status CheckNoConfigChangePendingUnlocked() const; + + // Sets the given configuration as pending commit. Does not persist into the peers + // metadata. In order to be persisted, SetCommittedConfigUnlocked() must be called. + Status SetPendingConfigUnlocked(const RaftConfigPB& new_config) WARN_UNUSED_RESULT; + + // Clear (cancel) the pending configuration. + void ClearPendingConfigUnlocked(); + + // Return the pending configuration, or crash if one is not set. + const RaftConfigPB& GetPendingConfigUnlocked() const; + + // Changes the committed config for this replica. Checks that there is a + // pending configuration and that it is equal to this one. Persists changes to disk. + // Resets the pending configuration to null. + Status SetCommittedConfigUnlocked(const RaftConfigPB& config_to_commit); + + // Return the persisted configuration. + const RaftConfigPB& GetCommittedConfigUnlocked() const; + + // Return the "active" configuration - if there is a pending configuration return it; + // otherwise return the committed configuration. + const RaftConfigPB& GetActiveConfigUnlocked() const; + + // Checks if the term change is legal. If so, sets 'current_term' + // to 'new_term' and sets 'has voted' to no for the current term. + // + // If the caller knows that it will call another method soon after + // to flush the change to disk, it may set 'flush' to 'SKIP_FLUSH_TO_DISK'. + Status SetCurrentTermUnlocked(int64_t new_term, + FlushToDisk flush) WARN_UNUSED_RESULT; + + // Returns the term set in the last config change round. + const int64_t GetCurrentTermUnlocked() const; + + // Accessors for the leader of the current term. + const std::string& GetLeaderUuidUnlocked() const; + bool HasLeaderUnlocked() const { return !GetLeaderUuidUnlocked().empty(); } + void ClearLeaderUnlocked() { SetLeaderUuidUnlocked(""); } + + // Return whether this peer has voted in the current term. + const bool HasVotedCurrentTermUnlocked() const; + + // Record replica's vote for the current term, then flush the consensus + // metadata to disk. + Status SetVotedForCurrentTermUnlocked(const std::string& uuid) WARN_UNUSED_RESULT; + + // Return replica's vote for the current term. + // The vote must be set; use HasVotedCurrentTermUnlocked() to check. + const std::string& GetVotedForCurrentTermUnlocked() const; + + const ConsensusOptions& GetOptions() const; + + std::string LogPrefix() const; + std::string LogPrefixUnlocked() const; + + // A variant of LogPrefix which does not take the lock. This is a slightly + // less thorough prefix which only includes immutable (and thus thread-safe) + // information, but does not require the lock. + std::string LogPrefixThreadSafe() const; + + std::string ToString() const; + std::string ToStringUnlocked() const; + + ConsensusMetadata* consensus_metadata_for_tests() const; + + const ConsensusOptions options_; + + // The UUID of the local peer. + const std::string peer_uuid_; + + // TODO(dralves) hack to serialize updates due to repeated/out-of-order messages + // should probably be refactored out. + // + // Lock ordering note: If both 'update_lock_' and 'lock_' are to be taken, + // 'update_lock_' lock must be taken first. + mutable simple_spinlock update_lock_; + + // Coarse-grained lock that protects all mutable data members. + mutable simple_spinlock lock_; + + State state_; + + // Consensus metadata persistence object. + std::unique_ptr<ConsensusMetadata> cmeta_; + // Threadpool for constructing requests to peers, handling RPC callbacks, // etc. gscoped_ptr<ThreadPool> thread_pool_; @@ -462,10 +618,8 @@ class RaftConsensus : public Consensus, // The queue of messages that must be sent to peers. gscoped_ptr<PeerMessageQueue> queue_; - gscoped_ptr<ReplicaState> state_; - // The currently pending rounds that have not yet been committed by - // consensus. Protected by the locks inside state_. + // consensus. Protected by 'lock_'. // TODO(todd) these locks will become more fine-grained. PendingRounds pending_; @@ -493,13 +647,6 @@ class RaftConsensus : public Consensus, const Callback<void(const std::string& reason)> mark_dirty_clbk_; - // TODO(dralves) hack to serialize updates due to repeated/out-of-order messages - // should probably be refactored out. - // - // Lock ordering note: If both this lock and the ReplicaState lock are to be - // taken, this lock must be taken first. - mutable simple_spinlock update_lock_; - AtomicBool shutdown_; // The number of times Update() has been called, used for some test assertions. @@ -515,5 +662,3 @@ class RaftConsensus : public Consensus, } // namespace consensus } // namespace kudu - -#endif /* KUDU_CONSENSUS_RAFT_CONSENSUS_H_ */ http://git-wip-us.apache.org/repos/asf/kudu/blob/9e40867c/src/kudu/consensus/raft_consensus_quorum-test.cc ---------------------------------------------------------------------- diff --git a/src/kudu/consensus/raft_consensus_quorum-test.cc b/src/kudu/consensus/raft_consensus_quorum-test.cc index a6db53b..bb1575e 100644 --- a/src/kudu/consensus/raft_consensus_quorum-test.cc +++ b/src/kudu/consensus/raft_consensus_quorum-test.cc @@ -31,7 +31,6 @@ #include "kudu/consensus/peer_manager.h" #include "kudu/consensus/quorum_util.h" #include "kudu/consensus/raft_consensus.h" -#include "kudu/consensus/raft_consensus_state.h" #include "kudu/gutil/stl_util.h" #include "kudu/gutil/strings/strcat.h" #include "kudu/gutil/strings/substitute.h" @@ -661,9 +660,8 @@ TEST_F(RaftConsensusQuorumTest, TestConsensusContinuesIfAMinorityFallsBehind) { scoped_refptr<RaftConsensus> follower0; CHECK_OK(peers_->GetPeerByIdx(kFollower0Idx, &follower0)); - ReplicaState* follower0_rs = follower0->GetReplicaStateForTests(); - ReplicaState::UniqueLock lock; - ASSERT_OK(follower0_rs->LockForRead(&lock)); + RaftConsensus::UniqueLock lock; + ASSERT_OK(follower0->LockForRead(&lock)); // If the locked replica would stop consensus we would hang here // as we wait for operations to be replicated to a majority. @@ -705,15 +703,13 @@ TEST_F(RaftConsensusQuorumTest, TestConsensusStopsIfAMajorityFallsBehind) { // and never letting them go. scoped_refptr<RaftConsensus> follower0; CHECK_OK(peers_->GetPeerByIdx(kFollower0Idx, &follower0)); - ReplicaState* follower0_rs = follower0->GetReplicaStateForTests(); - ReplicaState::UniqueLock lock0; - ASSERT_OK(follower0_rs->LockForRead(&lock0)); + RaftConsensus::UniqueLock lock0; + ASSERT_OK(follower0->LockForRead(&lock0)); scoped_refptr<RaftConsensus> follower1; CHECK_OK(peers_->GetPeerByIdx(kFollower1Idx, &follower1)); - ReplicaState* follower1_rs = follower1->GetReplicaStateForTests(); - ReplicaState::UniqueLock lock1; - ASSERT_OK(follower1_rs->LockForRead(&lock1)); + RaftConsensus::UniqueLock lock1; + ASSERT_OK(follower1->LockForRead(&lock1)); // Append a single message to the queue ASSERT_OK(AppendDummyMessage(kLeaderIdx, &round)); @@ -895,15 +891,13 @@ TEST_F(RaftConsensusQuorumTest, TestLeaderElectionWithQuiescedQuorum) { // This will force an election in which we expect to make the last // non-shutdown peer in the list become leader. - int flush_count_before = new_leader->GetReplicaStateForTests() - ->consensus_metadata_for_tests()->flush_count_for_tests(); + int flush_count_before = new_leader->consensus_metadata_for_tests()->flush_count_for_tests(); LOG(INFO) << "Running election for future leader with index " << (current_config_size - 1); ASSERT_OK(new_leader->StartElection(Consensus::ELECT_EVEN_IF_LEADER_IS_ALIVE, Consensus::EXTERNAL_REQUEST)); WaitUntilLeaderForTests(new_leader.get()); LOG(INFO) << "Election won"; - int flush_count_after = new_leader->GetReplicaStateForTests() - ->consensus_metadata_for_tests()->flush_count_for_tests(); + int flush_count_after = new_leader->consensus_metadata_for_tests()->flush_count_for_tests(); ASSERT_EQ(flush_count_after, flush_count_before + 1) << "Expected only one consensus metadata flush for a leader election"; @@ -1021,8 +1015,7 @@ TEST_F(RaftConsensusQuorumTest, TestRequestVote) { scoped_refptr<RaftConsensus> peer; CHECK_OK(peers_->GetPeerByIdx(kPeerIndex, &peer)); auto flush_count = [&]() { - return peer->GetReplicaStateForTests() - ->consensus_metadata_for_tests()->flush_count_for_tests(); + return peer->consensus_metadata_for_tests()->flush_count_for_tests(); }; VoteRequestPB request;
