This is an automated email from the ASF dual-hosted git repository.

granthenke pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit e28b1c65b325b469eb624fda4cda3f7960d2a9eb
Author: Bankim Bhavsar <[email protected]>
AuthorDate: Mon Dec 21 15:50:38 2020 -0800

    [consensus] Allow sending status-only request messages to FAILED peer
    
    This change adds the ability for a leader to send status-only
    messages to a peer even if it's in FAILED_UNRECOVERABLE state.
    This ability is turned off by default and controlled via
    a PeerMessageQueue parameter.
    
    Without this change when the system catalog is copied externally
    the new master remains in FAILED_UNRECOVERABLE state and doesn't get
    promoted to being a VOTER despite the system catalog being up to date.
    The procedure for end-to-end testing that hooks up masters to use
    this Raft config is a separate change.
    
    Change-Id: I229cc739c1b5ec7b11ce05d5e6b1b8e9d654d6f7
    Reviewed-on: http://gerrit.cloudera.org:8080/16899
    Tested-by: Kudu Jenkins
    Reviewed-by: Andrew Wong <[email protected]>
---
 src/kudu/consensus/consensus_queue-test.cc | 56 ++++++++++++++++++++++++++++--
 src/kudu/consensus/consensus_queue.cc      | 33 +++++++++++++++---
 src/kudu/consensus/consensus_queue.h       |  6 +++-
 src/kudu/consensus/raft_consensus.cc       |  3 +-
 src/kudu/consensus/raft_consensus.h        |  4 +++
 5 files changed, 92 insertions(+), 10 deletions(-)

diff --git a/src/kudu/consensus/consensus_queue-test.cc 
b/src/kudu/consensus/consensus_queue-test.cc
index 8c56737..5b6f359 100644
--- a/src/kudu/consensus/consensus_queue-test.cc
+++ b/src/kudu/consensus/consensus_queue-test.cc
@@ -23,6 +23,7 @@
 #include <memory>
 #include <ostream>
 #include <string>
+#include <unordered_map>
 #include <vector>
 
 #include <boost/optional/optional.hpp>
@@ -56,6 +57,7 @@
 #include "kudu/util/metrics.h"
 #include "kudu/util/monotime.h"
 #include "kudu/util/pb_util.h"
+#include "kudu/util/slice.h"
 #include "kudu/util/status.h"
 #include "kudu/util/test_macros.h"
 #include "kudu/util/test_util.h"
@@ -63,6 +65,7 @@
 
 DECLARE_int32(consensus_max_batch_size_bytes);
 DECLARE_int32(follower_unavailable_considered_failed_sec);
+DECLARE_double(consensus_fail_log_read_ops);
 
 using kudu::consensus::HealthReportPB;
 using std::atomic;
@@ -87,7 +90,8 @@ class ConsensusQueueTest : public KuduTest {
         metric_entity_tablet_(METRIC_ENTITY_tablet.Instantiate(
             &metric_registry_, "consensus-queue-test::tablet")),
         registry_(new log::LogAnchorRegistry),
-        quiescing_(false) {
+        quiescing_(false),
+        allow_status_msg_for_failed_peer_(false) {
   }
 
   virtual void SetUp() OVERRIDE {
@@ -121,7 +125,8 @@ class ConsensusQueueTest : public KuduTest {
         raft_pool_->NewToken(ThreadPool::ExecutionMode::SERIAL),
         &quiescing_,
         replicated_opid,
-        committed_opid));
+        committed_opid,
+        &allow_status_msg_for_failed_peer_));
   }
 
   virtual void TearDown() OVERRIDE {
@@ -176,7 +181,7 @@ class ConsensusQueueTest : public KuduTest {
     response->mutable_status()->Clear();
   }
 
-  // Like the above but uses the last received index as the commtited index.
+  // Like the above but uses the last received index as the committed index.
   void UpdatePeerWatermarkToOp(ConsensusRequestPB* request,
                                ConsensusResponsePB* response,
                                const OpId& last_received,
@@ -246,6 +251,7 @@ class ConsensusQueueTest : public KuduTest {
   scoped_refptr<log::LogAnchorRegistry> registry_;
   unique_ptr<clock::Clock> clock_;
   atomic<bool> quiescing_;
+  bool allow_status_msg_for_failed_peer_;
 };
 
 // Observer of a PeerMessageQueue that tracks the notifications sent to
@@ -361,6 +367,50 @@ TEST_F(ConsensusQueueTest, 
TestTransferLeadershipWhenAppropriate) {
   NO_FATALS(verify_elections(/*election_happened*/true));
 }
 
+// Test that verifies status-only request messages are sent to the peer even 
if it's in
+// FAILED_UNRECOVERABLE state.
+TEST_F(ConsensusQueueTest, TestStatusMessagesToFailedUnrecoverablePeer) {
+  allow_status_msg_for_failed_peer_ = true;
+  RaftConfigPB config = BuildRaftConfigPBForTests(/*num_voters*/2);
+  queue_->SetLeaderMode(kMinimumOpIdIndex, kMinimumTerm, config);
+
+  AppendReplicateMessagesToQueue(queue_.get(), clock_.get(), 1, 10);
+  WaitForLocalPeerToAckIndex(10);
+
+  RaftPeerPB follower = MakePeer(kPeerUuid, RaftPeerPB::VOTER);
+  queue_->TrackPeer(follower);
+
+  ConsensusRequestPB request;
+  ConsensusResponsePB response;
+  response.set_responder_uuid(kPeerUuid);
+
+  // Send request to a new peer.
+  vector<ReplicateRefPtr> refs;
+  bool needs_tablet_copy;
+  ASSERT_OK(queue_->RequestForPeer(kPeerUuid, &request, &refs, 
&needs_tablet_copy));
+  ASSERT_FALSE(needs_tablet_copy);
+  ASSERT_EQ(0, request.ops_size());
+
+  SetLastReceivedAndLastCommitted(&response, MinimumOpId());
+  bool send_more_immediately = 
queue_->ResponseFromPeer(response.responder_uuid(), response);
+  ASSERT_TRUE(send_more_immediately) << "Queue still had requests pending";
+
+  // Inject failure to read log messages. This will put the peer in 
FAILED_UNRECOVERABLE state.
+  FLAGS_consensus_fail_log_read_ops = 1.0;
+  Status s = queue_->RequestForPeer(kPeerUuid, &request, &refs, 
&needs_tablet_copy);
+  ASSERT_TRUE(s.IsNotFound());
+  ASSERT_EQ("INJECTED FAILURE", s.message().ToString());
+  auto health_map = queue_->ReportHealthOfPeers();
+  ASSERT_NE(health_map.count(kPeerUuid), 0);
+  auto actual_health = health_map[kPeerUuid].overall_health();
+  ASSERT_EQ(HealthReportPB::FAILED_UNRECOVERABLE, actual_health);
+
+  // Verify status-only message can be sent.
+  ASSERT_OK(queue_->RequestForPeer(kPeerUuid, &request, &refs, 
&needs_tablet_copy));
+  ASSERT_FALSE(needs_tablet_copy);
+  ASSERT_EQ(0, request.ops_size());
+}
+
 // Tests that the queue is able to track a peer when it starts tracking a peer
 // after the initial message in the queue. In particular this creates a queue
 // with several messages and then starts to track a peer whose watermark
diff --git a/src/kudu/consensus/consensus_queue.cc 
b/src/kudu/consensus/consensus_queue.cc
index 05aa800..eecaecb 100644
--- a/src/kudu/consensus/consensus_queue.cc
+++ b/src/kudu/consensus/consensus_queue.cc
@@ -29,6 +29,7 @@
 #include <boost/optional/optional.hpp>
 #include <boost/optional/optional_io.hpp>
 #include <gflags/gflags.h>
+#include <google/protobuf/stubs/port.h>
 
 #include "kudu/common/common.pb.h"
 #include "kudu/common/timestamp.h"
@@ -71,6 +72,10 @@ DEFINE_int32(consensus_inject_latency_ms_in_notifications, 0,
 TAG_FLAG(consensus_inject_latency_ms_in_notifications, hidden);
 TAG_FLAG(consensus_inject_latency_ms_in_notifications, unsafe);
 
+DEFINE_double(consensus_fail_log_read_ops, 0.0,
+              "Fraction of the time when reading from the log cache will 
fail");
+TAG_FLAG(consensus_fail_log_read_ops, hidden);
+
 DECLARE_bool(raft_prepare_replacement_before_eviction);
 DECLARE_bool(safe_time_advancement_without_writes);
 DECLARE_int32(consensus_rpc_timeout_ms);
@@ -177,7 +182,8 @@ PeerMessageQueue::PeerMessageQueue(const 
scoped_refptr<MetricEntity>& metric_ent
                                    unique_ptr<ThreadPoolToken> 
raft_pool_observers_token,
                                    const atomic<bool>* server_quiescing,
                                    OpId last_locally_replicated,
-                                   const OpId& last_locally_committed)
+                                   const OpId& last_locally_committed,
+                                   const bool* 
allow_status_msg_for_failed_peer)
     : raft_pool_observers_token_(std::move(raft_pool_observers_token)),
       server_quiescing_(server_quiescing),
       local_peer_pb_(std::move(local_peer_pb)),
@@ -185,7 +191,8 @@ PeerMessageQueue::PeerMessageQueue(const 
scoped_refptr<MetricEntity>& metric_ent
       successor_watch_in_progress_(false),
       log_cache_(metric_entity, std::move(log), 
local_peer_pb_.permanent_uuid(), tablet_id_),
       metrics_(metric_entity),
-      time_manager_(time_manager) {
+      time_manager_(time_manager),
+      allow_status_msg_for_failed_peer_(allow_status_msg_for_failed_peer) {
   DCHECK(local_peer_pb_.has_permanent_uuid());
   DCHECK(local_peer_pb_.has_last_known_addr());
   DCHECK(last_locally_replicated.IsInitialized());
@@ -687,10 +694,19 @@ Status PeerMessageQueue::RequestForPeer(const string& 
uuid,
   }
   *needs_tablet_copy = false;
 
+  // Should we send log messages to the peer?
   // If we've never communicated with the peer, we don't know what messages to
-  // send, so we'll send a status-only request. Otherwise, we grab requests
-  // from the log starting at the last_received point.
-  if (peer_copy.last_exchange_status != PeerStatus::NEW) {
+  // send, so we'll send a status-only request instead.
+  //
+  // There are cases where it's beneficial to send status-only messages to a 
peer
+  // in FAILED_UNRECOVERABLE state as it helps the peer transition out of the
+  // FAILED_UNRECOVERABLE state, case in point external system catalog copy 
that's done when
+  // adding a new master. In such a case don't try to send log messages which 
is expected
+  // to fail because the log has been GC'ed but instead send status-only 
message.
+  if (peer_copy.last_exchange_status != PeerStatus::NEW &&
+      (allow_status_msg_for_failed_peer_ == nullptr || 
!*allow_status_msg_for_failed_peer_ ||
+       peer_copy.wal_catchup_possible)) {
+    // We grab requests from the log starting at the last_received point.
 
     // The batch of messages to send to the peer.
     vector<ReplicateRefPtr> messages;
@@ -701,6 +717,13 @@ Status PeerMessageQueue::RequestForPeer(const string& uuid,
                                   max_batch_size,
                                   &messages,
                                   &preceding_id);
+
+    // Inject failure to simulate follower falling behind and the leader has 
GC'ed its logs.
+    if 
(PREDICT_FALSE(fault_injection::MaybeTrue(FLAGS_consensus_fail_log_read_ops))) {
+      wal_catchup_failure = true;
+      return Status::NotFound("INJECTED FAILURE");
+    }
+
     if (PREDICT_FALSE(!s.ok())) {
       // It's normal to have a NotFound() here if a follower falls behind where
       // the leader has GCed its logs. The follower replica will hang around
diff --git a/src/kudu/consensus/consensus_queue.h 
b/src/kudu/consensus/consensus_queue.h
index af8dfdb..c71640c 100644
--- a/src/kudu/consensus/consensus_queue.h
+++ b/src/kudu/consensus/consensus_queue.h
@@ -192,7 +192,8 @@ class PeerMessageQueue {
                    std::unique_ptr<ThreadPoolToken> raft_pool_observers_token,
                    const std::atomic<bool>* server_quiescing,
                    OpId last_locally_replicated,
-                   const OpId& last_locally_committed);
+                   const OpId& last_locally_committed,
+                   const bool* allow_status_msg_for_failed_peer = nullptr);
 
   // Changes the queue to leader mode, meaning it tracks majority replicated
   // operations and notifies observers when those change.
@@ -372,6 +373,7 @@ class PeerMessageQueue {
   FRIEND_TEST(ConsensusQueueTest, TestQueueAdvancesCommittedIndex);
   FRIEND_TEST(ConsensusQueueTest, TestQueueMovesWatermarksBackward);
   FRIEND_TEST(ConsensusQueueTest, TestFollowerCommittedIndexAndMetrics);
+  FRIEND_TEST(ConsensusQueueTest, TestStatusMessagesToFailedUnrecoverablePeer);
   FRIEND_TEST(ConsensusQueueUnitTest, PeerHealthStatus);
   FRIEND_TEST(RaftConsensusQuorumTest, 
TestReplicasEnforceTheLogMatchingProperty);
 
@@ -579,6 +581,8 @@ class PeerMessageQueue {
   Metrics metrics_;
 
   TimeManager* time_manager_;
+
+  const bool* allow_status_msg_for_failed_peer_;
 };
 
 // The interface between RaftConsensus and the PeerMessageQueue.
diff --git a/src/kudu/consensus/raft_consensus.cc 
b/src/kudu/consensus/raft_consensus.cc
index 50fcd0d..0daf78a 100644
--- a/src/kudu/consensus/raft_consensus.cc
+++ b/src/kudu/consensus/raft_consensus.cc
@@ -278,7 +278,8 @@ Status RaftConsensus::Start(const ConsensusBootstrapInfo& 
info,
       raft_pool->NewToken(ThreadPool::ExecutionMode::SERIAL),
       server_ctx_.quiescing,
       info.last_id,
-      info.last_committed_id));
+      info.last_committed_id,
+      server_ctx_.allow_status_msg_for_failed_peer));
 
   // A manager for the set of peers that actually send the operations both 
remotely
   // and to the local wal.
diff --git a/src/kudu/consensus/raft_consensus.h 
b/src/kudu/consensus/raft_consensus.h
index d418b44..743789e 100644
--- a/src/kudu/consensus/raft_consensus.h
+++ b/src/kudu/consensus/raft_consensus.h
@@ -86,6 +86,10 @@ struct ServerContext {
 
   // Threadpool on which to run Raft tasks.
   ThreadPool* raft_pool;
+
+  // Shared boolean indicating whether Raft consensus should continue sending 
request messages
+  // even if a peer is considered as failed.
+  const bool* allow_status_msg_for_failed_peer = nullptr;
 };
 
 struct ConsensusOptions {

Reply via email to