Repository: kudu Updated Branches: refs/heads/master f28e8bb81 -> 74aa53f13
[consensus] fix message on unprepared dedup ops A minor clean-up on RaftConsensus::UpdateReplica() aiming to fix the warning message on the unprepared operations from deduplicated request. Change-Id: Ib9871068743b27720f839797ba6aa6f23cf03a7a Reviewed-on: http://gerrit.cloudera.org:8080/11982 Tested-by: Kudu Jenkins Reviewed-by: Andrew Wong <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/kudu/repo Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/b37f264f Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/b37f264f Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/b37f264f Branch: refs/heads/master Commit: b37f264fbb4f798ee9743ed87e145cd92a7fc63d Parents: f28e8bb Author: Alexey Serbin <[email protected]> Authored: Thu Nov 22 00:17:42 2018 -0800 Committer: Alexey Serbin <[email protected]> Committed: Mon Nov 26 14:23:26 2018 +0000 ---------------------------------------------------------------------- src/kudu/consensus/raft_consensus.cc | 50 ++++++++++++++----------------- 1 file changed, 22 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kudu/blob/b37f264f/src/kudu/consensus/raft_consensus.cc ---------------------------------------------------------------------- diff --git a/src/kudu/consensus/raft_consensus.cc b/src/kudu/consensus/raft_consensus.cc index fc3fdca..d71f8e9 100644 --- a/src/kudu/consensus/raft_consensus.cc +++ b/src/kudu/consensus/raft_consensus.cc @@ -21,6 +21,7 @@ #include <cmath> #include <cstdint> #include <functional> +#include <iterator> #include <memory> #include <mutex> #include <ostream> @@ -1391,7 +1392,7 @@ Status RaftConsensus::UpdateReplica(const ConsensusRequestPB* request, // The deduplicated request. LeaderRequest deduped_req; - + auto& messages = deduped_req.messages; { ThreadRestrictions::AssertWaitAllowed(); LockGuard l(lock_); @@ -1403,7 +1404,6 @@ Status RaftConsensus::UpdateReplica(const ConsensusRequestPB* request, deduped_req.leader_uuid = request->caller_uuid(); RETURN_NOT_OK(CheckLeaderRequestUnlocked(request, response, &deduped_req)); - if (response->status().has_error()) { // We had an error, like an invalid term, we still fill the response. FillConsensusResponseOKUnlocked(response); @@ -1440,7 +1440,7 @@ Status RaftConsensus::UpdateReplica(const ConsensusRequestPB* request, // 1. As many pending transactions as we can, except... // 2. ...if we commit beyond the preceding index, we'd regress KUDU-639, and... // 3. ...the leader's committed index is always our upper bound. - int64_t early_apply_up_to = std::min<int64_t>({ + const int64_t early_apply_up_to = std::min({ pending_->GetLastPendingTransactionOpId().index(), deduped_req.preceding_opid->index(), request->committed_index()}); @@ -1456,13 +1456,9 @@ Status RaftConsensus::UpdateReplica(const ConsensusRequestPB* request, // 2 - Enqueue the prepares - TRACE("Triggering prepare for $0 ops", deduped_req.messages.size()); - - Status prepare_status; - auto iter = deduped_req.messages.begin(); - - if (PREDICT_TRUE(!deduped_req.messages.empty())) { + TRACE("Triggering prepare for $0 ops", messages.size()); + if (PREDICT_TRUE(!messages.empty())) { // This request contains at least one message, and is likely to increase // our memory pressure. double capacity_pct; @@ -1482,12 +1478,14 @@ Status RaftConsensus::UpdateReplica(const ConsensusRequestPB* request, } } - while (iter != deduped_req.messages.end()) { + Status prepare_status; + auto iter = messages.begin(); + while (iter != messages.end()) { prepare_status = StartFollowerTransactionUnlocked(*iter); if (PREDICT_FALSE(!prepare_status.ok())) { break; } - // TODO(dralves) Without leader leases this shouldn't be a allowed to fail. + // TODO(dralves) Without leader leases this shouldn't be allowed to fail. // Once we have that functionality we'll have to revisit this. CHECK_OK(time_manager_->MessageReceivedFromLeader(*(*iter)->get())); ++iter; @@ -1497,22 +1495,18 @@ Status RaftConsensus::UpdateReplica(const ConsensusRequestPB* request, // to perform cleanup, namely trimming deduped_req.messages to only contain the messages // that were actually prepared, and deleting the other ones since we've taken ownership // when we first deduped. - if (iter != deduped_req.messages.end()) { - bool need_to_warn = true; - while (iter != deduped_req.messages.end()) { - ReplicateRefPtr msg = (*iter); - iter = deduped_req.messages.erase(iter); - if (need_to_warn) { - need_to_warn = false; - LOG_WITH_PREFIX_UNLOCKED(WARNING) << "Could not prepare transaction for op " - << msg->get()->id() << " and following " << deduped_req.messages.size() << - " ops. Status for this op: " << prepare_status.ToString(); - } - } + if (iter != messages.end()) { + LOG_WITH_PREFIX_UNLOCKED(WARNING) << Substitute( + "Could not prepare transaction for op '$0' and following $1 ops. " + "Status for this op: $2", + (*iter)->get()->id().ShortDebugString(), + std::distance(iter, messages.end()) - 1, + prepare_status.ToString()); + iter = messages.erase(iter, messages.end()); // If this is empty, it means we couldn't prepare a single de-duped message. There is nothing // else we can do. The leader will detect this and retry later. - if (deduped_req.messages.empty()) { + if (messages.empty()) { string msg = Substitute("Rejecting Update request from peer $0 for term $1. " "Could not prepare a single transaction due to: $2", request->caller_uuid(), @@ -1538,14 +1532,14 @@ Status RaftConsensus::UpdateReplica(const ConsensusRequestPB* request, // 3 - Enqueue the writes. // Now that we've triggered the prepares enqueue the operations to be written // to the WAL. - if (PREDICT_TRUE(!deduped_req.messages.empty())) { - last_from_leader = deduped_req.messages.back()->get()->id(); + if (PREDICT_TRUE(!messages.empty())) { + last_from_leader = messages.back()->get()->id(); // Trigger the log append asap, if fsync() is on this might take a while // and we can't reply until this is done. // // Since we've prepared, we need to be able to append (or we risk trying to apply // later something that wasn't logged). We crash if we can't. - CHECK_OK(queue_->AppendOperations(deduped_req.messages, sync_status_cb)); + CHECK_OK(queue_->AppendOperations(messages, sync_status_cb)); } else { last_from_leader = *deduped_req.preceding_opid; } @@ -1585,7 +1579,7 @@ Status RaftConsensus::UpdateReplica(const ConsensusRequestPB* request, // We'll re-acquire it before we update the state again. // Update the last replicated op id - if (!deduped_req.messages.empty()) { + if (!messages.empty()) { // 5 - We wait for the writes to be durable.
