consensus: Get rid of LockFor*() methods Simplify the locking logic by removing layers of abstraction.
Also add State_Name() helper for state-related error messages. Change-Id: I6858752f4fbeb70b09eb4375c52e4aeaa1bb8e71 Reviewed-on: http://gerrit.cloudera.org:8080/7012 Reviewed-by: Alexey Serbin <[email protected]> Tested-by: Kudu Jenkins Reviewed-by: Todd Lipcon <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/kudu/repo Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/3846861a Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/3846861a Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/3846861a Branch: refs/heads/master Commit: 3846861ab258a0ac0497893865875b2138964fe3 Parents: 30682fd Author: Mike Percy <[email protected]> Authored: Tue May 30 14:00:53 2017 -0700 Committer: Mike Percy <[email protected]> Committed: Thu Jun 1 20:44:45 2017 +0000 ---------------------------------------------------------------------- src/kudu/consensus/raft_consensus.cc | 298 +++++++++---------- src/kudu/consensus/raft_consensus.h | 68 ++--- .../consensus/raft_consensus_quorum-test.cc | 9 +- 3 files changed, 169 insertions(+), 206 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kudu/blob/3846861a/src/kudu/consensus/raft_consensus.cc ---------------------------------------------------------------------- diff --git a/src/kudu/consensus/raft_consensus.cc b/src/kudu/consensus/raft_consensus.cc index c5370c7..6a2c39f 100644 --- a/src/kudu/consensus/raft_consensus.cc +++ b/src/kudu/consensus/raft_consensus.cc @@ -278,8 +278,11 @@ Status RaftConsensus::Start(const ConsensusBootstrapInfo& info) { failure_detector_)); { - UniqueLock lock; - RETURN_NOT_OK(LockForStart(&lock)); + ThreadRestrictions::AssertWaitAllowed(); + LockGuard l(lock_); + CHECK_EQ(kInitialized, state_) << LogPrefixUnlocked() << "Illegal state for Start(): " + << State_Name(state_); + ClearLeaderUnlocked(); // Our last persisted term can be higher than the last persisted operation @@ -310,8 +313,9 @@ Status RaftConsensus::Start(const ConsensusBootstrapInfo& info) { } { - UniqueLock lock; - RETURN_NOT_OK(LockForConfigChange(&lock)); + ThreadRestrictions::AssertWaitAllowed(); + LockGuard l(lock_); + RETURN_NOT_OK(CheckRunningUnlocked()); RETURN_NOT_OK(EnsureFailureDetectorEnabledUnlocked()); @@ -350,15 +354,19 @@ Status RaftConsensus::Start(const ConsensusBootstrapInfo& info) { } bool RaftConsensus::IsRunning() const { - UniqueLock lock; - Status s = LockForRead(&lock); - if (PREDICT_FALSE(!s.ok())) return false; + ThreadRestrictions::AssertWaitAllowed(); + LockGuard l(lock_); return state_ == kRunning; } Status RaftConsensus::EmulateElection() { - UniqueLock lock; - RETURN_NOT_OK(LockForConfigChange(&lock)); + TRACE_EVENT2("consensus", "RaftConsensus::EmulateElection", + "peer", peer_uuid_, + "tablet", options_.tablet_id); + + ThreadRestrictions::AssertWaitAllowed(); + LockGuard l(lock_); + RETURN_NOT_OK(CheckRunningUnlocked()); LOG_WITH_PREFIX_UNLOCKED(INFO) << "Emulating election..."; @@ -404,8 +412,9 @@ Status RaftConsensus::StartElection(ElectionMode mode, ElectionReason reason) { "mode", mode_str); scoped_refptr<LeaderElection> election; { - UniqueLock lock; - RETURN_NOT_OK(LockForConfigChange(&lock)); + ThreadRestrictions::AssertWaitAllowed(); + LockGuard l(lock_); + RETURN_NOT_OK(CheckRunningUnlocked()); RaftPeerPB::Role active_role = GetActiveRoleUnlocked(); if (active_role == RaftPeerPB::LEADER) { @@ -413,7 +422,7 @@ Status RaftConsensus::StartElection(ElectionMode mode, ElectionReason reason) { return Status::OK(); } if (PREDICT_FALSE(active_role == RaftPeerPB::NON_PARTICIPANT)) { - SnoozeFailureDetectorUnlocked(); // Avoid excessive election noise while in this state. + RETURN_NOT_OK(SnoozeFailureDetectorUnlocked()); // Reduce election noise while in this state. return Status::IllegalState("Not starting election: Node is currently " "a non-participant in the raft config", SecureShortDebugString(GetActiveConfigUnlocked())); @@ -499,8 +508,9 @@ Status RaftConsensus::WaitUntilLeaderForTests(const MonoDelta& timeout) { Status RaftConsensus::StepDown(LeaderStepDownResponsePB* resp) { TRACE_EVENT0("consensus", "RaftConsensus::StepDown"); - UniqueLock lock; - RETURN_NOT_OK(LockForConfigChange(&lock)); + ThreadRestrictions::AssertWaitAllowed(); + LockGuard l(lock_); + RETURN_NOT_OK(CheckRunningUnlocked()); if (GetActiveRoleUnlocked() != RaftPeerPB::LEADER) { resp->mutable_error()->set_code(TabletServerErrorPB::NOT_THE_LEADER); StatusToPB(Status::IllegalState("Not currently leader"), @@ -580,8 +590,9 @@ Status RaftConsensus::Replicate(const scoped_refptr<ConsensusRound>& round) { std::lock_guard<simple_spinlock> lock(update_lock_); { - UniqueLock lock; - RETURN_NOT_OK(LockForReplicate(&lock, *round->replicate_msg())); + ThreadRestrictions::AssertWaitAllowed(); + LockGuard l(lock_); + RETURN_NOT_OK(CheckSafeToReplicateUnlocked(*round->replicate_msg())); RETURN_NOT_OK(round->CheckBoundTerm(GetCurrentTermUnlocked())); RETURN_NOT_OK(AppendNewRoundToQueueUnlocked(round)); } @@ -591,8 +602,9 @@ Status RaftConsensus::Replicate(const scoped_refptr<ConsensusRound>& round) { } Status RaftConsensus::CheckLeadershipAndBindTerm(const scoped_refptr<ConsensusRound>& round) { - UniqueLock lock; - RETURN_NOT_OK(LockForReplicate(&lock, *round->replicate_msg())); + ThreadRestrictions::AssertWaitAllowed(); + LockGuard l(lock_); + RETURN_NOT_OK(CheckSafeToReplicateUnlocked(*round->replicate_msg())); round->BindToTerm(GetCurrentTermUnlocked()); return Status::OK(); } @@ -655,12 +667,19 @@ Status RaftConsensus::AddPendingOperationUnlocked(const scoped_refptr<ConsensusR } void RaftConsensus::NotifyCommitIndex(int64_t commit_index) { - UniqueLock lock; - Status s = LockForCommit(&lock); - if (PREDICT_FALSE(!s.ok())) { - LOG(WARNING) << LogPrefixThreadSafe() - << "Unable to take state lock to update committed index: " - << s.ToString(); + TRACE_EVENT2("consensus", "RaftConsensus::NotifyCommitIndex", + "tablet", options_.tablet_id, + "commit_index", commit_index); + + ThreadRestrictions::AssertWaitAllowed(); + LockGuard l(lock_); + // We will process commit notifications while shutting down because a replica + // which has initiated a Prepare() / Replicate() may eventually commit even if + // its state has changed after the initial Append() / Update(). + if (PREDICT_FALSE(state_ != kRunning && state_ != kShuttingDown)) { + LOG_WITH_PREFIX_UNLOCKED(WARNING) << "Unable to update committed index: " + << "Replica not in running state: " + << State_Name(state_); return; } @@ -672,11 +691,16 @@ void RaftConsensus::NotifyCommitIndex(int64_t commit_index) { } void RaftConsensus::NotifyTermChange(int64_t term) { - UniqueLock lock; - Status s = LockForConfigChange(&lock); + TRACE_EVENT2("consensus", "RaftConsensus::NotifyTermChange", + "tablet", options_.tablet_id, + "term", term); + + ThreadRestrictions::AssertWaitAllowed(); + LockGuard l(lock_); + Status s = CheckRunningUnlocked(); if (PREDICT_FALSE(!s.ok())) { - LOG(WARNING) << LogPrefixThreadSafe() << "Unable to lock consensus for term change" - << " when notified of new term " << term << ": " << s.ToString(); + LOG_WITH_PREFIX_UNLOCKED(WARNING) << "Unable to handle notification of new term " + << "(" << term << "): " << s.ToString(); return; } WARN_NOT_OK(HandleTermAdvanceUnlocked(term), "Couldn't advance consensus term."); @@ -697,14 +721,8 @@ void RaftConsensus::NotifyFailedFollower(const string& uuid, RaftConfigPB committed_config; { - UniqueLock lock; - Status s = LockForRead(&lock); - if (PREDICT_FALSE(!s.ok())) { - LOG(WARNING) << LogPrefixThreadSafe() << fail_msg - << "Unable to lock consensus for read: " << s.ToString(); - return; - } - + ThreadRestrictions::AssertWaitAllowed(); + LockGuard l(lock_); int64_t current_term = GetCurrentTermUnlocked(); if (current_term != term) { LOG_WITH_PREFIX_UNLOCKED(INFO) << fail_msg << "Notified about a follower failure in " @@ -794,8 +812,8 @@ Status RaftConsensus::StartReplicaTransactionUnlocked(const ReplicateRefPtr& msg } Status RaftConsensus::IsSingleVoterConfig(bool* single_voter) const { - UniqueLock lock; - RETURN_NOT_OK(LockForRead(&lock)); + ThreadRestrictions::AssertWaitAllowed(); + LockGuard l(lock_); const RaftConfigPB& config = GetCommittedConfigUnlocked(); const string& uuid = peer_uuid_; if (CountVoters(config) == 1 && IsRaftConfigVoter(uuid, config)) { @@ -1142,8 +1160,12 @@ Status RaftConsensus::UpdateReplica(const ConsensusRequestPB* request, LeaderRequest deduped_req; { - UniqueLock lock; - RETURN_NOT_OK(LockForUpdate(&lock)); + ThreadRestrictions::AssertWaitAllowed(); + LockGuard l(lock_); + RETURN_NOT_OK(CheckRunningUnlocked()); + if (!IsRaftConfigVoter(peer_uuid_, cmeta_->active_config())) { + LOG_WITH_PREFIX_UNLOCKED(INFO) << "Allowing update even though not a member of the config"; + } deduped_req.leader_uuid = request->caller_uuid(); @@ -1333,9 +1355,9 @@ Status RaftConsensus::UpdateReplica(const ConsensusRequestPB* request, // If just waiting for our log append to finish lets snooze the timer. // We don't want to fire leader election because we're waiting on our own log. if (s.IsTimedOut()) { - UniqueLock lock; - RETURN_NOT_OK(LockForRead(&lock)); - SnoozeFailureDetectorUnlocked(); + ThreadRestrictions::AssertWaitAllowed(); + LockGuard l(lock_); + RETURN_NOT_OK(SnoozeFailureDetectorUnlocked()); } } while (s.IsTimedOut()); RETURN_NOT_OK(s); @@ -1393,14 +1415,16 @@ Status RaftConsensus::RequestVote(const VoteRequestPB* request, VoteResponsePB* // timeouts, just vote a quick NO. // // We still need to take the state lock in order to respond with term info, etc. - UniqueLock state_guard; - RETURN_NOT_OK(LockForConfigChange(&state_guard)); + ThreadRestrictions::AssertWaitAllowed(); + LockGuard l(lock_); + RETURN_NOT_OK(CheckRunningUnlocked()); return RequestVoteRespondIsBusy(request, response); } // Acquire the replica state lock so we can read / modify the consensus state. - UniqueLock state_guard; - RETURN_NOT_OK(LockForConfigChange(&state_guard)); + ThreadRestrictions::AssertWaitAllowed(); + LockGuard l(lock_); + RETURN_NOT_OK(CheckRunningUnlocked()); // If the node is not in the configuration, allow the vote (this is required by Raft) // but log an informational message anyway. @@ -1480,6 +1504,10 @@ Status RaftConsensus::RequestVote(const VoteRequestPB* request, VoteResponsePB* Status RaftConsensus::ChangeConfig(const ChangeConfigRequestPB& req, const StatusCallback& client_cb, boost::optional<TabletServerErrorPB::Code>* error_code) { + TRACE_EVENT2("consensus", "RaftConsensus::ChangeConfig", + "peer", peer_uuid_, + "tablet", options_.tablet_id); + if (PREDICT_FALSE(!req.has_type())) { return Status::InvalidArgument("Must specify 'type' argument to ChangeConfig()", SecureShortDebugString(req)); @@ -1492,8 +1520,9 @@ Status RaftConsensus::ChangeConfig(const ChangeConfigRequestPB& req, ChangeConfigType type = req.type(); const RaftPeerPB& server = req.server(); { - UniqueLock lock; - RETURN_NOT_OK(LockForConfigChange(&lock)); + ThreadRestrictions::AssertWaitAllowed(); + LockGuard l(lock_); + RETURN_NOT_OK(CheckRunningUnlocked()); RETURN_NOT_OK(CheckActiveLeaderUnlocked()); RETURN_NOT_OK(CheckNoConfigChangePendingUnlocked()); @@ -1599,8 +1628,8 @@ Status RaftConsensus::UnsafeChangeConfig(const UnsafeChangeConfigRequestPB& req, { // Take the snapshot of the replica state and queue state so that // we can stick them in the consensus update request later. - UniqueLock lock; - RETURN_NOT_OK(LockForRead(&lock)); + ThreadRestrictions::AssertWaitAllowed(); + LockGuard l(lock_); current_term = GetCurrentTermUnlocked(); committed_config = GetCommittedConfigUnlocked(); if (IsConfigChangePendingUnlocked()) { @@ -1719,6 +1748,10 @@ Status RaftConsensus::UnsafeChangeConfig(const UnsafeChangeConfigRequestPB& req, } void RaftConsensus::Shutdown() { + TRACE_EVENT2("consensus", "RaftConsensus::Shutdown", + "peer", peer_uuid_, + "tablet", options_.tablet_id); + // Avoid taking locks if already shut down so we don't violate // ThreadRestrictions assertions in the case where the RaftConsensus // destructor runs on the reactor thread due to an election callback being @@ -1726,9 +1759,11 @@ void RaftConsensus::Shutdown() { if (shutdown_.Load(kMemOrderAcquire)) return; { - UniqueLock lock; + ThreadRestrictions::AssertWaitAllowed(); + LockGuard l(lock_); // Transition to kShuttingDown state. - CHECK_OK(LockForShutdown(&lock)); + CHECK_NE(kShutDown, state_) << State_Name(state_); // We are protected here by 'shutdown_'. + state_ = kShuttingDown; LOG_WITH_PREFIX_UNLOCKED(INFO) << "Raft consensus shutting down."; } @@ -1739,10 +1774,10 @@ void RaftConsensus::Shutdown() { queue_->Close(); { - UniqueLock lock; - CHECK_OK(LockForShutdown(&lock)); - CHECK_EQ(kShuttingDown, state_); + ThreadRestrictions::AssertWaitAllowed(); + LockGuard l(lock_); CHECK_OK(pending_.CancelPendingTransactions()); + CHECK_EQ(kShuttingDown, state_) << State_Name(state_); state_ = kShutDown; LOG_WITH_PREFIX_UNLOCKED(INFO) << "Raft consensus is shut down!"; } @@ -1779,8 +1814,9 @@ Status RaftConsensus::StartConsensusOnlyRoundUnlocked(const ReplicateRefPtr& msg } Status RaftConsensus::AdvanceTermForTests(int64_t new_term) { - UniqueLock lock; - CHECK_OK(LockForConfigChange(&lock)); + ThreadRestrictions::AssertWaitAllowed(); + LockGuard l(lock_); + CHECK_OK(CheckRunningUnlocked()); return HandleTermAdvanceUnlocked(new_term); } @@ -1914,11 +1950,27 @@ Status RaftConsensus::RequestVoteRespondVoteGranted(const VoteRequestPB* request } RaftPeerPB::Role RaftConsensus::role() const { - UniqueLock lock; - CHECK_OK(LockForRead(&lock)); + ThreadRestrictions::AssertWaitAllowed(); + LockGuard l(lock_); return GetActiveRoleUnlocked(); } +const char* RaftConsensus::State_Name(State state) { + switch (state) { + case kInitialized: + return "Initialized"; + case kRunning: + return "Running"; + case kShuttingDown: + return "Shutting down"; + case kShutDown: + return "Shut down"; + default: + LOG(DFATAL) << "Unknown State value: " << state; + return "Unknown"; + } +} + void RaftConsensus::SetLeaderUuidUnlocked(const string& uuid) { DCHECK(lock_.is_locked()); failed_elections_since_stable_leader_ = 0; @@ -1975,14 +2027,14 @@ const string& RaftConsensus::tablet_id() const { } ConsensusStatePB RaftConsensus::ConsensusState() const { - UniqueLock lock; - CHECK_OK(LockForRead(&lock)); + ThreadRestrictions::AssertWaitAllowed(); + LockGuard l(lock_); return ConsensusStateUnlocked(); } RaftConfigPB RaftConsensus::CommittedConfig() const { - UniqueLock lock; - CHECK_OK(LockForRead(&lock)); + ThreadRestrictions::AssertWaitAllowed(); + LockGuard l(lock_); return GetCommittedConfigUnlocked(); } @@ -1997,8 +2049,8 @@ void RaftConsensus::DumpStatusHtml(std::ostream& out) const { // Dump the queues on a leader. RaftPeerPB::Role role; { - UniqueLock lock; - CHECK_OK(LockForRead(&lock)); + ThreadRestrictions::AssertWaitAllowed(); + LockGuard l(lock_); role = GetActiveRoleUnlocked(); } if (role == RaftPeerPB::LEADER) { @@ -2026,8 +2078,8 @@ void RaftConsensus::DoElectionCallback(ElectionReason reason, const ElectionResu // Snooze to avoid the election timer firing again as much as possible. { - UniqueLock lock; - CHECK_OK(LockForRead(&lock)); + ThreadRestrictions::AssertWaitAllowed(); + LockGuard l(lock_); // We need to snooze when we win and when we lose: // - When we win because we're about to disable the timer and become leader. // - When we lose or otherwise we can fall into a cycle, where everyone keeps @@ -2064,12 +2116,13 @@ void RaftConsensus::DoElectionCallback(ElectionReason reason, const ElectionResu } // The vote was granted, become leader. - UniqueLock lock; - Status s = LockForConfigChange(&lock); + ThreadRestrictions::AssertWaitAllowed(); + UniqueLock lock(lock_); + Status s = CheckRunningUnlocked(); if (PREDICT_FALSE(!s.ok())) { - LOG_WITH_PREFIX(INFO) << "Received " << election_type << " callback for term " - << election_term << " while not running: " - << s.ToString(); + LOG_WITH_PREFIX_UNLOCKED(INFO) << "Received " << election_type << " callback for term " + << election_term << " while not running: " + << s.ToString(); return; } @@ -2137,8 +2190,8 @@ void RaftConsensus::DoElectionCallback(ElectionReason reason, const ElectionResu } Status RaftConsensus::GetLastOpId(OpIdType type, OpId* id) { - UniqueLock lock; - RETURN_NOT_OK(LockForRead(&lock)); + ThreadRestrictions::AssertWaitAllowed(); + LockGuard l(lock_); if (type == RECEIVED_OPID) { *DCHECK_NOTNULL(id) = queue_->GetLastOpIdInLog(); } else if (type == COMMITTED_OPID) { @@ -2192,13 +2245,13 @@ void RaftConsensus::NonTxRoundReplicationFinished(ConsensusRound* round, // the client callback. if (!status.ok()) { - LOG(INFO) << LogPrefixThreadSafe() << op_type_str << " replication failed: " - << status.ToString(); + LOG_WITH_PREFIX_UNLOCKED(INFO) << op_type_str << " replication failed: " + << status.ToString(); client_cb.Run(status); return; } - VLOG(1) << LogPrefixThreadSafe() << "Committing " << op_type_str << " with op id " - << round->id(); + VLOG_WITH_PREFIX_UNLOCKED(1) << "Committing " << op_type_str << " with op id " + << round->id(); gscoped_ptr<CommitMsg> commit_msg(new CommitMsg); commit_msg->set_op_type(round->replicate_msg()->op_type()); *commit_msg->mutable_commited_op_id() = round->id(); @@ -2364,82 +2417,19 @@ Status RaftConsensus::HandleTermAdvanceUnlocked(ConsensusTerm new_term, return Status::OK(); } -Status RaftConsensus::LockForStart(UniqueLock* lock) const { - ThreadRestrictions::AssertWaitAllowed(); - UniqueLock l(lock_); - CHECK_EQ(state_, kInitialized) << "Illegal state for Start()." - << " Replica is not in kInitialized state"; - lock->swap(l); - return Status::OK(); -} - -Status RaftConsensus::LockForRead(UniqueLock* lock) const { - ThreadRestrictions::AssertWaitAllowed(); - UniqueLock l(lock_); - lock->swap(l); - return Status::OK(); -} - -Status RaftConsensus::LockForReplicate(UniqueLock* lock, const ReplicateMsg& msg) const { - ThreadRestrictions::AssertWaitAllowed(); +Status RaftConsensus::CheckSafeToReplicateUnlocked(const ReplicateMsg& msg) const { + DCHECK(lock_.is_locked()); DCHECK(!msg.has_id()) << "Should not have an ID yet: " << SecureShortDebugString(msg); - UniqueLock l(lock_); - if (PREDICT_FALSE(state_ != kRunning)) { - return Status::IllegalState("Replica not in running state"); - } - - RETURN_NOT_OK(CheckActiveLeaderUnlocked()); - lock->swap(l); - return Status::OK(); -} - -Status RaftConsensus::LockForCommit(UniqueLock* lock) const { - TRACE_EVENT0("consensus", "RaftConsensus::LockForCommit"); - ThreadRestrictions::AssertWaitAllowed(); - UniqueLock l(lock_); - if (PREDICT_FALSE(state_ != kRunning && state_ != kShuttingDown)) { - return Status::IllegalState("Replica not in running state"); - } - lock->swap(l); - return Status::OK(); -} - -Status RaftConsensus::LockForUpdate(UniqueLock* lock) const { - TRACE_EVENT0("consensus", "RaftConsensus::LockForUpdate"); - ThreadRestrictions::AssertWaitAllowed(); - UniqueLock l(lock_); - if (PREDICT_FALSE(state_ != kRunning)) { - return Status::IllegalState("Replica not in running state"); - } - if (!IsRaftConfigVoter(peer_uuid_, cmeta_->active_config())) { - LOG_WITH_PREFIX_UNLOCKED(INFO) << "Allowing update even though not a member of the config"; - } - lock->swap(l); - return Status::OK(); + RETURN_NOT_OK(CheckRunningUnlocked()); + return CheckActiveLeaderUnlocked(); } -Status RaftConsensus::LockForConfigChange(UniqueLock* lock) const { - TRACE_EVENT0("consensus", "RaftConsensus::LockForConfigChange"); - - ThreadRestrictions::AssertWaitAllowed(); - UniqueLock l(lock_); - // Can only change the config on running replicas. +Status RaftConsensus::CheckRunningUnlocked() const { + DCHECK(lock_.is_locked()); if (PREDICT_FALSE(state_ != kRunning)) { - return Status::IllegalState("Unable to lock ReplicaState for config change", - Substitute("State = $0", state_)); + return Status::IllegalState("RaftConsensus is not running", + Substitute("State = $0", State_Name(state_))); } - lock->swap(l); - return Status::OK(); -} - -Status RaftConsensus::LockForShutdown(UniqueLock* lock) { - TRACE_EVENT0("consensus", "RaftConsensus::LockForShutdown"); - ThreadRestrictions::AssertWaitAllowed(); - UniqueLock l(lock_); - if (state_ != kShuttingDown && state_ != kShutDown) { - state_ = kShuttingDown; - } - lock->swap(l); return Status::OK(); } @@ -2608,8 +2598,8 @@ const ConsensusOptions& RaftConsensus::GetOptions() const { } string RaftConsensus::LogPrefix() const { - UniqueLock lock; - CHECK_OK(LockForRead(&lock)); + ThreadRestrictions::AssertWaitAllowed(); + LockGuard l(lock_); return LogPrefixUnlocked(); } @@ -2630,14 +2620,14 @@ string RaftConsensus::LogPrefixThreadSafe() const { string RaftConsensus::ToString() const { ThreadRestrictions::AssertWaitAllowed(); - UniqueLock lock(lock_); + LockGuard l(lock_); return ToStringUnlocked(); } string RaftConsensus::ToStringUnlocked() const { DCHECK(lock_.is_locked()); return Substitute("Replica: $0, State: $1, Role: $2", - peer_uuid_, state_, RaftPeerPB::Role_Name(GetActiveRoleUnlocked())); + peer_uuid_, State_Name(state_), RaftPeerPB::Role_Name(GetActiveRoleUnlocked())); } ConsensusMetadata* RaftConsensus::consensus_metadata_for_tests() const { http://git-wip-us.apache.org/repos/asf/kudu/blob/3846861a/src/kudu/consensus/raft_consensus.h ---------------------------------------------------------------------- diff --git a/src/kudu/consensus/raft_consensus.h b/src/kudu/consensus/raft_consensus.h index b1badde..b9348c2 100644 --- a/src/kudu/consensus/raft_consensus.h +++ b/src/kudu/consensus/raft_consensus.h @@ -35,9 +35,6 @@ namespace kudu { -typedef std::lock_guard<simple_spinlock> Lock; -typedef gscoped_ptr<Lock> ScopedLock; - class Counter; class FailureDetector; class HostPort; @@ -62,8 +59,6 @@ struct ElectionResult; class RaftConsensus : public Consensus, public PeerMessageQueueObserver { public: - typedef std::unique_lock<simple_spinlock> UniqueLock; - static scoped_refptr<RaftConsensus> Create( ConsensusOptions options, std::unique_ptr<ConsensusMetadata> cmeta, @@ -181,6 +176,8 @@ class RaftConsensus : public Consensus, FRIEND_TEST(RaftConsensusQuorumTest, TestReplicasEnforceTheLogMatchingProperty); FRIEND_TEST(RaftConsensusQuorumTest, TestRequestVote); + // NOTE: When adding / changing values in this enum, add the corresponding + // values to State_Name(). enum State { // State after the replica is built. kInitialized, @@ -224,6 +221,12 @@ class RaftConsensus : public Consensus, std::string OpsRangeString() const; }; + using LockGuard = std::lock_guard<simple_spinlock>; + using UniqueLock = std::unique_lock<simple_spinlock>; + + // Returns string description for State enum value. + static const char* State_Name(State state); + // Set the leader UUID of the configuration and mark the tablet config dirty for // reporting to the master. void SetLeaderUuidUnlocked(const std::string& uuid); @@ -276,7 +279,8 @@ class RaftConsensus : public Consensus, // pending operations, we proactively abort those pending operations after and including // the preceding op in 'req' to avoid a pointless cache miss in the leader's log cache. Status EnforceLogMatchingPropertyMatchesUnlocked(const LeaderRequest& req, - ConsensusResponsePB* response); + ConsensusResponsePB* response) + WARN_UNUSED_RESULT; // Check a request received from a leader, making sure: // - The request is in the right term @@ -288,7 +292,7 @@ class RaftConsensus : public Consensus, // the messages to add to our state machine. Status CheckLeaderRequestUnlocked(const ConsensusRequestPB* request, ConsensusResponsePB* response, - LeaderRequest* deduped_req); + LeaderRequest* deduped_req) WARN_UNUSED_RESULT; // Abort any pending operations after the given op index, // and also truncate the LogCache accordingly. @@ -382,13 +386,13 @@ class RaftConsensus : public Consensus, // When this is called a failure is guaranteed not to be detected // before 'FLAGS_leader_failure_max_missed_heartbeat_periods' * // 'FLAGS_raft_heartbeat_interval_ms' has elapsed. - Status SnoozeFailureDetectorUnlocked(); + Status SnoozeFailureDetectorUnlocked() WARN_UNUSED_RESULT; // Like the above but adds 'additional_delta' to the default timeout // period. If allow_logging is set to ALLOW_LOGGING, then this method // will print a log message when called. Status SnoozeFailureDetectorUnlocked(const MonoDelta& additional_delta, - AllowLogging allow_logging); + AllowLogging allow_logging) WARN_UNUSED_RESULT; // Return the minimum election timeout. Due to backoff and random // jitter, election timeouts may be longer than this. @@ -462,45 +466,17 @@ class RaftConsensus : public Consensus, // (see Diego Ongaro's thesis section 4.1). Status AddPendingOperationUnlocked(const scoped_refptr<ConsensusRound>& round); - // Locks a replica in preparation for StartUnlocked(). Makes - // sure the replica is in kInitialized state. - Status LockForStart(UniqueLock* lock) const WARN_UNUSED_RESULT; - - // Obtains the lock for a state read, does not check state. - Status LockForRead(UniqueLock* lock) const WARN_UNUSED_RESULT; - - // Locks a replica down until the critical section of an append completes, - // i.e. until the replicate message has been assigned an id and placed in - // the log queue. - // This also checks that the replica is in the appropriate - // state (role) to replicate the provided operation, that the operation - // contains a replicate message and is of the appropriate type, and returns - // Status::IllegalState if that is not the case. - Status LockForReplicate(UniqueLock* lock, const ReplicateMsg& msg) const WARN_UNUSED_RESULT; - - // Locks a replica down until the critical section of a commit completes. - // This succeeds for all states since a replica which has initiated - // a Prepare()/Replicate() must eventually commit even if it's state - // has changed after the initial Append()/Update(). - Status LockForCommit(UniqueLock* lock) const WARN_UNUSED_RESULT; - - // Locks a replica down until an the critical section of an update completes. - // Further updates from the same or some other leader will be blocked until - // this completes. This also checks that the replica is in the appropriate - // state (role) to be updated and returns Status::IllegalState if that - // is not the case. - Status LockForUpdate(UniqueLock* lock) const WARN_UNUSED_RESULT; - - Status LockForConfigChange(UniqueLock* lock) const WARN_UNUSED_RESULT; - - // Changes the role to non-participant and returns a lock that can be - // used to make sure no state updates come in until Shutdown() is - // completed. - Status LockForShutdown(UniqueLock* lock) WARN_UNUSED_RESULT; + // Checks that the replica is in the appropriate state and role to replicate + // the provided operation and that the replicate message does not yet have an + // OpId assigned. + Status CheckSafeToReplicateUnlocked(const ReplicateMsg& msg) const WARN_UNUSED_RESULT; + + // Return Status::IllegalState if 'state_' != kRunning, OK otherwise. + Status CheckRunningUnlocked() const WARN_UNUSED_RESULT; // Ensure the local peer is the active leader. // Returns OK if leader, IllegalState otherwise. - Status CheckActiveLeaderUnlocked() const; + Status CheckActiveLeaderUnlocked() const WARN_UNUSED_RESULT; // Return current consensus state summary. ConsensusStatePB ConsensusStateUnlocked() const; @@ -515,7 +491,7 @@ class RaftConsensus : public Consensus, // Inverse of IsConfigChangePendingUnlocked(): returns OK if there is // currently *no* configuration change pending, and IllegalState is there *is* a // configuration change pending. - Status CheckNoConfigChangePendingUnlocked() const; + Status CheckNoConfigChangePendingUnlocked() const WARN_UNUSED_RESULT; // Sets the given configuration as pending commit. Does not persist into the peers // metadata. In order to be persisted, SetCommittedConfigUnlocked() must be called. http://git-wip-us.apache.org/repos/asf/kudu/blob/3846861a/src/kudu/consensus/raft_consensus_quorum-test.cc ---------------------------------------------------------------------- diff --git a/src/kudu/consensus/raft_consensus_quorum-test.cc b/src/kudu/consensus/raft_consensus_quorum-test.cc index bb1575e..a0933c8 100644 --- a/src/kudu/consensus/raft_consensus_quorum-test.cc +++ b/src/kudu/consensus/raft_consensus_quorum-test.cc @@ -660,8 +660,7 @@ TEST_F(RaftConsensusQuorumTest, TestConsensusContinuesIfAMinorityFallsBehind) { scoped_refptr<RaftConsensus> follower0; CHECK_OK(peers_->GetPeerByIdx(kFollower0Idx, &follower0)); - RaftConsensus::UniqueLock lock; - ASSERT_OK(follower0->LockForRead(&lock)); + RaftConsensus::LockGuard l(follower0->lock_); // If the locked replica would stop consensus we would hang here // as we wait for operations to be replicated to a majority. @@ -703,13 +702,11 @@ TEST_F(RaftConsensusQuorumTest, TestConsensusStopsIfAMajorityFallsBehind) { // and never letting them go. scoped_refptr<RaftConsensus> follower0; CHECK_OK(peers_->GetPeerByIdx(kFollower0Idx, &follower0)); - RaftConsensus::UniqueLock lock0; - ASSERT_OK(follower0->LockForRead(&lock0)); + RaftConsensus::LockGuard l_0(follower0->lock_); scoped_refptr<RaftConsensus> follower1; CHECK_OK(peers_->GetPeerByIdx(kFollower1Idx, &follower1)); - RaftConsensus::UniqueLock lock1; - ASSERT_OK(follower1->LockForRead(&lock1)); + RaftConsensus::LockGuard l_1(follower1->lock_); // Append a single message to the queue ASSERT_OK(AppendDummyMessage(kLeaderIdx, &round));
