This is an automated email from the ASF dual-hosted git repository. alexey pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kudu.git
commit 710bd6efaa796db4b95ab2c9a13b53abcf5174a2 Author: Alexey Serbin <[email protected]> AuthorDate: Mon Jun 1 19:20:57 2020 -0700 [consensus] small cleanup on Peer::SignalRequest() It's possible not to hold 'peer_lock_' when scheduling SendNextRequest() on the thread pool: if 'request_pending_' or 'closed_' changes, it's handled by Peer::SendNextRequest() as necessary. It might be an extra run a thread in Raft pool in the worst case, but it seems better than blocking the thread running PrepareTask(). Blocking the latter involves blocking other tasks on corresponding locks and slowing the replication of a Raft operation, which leads to lower write throughput. The motivation for this change was seeing the following in the logs during intensive ingest workloads: 0531 13:09:40.403224 (+515724us) spinlock_profiling.cc:244] Waited 514 ms on lock 0xa1581150 with the stack symbolized like below: kudu::(anonymous namespace)::SubmitSpinLockProfileData(void const*, long) src/kudu/util/spinlock_profiling.cc:230 kudu::consensus::Peer::SignalRequest(bool) src/kudu/gutil/spinlock.h:106 kudu::consensus::PeerManager::SignalRequest(bool) src/kudu/consensus/peer_manager.cc:98 kudu::consensus::RaftConsensus::Replicate(scoped_refptr<kudu::consensus::ConsensusRound> const&) src/kudu/consensus/raft_consensus.cc:740 (discriminator 2) kudu::tablet::TransactionDriver::Prepare() src/kudu/tablet/transactions/transaction_driver.cc:333 kudu::tablet::TransactionDriver::PrepareTask() src/kudu/tablet/transactions/transaction_driver.cc:243 kudu::ThreadPool::DispatchThread() src/kudu/util/threadpool.cc:686 The stack trace pointed to 'peer_lock_' in Peer::SignalRequest(), consensus_peer.cc,167 as in one of its previous revisions, as the rest of the stack. I also did other minor cleanup on the code around. Change-Id: I899aec0964bc404e6df3742bd48d0d049e52d900 Reviewed-on: http://gerrit.cloudera.org:8080/16021 Tested-by: Kudu Jenkins Reviewed-by: Andrew Wong <[email protected]> --- src/kudu/consensus/consensus_peers.cc | 55 ++++++++++++++++++----------------- src/kudu/consensus/consensus_peers.h | 20 +++++++++---- src/kudu/consensus/peer_manager.cc | 10 +++---- src/kudu/consensus/raft_consensus.cc | 1 - src/kudu/tablet/ops/op_driver.cc | 3 +- src/kudu/tablet/ops/op_tracker.cc | 12 ++++---- 6 files changed, 55 insertions(+), 46 deletions(-) diff --git a/src/kudu/consensus/consensus_peers.cc b/src/kudu/consensus/consensus_peers.cc index 099468a..fd75076 100644 --- a/src/kudu/consensus/consensus_peers.cc +++ b/src/kudu/consensus/consensus_peers.cc @@ -139,7 +139,10 @@ Peer::Peer(RaftPeerPB peer_pb, queue_(queue), failed_attempts_(0), messenger_(std::move(messenger)), - raft_pool_token_(raft_pool_token) { + raft_pool_token_(raft_pool_token), + request_pending_(false), + closed_(false), + has_sent_first_request_(false) { } Status Peer::Init() { @@ -164,8 +167,11 @@ Status Peer::Init() { } Status Peer::SignalRequest(bool even_if_queue_empty) { - std::lock_guard<simple_spinlock> l(peer_lock_); - + // This is a best effort logic in checking for 'closed_' and + // 'request_pending_': it's not necessary to block if some other thread has + // taken 'peer_lock_' and about to update 'closed_'/'request_pending_' since + // the implementation of SendNextRequest() checks for 'closed_' and + // 'request_pending_' on its own. if (PREDICT_FALSE(closed_)) { return Status::IllegalState("Peer was closed."); } @@ -179,12 +185,11 @@ Status Peer::SignalRequest(bool even_if_queue_empty) { // Capture a weak_ptr reference into the submitted functor so that we can // safely handle the functor outliving its peer. weak_ptr<Peer> w_this = shared_from_this(); - RETURN_NOT_OK(raft_pool_token_->Submit([even_if_queue_empty, w_this]() { + return raft_pool_token_->Submit([even_if_queue_empty, w_this]() { if (auto p = w_this.lock()) { p->SendNextRequest(even_if_queue_empty); } - })); - return Status::OK(); + }); } void Peer::SendNextRequest(bool even_if_queue_empty) { @@ -272,13 +277,13 @@ void Peer::SendNextRequest(bool even_if_queue_empty) { MAYBE_FAULT(FLAGS_fault_crash_on_leader_request_fraction); - VLOG_WITH_PREFIX_UNLOCKED(2) << "Sending to peer " << peer_pb().permanent_uuid() << ": " << SecureShortDebugString(request_); - controller_.Reset(); + controller_.Reset(); request_pending_ = true; l.unlock(); + // Capture a shared_ptr reference into the RPC callback so that we're guaranteed // that this object outlives the RPC. shared_ptr<Peer> s_this = shared_from_this(); @@ -319,7 +324,7 @@ void Peer::StartElection() { void Peer::ProcessResponse() { // Note: This method runs on the reactor thread. std::unique_lock<simple_spinlock> lock(peer_lock_); - if (closed_) { + if (PREDICT_FALSE(closed_)) { return; } CHECK(request_pending_); @@ -332,7 +337,7 @@ void Peer::ProcessResponse() { auto ps = controller_status.IsRemoteError() ? PeerStatus::REMOTE_ERROR : PeerStatus::RPC_LAYER_ERROR; queue_->UpdatePeerStatus(peer_pb_.permanent_uuid(), ps, controller_status); - ProcessResponseError(controller_status); + ProcessResponseErrorUnlocked(controller_status); return; } @@ -344,7 +349,7 @@ void Peer::ProcessResponse() { Status response_status = StatusFromPB(response_.status().error().status()); queue_->UpdatePeerStatus(peer_pb_.permanent_uuid(), PeerStatus::CANNOT_PREPARE, response_status); - ProcessResponseError(response_status); + ProcessResponseErrorUnlocked(response_status); return; } @@ -367,7 +372,7 @@ void Peer::ProcessResponse() { ps = PeerStatus::REMOTE_ERROR; } queue_->UpdatePeerStatus(peer_pb_.permanent_uuid(), ps, response_status); - ProcessResponseError(response_status); + ProcessResponseErrorUnlocked(response_status); return; } @@ -382,7 +387,6 @@ void Peer::ProcessResponse() { Status s = raft_pool_token_->Submit([w_this]() { if (auto p = w_this.lock()) { p->DoProcessResponse(); - } }); if (PREDICT_FALSE(!s.ok())) { @@ -393,11 +397,11 @@ void Peer::ProcessResponse() { } void Peer::DoProcessResponse() { - VLOG_WITH_PREFIX_UNLOCKED(2) << "Response from peer " << peer_pb().permanent_uuid() << ": " << SecureShortDebugString(response_); - bool send_more_immediately = queue_->ResponseFromPeer(peer_pb_.permanent_uuid(), response_); + const auto send_more_immediately = + queue_->ResponseFromPeer(peer_pb_.permanent_uuid(), response_); { std::unique_lock<simple_spinlock> lock(peer_lock_); @@ -405,29 +409,25 @@ void Peer::DoProcessResponse() { failed_attempts_ = 0; request_pending_ = false; } - // We're OK to read the state_ without a lock here -- if we get a race, - // the worst thing that could happen is that we'll make one more request before - // noticing a close. + if (send_more_immediately) { SendNextRequest(true); } } Status Peer::PrepareTabletCopyRequest() { - if (!FLAGS_enable_tablet_copy) { + if (PREDICT_FALSE(!FLAGS_enable_tablet_copy)) { failed_attempts_++; return Status::NotSupported("Tablet Copy is disabled"); } - RETURN_NOT_OK(queue_->GetTabletCopyRequestForPeer(peer_pb_.permanent_uuid(), &tc_request_)); - - return Status::OK(); + return queue_->GetTabletCopyRequestForPeer(peer_pb_.permanent_uuid(), &tc_request_); } void Peer::ProcessTabletCopyResponse() { // If the peer is already closed return. std::unique_lock<simple_spinlock> lock(peer_lock_); - if (closed_) { + if (PREDICT_FALSE(closed_)) { return; } CHECK(request_pending_); @@ -454,7 +454,8 @@ void Peer::ProcessTabletCopyResponse() { } } -void Peer::ProcessResponseError(const Status& status) { +void Peer::ProcessResponseErrorUnlocked(const Status& status) { + DCHECK(peer_lock_.is_locked()); failed_attempts_++; string resp_err_info; if (response_.has_error()) { @@ -488,10 +489,12 @@ string Peer::LogPrefixUnlocked() const { } void Peer::Close() { - // If the peer is already closed return. + if (closed_) { + // Do nothing if the peer is already closed. + return; + } { std::lock_guard<simple_spinlock> lock(peer_lock_); - if (closed_) return; closed_ = true; } VLOG_WITH_PREFIX_UNLOCKED(1) << "Closing peer: " << peer_pb_.permanent_uuid(); diff --git a/src/kudu/consensus/consensus_peers.h b/src/kudu/consensus/consensus_peers.h index 1df1a4c..912ad64 100644 --- a/src/kudu/consensus/consensus_peers.h +++ b/src/kudu/consensus/consensus_peers.h @@ -16,6 +16,7 @@ // under the License. #pragma once +#include <atomic> #include <cstdint> #include <memory> #include <ostream> @@ -151,7 +152,7 @@ class Peer : void ProcessTabletCopyResponse(); // Signals there was an error sending the request to the peer. - void ProcessResponseError(const Status& status); + void ProcessResponseErrorUnlocked(const Status& status); std::string LogPrefixUnlocked() const; @@ -193,11 +194,18 @@ class Peer : // Repeating timer responsible for scheduling heartbeats to this peer. std::shared_ptr<rpc::PeriodicTimer> heartbeater_; - // Lock that protects Peer state changes, initialization, etc. - mutable simple_spinlock peer_lock_; - bool request_pending_ = false; - bool closed_ = false; - bool has_sent_first_request_ = false; + // Lock that protects Peer state changes, initialization, etc. It's necessary + // to hold 'peer_lock_' if setting 'request_pending_', 'closed_', and + // 'has_sent_first_request_' fields below. To read 'has_sent_first_request_', + // it's necessary to hold 'peer_lock_'. To read the 'closed_' and + // 'request_pending_' fields, there is no need to hold 'peer_lock_' unless + // it's necessary to block the threads which might be setting these fields + // concurrently. + simple_spinlock peer_lock_; + + std::atomic<bool> request_pending_; + std::atomic<bool> closed_; + bool has_sent_first_request_; }; // A proxy to another peer. Usually a thin wrapper around an rpc proxy but can diff --git a/src/kudu/consensus/peer_manager.cc b/src/kudu/consensus/peer_manager.cc index 4608e25..1a50a01 100644 --- a/src/kudu/consensus/peer_manager.cc +++ b/src/kudu/consensus/peer_manager.cc @@ -122,13 +122,11 @@ Status PeerManager::StartElection(const string& uuid) { } void PeerManager::Close() { - { - std::lock_guard<simple_spinlock> lock(lock_); - for (const auto& entry : peers_) { - entry.second->Close(); - } - peers_.clear(); + std::lock_guard<simple_spinlock> lock(lock_); + for (const auto& entry : peers_) { + entry.second->Close(); } + peers_.clear(); } string PeerManager::GetLogPrefix() const { diff --git a/src/kudu/consensus/raft_consensus.cc b/src/kudu/consensus/raft_consensus.cc index 934d240..2bc566c 100644 --- a/src/kudu/consensus/raft_consensus.cc +++ b/src/kudu/consensus/raft_consensus.cc @@ -744,7 +744,6 @@ Status RaftConsensus::BecomeReplicaUnlocked(boost::optional<MonoDelta> fd_delta) } Status RaftConsensus::Replicate(const scoped_refptr<ConsensusRound>& round) { - std::lock_guard<simple_spinlock> lock(update_lock_); { ThreadRestrictions::AssertWaitAllowed(); diff --git a/src/kudu/tablet/ops/op_driver.cc b/src/kudu/tablet/ops/op_driver.cc index be707e5..27261e6 100644 --- a/src/kudu/tablet/ops/op_driver.cc +++ b/src/kudu/tablet/ops/op_driver.cc @@ -173,8 +173,7 @@ Status OpDriver::Init(unique_ptr<Op> op, } } - RETURN_NOT_OK(op_tracker_->Add(this)); - return Status::OK(); + return op_tracker_->Add(this); } consensus::OpId OpDriver::GetOpId() { diff --git a/src/kudu/tablet/ops/op_tracker.cc b/src/kudu/tablet/ops/op_tracker.cc index eab0d28..97ea0fd 100644 --- a/src/kudu/tablet/ops/op_tracker.cc +++ b/src/kudu/tablet/ops/op_tracker.cc @@ -131,15 +131,17 @@ OpTracker::Metrics::Metrics(const scoped_refptr<MetricEntity>& entity) #undef MINIT OpTracker::State::State() - : memory_footprint(0) { + : memory_footprint(0) { } OpTracker::OpTracker() { } OpTracker::~OpTracker() { +#ifndef NDEBUG std::lock_guard<simple_spinlock> l(lock_); - CHECK_EQ(pending_ops_.size(), 0); + DCHECK(pending_ops_.empty()); +#endif } Status OpTracker::Add(OpDriver* driver) { @@ -219,13 +221,13 @@ void OpTracker::Release(OpDriver* driver) { // Remove the op from the map updating memory consumption if needed. std::lock_guard<simple_spinlock> l(lock_); - State st = FindOrDie(pending_ops_, driver); if (mem_tracker_) { + const State& st = FindOrDie(pending_ops_, driver); mem_tracker_->Release(st.memory_footprint); } if (PREDICT_FALSE(pending_ops_.erase(driver) != 1)) { - LOG(FATAL) << "Could not remove pending op from map: " - << driver->ToStringUnlocked(); + LOG(FATAL) << Substitute("Could not remove pending op from map: $0", + driver->ToStringUnlocked()); } }
