This is an automated email from the ASF dual-hosted git repository.
alexey pushed a commit to branch branch-1.18.x
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/branch-1.18.x by this push:
new 2645e7bf1 [consensus] avoid copying TrackedPeer on every response
2645e7bf1 is described below
commit 2645e7bf1987febdbcecbc17d47448b18724096c
Author: Alexey Serbin <[email protected]>
AuthorDate: Tue Feb 11 13:12:05 2025 -0800
[consensus] avoid copying TrackedPeer on every response
PeerMessageQueue::TrackedPeer is quite a heavy structure, so it's better
avoid copying it when processing responses from Raft peers. I also
updated the related code to follow the current style guidelines.
This patch is inspired by [1].
This changelist doesn't contain any functional modifications.
[1] https://github.com/facebook/kuduraft/commit/e04b6d03e
Change-Id: I7a2e38412a9c3434c580889d73a69af533f13d28
Reviewed-on: http://gerrit.cloudera.org:8080/22476
Reviewed-by: Yifan Zhang <[email protected]>
Tested-by: Alexey Serbin <[email protected]>
(cherry picked from commit f47805ea21b69076d90d377d60a5e429904c57e3)
Reviewed-on: http://gerrit.cloudera.org:8080/22479
---
src/kudu/consensus/consensus_queue.cc | 37 ++++++++++++++++++++---------------
src/kudu/consensus/consensus_queue.h | 11 +++++++----
2 files changed, 28 insertions(+), 20 deletions(-)
diff --git a/src/kudu/consensus/consensus_queue.cc
b/src/kudu/consensus/consensus_queue.cc
index 4f279a8fe..3d32d874e 100644
--- a/src/kudu/consensus/consensus_queue.cc
+++ b/src/kudu/consensus/consensus_queue.cc
@@ -1012,7 +1012,7 @@ void PeerMessageQueue::UpdatePeerStatus(const string&
peer_uuid,
}
void PeerMessageQueue::UpdateExchangeStatus(TrackedPeer* peer,
- const TrackedPeer& prev_peer_state,
+ PeerStatus
prev_last_exchange_status,
const ConsensusResponsePB&
response,
bool* lmp_mismatch) {
DCHECK(queue_lock_.is_locked());
@@ -1031,7 +1031,7 @@ void PeerMessageQueue::UpdateExchangeStatus(TrackedPeer*
peer,
case ConsensusErrorPB::PRECEDING_ENTRY_DIDNT_MATCH:
peer->last_exchange_status = PeerStatus::LMP_MISMATCH;
DCHECK(status.has_last_received());
- if (prev_peer_state.last_exchange_status == PeerStatus::NEW) {
+ if (prev_last_exchange_status == PeerStatus::NEW) {
LOG_WITH_PREFIX_UNLOCKED(INFO) << "Connected to new peer: " <<
peer->ToString();
} else {
LOG_WITH_PREFIX_UNLOCKED(INFO) << "Got LMP mismatch error from peer: "
@@ -1057,11 +1057,12 @@ void
PeerMessageQueue::UpdateExchangeStatus(TrackedPeer* peer,
}
}
-void PeerMessageQueue::PromoteIfNeeded(TrackedPeer* peer, const TrackedPeer&
prev_peer_state,
- const ConsensusStatusPB& status) {
+void PeerMessageQueue::PromoteIfNeeded(const TrackedPeer& peer,
+ const ConsensusStatusPB& status,
+ int64_t prev_last_received_index) {
DCHECK(queue_lock_.is_locked());
if (queue_state_.mode != PeerMessageQueue::LEADER ||
- peer->last_exchange_status != PeerStatus::OK) {
+ peer.last_exchange_status != PeerStatus::OK) {
return;
}
@@ -1069,7 +1070,7 @@ void PeerMessageQueue::PromoteIfNeeded(TrackedPeer* peer,
const TrackedPeer& pre
// TrackedPeer data structure.
RaftPeerPB* peer_pb;
Status s =
GetRaftConfigMember(DCHECK_NOTNULL(queue_state_.active_config.get()),
- peer->uuid(), &peer_pb);
+ peer.uuid(), &peer_pb);
if (s.ok() &&
peer_pb->member_type() == RaftPeerPB::NON_VOTER &&
peer_pb->attrs().promote()) {
@@ -1080,10 +1081,12 @@ void PeerMessageQueue::PromoteIfNeeded(TrackedPeer*
peer, const TrackedPeer& pre
// If we had never previously contacted this peer, wait until the second
// time we contact them to try to promote them.
- if (prev_peer_state.last_received.index() == 0) return;
+ if (prev_last_received_index == 0) {
+ return;
+ }
- int64_t last_batch_size = std::max<int64_t>(0, peer->last_received.index()
-
- prev_peer_state.last_received.index());
+ const int64_t last_batch_size = std::max<int64_t>(
+ 0, peer.last_received.index() - prev_last_received_index);
if (status.last_received_current_leader() == MinimumOpId() ||
status.last_received_current_leader().index() + last_batch_size
< queue_state_.committed_index) {
@@ -1094,7 +1097,7 @@ void PeerMessageQueue::PromoteIfNeeded(TrackedPeer* peer,
const TrackedPeer& pre
// TODO(mpercy): Implement a SafeToPromote() check to ensure that we only
// try to promote a NON_VOTER to VOTER if we will be able to commit the
// resulting config change operation.
- NotifyObserversOfPeerToPromote(peer->uuid());
+ NotifyObserversOfPeerToPromote(peer.uuid());
}
}
@@ -1178,14 +1181,16 @@ bool PeerMessageQueue::ResponseFromPeer(const
std::string& peer_uuid,
DCHECK(status.has_last_received_current_leader());
DCHECK(status.has_last_committed_idx());
- // Take a snapshot of the previously-recorded peer state.
- const TrackedPeer prev_peer_state = *peer;
+ // Take a snapshot of a few fields from the previously-recorded peer state.
+ const OpId peer_last_received_op_id = peer->last_received;
+ const PeerStatus peer_last_exchange_status = peer->last_exchange_status;
// Update the peer's last exchange status based on the response.
// In this case, if there is a log matching property (LMP) mismatch, we
// want to immediately send another request as we attempt to sync the log
// offset between the local leader and the remote peer.
- UpdateExchangeStatus(peer, prev_peer_state, response,
&send_more_immediately);
+ UpdateExchangeStatus(
+ peer, peer_last_exchange_status, response, &send_more_immediately);
// If the peer is hosted on a server that is quiescing, note that now.
peer->remote_server_quiescing = response.has_server_quiescing() &&
@@ -1204,7 +1209,7 @@ bool PeerMessageQueue::ResponseFromPeer(const
std::string& peer_uuid,
peer->next_index = peer->last_received.index() + 1;
// Check if the peer is a NON_VOTER candidate ready for promotion.
- PromoteIfNeeded(peer, prev_peer_state, status);
+ PromoteIfNeeded(*peer, status, peer_last_received_op_id.index());
TransferLeadershipIfNeeded(*peer, status);
} else if (status.last_received_current_leader() != MinimumOpId()) {
@@ -1261,7 +1266,7 @@ bool PeerMessageQueue::ResponseFromPeer(const
std::string& peer_uuid,
// Advance the majority replicated index.
AdvanceQueueWatermark("majority_replicated",
&queue_state_.majority_replicated_index,
- /*replicated_before=*/
prev_peer_state.last_received,
+ /*replicated_before=*/ peer_last_received_op_id,
/*replicated_after=*/ peer->last_received,
/*num_peers_required=*/
queue_state_.majority_size_,
VOTER_REPLICAS,
@@ -1270,7 +1275,7 @@ bool PeerMessageQueue::ResponseFromPeer(const
std::string& peer_uuid,
// Advance the all replicated index.
AdvanceQueueWatermark("all_replicated",
&queue_state_.all_replicated_index,
- /*replicated_before=*/
prev_peer_state.last_received,
+ /*replicated_before=*/ peer_last_received_op_id,
/*replicated_after=*/ peer->last_received,
/*num_peers_required=*/ peers_map_.size(),
ALL_REPLICAS,
diff --git a/src/kudu/consensus/consensus_queue.h
b/src/kudu/consensus/consensus_queue.h
index c20429dcc..f67cf7548 100644
--- a/src/kudu/consensus/consensus_queue.h
+++ b/src/kudu/consensus/consensus_queue.h
@@ -465,13 +465,16 @@ class PeerMessageQueue {
// response. Sets 'lmp_mismatch' to true if the given response indicates
// there was a log-matching property mismatch on the remote, otherwise sets
// it to false.
- void UpdateExchangeStatus(TrackedPeer* peer, const TrackedPeer&
prev_peer_state,
- const ConsensusResponsePB& response, bool*
lmp_mismatch);
+ void UpdateExchangeStatus(TrackedPeer* peer,
+ PeerStatus prev_last_exchange_status,
+ const ConsensusResponsePB& response,
+ bool* lmp_mismatch);
// Check if the peer is a NON_VOTER candidate ready for promotion. If so,
// trigger promotion.
- void PromoteIfNeeded(TrackedPeer* peer, const TrackedPeer& prev_peer_state,
- const ConsensusStatusPB& status);
+ void PromoteIfNeeded(const TrackedPeer& peer,
+ const ConsensusStatusPB& status,
+ int64_t prev_last_received_index);
// If there is a graceful leadership change underway, notify queue observers
// to initiate leadership transfer to the specified peer under the following