This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch branch-1.18.x
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/branch-1.18.x by this push:
     new 2645e7bf1 [consensus] avoid copying TrackedPeer on every response
2645e7bf1 is described below

commit 2645e7bf1987febdbcecbc17d47448b18724096c
Author: Alexey Serbin <[email protected]>
AuthorDate: Tue Feb 11 13:12:05 2025 -0800

    [consensus] avoid copying TrackedPeer on every response
    
    PeerMessageQueue::TrackedPeer is quite a heavy structure, so it's better
    avoid copying it when processing responses from Raft peers.  I also
    updated the related code to follow the current style guidelines.
    
    This patch is inspired by [1].
    
    This changelist doesn't contain any functional modifications.
    
    [1] https://github.com/facebook/kuduraft/commit/e04b6d03e
    
    Change-Id: I7a2e38412a9c3434c580889d73a69af533f13d28
    Reviewed-on: http://gerrit.cloudera.org:8080/22476
    Reviewed-by: Yifan Zhang <[email protected]>
    Tested-by: Alexey Serbin <[email protected]>
    (cherry picked from commit f47805ea21b69076d90d377d60a5e429904c57e3)
    Reviewed-on: http://gerrit.cloudera.org:8080/22479
---
 src/kudu/consensus/consensus_queue.cc | 37 ++++++++++++++++++++---------------
 src/kudu/consensus/consensus_queue.h  | 11 +++++++----
 2 files changed, 28 insertions(+), 20 deletions(-)

diff --git a/src/kudu/consensus/consensus_queue.cc 
b/src/kudu/consensus/consensus_queue.cc
index 4f279a8fe..3d32d874e 100644
--- a/src/kudu/consensus/consensus_queue.cc
+++ b/src/kudu/consensus/consensus_queue.cc
@@ -1012,7 +1012,7 @@ void PeerMessageQueue::UpdatePeerStatus(const string& 
peer_uuid,
 }
 
 void PeerMessageQueue::UpdateExchangeStatus(TrackedPeer* peer,
-                                            const TrackedPeer& prev_peer_state,
+                                            PeerStatus 
prev_last_exchange_status,
                                             const ConsensusResponsePB& 
response,
                                             bool* lmp_mismatch) {
   DCHECK(queue_lock_.is_locked());
@@ -1031,7 +1031,7 @@ void PeerMessageQueue::UpdateExchangeStatus(TrackedPeer* 
peer,
     case ConsensusErrorPB::PRECEDING_ENTRY_DIDNT_MATCH:
       peer->last_exchange_status = PeerStatus::LMP_MISMATCH;
       DCHECK(status.has_last_received());
-      if (prev_peer_state.last_exchange_status == PeerStatus::NEW) {
+      if (prev_last_exchange_status == PeerStatus::NEW) {
         LOG_WITH_PREFIX_UNLOCKED(INFO) << "Connected to new peer: " << 
peer->ToString();
       } else {
         LOG_WITH_PREFIX_UNLOCKED(INFO) << "Got LMP mismatch error from peer: "
@@ -1057,11 +1057,12 @@ void 
PeerMessageQueue::UpdateExchangeStatus(TrackedPeer* peer,
   }
 }
 
-void PeerMessageQueue::PromoteIfNeeded(TrackedPeer* peer, const TrackedPeer& 
prev_peer_state,
-                                       const ConsensusStatusPB& status) {
+void PeerMessageQueue::PromoteIfNeeded(const TrackedPeer& peer,
+                                       const ConsensusStatusPB& status,
+                                       int64_t prev_last_received_index) {
   DCHECK(queue_lock_.is_locked());
   if (queue_state_.mode != PeerMessageQueue::LEADER ||
-      peer->last_exchange_status != PeerStatus::OK) {
+      peer.last_exchange_status != PeerStatus::OK) {
     return;
   }
 
@@ -1069,7 +1070,7 @@ void PeerMessageQueue::PromoteIfNeeded(TrackedPeer* peer, 
const TrackedPeer& pre
   // TrackedPeer data structure.
   RaftPeerPB* peer_pb;
   Status s = 
GetRaftConfigMember(DCHECK_NOTNULL(queue_state_.active_config.get()),
-                                 peer->uuid(), &peer_pb);
+                                 peer.uuid(), &peer_pb);
   if (s.ok() &&
       peer_pb->member_type() == RaftPeerPB::NON_VOTER &&
       peer_pb->attrs().promote()) {
@@ -1080,10 +1081,12 @@ void PeerMessageQueue::PromoteIfNeeded(TrackedPeer* 
peer, const TrackedPeer& pre
 
     // If we had never previously contacted this peer, wait until the second
     // time we contact them to try to promote them.
-    if (prev_peer_state.last_received.index() == 0) return;
+    if (prev_last_received_index == 0) {
+      return;
+    }
 
-    int64_t last_batch_size = std::max<int64_t>(0, peer->last_received.index() 
-
-        prev_peer_state.last_received.index());
+    const int64_t last_batch_size = std::max<int64_t>(
+        0, peer.last_received.index() - prev_last_received_index);
     if (status.last_received_current_leader() == MinimumOpId() ||
         status.last_received_current_leader().index() + last_batch_size
             < queue_state_.committed_index) {
@@ -1094,7 +1097,7 @@ void PeerMessageQueue::PromoteIfNeeded(TrackedPeer* peer, 
const TrackedPeer& pre
     // TODO(mpercy): Implement a SafeToPromote() check to ensure that we only
     // try to promote a NON_VOTER to VOTER if we will be able to commit the
     // resulting config change operation.
-    NotifyObserversOfPeerToPromote(peer->uuid());
+    NotifyObserversOfPeerToPromote(peer.uuid());
   }
 }
 
@@ -1178,14 +1181,16 @@ bool PeerMessageQueue::ResponseFromPeer(const 
std::string& peer_uuid,
     DCHECK(status.has_last_received_current_leader());
     DCHECK(status.has_last_committed_idx());
 
-    // Take a snapshot of the previously-recorded peer state.
-    const TrackedPeer prev_peer_state = *peer;
+    // Take a snapshot of a few fields from the previously-recorded peer state.
+    const OpId peer_last_received_op_id = peer->last_received;
+    const PeerStatus peer_last_exchange_status = peer->last_exchange_status;
 
     // Update the peer's last exchange status based on the response.
     // In this case, if there is a log matching property (LMP) mismatch, we
     // want to immediately send another request as we attempt to sync the log
     // offset between the local leader and the remote peer.
-    UpdateExchangeStatus(peer, prev_peer_state, response, 
&send_more_immediately);
+    UpdateExchangeStatus(
+        peer, peer_last_exchange_status, response, &send_more_immediately);
 
     // If the peer is hosted on a server that is quiescing, note that now.
     peer->remote_server_quiescing = response.has_server_quiescing() &&
@@ -1204,7 +1209,7 @@ bool PeerMessageQueue::ResponseFromPeer(const 
std::string& peer_uuid,
       peer->next_index = peer->last_received.index() + 1;
 
       // Check if the peer is a NON_VOTER candidate ready for promotion.
-      PromoteIfNeeded(peer, prev_peer_state, status);
+      PromoteIfNeeded(*peer, status, peer_last_received_op_id.index());
 
       TransferLeadershipIfNeeded(*peer, status);
     } else if (status.last_received_current_leader() != MinimumOpId()) {
@@ -1261,7 +1266,7 @@ bool PeerMessageQueue::ResponseFromPeer(const 
std::string& peer_uuid,
       // Advance the majority replicated index.
       AdvanceQueueWatermark("majority_replicated",
                             &queue_state_.majority_replicated_index,
-                            /*replicated_before=*/ 
prev_peer_state.last_received,
+                            /*replicated_before=*/ peer_last_received_op_id,
                             /*replicated_after=*/ peer->last_received,
                             /*num_peers_required=*/ 
queue_state_.majority_size_,
                             VOTER_REPLICAS,
@@ -1270,7 +1275,7 @@ bool PeerMessageQueue::ResponseFromPeer(const 
std::string& peer_uuid,
       // Advance the all replicated index.
       AdvanceQueueWatermark("all_replicated",
                             &queue_state_.all_replicated_index,
-                            /*replicated_before=*/ 
prev_peer_state.last_received,
+                            /*replicated_before=*/ peer_last_received_op_id,
                             /*replicated_after=*/ peer->last_received,
                             /*num_peers_required=*/ peers_map_.size(),
                             ALL_REPLICAS,
diff --git a/src/kudu/consensus/consensus_queue.h 
b/src/kudu/consensus/consensus_queue.h
index c20429dcc..f67cf7548 100644
--- a/src/kudu/consensus/consensus_queue.h
+++ b/src/kudu/consensus/consensus_queue.h
@@ -465,13 +465,16 @@ class PeerMessageQueue {
   // response. Sets 'lmp_mismatch' to true if the given response indicates
   // there was a log-matching property mismatch on the remote, otherwise sets
   // it to false.
-  void UpdateExchangeStatus(TrackedPeer* peer, const TrackedPeer& 
prev_peer_state,
-                            const ConsensusResponsePB& response, bool* 
lmp_mismatch);
+  void UpdateExchangeStatus(TrackedPeer* peer,
+                            PeerStatus prev_last_exchange_status,
+                            const ConsensusResponsePB& response,
+                            bool* lmp_mismatch);
 
   // Check if the peer is a NON_VOTER candidate ready for promotion. If so,
   // trigger promotion.
-  void PromoteIfNeeded(TrackedPeer* peer, const TrackedPeer& prev_peer_state,
-                       const ConsensusStatusPB& status);
+  void PromoteIfNeeded(const TrackedPeer& peer,
+                       const ConsensusStatusPB& status,
+                       int64_t prev_last_received_index);
 
   // If there is a graceful leadership change underway, notify queue observers
   // to initiate leadership transfer to the specified peer under the following

Reply via email to