This is an automated email from the ASF dual-hosted git repository. alexey pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kudu.git
commit ee22ddcc734ab4947218c670d5cfddd61fe90fbb Author: Alexey Serbin <[email protected]> AuthorDate: Wed Sep 18 09:52:44 2019 -0700 [consensus] KUDU-2947 fix voting in case of slow WAL Before this patch, a follower replica may grant 'yes' vote right after processing recent Raft transactions even if currently established leader is alive and well in cases when persisting a Raft transaction in WAL takes longer than the leader election timeout. That might lead to multiple successive election rounds even if there were no actual reason to re-elect leader replicas. This patch also adds a test to verify the behavior of follower tablet replicas under the conditions described above. The test was failing before the patch and now it's now passing. This is a follow-up to 4870ef20b9a0c729c1d7f16c016c7d91b193e46f. Change-Id: I7c061b498e727a1a11e94e03c55530eeebfdf8dd Reviewed-on: http://gerrit.cloudera.org:8080/14260 Tested-by: Kudu Jenkins Reviewed-by: Adar Dembo <[email protected]> Reviewed-by: Andrew Wong <[email protected]> --- src/kudu/consensus/raft_consensus.cc | 24 ++- src/kudu/consensus/raft_consensus.h | 6 + .../raft_consensus_election-itest.cc | 162 ++++++++++++++++++++- 3 files changed, 188 insertions(+), 4 deletions(-) diff --git a/src/kudu/consensus/raft_consensus.cc b/src/kudu/consensus/raft_consensus.cc index a7b2057..358fa77 100644 --- a/src/kudu/consensus/raft_consensus.cc +++ b/src/kudu/consensus/raft_consensus.cc @@ -1436,7 +1436,7 @@ Status RaftConsensus::UpdateReplica(const ConsensusRequestPB* request, queue_->UpdateLastIndexAppendedToLeader(request->last_idx_appended_to_leader()); // Also prohibit voting for anyone for the minimum election timeout. - withhold_votes_until_ = MonoTime::Now() + MinimumElectionTimeout(); + WithholdVotesUnlocked(); // 1 - Early commit pending (and committed) transactions @@ -1596,9 +1596,15 @@ Status RaftConsensus::UpdateReplica(const ConsensusRequestPB* request, s = log_synchronizer.WaitFor( MonoDelta::FromMilliseconds(FLAGS_raft_heartbeat_interval_ms)); // If just waiting for our log append to finish lets snooze the timer. - // We don't want to fire leader election because we're waiting on our own log. + // We don't want to fire leader election nor accept vote requests because + // we're still processing the Raft message from the leader, + // waiting on our own log. if (s.IsTimedOut()) { SnoozeFailureDetector(); + { + LockGuard l(lock_); + WithholdVotesUnlocked(); + } } } while (s.IsTimedOut()); RETURN_NOT_OK(s); @@ -1723,9 +1729,15 @@ Status RaftConsensus::RequestVote(const VoteRequestPB* request, // will change to eject the abandoned node, but until that point, we don't want the // abandoned follower to disturb the other nodes. // + // 3) Other dynamic scenarios with a stale former leader + // This is a generalization of the case 1. It's possible that a stale former + // leader detects it's not a leader anymore at some point, but a majority + // of replicas has elected a new leader already. + // // See also https://ramcloud.stanford.edu/~ongaro/thesis.pdf // section 4.2.3. - if (!request->ignore_live_leader() && MonoTime::Now() < withhold_votes_until_) { + if (PREDICT_TRUE(!request->ignore_live_leader()) && + MonoTime::Now() < withhold_votes_until_) { return RequestVoteRespondLeaderIsAlive(request, response); } @@ -2848,6 +2860,12 @@ void RaftConsensus::SnoozeFailureDetector(boost::optional<string> reason_for_log } } +void RaftConsensus::WithholdVotesUnlocked() { + DCHECK(lock_.is_locked()); + withhold_votes_until_ = std::max(withhold_votes_until_, + MonoTime::Now() + MinimumElectionTimeout()); +} + MonoDelta RaftConsensus::MinimumElectionTimeout() const { int32_t failure_timeout = FLAGS_leader_failure_max_missed_heartbeat_periods * FLAGS_raft_heartbeat_interval_ms; diff --git a/src/kudu/consensus/raft_consensus.h b/src/kudu/consensus/raft_consensus.h index de12806..b2afafb 100644 --- a/src/kudu/consensus/raft_consensus.h +++ b/src/kudu/consensus/raft_consensus.h @@ -643,6 +643,12 @@ class RaftConsensus : public std::enable_shared_from_this<RaftConsensus>, void SnoozeFailureDetector(boost::optional<std::string> reason_for_log = boost::none, boost::optional<MonoDelta> delta = boost::none); + // Update the voting withhold interval, bumping it up for the minimum + // election timeout interval, i.e. 'FLAGS_raft_heartbeat_interval_ms' * + // 'FLAGS_leader_failure_max_missed_heartbeat_periods' milliseconds. + // This method is safe to call even it's a leader replica. + void WithholdVotesUnlocked(); + // Return the minimum election timeout. Due to backoff and random // jitter, election timeouts may be longer than this. MonoDelta MinimumElectionTimeout() const; diff --git a/src/kudu/integration-tests/raft_consensus_election-itest.cc b/src/kudu/integration-tests/raft_consensus_election-itest.cc index 63c2ecf..96c459c 100644 --- a/src/kudu/integration-tests/raft_consensus_election-itest.cc +++ b/src/kudu/integration-tests/raft_consensus_election-itest.cc @@ -29,6 +29,8 @@ #include "kudu/common/wire_protocol.pb.h" #include "kudu/consensus/consensus.pb.h" #include "kudu/consensus/metadata.pb.h" +#include "kudu/consensus/opid.pb.h" +#include "kudu/consensus/opid_util.h" #include "kudu/gutil/gscoped_ptr.h" #include "kudu/gutil/map-util.h" #include "kudu/gutil/strings/substitute.h" @@ -61,16 +63,21 @@ METRIC_DECLARE_counter(transaction_memory_pressure_rejections); METRIC_DECLARE_gauge_int64(raft_term); using kudu::cluster::ExternalTabletServer; +using kudu::consensus::COMMITTED_OPID; using kudu::consensus::ConsensusStatePB; +using kudu::consensus::MakeOpId; +using kudu::consensus::OpId; using kudu::consensus::RaftPeerPB; using kudu::itest::AddServer; using kudu::itest::GetConsensusState; +using kudu::itest::GetLastOpIdForReplica; using kudu::itest::GetReplicaStatusAndCheckIfLeader; using kudu::itest::LeaderStepDown; using kudu::itest::RemoveServer; +using kudu::itest::RequestVote; using kudu::itest::StartElection; -using kudu::itest::TabletServerMap; using kudu::itest::TServerDetails; +using kudu::itest::TabletServerMap; using kudu::itest::WaitUntilLeader; using kudu::itest::WriteSimpleTestRow; using kudu::pb_util::SecureShortDebugString; @@ -564,6 +571,159 @@ TEST_F(RaftConsensusElectionITest, TombstonedVoteAfterFailedTabletCopy) { workload.StopAndJoin(); } +// A test scenario to verify that a disruptive server doesn't start needless +// elections in case if it takes a long time to replicate Raft transactions +// from leader to follower replicas (e.g., due to slowness in WAL IO ops). +TEST_F(RaftConsensusElectionITest, DisruptiveServerAndSlowWAL) { + SKIP_IF_SLOW_NOT_ALLOWED(); + const MonoDelta kTimeout = MonoDelta::FromSeconds(10); + // Shorten the heartbeat interval for faster test run time. + const auto kHeartbeatIntervalMs = 200; + const auto kMaxMissedHeartbeatPeriods = 3; + const vector<string> ts_flags { + Substitute("--raft_heartbeat_interval_ms=$0", kHeartbeatIntervalMs), + Substitute("--leader_failure_max_missed_heartbeat_periods=$0", + kMaxMissedHeartbeatPeriods), + }; + NO_FATALS(BuildAndStart(ts_flags)); + + // Sanity check: this scenario assumes there are 3 tablet servers. The test + // table is created with RF=FLAGS_num_replicas. + ASSERT_EQ(3, FLAGS_num_replicas); + ASSERT_EQ(3, tablet_servers_.size()); + + // A convenience array to access each tablet server as TServerDetails. + vector<TServerDetails*> tservers; + AppendValuesFromMap(tablet_servers_, &tservers); + ASSERT_EQ(cluster_->num_tablet_servers(), tservers.size()); + + // The leadership might fluctuate before shutting down the third tablet + // server, so ASSERT_EVENTUALLY() below is for those rare cases. + // + // However, after one of the tablet servers is shutdown, the leadership should + // not fluctuate because: + // 1) only two voters out of three are alive + // 2) current leader should not be disturbed by any rogue votes -- that's + // the whole premise of this test scenario + // + // So, for this scenario the leadership can fluctuate only if significantly + // delaying or dropping Raft heartbeats sent from leader to follower replicas. + // However, minicluster components send heartbeats via the loopback interface, + // so no real network layer that might significantly delay heartbeats + // is involved. Also, the consensus RPC queues should not overflow + // in this scenario because the number of consensus RPCs is relatively low. + TServerDetails* leader_tserver = nullptr; + TServerDetails* other_tserver = nullptr; + TServerDetails* shutdown_tserver = nullptr; + ASSERT_EVENTUALLY([&] { + // This is a clean-up in case of retry. + if (shutdown_tserver != nullptr) { + auto* ts = cluster_->tablet_server_by_uuid(shutdown_tserver->uuid()); + if (ts->IsShutdown()) { + ASSERT_OK(ts->Restart()); + } + } + for (auto idx = 0; idx < cluster_->num_tablet_servers(); ++idx) { + auto* ts = cluster_->tablet_server(idx); + ASSERT_OK(cluster_->SetFlag(ts, "log_inject_latency", "false")); + } + leader_tserver = nullptr; + other_tserver = nullptr; + shutdown_tserver = nullptr; + + ASSERT_OK(GetLeaderReplicaWithRetries(tablet_id_, &leader_tserver)); + ASSERT_OK(WriteSimpleTestRow(leader_tserver, tablet_id_, + RowOperationsPB::UPSERT, 0, 0, "", kTimeout)); + OpId op_id; + ASSERT_OK(GetLastOpIdForReplica(tablet_id_, leader_tserver, + consensus::COMMITTED_OPID, kTimeout, + &op_id)); + ASSERT_OK(WaitForServersToAgree(kTimeout, tablet_servers_, tablet_id_, + op_id.index())); + // Shutdown one tablet server that doesn't host the leader replica of the + // target tablet and inject WAL latency to others. + for (const auto& server : tservers) { + auto* ts = cluster_->tablet_server_by_uuid(server->uuid()); + if (server->uuid() != leader_tserver->uuid() && shutdown_tserver == nullptr) { + shutdown_tserver = server; + continue; + } + if (server->uuid() != leader_tserver->uuid() && other_tserver == nullptr) { + other_tserver = server; + } + // For this scenario it's important to reserve some inactivity intervals + // for the follower between processing Raft messages from the leader. + // If a vote request arrives while replica is busy with processing + // Raft message from leader, it is rejected with 'busy' status before + // evaluating the vote withholding interval. + const auto mult = (server->uuid() == leader_tserver->uuid()) ? 2 : 1; + const auto latency_ms = mult * + kHeartbeatIntervalMs * kMaxMissedHeartbeatPeriods; + ASSERT_OK(cluster_->SetFlag(ts, "log_inject_latency_ms_mean", + std::to_string(latency_ms))); + ASSERT_OK(cluster_->SetFlag(ts, "log_inject_latency_ms_stddev", "0")); + ASSERT_OK(cluster_->SetFlag(ts, "log_inject_latency", "true")); + } + + // Shutdown the third tablet server. + cluster_->tablet_server_by_uuid(shutdown_tserver->uuid())->Shutdown(); + + // Sanity check: make sure the leadership hasn't changed since the leader + // has been determined. + TServerDetails* current_leader; + ASSERT_OK(GetLeaderReplicaWithRetries(tablet_id_, ¤t_leader)); + ASSERT_EQ(cluster_->tablet_server_index_by_uuid(leader_tserver->uuid()), + cluster_->tablet_server_index_by_uuid(current_leader->uuid())); + }); + + // Get the Raft term from the established leader. + ConsensusStatePB cstate; + ASSERT_OK(GetConsensusState(leader_tserver, tablet_id_, kTimeout, + consensus::EXCLUDE_HEALTH_REPORT, &cstate)); + + TestWorkload workload(cluster_.get()); + workload.set_table_name(kTableId); + workload.set_timeout_allowed(true); + workload.set_network_error_allowed(true); + workload.set_already_present_allowed(true); + workload.set_num_read_threads(0); + workload.set_num_write_threads(1); + workload.set_write_batch_size(1); + // Make a 'space' for the artificial vote requests (see below) to arrive + // while there is no activity on RaftConsensus::Update(). + workload.set_write_interval_millis(kHeartbeatIntervalMs); + workload.Setup(); + workload.Start(); + + // Issue rogue vote requests, imitating a disruptive tablet replica. + const auto& shutdown_server_uuid = shutdown_tserver->uuid(); + const auto next_term = cstate.current_term() + 1; + const auto targets = { leader_tserver, other_tserver }; + for (auto i = 0; i < 100; ++i) { + SleepFor(MonoDelta::FromMilliseconds(kHeartbeatIntervalMs / 4)); + for (const auto* ts : targets) { + auto s = RequestVote(ts, tablet_id_, shutdown_server_uuid, + next_term, MakeOpId(next_term + i, 0), + /*ignore_live_leader=*/ false, + /*is_pre_election=*/ false, + kTimeout); + // Neither leader nor follower replica should grant 'yes' vote + // since the healthy leader is there and doing well, sending Raft + // transactions to be replicated. + ASSERT_TRUE(s.IsInvalidArgument() || s.IsServiceUnavailable()) + << s.ToString(); + ASSERT_STR_MATCHES(s.ToString(), + "(" + "because replica is either leader or " + "believes a valid leader to be alive" + "|" + "because replica is already servicing an update " + "from a current leader or another vote" + ")"); + } + } +} + } // namespace tserver } // namespace kudu
