This is an automated email from the ASF dual-hosted git repository.
alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new b63200383 [consensus] small clean-up on PendingRounds
b63200383 is described below
commit b6320038376b6fb35bfd1f1ac74323168df0884a
Author: Alexey Serbin <[email protected]>
AuthorDate: Wed Oct 2 15:33:44 2024 -0700
[consensus] small clean-up on PendingRounds
I was looking at the code around PendingRounds while troubleshooting
a race condition, and found that there is room for micro-optimisations
and syntactic sugar in the PendingRounds implementation.
This patch doesn't contain any functional modifications.
Change-Id: Iaa5b66d8a2cf179f43141de8d640251e1a2dc80f
Reviewed-on: http://gerrit.cloudera.org:8080/21878
Tested-by: Alexey Serbin <[email protected]>
Reviewed-by: Abhishek Chennaka <[email protected]>
---
src/kudu/consensus/pending_rounds.cc | 34 ++++++++++++++++++----------------
src/kudu/consensus/pending_rounds.h | 4 ++--
2 files changed, 20 insertions(+), 18 deletions(-)
diff --git a/src/kudu/consensus/pending_rounds.cc
b/src/kudu/consensus/pending_rounds.cc
index 80a5d8660..3e1c25302 100644
--- a/src/kudu/consensus/pending_rounds.cc
+++ b/src/kudu/consensus/pending_rounds.cc
@@ -62,12 +62,13 @@ Status PendingRounds::CancelPendingOps() {
LOG_WITH_PREFIX(INFO) << "Trying to abort " << pending_ops_.size()
<< " pending ops.";
// Abort ops in reverse index order. See KUDU-1678.
+ static const auto kAbortedStatus = Status::Aborted("Op aborted");
for (auto op = pending_ops_.crbegin(); op != pending_ops_.crend(); ++op) {
const scoped_refptr<ConsensusRound>& round = op->second;
// We cancel only ops whose applies have not yet been triggered.
LOG_WITH_PREFIX(INFO) << "Aborting op as it isn't in flight: "
<< SecureShortDebugString(*round->replicate_msg());
- round->NotifyReplicationFinished(Status::Aborted("Op aborted"));
+ round->NotifyReplicationFinished(kAbortedStatus);
}
return Status::OK();
}
@@ -83,24 +84,25 @@ void PendingRounds::AbortOpsAfter(int64_t index) {
// Either the new preceding id is in the pendings set or it must be equal to
the
// committed index since we can't truncate already committed operations.
- if (iter != pending_ops_.end() && (*iter).first == index) {
- new_preceding = (*iter).second->replicate_msg()->id();
+ if (iter != pending_ops_.end() && iter->first == index) {
+ new_preceding = iter->second->replicate_msg()->id();
++iter;
} else {
CHECK_EQ(index, last_committed_op_id_.index());
new_preceding = last_committed_op_id_;
}
+ static const auto kAbortedStatus = Status::Aborted("Op aborted by new
leader");
for (; iter != pending_ops_.end();) {
- const scoped_refptr<ConsensusRound>& round = (*iter).second;
- auto op_type = round->replicate_msg()->op_type();
+ const scoped_refptr<ConsensusRound>& round = iter->second;
+ const auto op_type = round->replicate_msg()->op_type();
LOG_WITH_PREFIX(INFO)
<< "Aborting uncommitted " << OperationType_Name(op_type)
<< " operation due to leader change: " << round->replicate_msg()->id();
- round->NotifyReplicationFinished(Status::Aborted("Op aborted by new
leader"));
+ round->NotifyReplicationFinished(kAbortedStatus);
// Erase the entry from pendings.
- pending_ops_.erase(iter++);
+ iter = pending_ops_.erase(iter);
}
}
@@ -114,7 +116,6 @@ scoped_refptr<ConsensusRound>
PendingRounds::GetPendingOpByIndexOrNull(int64_t i
}
bool PendingRounds::IsOpCommittedOrPending(const OpId& op_id, bool*
term_mismatch) {
-
*term_mismatch = false;
if (op_id.index() <= GetCommittedIndex()) {
@@ -135,7 +136,7 @@ bool PendingRounds::IsOpCommittedOrPending(const OpId&
op_id, bool* term_mismatc
const OpId& PendingRounds::GetLastPendingOpOpId() const {
return pending_ops_.empty()
- ? MinimumOpId() : (--pending_ops_.end())->second->id();
+ ? MinimumOpId() : pending_ops_.crbegin()->second->id();
}
Status PendingRounds::AdvanceCommittedIndex(int64_t committed_index) {
@@ -160,18 +161,19 @@ Status PendingRounds::AdvanceCommittedIndex(int64_t
committed_index) {
return Status::OK();
}
+ // Stop at the operation after the last one we must commit.
+ const auto end_iter = pending_ops_.upper_bound(committed_index);
+
// Start at the operation after the last committed one.
auto iter = pending_ops_.upper_bound(last_committed_op_id_.index());
- // Stop at the operation after the last one we must commit.
- auto end_iter = pending_ops_.upper_bound(committed_index);
- CHECK(iter != pending_ops_.end());
+ DCHECK(iter != pending_ops_.end());
VLOG_WITH_PREFIX(1) << "Last triggered apply was: "
- << last_committed_op_id_
- << " Starting to apply from log index: " << (*iter).first;
+ << last_committed_op_id_
+ << " Starting to apply from log index: " << iter->first;
while (iter != end_iter) {
- scoped_refptr<ConsensusRound> round = (*iter).second; // Make a copy.
+ scoped_refptr<ConsensusRound> round = iter->second; // Make a copy.
DCHECK(round);
const OpId& current_id = round->id();
@@ -179,7 +181,7 @@ Status PendingRounds::AdvanceCommittedIndex(int64_t
committed_index) {
CHECK_OK(CheckOpInSequence(last_committed_op_id_, current_id));
}
- pending_ops_.erase(iter++);
+ iter = pending_ops_.erase(iter);
last_committed_op_id_ = round->id();
time_manager_->AdvanceSafeTimeWithMessage(*round->replicate_msg());
round->NotifyReplicationFinished(Status::OK());
diff --git a/src/kudu/consensus/pending_rounds.h
b/src/kudu/consensus/pending_rounds.h
index e3f0e4376..38a9fb98a 100644
--- a/src/kudu/consensus/pending_rounds.h
+++ b/src/kudu/consensus/pending_rounds.h
@@ -40,7 +40,7 @@ class TimeManager;
// NOTE: each round is associated with a single round, though "round" refers to
// the logical Raft replication and "op" refers to the replicated batch of
// operations.
-class PendingRounds {
+class PendingRounds final {
public:
PendingRounds(const std::string& log_prefix, TimeManager* time_manager);
~PendingRounds() = default;
@@ -106,7 +106,7 @@ class PendingRounds {
// Index=>Round map that manages pending ops, i.e. operations for which we've
// received a replicate message from the leader but have yet to be committed.
// The key is the index of the replicate operation.
- typedef std::map<int64_t, scoped_refptr<ConsensusRound> > IndexToRoundMap;
+ typedef std::map<int64_t, scoped_refptr<ConsensusRound>> IndexToRoundMap;
IndexToRoundMap pending_ops_;
// The OpId of the round that was last committed. Initialized to
MinimumOpId().