This is an automated email from the ASF dual-hosted git repository. alexey pushed a commit to branch branch-1.18.x in repository https://gitbox.apache.org/repos/asf/kudu.git
commit 098c94b0a6d9cdcfbe729502bcc0c43f57eaa064 Author: Alexey Serbin <[email protected]> AuthorDate: Wed Oct 2 15:33:44 2024 -0700 [consensus] small clean-up on PendingRounds I was looking at the code around PendingRounds while troubleshooting a race condition, and found that there is room for micro-optimisations and syntactic sugar in the PendingRounds implementation. This patch doesn't contain any functional modifications. Change-Id: Iaa5b66d8a2cf179f43141de8d640251e1a2dc80f Reviewed-on: http://gerrit.cloudera.org:8080/21878 Tested-by: Alexey Serbin <[email protected]> Reviewed-by: Abhishek Chennaka <[email protected]> (cherry picked from commit b6320038376b6fb35bfd1f1ac74323168df0884a) Reviewed-on: http://gerrit.cloudera.org:8080/21998 --- src/kudu/consensus/pending_rounds.cc | 34 ++++++++++++++++++---------------- src/kudu/consensus/pending_rounds.h | 4 ++-- 2 files changed, 20 insertions(+), 18 deletions(-) diff --git a/src/kudu/consensus/pending_rounds.cc b/src/kudu/consensus/pending_rounds.cc index 80a5d8660..3e1c25302 100644 --- a/src/kudu/consensus/pending_rounds.cc +++ b/src/kudu/consensus/pending_rounds.cc @@ -62,12 +62,13 @@ Status PendingRounds::CancelPendingOps() { LOG_WITH_PREFIX(INFO) << "Trying to abort " << pending_ops_.size() << " pending ops."; // Abort ops in reverse index order. See KUDU-1678. + static const auto kAbortedStatus = Status::Aborted("Op aborted"); for (auto op = pending_ops_.crbegin(); op != pending_ops_.crend(); ++op) { const scoped_refptr<ConsensusRound>& round = op->second; // We cancel only ops whose applies have not yet been triggered. LOG_WITH_PREFIX(INFO) << "Aborting op as it isn't in flight: " << SecureShortDebugString(*round->replicate_msg()); - round->NotifyReplicationFinished(Status::Aborted("Op aborted")); + round->NotifyReplicationFinished(kAbortedStatus); } return Status::OK(); } @@ -83,24 +84,25 @@ void PendingRounds::AbortOpsAfter(int64_t index) { // Either the new preceding id is in the pendings set or it must be equal to the // committed index since we can't truncate already committed operations. - if (iter != pending_ops_.end() && (*iter).first == index) { - new_preceding = (*iter).second->replicate_msg()->id(); + if (iter != pending_ops_.end() && iter->first == index) { + new_preceding = iter->second->replicate_msg()->id(); ++iter; } else { CHECK_EQ(index, last_committed_op_id_.index()); new_preceding = last_committed_op_id_; } + static const auto kAbortedStatus = Status::Aborted("Op aborted by new leader"); for (; iter != pending_ops_.end();) { - const scoped_refptr<ConsensusRound>& round = (*iter).second; - auto op_type = round->replicate_msg()->op_type(); + const scoped_refptr<ConsensusRound>& round = iter->second; + const auto op_type = round->replicate_msg()->op_type(); LOG_WITH_PREFIX(INFO) << "Aborting uncommitted " << OperationType_Name(op_type) << " operation due to leader change: " << round->replicate_msg()->id(); - round->NotifyReplicationFinished(Status::Aborted("Op aborted by new leader")); + round->NotifyReplicationFinished(kAbortedStatus); // Erase the entry from pendings. - pending_ops_.erase(iter++); + iter = pending_ops_.erase(iter); } } @@ -114,7 +116,6 @@ scoped_refptr<ConsensusRound> PendingRounds::GetPendingOpByIndexOrNull(int64_t i } bool PendingRounds::IsOpCommittedOrPending(const OpId& op_id, bool* term_mismatch) { - *term_mismatch = false; if (op_id.index() <= GetCommittedIndex()) { @@ -135,7 +136,7 @@ bool PendingRounds::IsOpCommittedOrPending(const OpId& op_id, bool* term_mismatc const OpId& PendingRounds::GetLastPendingOpOpId() const { return pending_ops_.empty() - ? MinimumOpId() : (--pending_ops_.end())->second->id(); + ? MinimumOpId() : pending_ops_.crbegin()->second->id(); } Status PendingRounds::AdvanceCommittedIndex(int64_t committed_index) { @@ -160,18 +161,19 @@ Status PendingRounds::AdvanceCommittedIndex(int64_t committed_index) { return Status::OK(); } + // Stop at the operation after the last one we must commit. + const auto end_iter = pending_ops_.upper_bound(committed_index); + // Start at the operation after the last committed one. auto iter = pending_ops_.upper_bound(last_committed_op_id_.index()); - // Stop at the operation after the last one we must commit. - auto end_iter = pending_ops_.upper_bound(committed_index); - CHECK(iter != pending_ops_.end()); + DCHECK(iter != pending_ops_.end()); VLOG_WITH_PREFIX(1) << "Last triggered apply was: " - << last_committed_op_id_ - << " Starting to apply from log index: " << (*iter).first; + << last_committed_op_id_ + << " Starting to apply from log index: " << iter->first; while (iter != end_iter) { - scoped_refptr<ConsensusRound> round = (*iter).second; // Make a copy. + scoped_refptr<ConsensusRound> round = iter->second; // Make a copy. DCHECK(round); const OpId& current_id = round->id(); @@ -179,7 +181,7 @@ Status PendingRounds::AdvanceCommittedIndex(int64_t committed_index) { CHECK_OK(CheckOpInSequence(last_committed_op_id_, current_id)); } - pending_ops_.erase(iter++); + iter = pending_ops_.erase(iter); last_committed_op_id_ = round->id(); time_manager_->AdvanceSafeTimeWithMessage(*round->replicate_msg()); round->NotifyReplicationFinished(Status::OK()); diff --git a/src/kudu/consensus/pending_rounds.h b/src/kudu/consensus/pending_rounds.h index e3f0e4376..38a9fb98a 100644 --- a/src/kudu/consensus/pending_rounds.h +++ b/src/kudu/consensus/pending_rounds.h @@ -40,7 +40,7 @@ class TimeManager; // NOTE: each round is associated with a single round, though "round" refers to // the logical Raft replication and "op" refers to the replicated batch of // operations. -class PendingRounds { +class PendingRounds final { public: PendingRounds(const std::string& log_prefix, TimeManager* time_manager); ~PendingRounds() = default; @@ -106,7 +106,7 @@ class PendingRounds { // Index=>Round map that manages pending ops, i.e. operations for which we've // received a replicate message from the leader but have yet to be committed. // The key is the index of the replicate operation. - typedef std::map<int64_t, scoped_refptr<ConsensusRound> > IndexToRoundMap; + typedef std::map<int64_t, scoped_refptr<ConsensusRound>> IndexToRoundMap; IndexToRoundMap pending_ops_; // The OpId of the round that was last committed. Initialized to MinimumOpId().
