This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new b63200383 [consensus] small clean-up on PendingRounds
b63200383 is described below

commit b6320038376b6fb35bfd1f1ac74323168df0884a
Author: Alexey Serbin <[email protected]>
AuthorDate: Wed Oct 2 15:33:44 2024 -0700

    [consensus] small clean-up on PendingRounds
    
    I was looking at the code around PendingRounds while troubleshooting
    a race condition, and found that there is room for micro-optimisations
    and syntactic sugar in the PendingRounds implementation.
    
    This patch doesn't contain any functional modifications.
    
    Change-Id: Iaa5b66d8a2cf179f43141de8d640251e1a2dc80f
    Reviewed-on: http://gerrit.cloudera.org:8080/21878
    Tested-by: Alexey Serbin <[email protected]>
    Reviewed-by: Abhishek Chennaka <[email protected]>
---
 src/kudu/consensus/pending_rounds.cc | 34 ++++++++++++++++++----------------
 src/kudu/consensus/pending_rounds.h  |  4 ++--
 2 files changed, 20 insertions(+), 18 deletions(-)

diff --git a/src/kudu/consensus/pending_rounds.cc 
b/src/kudu/consensus/pending_rounds.cc
index 80a5d8660..3e1c25302 100644
--- a/src/kudu/consensus/pending_rounds.cc
+++ b/src/kudu/consensus/pending_rounds.cc
@@ -62,12 +62,13 @@ Status PendingRounds::CancelPendingOps() {
   LOG_WITH_PREFIX(INFO) << "Trying to abort " << pending_ops_.size()
                         << " pending ops.";
   // Abort ops in reverse index order. See KUDU-1678.
+  static const auto kAbortedStatus = Status::Aborted("Op aborted");
   for (auto op = pending_ops_.crbegin(); op != pending_ops_.crend(); ++op) {
     const scoped_refptr<ConsensusRound>& round = op->second;
     // We cancel only ops whose applies have not yet been triggered.
     LOG_WITH_PREFIX(INFO) << "Aborting op as it isn't in flight: "
                           << SecureShortDebugString(*round->replicate_msg());
-    round->NotifyReplicationFinished(Status::Aborted("Op aborted"));
+    round->NotifyReplicationFinished(kAbortedStatus);
   }
   return Status::OK();
 }
@@ -83,24 +84,25 @@ void PendingRounds::AbortOpsAfter(int64_t index) {
 
   // Either the new preceding id is in the pendings set or it must be equal to 
the
   // committed index since we can't truncate already committed operations.
-  if (iter != pending_ops_.end() && (*iter).first == index) {
-    new_preceding = (*iter).second->replicate_msg()->id();
+  if (iter != pending_ops_.end() && iter->first == index) {
+    new_preceding = iter->second->replicate_msg()->id();
     ++iter;
   } else {
     CHECK_EQ(index, last_committed_op_id_.index());
     new_preceding = last_committed_op_id_;
   }
 
+  static const auto kAbortedStatus = Status::Aborted("Op aborted by new 
leader");
   for (; iter != pending_ops_.end();) {
-    const scoped_refptr<ConsensusRound>& round = (*iter).second;
-    auto op_type = round->replicate_msg()->op_type();
+    const scoped_refptr<ConsensusRound>& round = iter->second;
+    const auto op_type = round->replicate_msg()->op_type();
     LOG_WITH_PREFIX(INFO)
         << "Aborting uncommitted " << OperationType_Name(op_type)
         << " operation due to leader change: " << round->replicate_msg()->id();
 
-    round->NotifyReplicationFinished(Status::Aborted("Op aborted by new 
leader"));
+    round->NotifyReplicationFinished(kAbortedStatus);
     // Erase the entry from pendings.
-    pending_ops_.erase(iter++);
+    iter = pending_ops_.erase(iter);
   }
 }
 
@@ -114,7 +116,6 @@ scoped_refptr<ConsensusRound> 
PendingRounds::GetPendingOpByIndexOrNull(int64_t i
 }
 
 bool PendingRounds::IsOpCommittedOrPending(const OpId& op_id, bool* 
term_mismatch) {
-
   *term_mismatch = false;
 
   if (op_id.index() <= GetCommittedIndex()) {
@@ -135,7 +136,7 @@ bool PendingRounds::IsOpCommittedOrPending(const OpId& 
op_id, bool* term_mismatc
 
 const OpId& PendingRounds::GetLastPendingOpOpId() const {
   return pending_ops_.empty()
-      ? MinimumOpId() : (--pending_ops_.end())->second->id();
+      ? MinimumOpId() : pending_ops_.crbegin()->second->id();
 }
 
 Status PendingRounds::AdvanceCommittedIndex(int64_t committed_index) {
@@ -160,18 +161,19 @@ Status PendingRounds::AdvanceCommittedIndex(int64_t 
committed_index) {
     return Status::OK();
   }
 
+  // Stop at the operation after the last one we must commit.
+  const auto end_iter = pending_ops_.upper_bound(committed_index);
+
   // Start at the operation after the last committed one.
   auto iter = pending_ops_.upper_bound(last_committed_op_id_.index());
-  // Stop at the operation after the last one we must commit.
-  auto end_iter = pending_ops_.upper_bound(committed_index);
-  CHECK(iter != pending_ops_.end());
+  DCHECK(iter != pending_ops_.end());
 
   VLOG_WITH_PREFIX(1) << "Last triggered apply was: "
-      <<  last_committed_op_id_
-      << " Starting to apply from log index: " << (*iter).first;
+                      <<  last_committed_op_id_
+                      << " Starting to apply from log index: " << iter->first;
 
   while (iter != end_iter) {
-    scoped_refptr<ConsensusRound> round = (*iter).second; // Make a copy.
+    scoped_refptr<ConsensusRound> round = iter->second; // Make a copy.
     DCHECK(round);
     const OpId& current_id = round->id();
 
@@ -179,7 +181,7 @@ Status PendingRounds::AdvanceCommittedIndex(int64_t 
committed_index) {
       CHECK_OK(CheckOpInSequence(last_committed_op_id_, current_id));
     }
 
-    pending_ops_.erase(iter++);
+    iter = pending_ops_.erase(iter);
     last_committed_op_id_ = round->id();
     time_manager_->AdvanceSafeTimeWithMessage(*round->replicate_msg());
     round->NotifyReplicationFinished(Status::OK());
diff --git a/src/kudu/consensus/pending_rounds.h 
b/src/kudu/consensus/pending_rounds.h
index e3f0e4376..38a9fb98a 100644
--- a/src/kudu/consensus/pending_rounds.h
+++ b/src/kudu/consensus/pending_rounds.h
@@ -40,7 +40,7 @@ class TimeManager;
 // NOTE: each round is associated with a single round, though "round" refers to
 // the logical Raft replication and "op" refers to the replicated batch of
 // operations.
-class PendingRounds {
+class PendingRounds final {
  public:
   PendingRounds(const std::string& log_prefix, TimeManager* time_manager);
   ~PendingRounds() = default;
@@ -106,7 +106,7 @@ class PendingRounds {
   // Index=>Round map that manages pending ops, i.e. operations for which we've
   // received a replicate message from the leader but have yet to be committed.
   // The key is the index of the replicate operation.
-  typedef std::map<int64_t, scoped_refptr<ConsensusRound> > IndexToRoundMap;
+  typedef std::map<int64_t, scoped_refptr<ConsensusRound>> IndexToRoundMap;
   IndexToRoundMap pending_ops_;
 
   // The OpId of the round that was last committed. Initialized to 
MinimumOpId().

Reply via email to