Repository: kudu Updated Branches: refs/heads/master e37bd1cf5 -> f932547b2
consensus: Reduce copy/paste for observer callbacks This is a net-negative line patch that unifies how the PeerMessageQueue notifies PeerMessageQueueObserver instances that are registered with it. The patch removes most of the copy/paste in this part of the code. This patch has no functional changes. Change-Id: Icbc4cb9d7b6e51a9f64d6f08c2f48d89705f5437 Reviewed-on: http://gerrit.cloudera.org:8080/8962 Reviewed-by: Alexey Serbin <[email protected]> Tested-by: Kudu Jenkins Project: http://git-wip-us.apache.org/repos/asf/kudu/repo Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/ff63b434 Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/ff63b434 Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/ff63b434 Branch: refs/heads/master Commit: ff63b434c25574595105fb480bcd01c9334008c7 Parents: e37bd1c Author: Mike Percy <[email protected]> Authored: Tue Dec 12 18:20:27 2017 -0800 Committer: Mike Percy <[email protected]> Committed: Tue Jan 9 03:18:06 2018 +0000 ---------------------------------------------------------------------- src/kudu/consensus/consensus_queue.cc | 99 +++++++++--------------------- src/kudu/consensus/consensus_queue.h | 19 ++---- 2 files changed, 35 insertions(+), 83 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kudu/blob/ff63b434/src/kudu/consensus/consensus_queue.cc ---------------------------------------------------------------------- diff --git a/src/kudu/consensus/consensus_queue.cc b/src/kudu/consensus/consensus_queue.cc index 6933cfa..1889ed3 100644 --- a/src/kudu/consensus/consensus_queue.cc +++ b/src/kudu/consensus/consensus_queue.cc @@ -18,6 +18,7 @@ #include <algorithm> #include <cstdint> +#include <functional> #include <iostream> #include <mutex> #include <string> @@ -1275,97 +1276,55 @@ bool PeerMessageQueue::IsOpInLog(const OpId& desired_op) const { void PeerMessageQueue::NotifyObserversOfCommitIndexChange(int64_t new_commit_index) { WARN_NOT_OK(raft_pool_observers_token_->SubmitClosure( - Bind(&PeerMessageQueue::NotifyObserversOfCommitIndexChangeTask, - Unretained(this), new_commit_index)), - LogPrefixUnlocked() + "Unable to notify RaftConsensus of " - "commit index change."); + Bind(&PeerMessageQueue::NotifyObserversTask, Unretained(this), + [=](PeerMessageQueueObserver* observer) { + observer->NotifyCommitIndex(new_commit_index); + })), + LogPrefixUnlocked() + "Unable to notify RaftConsensus of commit index change."); } void PeerMessageQueue::NotifyObserversOfTermChange(int64_t term) { WARN_NOT_OK(raft_pool_observers_token_->SubmitClosure( - Bind(&PeerMessageQueue::NotifyObserversOfTermChangeTask, - Unretained(this), term)), - LogPrefixUnlocked() + "Unable to notify RaftConsensus of term change."); -} - -void PeerMessageQueue::NotifyObserversOfCommitIndexChangeTask(int64_t new_commit_index) { - std::vector<PeerMessageQueueObserver*> copy; - { - std::lock_guard<simple_spinlock> lock(queue_lock_); - copy = observers_; - } - for (PeerMessageQueueObserver* observer : copy) { - observer->NotifyCommitIndex(new_commit_index); - } -} - -void PeerMessageQueue::NotifyObserversOfTermChangeTask(int64_t term) { - MAYBE_INJECT_RANDOM_LATENCY(FLAGS_consensus_inject_latency_ms_in_notifications); - std::vector<PeerMessageQueueObserver*> copy; - { - std::lock_guard<simple_spinlock> lock(queue_lock_); - copy = observers_; - } - for (PeerMessageQueueObserver* observer : copy) { - observer->NotifyTermChange(term); - } + Bind(&PeerMessageQueue::NotifyObserversTask, Unretained(this), + [=](PeerMessageQueueObserver* observer) { + observer->NotifyTermChange(term); + })), + LogPrefixUnlocked() + "Unable to notify RaftConsensus of term change."); } void PeerMessageQueue::NotifyObserversOfFailedFollower(const string& uuid, int64_t term, const string& reason) { WARN_NOT_OK(raft_pool_observers_token_->SubmitClosure( - Bind(&PeerMessageQueue::NotifyObserversOfFailedFollowerTask, - Unretained(this), uuid, term, reason)), - LogPrefixUnlocked() + "Unable to notify RaftConsensus of abandoned follower."); -} - -void PeerMessageQueue::NotifyObserversOfFailedFollowerTask(const string& uuid, - int64_t term, - const string& reason) { - MAYBE_INJECT_RANDOM_LATENCY(FLAGS_consensus_inject_latency_ms_in_notifications); - std::vector<PeerMessageQueueObserver*> observers_copy; - { - std::lock_guard<simple_spinlock> lock(queue_lock_); - observers_copy = observers_; - } - for (PeerMessageQueueObserver* observer : observers_copy) { - observer->NotifyFailedFollower(uuid, term, reason); - } + Bind(&PeerMessageQueue::NotifyObserversTask, Unretained(this), + [=](PeerMessageQueueObserver* observer) { + observer->NotifyFailedFollower(uuid, term, reason); + })), + LogPrefixUnlocked() + "Unable to notify RaftConsensus of abandoned follower."); } void PeerMessageQueue::NotifyObserversOfPeerToPromote(const string& peer_uuid, int64_t term, int64_t committed_config_opid_index) { WARN_NOT_OK(raft_pool_observers_token_->SubmitClosure( - Bind(&PeerMessageQueue::NotifyObserversOfPeerToPromoteTask, - Unretained(this), peer_uuid, term, committed_config_opid_index)), - LogPrefixUnlocked() + "unable to notify RaftConsensus of peer to promote"); - -} - -void PeerMessageQueue::NotifyObserversOfPeerToPromoteTask(const string& peer_uuid, - int64_t term, - int64_t committed_config_opid_index) { - MAYBE_INJECT_RANDOM_LATENCY(FLAGS_consensus_inject_latency_ms_in_notifications); - std::vector<PeerMessageQueueObserver*> observers_copy; - { - std::lock_guard<simple_spinlock> lock(queue_lock_); - observers_copy = observers_; - } - for (PeerMessageQueueObserver* observer : observers_copy) { - observer->NotifyPeerToPromote(peer_uuid, term, committed_config_opid_index); - } - + Bind(&PeerMessageQueue::NotifyObserversTask, Unretained(this), + [=](PeerMessageQueueObserver* observer) { + observer->NotifyPeerToPromote(peer_uuid, term, committed_config_opid_index); + })), + LogPrefixUnlocked() + "Unable to notify RaftConsensus of peer to promote."); } void PeerMessageQueue::NotifyObserversOfPeerHealthChange() { WARN_NOT_OK(raft_pool_observers_token_->SubmitClosure( - Bind(&PeerMessageQueue::NotifyObserversOfPeerHealthChangeTask, Unretained(this))), - LogPrefixUnlocked() + "Unable to notify RaftConsensus peer health change."); + Bind(&PeerMessageQueue::NotifyObserversTask, Unretained(this), + [](PeerMessageQueueObserver* observer) { + observer->NotifyPeerHealthChange(); + })), + LogPrefixUnlocked() + "Unable to notify RaftConsensus peer health change."); } -void PeerMessageQueue::NotifyObserversOfPeerHealthChangeTask() { +void PeerMessageQueue::NotifyObserversTask( + const std::function<void(PeerMessageQueueObserver*)>& func) { MAYBE_INJECT_RANDOM_LATENCY(FLAGS_consensus_inject_latency_ms_in_notifications); std::vector<PeerMessageQueueObserver*> observers_copy; { @@ -1373,7 +1332,7 @@ void PeerMessageQueue::NotifyObserversOfPeerHealthChangeTask() { observers_copy = observers_; } for (PeerMessageQueueObserver* observer : observers_copy) { - observer->NotifyPeerHealthChange(); + func(observer); } } http://git-wip-us.apache.org/repos/asf/kudu/blob/ff63b434/src/kudu/consensus/consensus_queue.h ---------------------------------------------------------------------- diff --git a/src/kudu/consensus/consensus_queue.h b/src/kudu/consensus/consensus_queue.h index 0808c69..2a6df6c 100644 --- a/src/kudu/consensus/consensus_queue.h +++ b/src/kudu/consensus/consensus_queue.h @@ -19,6 +19,7 @@ #define KUDU_CONSENSUS_CONSENSUS_QUEUE_H_ #include <cstdint> +#include <functional> #include <iosfwd> #include <memory> #include <string> @@ -450,28 +451,20 @@ class PeerMessageQueue { // Calculate a peer's up-to-date health status based on internal fields. static HealthReportPB::HealthStatus PeerHealthStatus(const TrackedPeer& peer); + // Asynchronously trigger various types of observer notifications on a + // separate thread. void NotifyObserversOfCommitIndexChange(int64_t new_commit_index); - void NotifyObserversOfCommitIndexChangeTask(int64_t new_commit_index); - void NotifyObserversOfTermChange(int64_t term); - void NotifyObserversOfTermChangeTask(int64_t term); - void NotifyObserversOfFailedFollower(const std::string& uuid, int64_t term, const std::string& reason); - void NotifyObserversOfFailedFollowerTask(const std::string& uuid, - int64_t term, - const std::string& reason); - void NotifyObserversOfPeerToPromote(const std::string& peer_uuid, int64_t term, int64_t committed_config_opid_index); - void NotifyObserversOfPeerToPromoteTask(const std::string& peer_uuid, - int64_t term, - int64_t committed_config_opid_index); - void NotifyObserversOfPeerHealthChange(); - void NotifyObserversOfPeerHealthChangeTask(); + + // Notify all PeerMessageQueueObservers using the given callback function. + void NotifyObserversTask(const std::function<void(PeerMessageQueueObserver*)>& func); typedef std::unordered_map<std::string, TrackedPeer*> PeersMap;
