This is an automated email from the ASF dual-hosted git repository. granthenke pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kudu.git
commit e28b1c65b325b469eb624fda4cda3f7960d2a9eb Author: Bankim Bhavsar <[email protected]> AuthorDate: Mon Dec 21 15:50:38 2020 -0800 [consensus] Allow sending status-only request messages to FAILED peer This change adds the ability for a leader to send status-only messages to a peer even if it's in FAILED_UNRECOVERABLE state. This ability is turned off by default and controlled via a PeerMessageQueue parameter. Without this change when the system catalog is copied externally the new master remains in FAILED_UNRECOVERABLE state and doesn't get promoted to being a VOTER despite the system catalog being up to date. The procedure for end-to-end testing that hooks up masters to use this Raft config is a separate change. Change-Id: I229cc739c1b5ec7b11ce05d5e6b1b8e9d654d6f7 Reviewed-on: http://gerrit.cloudera.org:8080/16899 Tested-by: Kudu Jenkins Reviewed-by: Andrew Wong <[email protected]> --- src/kudu/consensus/consensus_queue-test.cc | 56 ++++++++++++++++++++++++++++-- src/kudu/consensus/consensus_queue.cc | 33 +++++++++++++++--- src/kudu/consensus/consensus_queue.h | 6 +++- src/kudu/consensus/raft_consensus.cc | 3 +- src/kudu/consensus/raft_consensus.h | 4 +++ 5 files changed, 92 insertions(+), 10 deletions(-) diff --git a/src/kudu/consensus/consensus_queue-test.cc b/src/kudu/consensus/consensus_queue-test.cc index 8c56737..5b6f359 100644 --- a/src/kudu/consensus/consensus_queue-test.cc +++ b/src/kudu/consensus/consensus_queue-test.cc @@ -23,6 +23,7 @@ #include <memory> #include <ostream> #include <string> +#include <unordered_map> #include <vector> #include <boost/optional/optional.hpp> @@ -56,6 +57,7 @@ #include "kudu/util/metrics.h" #include "kudu/util/monotime.h" #include "kudu/util/pb_util.h" +#include "kudu/util/slice.h" #include "kudu/util/status.h" #include "kudu/util/test_macros.h" #include "kudu/util/test_util.h" @@ -63,6 +65,7 @@ DECLARE_int32(consensus_max_batch_size_bytes); DECLARE_int32(follower_unavailable_considered_failed_sec); +DECLARE_double(consensus_fail_log_read_ops); using kudu::consensus::HealthReportPB; using std::atomic; @@ -87,7 +90,8 @@ class ConsensusQueueTest : public KuduTest { metric_entity_tablet_(METRIC_ENTITY_tablet.Instantiate( &metric_registry_, "consensus-queue-test::tablet")), registry_(new log::LogAnchorRegistry), - quiescing_(false) { + quiescing_(false), + allow_status_msg_for_failed_peer_(false) { } virtual void SetUp() OVERRIDE { @@ -121,7 +125,8 @@ class ConsensusQueueTest : public KuduTest { raft_pool_->NewToken(ThreadPool::ExecutionMode::SERIAL), &quiescing_, replicated_opid, - committed_opid)); + committed_opid, + &allow_status_msg_for_failed_peer_)); } virtual void TearDown() OVERRIDE { @@ -176,7 +181,7 @@ class ConsensusQueueTest : public KuduTest { response->mutable_status()->Clear(); } - // Like the above but uses the last received index as the commtited index. + // Like the above but uses the last received index as the committed index. void UpdatePeerWatermarkToOp(ConsensusRequestPB* request, ConsensusResponsePB* response, const OpId& last_received, @@ -246,6 +251,7 @@ class ConsensusQueueTest : public KuduTest { scoped_refptr<log::LogAnchorRegistry> registry_; unique_ptr<clock::Clock> clock_; atomic<bool> quiescing_; + bool allow_status_msg_for_failed_peer_; }; // Observer of a PeerMessageQueue that tracks the notifications sent to @@ -361,6 +367,50 @@ TEST_F(ConsensusQueueTest, TestTransferLeadershipWhenAppropriate) { NO_FATALS(verify_elections(/*election_happened*/true)); } +// Test that verifies status-only request messages are sent to the peer even if it's in +// FAILED_UNRECOVERABLE state. +TEST_F(ConsensusQueueTest, TestStatusMessagesToFailedUnrecoverablePeer) { + allow_status_msg_for_failed_peer_ = true; + RaftConfigPB config = BuildRaftConfigPBForTests(/*num_voters*/2); + queue_->SetLeaderMode(kMinimumOpIdIndex, kMinimumTerm, config); + + AppendReplicateMessagesToQueue(queue_.get(), clock_.get(), 1, 10); + WaitForLocalPeerToAckIndex(10); + + RaftPeerPB follower = MakePeer(kPeerUuid, RaftPeerPB::VOTER); + queue_->TrackPeer(follower); + + ConsensusRequestPB request; + ConsensusResponsePB response; + response.set_responder_uuid(kPeerUuid); + + // Send request to a new peer. + vector<ReplicateRefPtr> refs; + bool needs_tablet_copy; + ASSERT_OK(queue_->RequestForPeer(kPeerUuid, &request, &refs, &needs_tablet_copy)); + ASSERT_FALSE(needs_tablet_copy); + ASSERT_EQ(0, request.ops_size()); + + SetLastReceivedAndLastCommitted(&response, MinimumOpId()); + bool send_more_immediately = queue_->ResponseFromPeer(response.responder_uuid(), response); + ASSERT_TRUE(send_more_immediately) << "Queue still had requests pending"; + + // Inject failure to read log messages. This will put the peer in FAILED_UNRECOVERABLE state. + FLAGS_consensus_fail_log_read_ops = 1.0; + Status s = queue_->RequestForPeer(kPeerUuid, &request, &refs, &needs_tablet_copy); + ASSERT_TRUE(s.IsNotFound()); + ASSERT_EQ("INJECTED FAILURE", s.message().ToString()); + auto health_map = queue_->ReportHealthOfPeers(); + ASSERT_NE(health_map.count(kPeerUuid), 0); + auto actual_health = health_map[kPeerUuid].overall_health(); + ASSERT_EQ(HealthReportPB::FAILED_UNRECOVERABLE, actual_health); + + // Verify status-only message can be sent. + ASSERT_OK(queue_->RequestForPeer(kPeerUuid, &request, &refs, &needs_tablet_copy)); + ASSERT_FALSE(needs_tablet_copy); + ASSERT_EQ(0, request.ops_size()); +} + // Tests that the queue is able to track a peer when it starts tracking a peer // after the initial message in the queue. In particular this creates a queue // with several messages and then starts to track a peer whose watermark diff --git a/src/kudu/consensus/consensus_queue.cc b/src/kudu/consensus/consensus_queue.cc index 05aa800..eecaecb 100644 --- a/src/kudu/consensus/consensus_queue.cc +++ b/src/kudu/consensus/consensus_queue.cc @@ -29,6 +29,7 @@ #include <boost/optional/optional.hpp> #include <boost/optional/optional_io.hpp> #include <gflags/gflags.h> +#include <google/protobuf/stubs/port.h> #include "kudu/common/common.pb.h" #include "kudu/common/timestamp.h" @@ -71,6 +72,10 @@ DEFINE_int32(consensus_inject_latency_ms_in_notifications, 0, TAG_FLAG(consensus_inject_latency_ms_in_notifications, hidden); TAG_FLAG(consensus_inject_latency_ms_in_notifications, unsafe); +DEFINE_double(consensus_fail_log_read_ops, 0.0, + "Fraction of the time when reading from the log cache will fail"); +TAG_FLAG(consensus_fail_log_read_ops, hidden); + DECLARE_bool(raft_prepare_replacement_before_eviction); DECLARE_bool(safe_time_advancement_without_writes); DECLARE_int32(consensus_rpc_timeout_ms); @@ -177,7 +182,8 @@ PeerMessageQueue::PeerMessageQueue(const scoped_refptr<MetricEntity>& metric_ent unique_ptr<ThreadPoolToken> raft_pool_observers_token, const atomic<bool>* server_quiescing, OpId last_locally_replicated, - const OpId& last_locally_committed) + const OpId& last_locally_committed, + const bool* allow_status_msg_for_failed_peer) : raft_pool_observers_token_(std::move(raft_pool_observers_token)), server_quiescing_(server_quiescing), local_peer_pb_(std::move(local_peer_pb)), @@ -185,7 +191,8 @@ PeerMessageQueue::PeerMessageQueue(const scoped_refptr<MetricEntity>& metric_ent successor_watch_in_progress_(false), log_cache_(metric_entity, std::move(log), local_peer_pb_.permanent_uuid(), tablet_id_), metrics_(metric_entity), - time_manager_(time_manager) { + time_manager_(time_manager), + allow_status_msg_for_failed_peer_(allow_status_msg_for_failed_peer) { DCHECK(local_peer_pb_.has_permanent_uuid()); DCHECK(local_peer_pb_.has_last_known_addr()); DCHECK(last_locally_replicated.IsInitialized()); @@ -687,10 +694,19 @@ Status PeerMessageQueue::RequestForPeer(const string& uuid, } *needs_tablet_copy = false; + // Should we send log messages to the peer? // If we've never communicated with the peer, we don't know what messages to - // send, so we'll send a status-only request. Otherwise, we grab requests - // from the log starting at the last_received point. - if (peer_copy.last_exchange_status != PeerStatus::NEW) { + // send, so we'll send a status-only request instead. + // + // There are cases where it's beneficial to send status-only messages to a peer + // in FAILED_UNRECOVERABLE state as it helps the peer transition out of the + // FAILED_UNRECOVERABLE state, case in point external system catalog copy that's done when + // adding a new master. In such a case don't try to send log messages which is expected + // to fail because the log has been GC'ed but instead send status-only message. + if (peer_copy.last_exchange_status != PeerStatus::NEW && + (allow_status_msg_for_failed_peer_ == nullptr || !*allow_status_msg_for_failed_peer_ || + peer_copy.wal_catchup_possible)) { + // We grab requests from the log starting at the last_received point. // The batch of messages to send to the peer. vector<ReplicateRefPtr> messages; @@ -701,6 +717,13 @@ Status PeerMessageQueue::RequestForPeer(const string& uuid, max_batch_size, &messages, &preceding_id); + + // Inject failure to simulate follower falling behind and the leader has GC'ed its logs. + if (PREDICT_FALSE(fault_injection::MaybeTrue(FLAGS_consensus_fail_log_read_ops))) { + wal_catchup_failure = true; + return Status::NotFound("INJECTED FAILURE"); + } + if (PREDICT_FALSE(!s.ok())) { // It's normal to have a NotFound() here if a follower falls behind where // the leader has GCed its logs. The follower replica will hang around diff --git a/src/kudu/consensus/consensus_queue.h b/src/kudu/consensus/consensus_queue.h index af8dfdb..c71640c 100644 --- a/src/kudu/consensus/consensus_queue.h +++ b/src/kudu/consensus/consensus_queue.h @@ -192,7 +192,8 @@ class PeerMessageQueue { std::unique_ptr<ThreadPoolToken> raft_pool_observers_token, const std::atomic<bool>* server_quiescing, OpId last_locally_replicated, - const OpId& last_locally_committed); + const OpId& last_locally_committed, + const bool* allow_status_msg_for_failed_peer = nullptr); // Changes the queue to leader mode, meaning it tracks majority replicated // operations and notifies observers when those change. @@ -372,6 +373,7 @@ class PeerMessageQueue { FRIEND_TEST(ConsensusQueueTest, TestQueueAdvancesCommittedIndex); FRIEND_TEST(ConsensusQueueTest, TestQueueMovesWatermarksBackward); FRIEND_TEST(ConsensusQueueTest, TestFollowerCommittedIndexAndMetrics); + FRIEND_TEST(ConsensusQueueTest, TestStatusMessagesToFailedUnrecoverablePeer); FRIEND_TEST(ConsensusQueueUnitTest, PeerHealthStatus); FRIEND_TEST(RaftConsensusQuorumTest, TestReplicasEnforceTheLogMatchingProperty); @@ -579,6 +581,8 @@ class PeerMessageQueue { Metrics metrics_; TimeManager* time_manager_; + + const bool* allow_status_msg_for_failed_peer_; }; // The interface between RaftConsensus and the PeerMessageQueue. diff --git a/src/kudu/consensus/raft_consensus.cc b/src/kudu/consensus/raft_consensus.cc index 50fcd0d..0daf78a 100644 --- a/src/kudu/consensus/raft_consensus.cc +++ b/src/kudu/consensus/raft_consensus.cc @@ -278,7 +278,8 @@ Status RaftConsensus::Start(const ConsensusBootstrapInfo& info, raft_pool->NewToken(ThreadPool::ExecutionMode::SERIAL), server_ctx_.quiescing, info.last_id, - info.last_committed_id)); + info.last_committed_id, + server_ctx_.allow_status_msg_for_failed_peer)); // A manager for the set of peers that actually send the operations both remotely // and to the local wal. diff --git a/src/kudu/consensus/raft_consensus.h b/src/kudu/consensus/raft_consensus.h index d418b44..743789e 100644 --- a/src/kudu/consensus/raft_consensus.h +++ b/src/kudu/consensus/raft_consensus.h @@ -86,6 +86,10 @@ struct ServerContext { // Threadpool on which to run Raft tasks. ThreadPool* raft_pool; + + // Shared boolean indicating whether Raft consensus should continue sending request messages + // even if a peer is considered as failed. + const bool* allow_status_msg_for_failed_peer = nullptr; }; struct ConsensusOptions {
