This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new bde9cbad1 [consensus] fix result status handling
bde9cbad1 is described below

commit bde9cbad115deb1cd689eb583a1c7a8d3440b156
Author: Alexey Serbin <[email protected]>
AuthorDate: Thu Feb 27 18:19:12 2025 -0800

    [consensus] fix result status handling
    
    This patch addresses sloppy result handling of functions/methods
    that return Status under the src/kudu/consensus directory.
    
    In addition, this patch introduces a new unsafe runtime flag
    --raft_allow_committed_pending_index_gap that's now used by the
    TsRecoveryITestDeathTest.RecoverFromOpIdOverflow test scenario only.
    The test scenario was reliant on the sloppy result status handling
    of the call to PendingRounds::SetInitialCommittedOpId() in
    RaftConsensus::Start() that's been addressed in this patch.  It's
    the path of least resistance compared with updating test scaffolding
    and the test scenario itself, but it also has a benefit of providing
    a flag to enable the former result status handling approach in
    RaftConsensus::Start() when setting last committed OpId for a tablet's
    Raft consensus implementation.  Frankly, I don't think anybody will ever
    need to re-enable the original bogus behavior of ignoring non-OK result
    status, but it doesn't hurt to have such an option just in case
    if I'm missing something here.
    
    Change-Id: I6db113c58c6b453bf3cd290ee0dfdd184b63b999
    Reviewed-on: http://gerrit.cloudera.org:8080/22646
    Reviewed-by: Abhishek Chennaka <[email protected]>
    Tested-by: Alexey Serbin <[email protected]>
---
 src/kudu/consensus/consensus_peers-test.cc       | 14 +++++++-------
 src/kudu/consensus/consensus_peers.cc            | 14 +++++++-------
 src/kudu/consensus/consensus_queue-test.cc       | 10 +++++-----
 src/kudu/consensus/consensus_queue.cc            |  5 ++---
 src/kudu/consensus/consensus_queue.h             |  2 +-
 src/kudu/consensus/log-test.cc                   | 24 ++++++++++++------------
 src/kudu/consensus/log.cc                        |  5 ++---
 src/kudu/consensus/log_anchor_registry.cc        |  5 +++--
 src/kudu/consensus/log_anchor_registry.h         |  2 +-
 src/kudu/consensus/log_cache-test.cc             | 24 ++++++++++++------------
 src/kudu/consensus/peer_manager.cc               | 19 ++++++++++++-------
 src/kudu/consensus/pending_rounds.cc             | 11 ++++++++++-
 src/kudu/consensus/raft_consensus.cc             | 12 ++++++++----
 src/kudu/consensus/raft_consensus_quorum-test.cc |  6 +++---
 src/kudu/consensus/time_manager-test.cc          |  4 ++--
 src/kudu/integration-tests/ts_recovery-itest.cc  |  8 +++++++-
 16 files changed, 94 insertions(+), 71 deletions(-)

diff --git a/src/kudu/consensus/consensus_peers-test.cc 
b/src/kudu/consensus/consensus_peers-test.cc
index 02e2ec89c..236f730f9 100644
--- a/src/kudu/consensus/consensus_peers-test.cc
+++ b/src/kudu/consensus/consensus_peers-test.cc
@@ -208,7 +208,7 @@ TEST_F(ConsensusPeersTest, TestRemotePeer) {
   AppendReplicateMessagesToQueue(message_queue_.get(), clock_.get(), 1, 20);
 
   // signal the peer there are requests pending.
-  remote_peer->SignalRequest();
+  ASSERT_OK(remote_peer->SignalRequest());
   // now wait on the status of the last operation
   // this will complete once the peer has logged all
   // requests.
@@ -240,8 +240,8 @@ TEST_F(ConsensusPeersTest, TestRemotePeers) {
 
   OpId first = MakeOpId(0, 1);
 
-  remote_peer1->SignalRequest();
-  remote_peer2->SignalRequest();
+  ASSERT_OK(remote_peer1->SignalRequest());
+  ASSERT_OK(remote_peer2->SignalRequest());
 
   // Now wait for the message to be replicated, this should succeed since
   // majority = 2 and only one peer was delayed. The majority is made up
@@ -268,7 +268,7 @@ TEST_F(ConsensusPeersTest, TestRemotePeers) {
   ASSERT_LT(message_queue_->GetCommittedIndex(), 2);
 
   // Signal one of the two remote peers.
-  remote_peer1->SignalRequest();
+  ASSERT_OK(remote_peer1->SignalRequest());
   // We should now be able to wait for it to replicate, since two peers (a 
majority)
   // have replicated the message.
   WaitForCommitIndex(2);
@@ -308,7 +308,7 @@ TEST_F(ConsensusPeersTest, 
TestCloseWhenRemotePeerDoesntMakeProgress) {
 
   // Add an op to the queue and start sending requests to the peer.
   AppendReplicateMessagesToQueue(message_queue_.get(), clock_.get(), 1, 1);
-  peer->SignalRequest(true);
+  ASSERT_OK(peer->SignalRequest(true));
 
   // We should be able to close the peer even though it has more data pending.
   peer->Close();
@@ -346,7 +346,7 @@ TEST_F(ConsensusPeersTest, 
TestDontSendOneRpcPerWriteWhenPeerIsDown) {
   mock_proxy->set_update_response(initial_resp);
 
   AppendReplicateMessagesToQueue(message_queue_.get(), clock_.get(), 1, 1);
-  peer->SignalRequest(true);
+  ASSERT_OK(peer->SignalRequest(true));
 
   // Now wait for the message to be replicated, this should succeed since
   // the local (leader) peer always acks and the follower also acked this time.
@@ -361,7 +361,7 @@ TEST_F(ConsensusPeersTest, 
TestDontSendOneRpcPerWriteWhenPeerIsDown) {
   // Add a bunch of messages to the queue.
   for (int i = 2; i <= 100; i++) {
     AppendReplicateMessagesToQueue(message_queue_.get(), clock_.get(), i, 1);
-    peer->SignalRequest(false);
+    ASSERT_OK(peer->SignalRequest(false));
     SleepFor(MonoDelta::FromMilliseconds(2));
   }
 
diff --git a/src/kudu/consensus/consensus_peers.cc 
b/src/kudu/consensus/consensus_peers.cc
index 55d521e16..0dd24958d 100644
--- a/src/kudu/consensus/consensus_peers.cc
+++ b/src/kudu/consensus/consensus_peers.cc
@@ -151,12 +151,12 @@ void Peer::Init() {
 
   // Capture a weak_ptr reference into the functor so it can safely handle
   // outliving the peer.
-  weak_ptr<Peer> w = shared_from_this();
+  weak_ptr<Peer> w_this = shared_from_this();
   heartbeater_ = PeriodicTimer::Create(
       messenger_,
-      [w]() {
-        if (auto p = w.lock()) {
-          p->SignalRequest(true);
+      [w_this = std::move(w_this)]() {
+        if (auto p = w_this.lock()) {
+          WARN_NOT_OK(p->SignalRequest(true), "SignalRequest failed");
         }
       },
       MonoDelta::FromMilliseconds(FLAGS_raft_heartbeat_interval_ms));
@@ -170,7 +170,7 @@ Status Peer::SignalRequest(bool even_if_queue_empty) {
   // the implementation of SendNextRequest() checks for 'closed_' and
   // 'request_pending_' on its own.
   if (PREDICT_FALSE(closed_)) {
-    return Status::IllegalState("Peer was closed.");
+    return Status::IllegalState("peer closed");
   }
 
   // Only allow one request at a time. No sense waking up the
@@ -181,8 +181,8 @@ Status Peer::SignalRequest(bool even_if_queue_empty) {
 
   // Capture a weak_ptr reference into the submitted functor so that we can
   // safely handle the functor outliving its peer.
-  weak_ptr<Peer> w_this = shared_from_this();
-  return raft_pool_token_->Submit([even_if_queue_empty, w_this]() {
+  weak_ptr<Peer> w_this(shared_from_this());
+  return raft_pool_token_->Submit([even_if_queue_empty, w_this = 
std::move(w_this)]() {
     if (auto p = w_this.lock()) {
       p->SendNextRequest(even_if_queue_empty);
     }
diff --git a/src/kudu/consensus/consensus_queue-test.cc 
b/src/kudu/consensus/consensus_queue-test.cc
index 0a64c1df1..56d6cb511 100644
--- a/src/kudu/consensus/consensus_queue-test.cc
+++ b/src/kudu/consensus/consensus_queue-test.cc
@@ -131,7 +131,7 @@ class ConsensusQueueTest : public KuduTest {
   }
 
   void TearDown() override {
-    log_->WaitUntilAllFlushed();
+    ASSERT_OK(log_->WaitUntilAllFlushed());
     queue_->Close();
   }
 
@@ -910,7 +910,7 @@ TEST_F(ConsensusQueueTest, 
TestQueueMovesWatermarksBackward) {
   // Append a bunch of messages and update as if they were also appeneded to 
the leader.
   queue_->UpdateLastIndexAppendedToLeader(10);
   AppendReplicateMessagesToQueue(queue_.get(), clock_.get(), 1, 10);
-  log_->WaitUntilAllFlushed();
+  ASSERT_OK(log_->WaitUntilAllFlushed());
 
   // Now rewrite some of the operations and wait for the log to append.
   Synchronizer synch;
@@ -995,14 +995,14 @@ TEST_F(ConsensusQueueTest, 
TestOnlyAdvancesWatermarkWhenPeerHasAPrefixOfOurLog)
 
   for (int i = 31; i <= 53; i++) {
     if (i <= 45) {
-      AppendReplicateMsg(72, i, 1024);
+      ASSERT_OK(AppendReplicateMsg(72, i, 1024));
       continue;
     }
     if (i <= 51) {
-      AppendReplicateMsg(73, i, 1024);
+      ASSERT_OK(AppendReplicateMsg(73, i, 1024));
       continue;
     }
-    AppendReplicateMsg(76, i, 1024);
+    ASSERT_OK(AppendReplicateMsg(76, i, 1024));
   }
 
   WaitForLocalPeerToAckIndex(53);
diff --git a/src/kudu/consensus/consensus_queue.cc 
b/src/kudu/consensus/consensus_queue.cc
index 543e88d2b..def546acf 100644
--- a/src/kudu/consensus/consensus_queue.cc
+++ b/src/kudu/consensus/consensus_queue.cc
@@ -1467,14 +1467,13 @@ void 
PeerMessageQueue::RegisterObserver(PeerMessageQueueObserver* observer) {
   }
 }
 
-Status PeerMessageQueue::UnRegisterObserver(PeerMessageQueueObserver* 
observer) {
+void PeerMessageQueue::UnRegisterObserver(PeerMessageQueueObserver* observer) {
   std::lock_guard lock(queue_lock_);
   auto iter = std::find(observers_.begin(), observers_.end(), observer);
   if (iter == observers_.end()) {
-    return Status::NotFound("Can't find observer.");
+    return;
   }
   observers_.erase(iter);
-  return Status::OK();
 }
 
 bool PeerMessageQueue::IsOpInLog(const OpId& desired_op) const {
diff --git a/src/kudu/consensus/consensus_queue.h 
b/src/kudu/consensus/consensus_queue.h
index e25e5958f..16d1f7ed3 100644
--- a/src/kudu/consensus/consensus_queue.h
+++ b/src/kudu/consensus/consensus_queue.h
@@ -346,7 +346,7 @@ class PeerMessageQueue {
 
   void RegisterObserver(PeerMessageQueueObserver* observer);
 
-  Status UnRegisterObserver(PeerMessageQueueObserver* observer);
+  void UnRegisterObserver(PeerMessageQueueObserver* observer);
 
   struct Metrics {
     // Keeps track of the number of ops. that are completed by a majority but 
still need
diff --git a/src/kudu/consensus/log-test.cc b/src/kudu/consensus/log-test.cc
index 4ea6e0bc8..61852d88c 100644
--- a/src/kudu/consensus/log-test.cc
+++ b/src/kudu/consensus/log-test.cc
@@ -137,7 +137,7 @@ class LogTest : public LogTestBase {
     LogSegmentHeaderPB header;
     header.set_sequence_number(sequence_number);
     header.set_tablet_id(kTestTablet);
-    SchemaToPB(GetSimpleTestSchema(), header.mutable_schema());
+    RETURN_NOT_OK(SchemaToPB(GetSimpleTestSchema(), header.mutable_schema()));
 
     LogSegmentFooterPB footer;
     footer.set_num_entries(10);
@@ -189,7 +189,7 @@ TEST_P(LogTestOptionalCompression, 
TestMultipleEntriesInABatch) {
   opid.set_term(1);
   opid.set_index(1);
 
-  AppendNoOpsToLogSync(clock_.get(), log_.get(), &opid, 2);
+  ASSERT_OK(AppendNoOpsToLogSync(clock_.get(), log_.get(), &opid, 2));
 
   // RollOver() the batch so that we have a properly formed footer.
   ASSERT_OK(log_->AllocateSegmentAndRollOverForTests());
@@ -245,7 +245,7 @@ TEST_P(LogTestOptionalCompression, TestFsync) {
   opid.set_term(0);
   opid.set_index(1);
 
-  AppendNoOp(&opid);
+  ASSERT_OK(AppendNoOp(&opid));
 
   ASSERT_OK(log_->Close());
 }
@@ -258,14 +258,14 @@ TEST_P(LogTestOptionalCompression, TestSizeIsMaintained) {
   ASSERT_OK(BuildLog());
 
   OpId opid = MakeOpId(0, 1);
-  AppendNoOp(&opid);
+  ASSERT_OK(AppendNoOp(&opid));
 
   SegmentSequence segments;
   log_->reader()->GetSegmentsSnapshot(&segments);
   int64_t orig_size = segments[0]->file_size();
   ASSERT_GT(orig_size, 0);
 
-  AppendNoOp(&opid);
+  ASSERT_OK(AppendNoOp(&opid));
 
   log_->reader()->GetSegmentsSnapshot(&segments);
   int64_t new_size = segments[0]->file_size();
@@ -283,7 +283,7 @@ TEST_P(LogTestOptionalCompression, TestLogNotTrimmed) {
   opid.set_term(0);
   opid.set_index(1);
 
-  AppendNoOp(&opid);
+  ASSERT_OK(AppendNoOp(&opid));
 
   SegmentSequence segments;
   log_->reader()->GetSegmentsSnapshot(&segments);
@@ -1153,12 +1153,12 @@ TEST_F(LogTest, TestAutoStopIdleAppendThread) {
   // after the append long enough for the append thread to shut itself down
   // again.
   ASSERT_EVENTUALLY([&]() {
-      AppendNoOpsToLogSync(clock_.get(), log_.get(), &opid, 2);
-      ASSERT_TRUE(log_->append_thread_active_for_tests());
-      debug::ScopedTSANIgnoreReadsAndWrites ignore_tsan;
-      
ASSERT_GT(log_->segment_allocator_.active_segment_->compress_buf_.capacity(),
-                faststring::kInitialCapacity);
-    });
+    ASSERT_OK(AppendNoOpsToLogSync(clock_.get(), log_.get(), &opid, 2));
+    ASSERT_TRUE(log_->append_thread_active_for_tests());
+    debug::ScopedTSANIgnoreReadsAndWrites ignore_tsan;
+    
ASSERT_GT(log_->segment_allocator_.active_segment_->compress_buf_.capacity(),
+              faststring::kInitialCapacity);
+  });
   // After some time, the append thread should shut itself down.
   ASSERT_EVENTUALLY([&]() {
       ASSERT_FALSE(log_->append_thread_active_for_tests());
diff --git a/src/kudu/consensus/log.cc b/src/kudu/consensus/log.cc
index 422e6e771..0de97b678 100644
--- a/src/kudu/consensus/log.cc
+++ b/src/kudu/consensus/log.cc
@@ -892,8 +892,7 @@ Status Log::AsyncAppendCommit(const consensus::CommitMsg& 
commit_msg,
   unique_ptr<LogEntryBatch> entry_batch = CreateBatchFromPB(
       COMMIT, batch_pb, std::move(callback));
   entry->unsafe_arena_release_commit();
-  AsyncAppend(std::move(entry_batch));
-  return Status::OK();
+  return AsyncAppend(std::move(entry_batch));
 }
 
 Status Log::WriteBatch(LogEntryBatch* entry_batch) {
@@ -1037,7 +1036,7 @@ Status Log::WaitUntilAllFlushed() {
   Synchronizer s;
   unique_ptr<LogEntryBatch> reserved_entry_batch =
       CreateBatchFromPB(FLUSH_MARKER, entry_batch, s.AsStatusCallback());
-  AsyncAppend(std::move(reserved_entry_batch));
+  RETURN_NOT_OK(AsyncAppend(std::move(reserved_entry_batch)));
   return s.Wait();
 }
 
diff --git a/src/kudu/consensus/log_anchor_registry.cc 
b/src/kudu/consensus/log_anchor_registry.cc
index 0a9ebfd6c..782c5a601 100644
--- a/src/kudu/consensus/log_anchor_registry.cc
+++ b/src/kudu/consensus/log_anchor_registry.cc
@@ -155,13 +155,14 @@ MinLogIndexAnchorer::~MinLogIndexAnchorer() {
   CHECK_OK(ReleaseAnchor());
 }
 
-void MinLogIndexAnchorer::AnchorIfMinimum(int64_t log_index) {
+Status MinLogIndexAnchorer::AnchorIfMinimum(int64_t log_index) {
   std::lock_guard l(lock_);
   if (log_index < minimum_log_index_ ||
       PREDICT_FALSE(minimum_log_index_ == kInvalidOpIdIndex)) {
     minimum_log_index_ = log_index;
-    registry_->RegisterOrUpdate(minimum_log_index_, owner_, &anchor_);
+    return registry_->RegisterOrUpdate(minimum_log_index_, owner_, &anchor_);
   }
+  return Status::OK();
 }
 
 Status MinLogIndexAnchorer::ReleaseAnchor() {
diff --git a/src/kudu/consensus/log_anchor_registry.h 
b/src/kudu/consensus/log_anchor_registry.h
index 6ccb40f1c..46e337767 100644
--- a/src/kudu/consensus/log_anchor_registry.h
+++ b/src/kudu/consensus/log_anchor_registry.h
@@ -135,7 +135,7 @@ class MinLogIndexAnchorer {
 
   // If op_id is less than the minimum index registered so far, or if no 
indexes
   // are currently registered, anchor on 'log_index'.
-  void AnchorIfMinimum(int64_t log_index);
+  Status AnchorIfMinimum(int64_t log_index);
 
   // Un-anchors the earliest index (which is the only one tracked).
   // If no minimum is known (no anchor registered), returns OK.
diff --git a/src/kudu/consensus/log_cache-test.cc 
b/src/kudu/consensus/log_cache-test.cc
index 1292591a6..b0220dd62 100644
--- a/src/kudu/consensus/log_cache-test.cc
+++ b/src/kudu/consensus/log_cache-test.cc
@@ -105,7 +105,7 @@ class LogCacheTest : public KuduTest {
   }
 
   void TearDown() override {
-    log_->WaitUntilAllFlushed();
+    ASSERT_OK(log_->WaitUntilAllFlushed());
   }
 
   void CloseAndReopenCache(const OpId& preceding_id) {
@@ -153,7 +153,7 @@ TEST_F(LogCacheTest, TestAppendAndGetMessages) {
   ASSERT_OK(AppendReplicateMessagesToCache(1, 100));
   ASSERT_EQ(100, cache_->metrics_.log_cache_num_ops->value());
   ASSERT_GE(cache_->metrics_.log_cache_size->value(), 500);
-  log_->WaitUntilAllFlushed();
+  ASSERT_OK(log_->WaitUntilAllFlushed());
 
   vector<ReplicateRefPtr> messages;
   OpId preceding;
@@ -197,7 +197,7 @@ TEST_F(LogCacheTest, TestAlwaysYieldsAtLeastOneMessage) {
 
   // Append several large ops to the cache
   ASSERT_OK(AppendReplicateMessagesToCache(1, 4, kPayloadSize));
-  log_->WaitUntilAllFlushed();
+  ASSERT_OK(log_->WaitUntilAllFlushed());
 
   // We should get one of them, even though we only ask for 100 bytes
   vector<ReplicateRefPtr> messages;
@@ -218,7 +218,7 @@ TEST_F(LogCacheTest, TestAlwaysYieldsAtLeastOneMessage) {
 TEST_F(LogCacheTest, TestCacheEdgeCases) {
   // Append 1 message to the cache
   ASSERT_OK(AppendReplicateMessagesToCache(1, 1));
-  log_->WaitUntilAllFlushed();
+  ASSERT_OK(log_->WaitUntilAllFlushed());
 
   std::vector<ReplicateRefPtr> messages;
   OpId preceding;
@@ -263,7 +263,7 @@ TEST_F(LogCacheTest, TestMemoryLimit) {
   const int kPayloadSize = 400 * 1024;
   // Limit should not be violated.
   ASSERT_OK(AppendReplicateMessagesToCache(1, 1, kPayloadSize));
-  log_->WaitUntilAllFlushed();
+  ASSERT_OK(log_->WaitUntilAllFlushed());
   ASSERT_EQ(1, cache_->num_cached_ops());
 
   // Verify the size is right. It's not exactly kPayloadSize because of 
in-memory
@@ -274,7 +274,7 @@ TEST_F(LogCacheTest, TestMemoryLimit) {
 
   // Add another operation which fits under the 1MB limit.
   ASSERT_OK(AppendReplicateMessagesToCache(2, 1, kPayloadSize));
-  log_->WaitUntilAllFlushed();
+  ASSERT_OK(log_->WaitUntilAllFlushed());
   ASSERT_EQ(2, cache_->num_cached_ops());
 
   int size_with_two_msgs = cache_->BytesUsed();
@@ -287,7 +287,7 @@ TEST_F(LogCacheTest, TestMemoryLimit) {
   // Verify that we have trimmed by appending a message that would
   // otherwise be rejected, since the cache max size limit is 2MB.
   ASSERT_OK(AppendReplicateMessagesToCache(3, 1, kPayloadSize));
-  log_->WaitUntilAllFlushed();
+  ASSERT_OK(log_->WaitUntilAllFlushed());
   ASSERT_EQ(2, cache_->num_cached_ops());
   ASSERT_EQ(size_with_two_msgs, cache_->BytesUsed());
 
@@ -318,7 +318,7 @@ TEST_F(LogCacheTest, TestGlobalMemoryLimit) {
 
   // Should succeed, but only end up caching one of the two ops because of the 
global limit.
   ASSERT_OK(AppendReplicateMessagesToCache(1, 2, kPayloadSize));
-  log_->WaitUntilAllFlushed();
+  ASSERT_OK(log_->WaitUntilAllFlushed());
 
   ASSERT_EQ(1, cache_->num_cached_ops());
   ASSERT_LE(cache_->BytesUsed(), 1024 * 1024);
@@ -339,7 +339,7 @@ TEST_F(LogCacheTest, TestReplaceMessages) {
     ASSERT_OK(AppendReplicateMessagesToCache(1, 1, kPayloadSize));
   }
 
-  log_->WaitUntilAllFlushed();
+  ASSERT_OK(log_->WaitUntilAllFlushed());
 
   EXPECT_EQ(size_with_one_msg, tracker->consumption());
   EXPECT_EQ(Substitute("Pinned index: 2, LogCacheStats(num_ops=1, bytes=$0)",
@@ -356,17 +356,17 @@ TEST_F(LogCacheTest, TestTruncation) {
   };
 
   // Append 1 through 3.
-  AppendReplicateMessagesToCache(1, 3, 100);
+  ASSERT_OK(AppendReplicateMessagesToCache(1, 3, 100));
 
   for (auto mode : {TRUNCATE_BY_APPEND, TRUNCATE_EXPLICITLY}) {
     SCOPED_TRACE(mode == TRUNCATE_BY_APPEND ? "by append" : "explicitly");
     // Append messages 4 through 10.
-    AppendReplicateMessagesToCache(4, 7, 100);
+    ASSERT_OK(AppendReplicateMessagesToCache(4, 7, 100));
     ASSERT_EQ(10, cache_->metrics_.log_cache_num_ops->value());
 
     switch (mode) {
       case TRUNCATE_BY_APPEND:
-        AppendReplicateMessagesToCache(3, 1, 100);
+        ASSERT_OK(AppendReplicateMessagesToCache(3, 1, 100));
         break;
       case TRUNCATE_EXPLICITLY:
         cache_->TruncateOpsAfter(3);
diff --git a/src/kudu/consensus/peer_manager.cc 
b/src/kudu/consensus/peer_manager.cc
index f09eb443d..be43243a9 100644
--- a/src/kudu/consensus/peer_manager.cc
+++ b/src/kudu/consensus/peer_manager.cc
@@ -89,14 +89,19 @@ void PeerManager::UpdateRaftConfig(const RaftConfigPB& 
config) {
 void PeerManager::SignalRequest(bool force_if_queue_empty) {
   std::lock_guard lock(lock_);
   for (auto iter = peers_.begin(); iter != peers_.end();) {
-    Status s = (*iter).second->SignalRequest(force_if_queue_empty);
-    if (PREDICT_FALSE(!s.ok())) {
-      LOG(WARNING) << GetLogPrefix()
-                   << "Peer was closed, removing from peers. Peer: "
-                   << SecureShortDebugString((*iter).second->peer_pb());
-      peers_.erase(iter++);
-    } else {
+    const auto s = iter->second->SignalRequest(force_if_queue_empty);
+    if (PREDICT_TRUE(s.ok())) {
       ++iter;
+      continue;
+    }
+    const auto& peer_info = SecureShortDebugString(iter->second->peer_pb());
+    if (!s.IsIllegalState()) {
+      WARN_NOT_OK(s, Substitute("$0: SignalRequest failed at peer $1",
+                                GetLogPrefix(), peer_info));
+    } else {
+      WARN_NOT_OK(s, Substitute("$0: SignalRequest failed, removing peer $1",
+                                GetLogPrefix(), peer_info));
+      peers_.erase(iter++);
     }
   }
 }
diff --git a/src/kudu/consensus/pending_rounds.cc 
b/src/kudu/consensus/pending_rounds.cc
index 3e1c25302..984495550 100644
--- a/src/kudu/consensus/pending_rounds.cc
+++ b/src/kudu/consensus/pending_rounds.cc
@@ -21,6 +21,7 @@
 #include <ostream>
 #include <utility>
 
+#include <gflags/gflags.h>
 #include <glog/logging.h>
 
 #include "kudu/consensus/consensus.pb.h"
@@ -31,11 +32,18 @@
 #include "kudu/gutil/port.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/util/debug-util.h"
+#include "kudu/util/flag_tags.h"
 #include "kudu/util/logging.h"
 #include "kudu/util/pb_util.h"
 #include "kudu/util/status.h"
 #include "kudu/util/thread_restrictions.h"
 
+DEFINE_bool(raft_allow_committed_pending_index_gap, false,
+            "Allow a gap between last committed and first pending operation "
+            "in the queue when starting Raft consensus. For testing only!");
+TAG_FLAG(raft_allow_committed_pending_index_gap, runtime);
+TAG_FLAG(raft_allow_committed_pending_index_gap, unsafe);
+
 using kudu::pb_util::SecureShortDebugString;
 using std::string;
 using strings::Substitute;
@@ -195,7 +203,8 @@ Status PendingRounds::SetInitialCommittedOpId(const OpId& 
committed_op) {
   if (!pending_ops_.empty()) {
     int64_t first_pending_index = pending_ops_.begin()->first;
     if (committed_op.index() < first_pending_index) {
-      if (committed_op.index() != first_pending_index - 1) {
+      if (PREDICT_TRUE(!FLAGS_raft_allow_committed_pending_index_gap) &&
+          committed_op.index() != first_pending_index - 1) {
         return Status::Corruption(Substitute(
             "pending operations should start at first operation "
             "after the committed operation (committed=$0, first pending=$1)",
diff --git a/src/kudu/consensus/raft_consensus.cc 
b/src/kudu/consensus/raft_consensus.cc
index 6c232c690..9fcef781f 100644
--- a/src/kudu/consensus/raft_consensus.cc
+++ b/src/kudu/consensus/raft_consensus.cc
@@ -365,7 +365,7 @@ Status RaftConsensus::Start(const ConsensusBootstrapInfo& 
info,
 
     // Set the initial committed opid for the PendingRounds only after
     // appending any uncommitted replicate messages to the queue.
-    pending_->SetInitialCommittedOpId(info.last_committed_id);
+    RETURN_NOT_OK(pending_->SetInitialCommittedOpId(info.last_committed_id));
 
     // If this is the first term expire the FD immediately so that we have a
     // fast first election, otherwise we just let the timer expire normally.
@@ -748,8 +748,11 @@ Status 
RaftConsensus::BecomeReplicaUnlocked(optional<MonoDelta> fd_delta) {
 
   // Deregister ourselves from the queue. We no longer need to track what gets
   // replicated since we're stepping down.
+  //
+  // NOTE: if observer isn't registered at this point yet, that's OK because
+  //       in one of use cases Start() also invokes BecomeReplicaUnlocked()
   queue_->UnRegisterObserver(this);
-  bool was_leader = queue_->IsInLeaderMode();
+  const bool was_leader = queue_->IsInLeaderMode();
   queue_->SetNonLeaderMode(cmeta_->ActiveConfig());
   if (was_leader && server_ctx_.num_leaders) {
     server_ctx_.num_leaders->IncrementBy(-1);
@@ -908,7 +911,7 @@ void RaftConsensus::NotifyCommitIndex(int64_t commit_index) 
{
     return;
   }
 
-  pending_->AdvanceCommittedIndex(commit_index);
+  CHECK_OK(pending_->AdvanceCommittedIndex(commit_index));
 
   if (cmeta_->active_role() == RaftPeerPB::LEADER) {
     peer_manager_->SignalRequest(false);
@@ -2737,7 +2740,8 @@ void RaftConsensus::DoElectionCallback(ElectionReason 
reason, const ElectionResu
     // will bump to term 2 when it gets the vote rejection, such that its
     // next pre-election (for term 3) would succeed.
     if (result.highest_voter_term > CurrentTermUnlocked()) {
-      HandleTermAdvanceUnlocked(result.highest_voter_term);
+      WARN_NOT_OK(HandleTermAdvanceUnlocked(result.highest_voter_term),
+                  "error while handling term advanced beyond the current one");
     }
 
     LOG_WITH_PREFIX_UNLOCKED(INFO)
diff --git a/src/kudu/consensus/raft_consensus_quorum-test.cc 
b/src/kudu/consensus/raft_consensus_quorum-test.cc
index 67ebe7458..b2eb93465 100644
--- a/src/kudu/consensus/raft_consensus_quorum-test.cc
+++ b/src/kudu/consensus/raft_consensus_quorum-test.cc
@@ -422,7 +422,7 @@ class RaftConsensusQuorumTest : public KuduTest {
       ASSERT_OK(WaitForReplicate(round.get()));
       last_op_id->CopyFrom(round->id());
       if (commit_mode == COMMIT_ONE_BY_ONE) {
-        CommitDummyMessage(leader_idx, round.get(), commit_sync);
+        ASSERT_OK(CommitDummyMessage(leader_idx, round.get(), commit_sync));
       }
       rounds->push_back(round);
     }
@@ -445,7 +445,7 @@ class RaftConsensusQuorumTest : public KuduTest {
   void GatherLogEntries(int idx, const scoped_refptr<Log>& log,
                         LogEntries* entries) {
     ASSERT_OK(log->WaitUntilAllFlushed());
-    log->Close();
+    ASSERT_OK(log->Close());
     shared_ptr<LogReader> log_reader;
     ASSERT_OK(log::LogReader::Open(fs_managers_[idx].get(),
                                    /*index*/nullptr,
@@ -757,7 +757,7 @@ TEST_F(RaftConsensusQuorumTest, 
TestConsensusStopsIfAMajorityFallsBehind) {
   // After we release the locks the operation should replicate to all replicas
   // and we commit.
   ASSERT_OK(WaitForReplicate(round.get()));
-  CommitDummyMessage(kLeaderIdx, round.get());
+  ASSERT_OK(CommitDummyMessage(kLeaderIdx, round.get()));
 
   // Assert that everything was ok
   WaitForReplicateIfNotAlreadyPresent(last_op_id, kFollower0Idx);
diff --git a/src/kudu/consensus/time_manager-test.cc 
b/src/kudu/consensus/time_manager-test.cc
index da37c83c2..33c39b6bf 100644
--- a/src/kudu/consensus/time_manager-test.cc
+++ b/src/kudu/consensus/time_manager-test.cc
@@ -171,8 +171,8 @@ TEST_F(TimeManagerTest, TestTimeManagerLeaderMode) {
 
   // In leader mode calling MessageReceivedFromLeader() should cause a CHECK 
failure.
   EXPECT_DEATH({
-     time_manager_->MessageReceivedFromLeader(message);
-    }, "Cannot receive messages from a leader in leader mode.");
+    ASSERT_OK(time_manager_->MessageReceivedFromLeader(message));
+  }, "Cannot receive messages from a leader in leader mode.");
 
   // .. as should AdvanceSafeTime()
   EXPECT_DEATH({
diff --git a/src/kudu/integration-tests/ts_recovery-itest.cc 
b/src/kudu/integration-tests/ts_recovery-itest.cc
index 4f61cf391..51cd66880 100644
--- a/src/kudu/integration-tests/ts_recovery-itest.cc
+++ b/src/kudu/integration-tests/ts_recovery-itest.cc
@@ -571,7 +571,13 @@ INSTANTIATE_TEST_SUITE_P(BlockManagerType, 
TsRecoveryITestDeathTest,
 TEST_P(TsRecoveryITestDeathTest, RecoverFromOpIdOverflow) {
   // Create the initial tablet files on disk, then shut down the cluster so we
   // can meddle with the WAL.
-  NO_FATALS(StartClusterOneTs());
+  NO_FATALS(StartClusterOneTs(
+      // To simplify test scaffolding, this scenario relies on lenient handling
+      // of operation indices read from the tablet's WAL upon bootstrapping.
+      // With regular guards in place, an inconsistency would be detected when
+      // setting committed OpId for PendingRounds by RaftConsensus::Start().
+      { "--raft_allow_committed_pending_index_gap" }
+  ));
   TestWorkload workload(cluster_.get());
   workload.set_num_replicas(1);
   workload.Setup();

Reply via email to