This is an automated email from the ASF dual-hosted git repository.
alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new bde9cbad1 [consensus] fix result status handling
bde9cbad1 is described below
commit bde9cbad115deb1cd689eb583a1c7a8d3440b156
Author: Alexey Serbin <[email protected]>
AuthorDate: Thu Feb 27 18:19:12 2025 -0800
[consensus] fix result status handling
This patch addresses sloppy result handling of functions/methods
that return Status under the src/kudu/consensus directory.
In addition, this patch introduces a new unsafe runtime flag
--raft_allow_committed_pending_index_gap that's now used by the
TsRecoveryITestDeathTest.RecoverFromOpIdOverflow test scenario only.
The test scenario was reliant on the sloppy result status handling
of the call to PendingRounds::SetInitialCommittedOpId() in
RaftConsensus::Start() that's been addressed in this patch. It's
the path of least resistance compared with updating test scaffolding
and the test scenario itself, but it also has a benefit of providing
a flag to enable the former result status handling approach in
RaftConsensus::Start() when setting last committed OpId for a tablet's
Raft consensus implementation. Frankly, I don't think anybody will ever
need to re-enable the original bogus behavior of ignoring non-OK result
status, but it doesn't hurt to have such an option just in case
if I'm missing something here.
Change-Id: I6db113c58c6b453bf3cd290ee0dfdd184b63b999
Reviewed-on: http://gerrit.cloudera.org:8080/22646
Reviewed-by: Abhishek Chennaka <[email protected]>
Tested-by: Alexey Serbin <[email protected]>
---
src/kudu/consensus/consensus_peers-test.cc | 14 +++++++-------
src/kudu/consensus/consensus_peers.cc | 14 +++++++-------
src/kudu/consensus/consensus_queue-test.cc | 10 +++++-----
src/kudu/consensus/consensus_queue.cc | 5 ++---
src/kudu/consensus/consensus_queue.h | 2 +-
src/kudu/consensus/log-test.cc | 24 ++++++++++++------------
src/kudu/consensus/log.cc | 5 ++---
src/kudu/consensus/log_anchor_registry.cc | 5 +++--
src/kudu/consensus/log_anchor_registry.h | 2 +-
src/kudu/consensus/log_cache-test.cc | 24 ++++++++++++------------
src/kudu/consensus/peer_manager.cc | 19 ++++++++++++-------
src/kudu/consensus/pending_rounds.cc | 11 ++++++++++-
src/kudu/consensus/raft_consensus.cc | 12 ++++++++----
src/kudu/consensus/raft_consensus_quorum-test.cc | 6 +++---
src/kudu/consensus/time_manager-test.cc | 4 ++--
src/kudu/integration-tests/ts_recovery-itest.cc | 8 +++++++-
16 files changed, 94 insertions(+), 71 deletions(-)
diff --git a/src/kudu/consensus/consensus_peers-test.cc
b/src/kudu/consensus/consensus_peers-test.cc
index 02e2ec89c..236f730f9 100644
--- a/src/kudu/consensus/consensus_peers-test.cc
+++ b/src/kudu/consensus/consensus_peers-test.cc
@@ -208,7 +208,7 @@ TEST_F(ConsensusPeersTest, TestRemotePeer) {
AppendReplicateMessagesToQueue(message_queue_.get(), clock_.get(), 1, 20);
// signal the peer there are requests pending.
- remote_peer->SignalRequest();
+ ASSERT_OK(remote_peer->SignalRequest());
// now wait on the status of the last operation
// this will complete once the peer has logged all
// requests.
@@ -240,8 +240,8 @@ TEST_F(ConsensusPeersTest, TestRemotePeers) {
OpId first = MakeOpId(0, 1);
- remote_peer1->SignalRequest();
- remote_peer2->SignalRequest();
+ ASSERT_OK(remote_peer1->SignalRequest());
+ ASSERT_OK(remote_peer2->SignalRequest());
// Now wait for the message to be replicated, this should succeed since
// majority = 2 and only one peer was delayed. The majority is made up
@@ -268,7 +268,7 @@ TEST_F(ConsensusPeersTest, TestRemotePeers) {
ASSERT_LT(message_queue_->GetCommittedIndex(), 2);
// Signal one of the two remote peers.
- remote_peer1->SignalRequest();
+ ASSERT_OK(remote_peer1->SignalRequest());
// We should now be able to wait for it to replicate, since two peers (a
majority)
// have replicated the message.
WaitForCommitIndex(2);
@@ -308,7 +308,7 @@ TEST_F(ConsensusPeersTest,
TestCloseWhenRemotePeerDoesntMakeProgress) {
// Add an op to the queue and start sending requests to the peer.
AppendReplicateMessagesToQueue(message_queue_.get(), clock_.get(), 1, 1);
- peer->SignalRequest(true);
+ ASSERT_OK(peer->SignalRequest(true));
// We should be able to close the peer even though it has more data pending.
peer->Close();
@@ -346,7 +346,7 @@ TEST_F(ConsensusPeersTest,
TestDontSendOneRpcPerWriteWhenPeerIsDown) {
mock_proxy->set_update_response(initial_resp);
AppendReplicateMessagesToQueue(message_queue_.get(), clock_.get(), 1, 1);
- peer->SignalRequest(true);
+ ASSERT_OK(peer->SignalRequest(true));
// Now wait for the message to be replicated, this should succeed since
// the local (leader) peer always acks and the follower also acked this time.
@@ -361,7 +361,7 @@ TEST_F(ConsensusPeersTest,
TestDontSendOneRpcPerWriteWhenPeerIsDown) {
// Add a bunch of messages to the queue.
for (int i = 2; i <= 100; i++) {
AppendReplicateMessagesToQueue(message_queue_.get(), clock_.get(), i, 1);
- peer->SignalRequest(false);
+ ASSERT_OK(peer->SignalRequest(false));
SleepFor(MonoDelta::FromMilliseconds(2));
}
diff --git a/src/kudu/consensus/consensus_peers.cc
b/src/kudu/consensus/consensus_peers.cc
index 55d521e16..0dd24958d 100644
--- a/src/kudu/consensus/consensus_peers.cc
+++ b/src/kudu/consensus/consensus_peers.cc
@@ -151,12 +151,12 @@ void Peer::Init() {
// Capture a weak_ptr reference into the functor so it can safely handle
// outliving the peer.
- weak_ptr<Peer> w = shared_from_this();
+ weak_ptr<Peer> w_this = shared_from_this();
heartbeater_ = PeriodicTimer::Create(
messenger_,
- [w]() {
- if (auto p = w.lock()) {
- p->SignalRequest(true);
+ [w_this = std::move(w_this)]() {
+ if (auto p = w_this.lock()) {
+ WARN_NOT_OK(p->SignalRequest(true), "SignalRequest failed");
}
},
MonoDelta::FromMilliseconds(FLAGS_raft_heartbeat_interval_ms));
@@ -170,7 +170,7 @@ Status Peer::SignalRequest(bool even_if_queue_empty) {
// the implementation of SendNextRequest() checks for 'closed_' and
// 'request_pending_' on its own.
if (PREDICT_FALSE(closed_)) {
- return Status::IllegalState("Peer was closed.");
+ return Status::IllegalState("peer closed");
}
// Only allow one request at a time. No sense waking up the
@@ -181,8 +181,8 @@ Status Peer::SignalRequest(bool even_if_queue_empty) {
// Capture a weak_ptr reference into the submitted functor so that we can
// safely handle the functor outliving its peer.
- weak_ptr<Peer> w_this = shared_from_this();
- return raft_pool_token_->Submit([even_if_queue_empty, w_this]() {
+ weak_ptr<Peer> w_this(shared_from_this());
+ return raft_pool_token_->Submit([even_if_queue_empty, w_this =
std::move(w_this)]() {
if (auto p = w_this.lock()) {
p->SendNextRequest(even_if_queue_empty);
}
diff --git a/src/kudu/consensus/consensus_queue-test.cc
b/src/kudu/consensus/consensus_queue-test.cc
index 0a64c1df1..56d6cb511 100644
--- a/src/kudu/consensus/consensus_queue-test.cc
+++ b/src/kudu/consensus/consensus_queue-test.cc
@@ -131,7 +131,7 @@ class ConsensusQueueTest : public KuduTest {
}
void TearDown() override {
- log_->WaitUntilAllFlushed();
+ ASSERT_OK(log_->WaitUntilAllFlushed());
queue_->Close();
}
@@ -910,7 +910,7 @@ TEST_F(ConsensusQueueTest,
TestQueueMovesWatermarksBackward) {
// Append a bunch of messages and update as if they were also appeneded to
the leader.
queue_->UpdateLastIndexAppendedToLeader(10);
AppendReplicateMessagesToQueue(queue_.get(), clock_.get(), 1, 10);
- log_->WaitUntilAllFlushed();
+ ASSERT_OK(log_->WaitUntilAllFlushed());
// Now rewrite some of the operations and wait for the log to append.
Synchronizer synch;
@@ -995,14 +995,14 @@ TEST_F(ConsensusQueueTest,
TestOnlyAdvancesWatermarkWhenPeerHasAPrefixOfOurLog)
for (int i = 31; i <= 53; i++) {
if (i <= 45) {
- AppendReplicateMsg(72, i, 1024);
+ ASSERT_OK(AppendReplicateMsg(72, i, 1024));
continue;
}
if (i <= 51) {
- AppendReplicateMsg(73, i, 1024);
+ ASSERT_OK(AppendReplicateMsg(73, i, 1024));
continue;
}
- AppendReplicateMsg(76, i, 1024);
+ ASSERT_OK(AppendReplicateMsg(76, i, 1024));
}
WaitForLocalPeerToAckIndex(53);
diff --git a/src/kudu/consensus/consensus_queue.cc
b/src/kudu/consensus/consensus_queue.cc
index 543e88d2b..def546acf 100644
--- a/src/kudu/consensus/consensus_queue.cc
+++ b/src/kudu/consensus/consensus_queue.cc
@@ -1467,14 +1467,13 @@ void
PeerMessageQueue::RegisterObserver(PeerMessageQueueObserver* observer) {
}
}
-Status PeerMessageQueue::UnRegisterObserver(PeerMessageQueueObserver*
observer) {
+void PeerMessageQueue::UnRegisterObserver(PeerMessageQueueObserver* observer) {
std::lock_guard lock(queue_lock_);
auto iter = std::find(observers_.begin(), observers_.end(), observer);
if (iter == observers_.end()) {
- return Status::NotFound("Can't find observer.");
+ return;
}
observers_.erase(iter);
- return Status::OK();
}
bool PeerMessageQueue::IsOpInLog(const OpId& desired_op) const {
diff --git a/src/kudu/consensus/consensus_queue.h
b/src/kudu/consensus/consensus_queue.h
index e25e5958f..16d1f7ed3 100644
--- a/src/kudu/consensus/consensus_queue.h
+++ b/src/kudu/consensus/consensus_queue.h
@@ -346,7 +346,7 @@ class PeerMessageQueue {
void RegisterObserver(PeerMessageQueueObserver* observer);
- Status UnRegisterObserver(PeerMessageQueueObserver* observer);
+ void UnRegisterObserver(PeerMessageQueueObserver* observer);
struct Metrics {
// Keeps track of the number of ops. that are completed by a majority but
still need
diff --git a/src/kudu/consensus/log-test.cc b/src/kudu/consensus/log-test.cc
index 4ea6e0bc8..61852d88c 100644
--- a/src/kudu/consensus/log-test.cc
+++ b/src/kudu/consensus/log-test.cc
@@ -137,7 +137,7 @@ class LogTest : public LogTestBase {
LogSegmentHeaderPB header;
header.set_sequence_number(sequence_number);
header.set_tablet_id(kTestTablet);
- SchemaToPB(GetSimpleTestSchema(), header.mutable_schema());
+ RETURN_NOT_OK(SchemaToPB(GetSimpleTestSchema(), header.mutable_schema()));
LogSegmentFooterPB footer;
footer.set_num_entries(10);
@@ -189,7 +189,7 @@ TEST_P(LogTestOptionalCompression,
TestMultipleEntriesInABatch) {
opid.set_term(1);
opid.set_index(1);
- AppendNoOpsToLogSync(clock_.get(), log_.get(), &opid, 2);
+ ASSERT_OK(AppendNoOpsToLogSync(clock_.get(), log_.get(), &opid, 2));
// RollOver() the batch so that we have a properly formed footer.
ASSERT_OK(log_->AllocateSegmentAndRollOverForTests());
@@ -245,7 +245,7 @@ TEST_P(LogTestOptionalCompression, TestFsync) {
opid.set_term(0);
opid.set_index(1);
- AppendNoOp(&opid);
+ ASSERT_OK(AppendNoOp(&opid));
ASSERT_OK(log_->Close());
}
@@ -258,14 +258,14 @@ TEST_P(LogTestOptionalCompression, TestSizeIsMaintained) {
ASSERT_OK(BuildLog());
OpId opid = MakeOpId(0, 1);
- AppendNoOp(&opid);
+ ASSERT_OK(AppendNoOp(&opid));
SegmentSequence segments;
log_->reader()->GetSegmentsSnapshot(&segments);
int64_t orig_size = segments[0]->file_size();
ASSERT_GT(orig_size, 0);
- AppendNoOp(&opid);
+ ASSERT_OK(AppendNoOp(&opid));
log_->reader()->GetSegmentsSnapshot(&segments);
int64_t new_size = segments[0]->file_size();
@@ -283,7 +283,7 @@ TEST_P(LogTestOptionalCompression, TestLogNotTrimmed) {
opid.set_term(0);
opid.set_index(1);
- AppendNoOp(&opid);
+ ASSERT_OK(AppendNoOp(&opid));
SegmentSequence segments;
log_->reader()->GetSegmentsSnapshot(&segments);
@@ -1153,12 +1153,12 @@ TEST_F(LogTest, TestAutoStopIdleAppendThread) {
// after the append long enough for the append thread to shut itself down
// again.
ASSERT_EVENTUALLY([&]() {
- AppendNoOpsToLogSync(clock_.get(), log_.get(), &opid, 2);
- ASSERT_TRUE(log_->append_thread_active_for_tests());
- debug::ScopedTSANIgnoreReadsAndWrites ignore_tsan;
-
ASSERT_GT(log_->segment_allocator_.active_segment_->compress_buf_.capacity(),
- faststring::kInitialCapacity);
- });
+ ASSERT_OK(AppendNoOpsToLogSync(clock_.get(), log_.get(), &opid, 2));
+ ASSERT_TRUE(log_->append_thread_active_for_tests());
+ debug::ScopedTSANIgnoreReadsAndWrites ignore_tsan;
+
ASSERT_GT(log_->segment_allocator_.active_segment_->compress_buf_.capacity(),
+ faststring::kInitialCapacity);
+ });
// After some time, the append thread should shut itself down.
ASSERT_EVENTUALLY([&]() {
ASSERT_FALSE(log_->append_thread_active_for_tests());
diff --git a/src/kudu/consensus/log.cc b/src/kudu/consensus/log.cc
index 422e6e771..0de97b678 100644
--- a/src/kudu/consensus/log.cc
+++ b/src/kudu/consensus/log.cc
@@ -892,8 +892,7 @@ Status Log::AsyncAppendCommit(const consensus::CommitMsg&
commit_msg,
unique_ptr<LogEntryBatch> entry_batch = CreateBatchFromPB(
COMMIT, batch_pb, std::move(callback));
entry->unsafe_arena_release_commit();
- AsyncAppend(std::move(entry_batch));
- return Status::OK();
+ return AsyncAppend(std::move(entry_batch));
}
Status Log::WriteBatch(LogEntryBatch* entry_batch) {
@@ -1037,7 +1036,7 @@ Status Log::WaitUntilAllFlushed() {
Synchronizer s;
unique_ptr<LogEntryBatch> reserved_entry_batch =
CreateBatchFromPB(FLUSH_MARKER, entry_batch, s.AsStatusCallback());
- AsyncAppend(std::move(reserved_entry_batch));
+ RETURN_NOT_OK(AsyncAppend(std::move(reserved_entry_batch)));
return s.Wait();
}
diff --git a/src/kudu/consensus/log_anchor_registry.cc
b/src/kudu/consensus/log_anchor_registry.cc
index 0a9ebfd6c..782c5a601 100644
--- a/src/kudu/consensus/log_anchor_registry.cc
+++ b/src/kudu/consensus/log_anchor_registry.cc
@@ -155,13 +155,14 @@ MinLogIndexAnchorer::~MinLogIndexAnchorer() {
CHECK_OK(ReleaseAnchor());
}
-void MinLogIndexAnchorer::AnchorIfMinimum(int64_t log_index) {
+Status MinLogIndexAnchorer::AnchorIfMinimum(int64_t log_index) {
std::lock_guard l(lock_);
if (log_index < minimum_log_index_ ||
PREDICT_FALSE(minimum_log_index_ == kInvalidOpIdIndex)) {
minimum_log_index_ = log_index;
- registry_->RegisterOrUpdate(minimum_log_index_, owner_, &anchor_);
+ return registry_->RegisterOrUpdate(minimum_log_index_, owner_, &anchor_);
}
+ return Status::OK();
}
Status MinLogIndexAnchorer::ReleaseAnchor() {
diff --git a/src/kudu/consensus/log_anchor_registry.h
b/src/kudu/consensus/log_anchor_registry.h
index 6ccb40f1c..46e337767 100644
--- a/src/kudu/consensus/log_anchor_registry.h
+++ b/src/kudu/consensus/log_anchor_registry.h
@@ -135,7 +135,7 @@ class MinLogIndexAnchorer {
// If op_id is less than the minimum index registered so far, or if no
indexes
// are currently registered, anchor on 'log_index'.
- void AnchorIfMinimum(int64_t log_index);
+ Status AnchorIfMinimum(int64_t log_index);
// Un-anchors the earliest index (which is the only one tracked).
// If no minimum is known (no anchor registered), returns OK.
diff --git a/src/kudu/consensus/log_cache-test.cc
b/src/kudu/consensus/log_cache-test.cc
index 1292591a6..b0220dd62 100644
--- a/src/kudu/consensus/log_cache-test.cc
+++ b/src/kudu/consensus/log_cache-test.cc
@@ -105,7 +105,7 @@ class LogCacheTest : public KuduTest {
}
void TearDown() override {
- log_->WaitUntilAllFlushed();
+ ASSERT_OK(log_->WaitUntilAllFlushed());
}
void CloseAndReopenCache(const OpId& preceding_id) {
@@ -153,7 +153,7 @@ TEST_F(LogCacheTest, TestAppendAndGetMessages) {
ASSERT_OK(AppendReplicateMessagesToCache(1, 100));
ASSERT_EQ(100, cache_->metrics_.log_cache_num_ops->value());
ASSERT_GE(cache_->metrics_.log_cache_size->value(), 500);
- log_->WaitUntilAllFlushed();
+ ASSERT_OK(log_->WaitUntilAllFlushed());
vector<ReplicateRefPtr> messages;
OpId preceding;
@@ -197,7 +197,7 @@ TEST_F(LogCacheTest, TestAlwaysYieldsAtLeastOneMessage) {
// Append several large ops to the cache
ASSERT_OK(AppendReplicateMessagesToCache(1, 4, kPayloadSize));
- log_->WaitUntilAllFlushed();
+ ASSERT_OK(log_->WaitUntilAllFlushed());
// We should get one of them, even though we only ask for 100 bytes
vector<ReplicateRefPtr> messages;
@@ -218,7 +218,7 @@ TEST_F(LogCacheTest, TestAlwaysYieldsAtLeastOneMessage) {
TEST_F(LogCacheTest, TestCacheEdgeCases) {
// Append 1 message to the cache
ASSERT_OK(AppendReplicateMessagesToCache(1, 1));
- log_->WaitUntilAllFlushed();
+ ASSERT_OK(log_->WaitUntilAllFlushed());
std::vector<ReplicateRefPtr> messages;
OpId preceding;
@@ -263,7 +263,7 @@ TEST_F(LogCacheTest, TestMemoryLimit) {
const int kPayloadSize = 400 * 1024;
// Limit should not be violated.
ASSERT_OK(AppendReplicateMessagesToCache(1, 1, kPayloadSize));
- log_->WaitUntilAllFlushed();
+ ASSERT_OK(log_->WaitUntilAllFlushed());
ASSERT_EQ(1, cache_->num_cached_ops());
// Verify the size is right. It's not exactly kPayloadSize because of
in-memory
@@ -274,7 +274,7 @@ TEST_F(LogCacheTest, TestMemoryLimit) {
// Add another operation which fits under the 1MB limit.
ASSERT_OK(AppendReplicateMessagesToCache(2, 1, kPayloadSize));
- log_->WaitUntilAllFlushed();
+ ASSERT_OK(log_->WaitUntilAllFlushed());
ASSERT_EQ(2, cache_->num_cached_ops());
int size_with_two_msgs = cache_->BytesUsed();
@@ -287,7 +287,7 @@ TEST_F(LogCacheTest, TestMemoryLimit) {
// Verify that we have trimmed by appending a message that would
// otherwise be rejected, since the cache max size limit is 2MB.
ASSERT_OK(AppendReplicateMessagesToCache(3, 1, kPayloadSize));
- log_->WaitUntilAllFlushed();
+ ASSERT_OK(log_->WaitUntilAllFlushed());
ASSERT_EQ(2, cache_->num_cached_ops());
ASSERT_EQ(size_with_two_msgs, cache_->BytesUsed());
@@ -318,7 +318,7 @@ TEST_F(LogCacheTest, TestGlobalMemoryLimit) {
// Should succeed, but only end up caching one of the two ops because of the
global limit.
ASSERT_OK(AppendReplicateMessagesToCache(1, 2, kPayloadSize));
- log_->WaitUntilAllFlushed();
+ ASSERT_OK(log_->WaitUntilAllFlushed());
ASSERT_EQ(1, cache_->num_cached_ops());
ASSERT_LE(cache_->BytesUsed(), 1024 * 1024);
@@ -339,7 +339,7 @@ TEST_F(LogCacheTest, TestReplaceMessages) {
ASSERT_OK(AppendReplicateMessagesToCache(1, 1, kPayloadSize));
}
- log_->WaitUntilAllFlushed();
+ ASSERT_OK(log_->WaitUntilAllFlushed());
EXPECT_EQ(size_with_one_msg, tracker->consumption());
EXPECT_EQ(Substitute("Pinned index: 2, LogCacheStats(num_ops=1, bytes=$0)",
@@ -356,17 +356,17 @@ TEST_F(LogCacheTest, TestTruncation) {
};
// Append 1 through 3.
- AppendReplicateMessagesToCache(1, 3, 100);
+ ASSERT_OK(AppendReplicateMessagesToCache(1, 3, 100));
for (auto mode : {TRUNCATE_BY_APPEND, TRUNCATE_EXPLICITLY}) {
SCOPED_TRACE(mode == TRUNCATE_BY_APPEND ? "by append" : "explicitly");
// Append messages 4 through 10.
- AppendReplicateMessagesToCache(4, 7, 100);
+ ASSERT_OK(AppendReplicateMessagesToCache(4, 7, 100));
ASSERT_EQ(10, cache_->metrics_.log_cache_num_ops->value());
switch (mode) {
case TRUNCATE_BY_APPEND:
- AppendReplicateMessagesToCache(3, 1, 100);
+ ASSERT_OK(AppendReplicateMessagesToCache(3, 1, 100));
break;
case TRUNCATE_EXPLICITLY:
cache_->TruncateOpsAfter(3);
diff --git a/src/kudu/consensus/peer_manager.cc
b/src/kudu/consensus/peer_manager.cc
index f09eb443d..be43243a9 100644
--- a/src/kudu/consensus/peer_manager.cc
+++ b/src/kudu/consensus/peer_manager.cc
@@ -89,14 +89,19 @@ void PeerManager::UpdateRaftConfig(const RaftConfigPB&
config) {
void PeerManager::SignalRequest(bool force_if_queue_empty) {
std::lock_guard lock(lock_);
for (auto iter = peers_.begin(); iter != peers_.end();) {
- Status s = (*iter).second->SignalRequest(force_if_queue_empty);
- if (PREDICT_FALSE(!s.ok())) {
- LOG(WARNING) << GetLogPrefix()
- << "Peer was closed, removing from peers. Peer: "
- << SecureShortDebugString((*iter).second->peer_pb());
- peers_.erase(iter++);
- } else {
+ const auto s = iter->second->SignalRequest(force_if_queue_empty);
+ if (PREDICT_TRUE(s.ok())) {
++iter;
+ continue;
+ }
+ const auto& peer_info = SecureShortDebugString(iter->second->peer_pb());
+ if (!s.IsIllegalState()) {
+ WARN_NOT_OK(s, Substitute("$0: SignalRequest failed at peer $1",
+ GetLogPrefix(), peer_info));
+ } else {
+ WARN_NOT_OK(s, Substitute("$0: SignalRequest failed, removing peer $1",
+ GetLogPrefix(), peer_info));
+ peers_.erase(iter++);
}
}
}
diff --git a/src/kudu/consensus/pending_rounds.cc
b/src/kudu/consensus/pending_rounds.cc
index 3e1c25302..984495550 100644
--- a/src/kudu/consensus/pending_rounds.cc
+++ b/src/kudu/consensus/pending_rounds.cc
@@ -21,6 +21,7 @@
#include <ostream>
#include <utility>
+#include <gflags/gflags.h>
#include <glog/logging.h>
#include "kudu/consensus/consensus.pb.h"
@@ -31,11 +32,18 @@
#include "kudu/gutil/port.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/util/debug-util.h"
+#include "kudu/util/flag_tags.h"
#include "kudu/util/logging.h"
#include "kudu/util/pb_util.h"
#include "kudu/util/status.h"
#include "kudu/util/thread_restrictions.h"
+DEFINE_bool(raft_allow_committed_pending_index_gap, false,
+ "Allow a gap between last committed and first pending operation "
+ "in the queue when starting Raft consensus. For testing only!");
+TAG_FLAG(raft_allow_committed_pending_index_gap, runtime);
+TAG_FLAG(raft_allow_committed_pending_index_gap, unsafe);
+
using kudu::pb_util::SecureShortDebugString;
using std::string;
using strings::Substitute;
@@ -195,7 +203,8 @@ Status PendingRounds::SetInitialCommittedOpId(const OpId&
committed_op) {
if (!pending_ops_.empty()) {
int64_t first_pending_index = pending_ops_.begin()->first;
if (committed_op.index() < first_pending_index) {
- if (committed_op.index() != first_pending_index - 1) {
+ if (PREDICT_TRUE(!FLAGS_raft_allow_committed_pending_index_gap) &&
+ committed_op.index() != first_pending_index - 1) {
return Status::Corruption(Substitute(
"pending operations should start at first operation "
"after the committed operation (committed=$0, first pending=$1)",
diff --git a/src/kudu/consensus/raft_consensus.cc
b/src/kudu/consensus/raft_consensus.cc
index 6c232c690..9fcef781f 100644
--- a/src/kudu/consensus/raft_consensus.cc
+++ b/src/kudu/consensus/raft_consensus.cc
@@ -365,7 +365,7 @@ Status RaftConsensus::Start(const ConsensusBootstrapInfo&
info,
// Set the initial committed opid for the PendingRounds only after
// appending any uncommitted replicate messages to the queue.
- pending_->SetInitialCommittedOpId(info.last_committed_id);
+ RETURN_NOT_OK(pending_->SetInitialCommittedOpId(info.last_committed_id));
// If this is the first term expire the FD immediately so that we have a
// fast first election, otherwise we just let the timer expire normally.
@@ -748,8 +748,11 @@ Status
RaftConsensus::BecomeReplicaUnlocked(optional<MonoDelta> fd_delta) {
// Deregister ourselves from the queue. We no longer need to track what gets
// replicated since we're stepping down.
+ //
+ // NOTE: if observer isn't registered at this point yet, that's OK because
+ // in one of use cases Start() also invokes BecomeReplicaUnlocked()
queue_->UnRegisterObserver(this);
- bool was_leader = queue_->IsInLeaderMode();
+ const bool was_leader = queue_->IsInLeaderMode();
queue_->SetNonLeaderMode(cmeta_->ActiveConfig());
if (was_leader && server_ctx_.num_leaders) {
server_ctx_.num_leaders->IncrementBy(-1);
@@ -908,7 +911,7 @@ void RaftConsensus::NotifyCommitIndex(int64_t commit_index)
{
return;
}
- pending_->AdvanceCommittedIndex(commit_index);
+ CHECK_OK(pending_->AdvanceCommittedIndex(commit_index));
if (cmeta_->active_role() == RaftPeerPB::LEADER) {
peer_manager_->SignalRequest(false);
@@ -2737,7 +2740,8 @@ void RaftConsensus::DoElectionCallback(ElectionReason
reason, const ElectionResu
// will bump to term 2 when it gets the vote rejection, such that its
// next pre-election (for term 3) would succeed.
if (result.highest_voter_term > CurrentTermUnlocked()) {
- HandleTermAdvanceUnlocked(result.highest_voter_term);
+ WARN_NOT_OK(HandleTermAdvanceUnlocked(result.highest_voter_term),
+ "error while handling term advanced beyond the current one");
}
LOG_WITH_PREFIX_UNLOCKED(INFO)
diff --git a/src/kudu/consensus/raft_consensus_quorum-test.cc
b/src/kudu/consensus/raft_consensus_quorum-test.cc
index 67ebe7458..b2eb93465 100644
--- a/src/kudu/consensus/raft_consensus_quorum-test.cc
+++ b/src/kudu/consensus/raft_consensus_quorum-test.cc
@@ -422,7 +422,7 @@ class RaftConsensusQuorumTest : public KuduTest {
ASSERT_OK(WaitForReplicate(round.get()));
last_op_id->CopyFrom(round->id());
if (commit_mode == COMMIT_ONE_BY_ONE) {
- CommitDummyMessage(leader_idx, round.get(), commit_sync);
+ ASSERT_OK(CommitDummyMessage(leader_idx, round.get(), commit_sync));
}
rounds->push_back(round);
}
@@ -445,7 +445,7 @@ class RaftConsensusQuorumTest : public KuduTest {
void GatherLogEntries(int idx, const scoped_refptr<Log>& log,
LogEntries* entries) {
ASSERT_OK(log->WaitUntilAllFlushed());
- log->Close();
+ ASSERT_OK(log->Close());
shared_ptr<LogReader> log_reader;
ASSERT_OK(log::LogReader::Open(fs_managers_[idx].get(),
/*index*/nullptr,
@@ -757,7 +757,7 @@ TEST_F(RaftConsensusQuorumTest,
TestConsensusStopsIfAMajorityFallsBehind) {
// After we release the locks the operation should replicate to all replicas
// and we commit.
ASSERT_OK(WaitForReplicate(round.get()));
- CommitDummyMessage(kLeaderIdx, round.get());
+ ASSERT_OK(CommitDummyMessage(kLeaderIdx, round.get()));
// Assert that everything was ok
WaitForReplicateIfNotAlreadyPresent(last_op_id, kFollower0Idx);
diff --git a/src/kudu/consensus/time_manager-test.cc
b/src/kudu/consensus/time_manager-test.cc
index da37c83c2..33c39b6bf 100644
--- a/src/kudu/consensus/time_manager-test.cc
+++ b/src/kudu/consensus/time_manager-test.cc
@@ -171,8 +171,8 @@ TEST_F(TimeManagerTest, TestTimeManagerLeaderMode) {
// In leader mode calling MessageReceivedFromLeader() should cause a CHECK
failure.
EXPECT_DEATH({
- time_manager_->MessageReceivedFromLeader(message);
- }, "Cannot receive messages from a leader in leader mode.");
+ ASSERT_OK(time_manager_->MessageReceivedFromLeader(message));
+ }, "Cannot receive messages from a leader in leader mode.");
// .. as should AdvanceSafeTime()
EXPECT_DEATH({
diff --git a/src/kudu/integration-tests/ts_recovery-itest.cc
b/src/kudu/integration-tests/ts_recovery-itest.cc
index 4f61cf391..51cd66880 100644
--- a/src/kudu/integration-tests/ts_recovery-itest.cc
+++ b/src/kudu/integration-tests/ts_recovery-itest.cc
@@ -571,7 +571,13 @@ INSTANTIATE_TEST_SUITE_P(BlockManagerType,
TsRecoveryITestDeathTest,
TEST_P(TsRecoveryITestDeathTest, RecoverFromOpIdOverflow) {
// Create the initial tablet files on disk, then shut down the cluster so we
// can meddle with the WAL.
- NO_FATALS(StartClusterOneTs());
+ NO_FATALS(StartClusterOneTs(
+ // To simplify test scaffolding, this scenario relies on lenient handling
+ // of operation indices read from the tablet's WAL upon bootstrapping.
+ // With regular guards in place, an inconsistency would be detected when
+ // setting committed OpId for PendingRounds by RaftConsensus::Start().
+ { "--raft_allow_committed_pending_index_gap" }
+ ));
TestWorkload workload(cluster_.get());
workload.set_num_replicas(1);
workload.Setup();