This is an automated email from the ASF dual-hosted git repository. alexey pushed a commit to branch branch-1.18.x in repository https://gitbox.apache.org/repos/asf/kudu.git
commit 9e0c2e5390ebd88f87702ccba4a9992357d0bb2b Author: Alexey Serbin <[email protected]> AuthorDate: Thu Feb 27 18:19:12 2025 -0800 [consensus] fix result status handling This patch addresses sloppy result handling of functions/methods that return Status under the src/kudu/consensus directory. In addition, this patch introduces a new unsafe runtime flag --raft_allow_committed_pending_index_gap that's now used by the TsRecoveryITestDeathTest.RecoverFromOpIdOverflow test scenario only. The test scenario was reliant on the sloppy result status handling of the call to PendingRounds::SetInitialCommittedOpId() in RaftConsensus::Start() that's been addressed in this patch. It's the path of least resistance compared with updating test scaffolding and the test scenario itself, but it also has a benefit of providing a flag to enable the former result status handling approach in RaftConsensus::Start() when setting last committed OpId for a tablet's Raft consensus implementation. Frankly, I don't think anybody will ever need to re-enable the original bogus behavior of ignoring non-OK result status, but it doesn't hurt to have such an option just in case if I'm missing something here. Change-Id: I6db113c58c6b453bf3cd290ee0dfdd184b63b999 Reviewed-on: http://gerrit.cloudera.org:8080/22646 Reviewed-by: Abhishek Chennaka <[email protected]> Tested-by: Alexey Serbin <[email protected]> (cherry picked from commit bde9cbad115deb1cd689eb583a1c7a8d3440b156) Reviewed-on: http://gerrit.cloudera.org:8080/22752 --- src/kudu/consensus/consensus_peers-test.cc | 14 +++++++------- src/kudu/consensus/consensus_peers.cc | 14 +++++++------- src/kudu/consensus/consensus_queue-test.cc | 10 +++++----- src/kudu/consensus/consensus_queue.cc | 5 ++--- src/kudu/consensus/consensus_queue.h | 2 +- src/kudu/consensus/log-test.cc | 24 ++++++++++++------------ src/kudu/consensus/log.cc | 5 ++--- src/kudu/consensus/log_anchor_registry.cc | 5 +++-- src/kudu/consensus/log_anchor_registry.h | 2 +- src/kudu/consensus/log_cache-test.cc | 24 ++++++++++++------------ src/kudu/consensus/peer_manager.cc | 19 ++++++++++++------- src/kudu/consensus/pending_rounds.cc | 11 ++++++++++- src/kudu/consensus/raft_consensus.cc | 12 ++++++++---- src/kudu/consensus/raft_consensus_quorum-test.cc | 6 +++--- src/kudu/consensus/time_manager-test.cc | 4 ++-- src/kudu/integration-tests/ts_recovery-itest.cc | 8 +++++++- 16 files changed, 94 insertions(+), 71 deletions(-) diff --git a/src/kudu/consensus/consensus_peers-test.cc b/src/kudu/consensus/consensus_peers-test.cc index 02e2ec89c..236f730f9 100644 --- a/src/kudu/consensus/consensus_peers-test.cc +++ b/src/kudu/consensus/consensus_peers-test.cc @@ -208,7 +208,7 @@ TEST_F(ConsensusPeersTest, TestRemotePeer) { AppendReplicateMessagesToQueue(message_queue_.get(), clock_.get(), 1, 20); // signal the peer there are requests pending. - remote_peer->SignalRequest(); + ASSERT_OK(remote_peer->SignalRequest()); // now wait on the status of the last operation // this will complete once the peer has logged all // requests. @@ -240,8 +240,8 @@ TEST_F(ConsensusPeersTest, TestRemotePeers) { OpId first = MakeOpId(0, 1); - remote_peer1->SignalRequest(); - remote_peer2->SignalRequest(); + ASSERT_OK(remote_peer1->SignalRequest()); + ASSERT_OK(remote_peer2->SignalRequest()); // Now wait for the message to be replicated, this should succeed since // majority = 2 and only one peer was delayed. The majority is made up @@ -268,7 +268,7 @@ TEST_F(ConsensusPeersTest, TestRemotePeers) { ASSERT_LT(message_queue_->GetCommittedIndex(), 2); // Signal one of the two remote peers. - remote_peer1->SignalRequest(); + ASSERT_OK(remote_peer1->SignalRequest()); // We should now be able to wait for it to replicate, since two peers (a majority) // have replicated the message. WaitForCommitIndex(2); @@ -308,7 +308,7 @@ TEST_F(ConsensusPeersTest, TestCloseWhenRemotePeerDoesntMakeProgress) { // Add an op to the queue and start sending requests to the peer. AppendReplicateMessagesToQueue(message_queue_.get(), clock_.get(), 1, 1); - peer->SignalRequest(true); + ASSERT_OK(peer->SignalRequest(true)); // We should be able to close the peer even though it has more data pending. peer->Close(); @@ -346,7 +346,7 @@ TEST_F(ConsensusPeersTest, TestDontSendOneRpcPerWriteWhenPeerIsDown) { mock_proxy->set_update_response(initial_resp); AppendReplicateMessagesToQueue(message_queue_.get(), clock_.get(), 1, 1); - peer->SignalRequest(true); + ASSERT_OK(peer->SignalRequest(true)); // Now wait for the message to be replicated, this should succeed since // the local (leader) peer always acks and the follower also acked this time. @@ -361,7 +361,7 @@ TEST_F(ConsensusPeersTest, TestDontSendOneRpcPerWriteWhenPeerIsDown) { // Add a bunch of messages to the queue. for (int i = 2; i <= 100; i++) { AppendReplicateMessagesToQueue(message_queue_.get(), clock_.get(), i, 1); - peer->SignalRequest(false); + ASSERT_OK(peer->SignalRequest(false)); SleepFor(MonoDelta::FromMilliseconds(2)); } diff --git a/src/kudu/consensus/consensus_peers.cc b/src/kudu/consensus/consensus_peers.cc index 349a34ced..469c229d7 100644 --- a/src/kudu/consensus/consensus_peers.cc +++ b/src/kudu/consensus/consensus_peers.cc @@ -151,12 +151,12 @@ void Peer::Init() { // Capture a weak_ptr reference into the functor so it can safely handle // outliving the peer. - weak_ptr<Peer> w = shared_from_this(); + weak_ptr<Peer> w_this = shared_from_this(); heartbeater_ = PeriodicTimer::Create( messenger_, - [w]() { - if (auto p = w.lock()) { - p->SignalRequest(true); + [w_this = std::move(w_this)]() { + if (auto p = w_this.lock()) { + WARN_NOT_OK(p->SignalRequest(true), "SignalRequest failed"); } }, MonoDelta::FromMilliseconds(FLAGS_raft_heartbeat_interval_ms)); @@ -170,7 +170,7 @@ Status Peer::SignalRequest(bool even_if_queue_empty) { // the implementation of SendNextRequest() checks for 'closed_' and // 'request_pending_' on its own. if (PREDICT_FALSE(closed_)) { - return Status::IllegalState("Peer was closed."); + return Status::IllegalState("peer closed"); } // Only allow one request at a time. No sense waking up the @@ -181,8 +181,8 @@ Status Peer::SignalRequest(bool even_if_queue_empty) { // Capture a weak_ptr reference into the submitted functor so that we can // safely handle the functor outliving its peer. - weak_ptr<Peer> w_this = shared_from_this(); - return raft_pool_token_->Submit([even_if_queue_empty, w_this]() { + weak_ptr<Peer> w_this(shared_from_this()); + return raft_pool_token_->Submit([even_if_queue_empty, w_this = std::move(w_this)]() { if (auto p = w_this.lock()) { p->SendNextRequest(even_if_queue_empty); } diff --git a/src/kudu/consensus/consensus_queue-test.cc b/src/kudu/consensus/consensus_queue-test.cc index 0a64c1df1..56d6cb511 100644 --- a/src/kudu/consensus/consensus_queue-test.cc +++ b/src/kudu/consensus/consensus_queue-test.cc @@ -131,7 +131,7 @@ class ConsensusQueueTest : public KuduTest { } void TearDown() override { - log_->WaitUntilAllFlushed(); + ASSERT_OK(log_->WaitUntilAllFlushed()); queue_->Close(); } @@ -910,7 +910,7 @@ TEST_F(ConsensusQueueTest, TestQueueMovesWatermarksBackward) { // Append a bunch of messages and update as if they were also appeneded to the leader. queue_->UpdateLastIndexAppendedToLeader(10); AppendReplicateMessagesToQueue(queue_.get(), clock_.get(), 1, 10); - log_->WaitUntilAllFlushed(); + ASSERT_OK(log_->WaitUntilAllFlushed()); // Now rewrite some of the operations and wait for the log to append. Synchronizer synch; @@ -995,14 +995,14 @@ TEST_F(ConsensusQueueTest, TestOnlyAdvancesWatermarkWhenPeerHasAPrefixOfOurLog) for (int i = 31; i <= 53; i++) { if (i <= 45) { - AppendReplicateMsg(72, i, 1024); + ASSERT_OK(AppendReplicateMsg(72, i, 1024)); continue; } if (i <= 51) { - AppendReplicateMsg(73, i, 1024); + ASSERT_OK(AppendReplicateMsg(73, i, 1024)); continue; } - AppendReplicateMsg(76, i, 1024); + ASSERT_OK(AppendReplicateMsg(76, i, 1024)); } WaitForLocalPeerToAckIndex(53); diff --git a/src/kudu/consensus/consensus_queue.cc b/src/kudu/consensus/consensus_queue.cc index c3d546989..c4551ea6a 100644 --- a/src/kudu/consensus/consensus_queue.cc +++ b/src/kudu/consensus/consensus_queue.cc @@ -1467,14 +1467,13 @@ void PeerMessageQueue::RegisterObserver(PeerMessageQueueObserver* observer) { } } -Status PeerMessageQueue::UnRegisterObserver(PeerMessageQueueObserver* observer) { +void PeerMessageQueue::UnRegisterObserver(PeerMessageQueueObserver* observer) { std::lock_guard lock(queue_lock_); auto iter = std::find(observers_.begin(), observers_.end(), observer); if (iter == observers_.end()) { - return Status::NotFound("Can't find observer."); + return; } observers_.erase(iter); - return Status::OK(); } bool PeerMessageQueue::IsOpInLog(const OpId& desired_op) const { diff --git a/src/kudu/consensus/consensus_queue.h b/src/kudu/consensus/consensus_queue.h index e25e5958f..16d1f7ed3 100644 --- a/src/kudu/consensus/consensus_queue.h +++ b/src/kudu/consensus/consensus_queue.h @@ -346,7 +346,7 @@ class PeerMessageQueue { void RegisterObserver(PeerMessageQueueObserver* observer); - Status UnRegisterObserver(PeerMessageQueueObserver* observer); + void UnRegisterObserver(PeerMessageQueueObserver* observer); struct Metrics { // Keeps track of the number of ops. that are completed by a majority but still need diff --git a/src/kudu/consensus/log-test.cc b/src/kudu/consensus/log-test.cc index 4ea6e0bc8..61852d88c 100644 --- a/src/kudu/consensus/log-test.cc +++ b/src/kudu/consensus/log-test.cc @@ -137,7 +137,7 @@ class LogTest : public LogTestBase { LogSegmentHeaderPB header; header.set_sequence_number(sequence_number); header.set_tablet_id(kTestTablet); - SchemaToPB(GetSimpleTestSchema(), header.mutable_schema()); + RETURN_NOT_OK(SchemaToPB(GetSimpleTestSchema(), header.mutable_schema())); LogSegmentFooterPB footer; footer.set_num_entries(10); @@ -189,7 +189,7 @@ TEST_P(LogTestOptionalCompression, TestMultipleEntriesInABatch) { opid.set_term(1); opid.set_index(1); - AppendNoOpsToLogSync(clock_.get(), log_.get(), &opid, 2); + ASSERT_OK(AppendNoOpsToLogSync(clock_.get(), log_.get(), &opid, 2)); // RollOver() the batch so that we have a properly formed footer. ASSERT_OK(log_->AllocateSegmentAndRollOverForTests()); @@ -245,7 +245,7 @@ TEST_P(LogTestOptionalCompression, TestFsync) { opid.set_term(0); opid.set_index(1); - AppendNoOp(&opid); + ASSERT_OK(AppendNoOp(&opid)); ASSERT_OK(log_->Close()); } @@ -258,14 +258,14 @@ TEST_P(LogTestOptionalCompression, TestSizeIsMaintained) { ASSERT_OK(BuildLog()); OpId opid = MakeOpId(0, 1); - AppendNoOp(&opid); + ASSERT_OK(AppendNoOp(&opid)); SegmentSequence segments; log_->reader()->GetSegmentsSnapshot(&segments); int64_t orig_size = segments[0]->file_size(); ASSERT_GT(orig_size, 0); - AppendNoOp(&opid); + ASSERT_OK(AppendNoOp(&opid)); log_->reader()->GetSegmentsSnapshot(&segments); int64_t new_size = segments[0]->file_size(); @@ -283,7 +283,7 @@ TEST_P(LogTestOptionalCompression, TestLogNotTrimmed) { opid.set_term(0); opid.set_index(1); - AppendNoOp(&opid); + ASSERT_OK(AppendNoOp(&opid)); SegmentSequence segments; log_->reader()->GetSegmentsSnapshot(&segments); @@ -1153,12 +1153,12 @@ TEST_F(LogTest, TestAutoStopIdleAppendThread) { // after the append long enough for the append thread to shut itself down // again. ASSERT_EVENTUALLY([&]() { - AppendNoOpsToLogSync(clock_.get(), log_.get(), &opid, 2); - ASSERT_TRUE(log_->append_thread_active_for_tests()); - debug::ScopedTSANIgnoreReadsAndWrites ignore_tsan; - ASSERT_GT(log_->segment_allocator_.active_segment_->compress_buf_.capacity(), - faststring::kInitialCapacity); - }); + ASSERT_OK(AppendNoOpsToLogSync(clock_.get(), log_.get(), &opid, 2)); + ASSERT_TRUE(log_->append_thread_active_for_tests()); + debug::ScopedTSANIgnoreReadsAndWrites ignore_tsan; + ASSERT_GT(log_->segment_allocator_.active_segment_->compress_buf_.capacity(), + faststring::kInitialCapacity); + }); // After some time, the append thread should shut itself down. ASSERT_EVENTUALLY([&]() { ASSERT_FALSE(log_->append_thread_active_for_tests()); diff --git a/src/kudu/consensus/log.cc b/src/kudu/consensus/log.cc index 8ae2c174c..eced64a26 100644 --- a/src/kudu/consensus/log.cc +++ b/src/kudu/consensus/log.cc @@ -889,8 +889,7 @@ Status Log::AsyncAppendCommit(const consensus::CommitMsg& commit_msg, unique_ptr<LogEntryBatch> entry_batch = CreateBatchFromPB( COMMIT, batch_pb, std::move(callback)); entry->unsafe_arena_release_commit(); - AsyncAppend(std::move(entry_batch)); - return Status::OK(); + return AsyncAppend(std::move(entry_batch)); } Status Log::WriteBatch(LogEntryBatch* entry_batch) { @@ -1034,7 +1033,7 @@ Status Log::WaitUntilAllFlushed() { Synchronizer s; unique_ptr<LogEntryBatch> reserved_entry_batch = CreateBatchFromPB(FLUSH_MARKER, entry_batch, s.AsStatusCallback()); - AsyncAppend(std::move(reserved_entry_batch)); + RETURN_NOT_OK(AsyncAppend(std::move(reserved_entry_batch))); return s.Wait(); } diff --git a/src/kudu/consensus/log_anchor_registry.cc b/src/kudu/consensus/log_anchor_registry.cc index 0a9ebfd6c..782c5a601 100644 --- a/src/kudu/consensus/log_anchor_registry.cc +++ b/src/kudu/consensus/log_anchor_registry.cc @@ -155,13 +155,14 @@ MinLogIndexAnchorer::~MinLogIndexAnchorer() { CHECK_OK(ReleaseAnchor()); } -void MinLogIndexAnchorer::AnchorIfMinimum(int64_t log_index) { +Status MinLogIndexAnchorer::AnchorIfMinimum(int64_t log_index) { std::lock_guard l(lock_); if (log_index < minimum_log_index_ || PREDICT_FALSE(minimum_log_index_ == kInvalidOpIdIndex)) { minimum_log_index_ = log_index; - registry_->RegisterOrUpdate(minimum_log_index_, owner_, &anchor_); + return registry_->RegisterOrUpdate(minimum_log_index_, owner_, &anchor_); } + return Status::OK(); } Status MinLogIndexAnchorer::ReleaseAnchor() { diff --git a/src/kudu/consensus/log_anchor_registry.h b/src/kudu/consensus/log_anchor_registry.h index 6ccb40f1c..46e337767 100644 --- a/src/kudu/consensus/log_anchor_registry.h +++ b/src/kudu/consensus/log_anchor_registry.h @@ -135,7 +135,7 @@ class MinLogIndexAnchorer { // If op_id is less than the minimum index registered so far, or if no indexes // are currently registered, anchor on 'log_index'. - void AnchorIfMinimum(int64_t log_index); + Status AnchorIfMinimum(int64_t log_index); // Un-anchors the earliest index (which is the only one tracked). // If no minimum is known (no anchor registered), returns OK. diff --git a/src/kudu/consensus/log_cache-test.cc b/src/kudu/consensus/log_cache-test.cc index 1292591a6..b0220dd62 100644 --- a/src/kudu/consensus/log_cache-test.cc +++ b/src/kudu/consensus/log_cache-test.cc @@ -105,7 +105,7 @@ class LogCacheTest : public KuduTest { } void TearDown() override { - log_->WaitUntilAllFlushed(); + ASSERT_OK(log_->WaitUntilAllFlushed()); } void CloseAndReopenCache(const OpId& preceding_id) { @@ -153,7 +153,7 @@ TEST_F(LogCacheTest, TestAppendAndGetMessages) { ASSERT_OK(AppendReplicateMessagesToCache(1, 100)); ASSERT_EQ(100, cache_->metrics_.log_cache_num_ops->value()); ASSERT_GE(cache_->metrics_.log_cache_size->value(), 500); - log_->WaitUntilAllFlushed(); + ASSERT_OK(log_->WaitUntilAllFlushed()); vector<ReplicateRefPtr> messages; OpId preceding; @@ -197,7 +197,7 @@ TEST_F(LogCacheTest, TestAlwaysYieldsAtLeastOneMessage) { // Append several large ops to the cache ASSERT_OK(AppendReplicateMessagesToCache(1, 4, kPayloadSize)); - log_->WaitUntilAllFlushed(); + ASSERT_OK(log_->WaitUntilAllFlushed()); // We should get one of them, even though we only ask for 100 bytes vector<ReplicateRefPtr> messages; @@ -218,7 +218,7 @@ TEST_F(LogCacheTest, TestAlwaysYieldsAtLeastOneMessage) { TEST_F(LogCacheTest, TestCacheEdgeCases) { // Append 1 message to the cache ASSERT_OK(AppendReplicateMessagesToCache(1, 1)); - log_->WaitUntilAllFlushed(); + ASSERT_OK(log_->WaitUntilAllFlushed()); std::vector<ReplicateRefPtr> messages; OpId preceding; @@ -263,7 +263,7 @@ TEST_F(LogCacheTest, TestMemoryLimit) { const int kPayloadSize = 400 * 1024; // Limit should not be violated. ASSERT_OK(AppendReplicateMessagesToCache(1, 1, kPayloadSize)); - log_->WaitUntilAllFlushed(); + ASSERT_OK(log_->WaitUntilAllFlushed()); ASSERT_EQ(1, cache_->num_cached_ops()); // Verify the size is right. It's not exactly kPayloadSize because of in-memory @@ -274,7 +274,7 @@ TEST_F(LogCacheTest, TestMemoryLimit) { // Add another operation which fits under the 1MB limit. ASSERT_OK(AppendReplicateMessagesToCache(2, 1, kPayloadSize)); - log_->WaitUntilAllFlushed(); + ASSERT_OK(log_->WaitUntilAllFlushed()); ASSERT_EQ(2, cache_->num_cached_ops()); int size_with_two_msgs = cache_->BytesUsed(); @@ -287,7 +287,7 @@ TEST_F(LogCacheTest, TestMemoryLimit) { // Verify that we have trimmed by appending a message that would // otherwise be rejected, since the cache max size limit is 2MB. ASSERT_OK(AppendReplicateMessagesToCache(3, 1, kPayloadSize)); - log_->WaitUntilAllFlushed(); + ASSERT_OK(log_->WaitUntilAllFlushed()); ASSERT_EQ(2, cache_->num_cached_ops()); ASSERT_EQ(size_with_two_msgs, cache_->BytesUsed()); @@ -318,7 +318,7 @@ TEST_F(LogCacheTest, TestGlobalMemoryLimit) { // Should succeed, but only end up caching one of the two ops because of the global limit. ASSERT_OK(AppendReplicateMessagesToCache(1, 2, kPayloadSize)); - log_->WaitUntilAllFlushed(); + ASSERT_OK(log_->WaitUntilAllFlushed()); ASSERT_EQ(1, cache_->num_cached_ops()); ASSERT_LE(cache_->BytesUsed(), 1024 * 1024); @@ -339,7 +339,7 @@ TEST_F(LogCacheTest, TestReplaceMessages) { ASSERT_OK(AppendReplicateMessagesToCache(1, 1, kPayloadSize)); } - log_->WaitUntilAllFlushed(); + ASSERT_OK(log_->WaitUntilAllFlushed()); EXPECT_EQ(size_with_one_msg, tracker->consumption()); EXPECT_EQ(Substitute("Pinned index: 2, LogCacheStats(num_ops=1, bytes=$0)", @@ -356,17 +356,17 @@ TEST_F(LogCacheTest, TestTruncation) { }; // Append 1 through 3. - AppendReplicateMessagesToCache(1, 3, 100); + ASSERT_OK(AppendReplicateMessagesToCache(1, 3, 100)); for (auto mode : {TRUNCATE_BY_APPEND, TRUNCATE_EXPLICITLY}) { SCOPED_TRACE(mode == TRUNCATE_BY_APPEND ? "by append" : "explicitly"); // Append messages 4 through 10. - AppendReplicateMessagesToCache(4, 7, 100); + ASSERT_OK(AppendReplicateMessagesToCache(4, 7, 100)); ASSERT_EQ(10, cache_->metrics_.log_cache_num_ops->value()); switch (mode) { case TRUNCATE_BY_APPEND: - AppendReplicateMessagesToCache(3, 1, 100); + ASSERT_OK(AppendReplicateMessagesToCache(3, 1, 100)); break; case TRUNCATE_EXPLICITLY: cache_->TruncateOpsAfter(3); diff --git a/src/kudu/consensus/peer_manager.cc b/src/kudu/consensus/peer_manager.cc index f09eb443d..be43243a9 100644 --- a/src/kudu/consensus/peer_manager.cc +++ b/src/kudu/consensus/peer_manager.cc @@ -89,14 +89,19 @@ void PeerManager::UpdateRaftConfig(const RaftConfigPB& config) { void PeerManager::SignalRequest(bool force_if_queue_empty) { std::lock_guard lock(lock_); for (auto iter = peers_.begin(); iter != peers_.end();) { - Status s = (*iter).second->SignalRequest(force_if_queue_empty); - if (PREDICT_FALSE(!s.ok())) { - LOG(WARNING) << GetLogPrefix() - << "Peer was closed, removing from peers. Peer: " - << SecureShortDebugString((*iter).second->peer_pb()); - peers_.erase(iter++); - } else { + const auto s = iter->second->SignalRequest(force_if_queue_empty); + if (PREDICT_TRUE(s.ok())) { ++iter; + continue; + } + const auto& peer_info = SecureShortDebugString(iter->second->peer_pb()); + if (!s.IsIllegalState()) { + WARN_NOT_OK(s, Substitute("$0: SignalRequest failed at peer $1", + GetLogPrefix(), peer_info)); + } else { + WARN_NOT_OK(s, Substitute("$0: SignalRequest failed, removing peer $1", + GetLogPrefix(), peer_info)); + peers_.erase(iter++); } } } diff --git a/src/kudu/consensus/pending_rounds.cc b/src/kudu/consensus/pending_rounds.cc index 3e1c25302..984495550 100644 --- a/src/kudu/consensus/pending_rounds.cc +++ b/src/kudu/consensus/pending_rounds.cc @@ -21,6 +21,7 @@ #include <ostream> #include <utility> +#include <gflags/gflags.h> #include <glog/logging.h> #include "kudu/consensus/consensus.pb.h" @@ -31,11 +32,18 @@ #include "kudu/gutil/port.h" #include "kudu/gutil/strings/substitute.h" #include "kudu/util/debug-util.h" +#include "kudu/util/flag_tags.h" #include "kudu/util/logging.h" #include "kudu/util/pb_util.h" #include "kudu/util/status.h" #include "kudu/util/thread_restrictions.h" +DEFINE_bool(raft_allow_committed_pending_index_gap, false, + "Allow a gap between last committed and first pending operation " + "in the queue when starting Raft consensus. For testing only!"); +TAG_FLAG(raft_allow_committed_pending_index_gap, runtime); +TAG_FLAG(raft_allow_committed_pending_index_gap, unsafe); + using kudu::pb_util::SecureShortDebugString; using std::string; using strings::Substitute; @@ -195,7 +203,8 @@ Status PendingRounds::SetInitialCommittedOpId(const OpId& committed_op) { if (!pending_ops_.empty()) { int64_t first_pending_index = pending_ops_.begin()->first; if (committed_op.index() < first_pending_index) { - if (committed_op.index() != first_pending_index - 1) { + if (PREDICT_TRUE(!FLAGS_raft_allow_committed_pending_index_gap) && + committed_op.index() != first_pending_index - 1) { return Status::Corruption(Substitute( "pending operations should start at first operation " "after the committed operation (committed=$0, first pending=$1)", diff --git a/src/kudu/consensus/raft_consensus.cc b/src/kudu/consensus/raft_consensus.cc index 5ec7aea89..0b65d2121 100644 --- a/src/kudu/consensus/raft_consensus.cc +++ b/src/kudu/consensus/raft_consensus.cc @@ -365,7 +365,7 @@ Status RaftConsensus::Start(const ConsensusBootstrapInfo& info, // Set the initial committed opid for the PendingRounds only after // appending any uncommitted replicate messages to the queue. - pending_->SetInitialCommittedOpId(info.last_committed_id); + RETURN_NOT_OK(pending_->SetInitialCommittedOpId(info.last_committed_id)); // If this is the first term expire the FD immediately so that we have a // fast first election, otherwise we just let the timer expire normally. @@ -748,8 +748,11 @@ Status RaftConsensus::BecomeReplicaUnlocked(optional<MonoDelta> fd_delta) { // Deregister ourselves from the queue. We no longer need to track what gets // replicated since we're stepping down. + // + // NOTE: if observer isn't registered at this point yet, that's OK because + // in one of use cases Start() also invokes BecomeReplicaUnlocked() queue_->UnRegisterObserver(this); - bool was_leader = queue_->IsInLeaderMode(); + const bool was_leader = queue_->IsInLeaderMode(); queue_->SetNonLeaderMode(cmeta_->ActiveConfig()); if (was_leader && server_ctx_.num_leaders) { server_ctx_.num_leaders->IncrementBy(-1); @@ -908,7 +911,7 @@ void RaftConsensus::NotifyCommitIndex(int64_t commit_index) { return; } - pending_->AdvanceCommittedIndex(commit_index); + CHECK_OK(pending_->AdvanceCommittedIndex(commit_index)); if (cmeta_->active_role() == RaftPeerPB::LEADER) { peer_manager_->SignalRequest(false); @@ -2737,7 +2740,8 @@ void RaftConsensus::DoElectionCallback(ElectionReason reason, const ElectionResu // will bump to term 2 when it gets the vote rejection, such that its // next pre-election (for term 3) would succeed. if (result.highest_voter_term > CurrentTermUnlocked()) { - HandleTermAdvanceUnlocked(result.highest_voter_term); + WARN_NOT_OK(HandleTermAdvanceUnlocked(result.highest_voter_term), + "error while handling term advanced beyond the current one"); } LOG_WITH_PREFIX_UNLOCKED(INFO) diff --git a/src/kudu/consensus/raft_consensus_quorum-test.cc b/src/kudu/consensus/raft_consensus_quorum-test.cc index ee613e1fd..14cdeb455 100644 --- a/src/kudu/consensus/raft_consensus_quorum-test.cc +++ b/src/kudu/consensus/raft_consensus_quorum-test.cc @@ -422,7 +422,7 @@ class RaftConsensusQuorumTest : public KuduTest { ASSERT_OK(WaitForReplicate(round.get())); last_op_id->CopyFrom(round->id()); if (commit_mode == COMMIT_ONE_BY_ONE) { - CommitDummyMessage(leader_idx, round.get(), commit_sync); + ASSERT_OK(CommitDummyMessage(leader_idx, round.get(), commit_sync)); } rounds->push_back(round); } @@ -445,7 +445,7 @@ class RaftConsensusQuorumTest : public KuduTest { void GatherLogEntries(int idx, const scoped_refptr<Log>& log, LogEntries* entries) { ASSERT_OK(log->WaitUntilAllFlushed()); - log->Close(); + ASSERT_OK(log->Close()); shared_ptr<LogReader> log_reader; ASSERT_OK(log::LogReader::Open(fs_managers_[idx].get(), /*index*/nullptr, @@ -757,7 +757,7 @@ TEST_F(RaftConsensusQuorumTest, TestConsensusStopsIfAMajorityFallsBehind) { // After we release the locks the operation should replicate to all replicas // and we commit. ASSERT_OK(WaitForReplicate(round.get())); - CommitDummyMessage(kLeaderIdx, round.get()); + ASSERT_OK(CommitDummyMessage(kLeaderIdx, round.get())); // Assert that everything was ok WaitForReplicateIfNotAlreadyPresent(last_op_id, kFollower0Idx); diff --git a/src/kudu/consensus/time_manager-test.cc b/src/kudu/consensus/time_manager-test.cc index da37c83c2..33c39b6bf 100644 --- a/src/kudu/consensus/time_manager-test.cc +++ b/src/kudu/consensus/time_manager-test.cc @@ -171,8 +171,8 @@ TEST_F(TimeManagerTest, TestTimeManagerLeaderMode) { // In leader mode calling MessageReceivedFromLeader() should cause a CHECK failure. EXPECT_DEATH({ - time_manager_->MessageReceivedFromLeader(message); - }, "Cannot receive messages from a leader in leader mode."); + ASSERT_OK(time_manager_->MessageReceivedFromLeader(message)); + }, "Cannot receive messages from a leader in leader mode."); // .. as should AdvanceSafeTime() EXPECT_DEATH({ diff --git a/src/kudu/integration-tests/ts_recovery-itest.cc b/src/kudu/integration-tests/ts_recovery-itest.cc index 4f61cf391..51cd66880 100644 --- a/src/kudu/integration-tests/ts_recovery-itest.cc +++ b/src/kudu/integration-tests/ts_recovery-itest.cc @@ -571,7 +571,13 @@ INSTANTIATE_TEST_SUITE_P(BlockManagerType, TsRecoveryITestDeathTest, TEST_P(TsRecoveryITestDeathTest, RecoverFromOpIdOverflow) { // Create the initial tablet files on disk, then shut down the cluster so we // can meddle with the WAL. - NO_FATALS(StartClusterOneTs()); + NO_FATALS(StartClusterOneTs( + // To simplify test scaffolding, this scenario relies on lenient handling + // of operation indices read from the tablet's WAL upon bootstrapping. + // With regular guards in place, an inconsistency would be detected when + // setting committed OpId for PendingRounds by RaftConsensus::Start(). + { "--raft_allow_committed_pending_index_gap" } + )); TestWorkload workload(cluster_.get()); workload.set_num_replicas(1); workload.Setup();
