This is an automated email from the ASF dual-hosted git repository. awong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kudu.git
commit 54db215511e84785a8649ba1e52911f8adfb11e4 Author: Andrew Wong <[email protected]> AuthorDate: Wed Jan 8 23:28:12 2020 -0800 KUDU-3011 p5: transfer leadership when quiescing This amends the behavior of quiescing such that when a tablet server is quiescing, it will transfer leadership to a caught-up follower as soon as it can. While in this state, unlike while in a graceful stepdown period, the tablet can still be written to, as to not obstruct on-going workloads. Tests are added to exercise: - The basic behavior: even without injecting any errors that might cause elections, a quiescing leader will relinquish leadership. - The behavior when there are followers being caught up. In such cases, the leader won't immediately relinquish leadership -- instead, it will wait for the followers to catch up before stepping down. - The behavior when being written to. The fact that a leader is quiescing shouldn't affect its ability to be written to. - The behavior of the PeerMessageQueue when responding to various peer responses. I also removed some election-causing injection in a couple existing tests that was previously required to transfer leadership while quiescing. Note: right now, if all tablet servers are quiescing while there is a write workload on-going, a large number of StartElection requests will be sent from the leaders to the followers. A follow-up patch will address this. Change-Id: Idbf0716f5c9455f83ff5f6f601b0f5042f77d078 Reviewed-on: http://gerrit.cloudera.org:8080/15012 Reviewed-by: Adar Dembo <[email protected]> Reviewed-by: Alexey Serbin <[email protected]> Tested-by: Andrew Wong <[email protected]> --- src/kudu/consensus/consensus_peers-test.cc | 1 + src/kudu/consensus/consensus_queue-test.cc | 116 ++++++++++++- src/kudu/consensus/consensus_queue.cc | 20 ++- src/kudu/consensus/consensus_queue.h | 6 + src/kudu/consensus/raft_consensus.cc | 8 +- .../tablet_server_quiescing-itest.cc | 180 +++++++++++++++++++-- 6 files changed, 306 insertions(+), 25 deletions(-) diff --git a/src/kudu/consensus/consensus_peers-test.cc b/src/kudu/consensus/consensus_peers-test.cc index 8ac75a8..49c9812 100644 --- a/src/kudu/consensus/consensus_peers-test.cc +++ b/src/kudu/consensus/consensus_peers-test.cc @@ -103,6 +103,7 @@ class ConsensusPeersTest : public KuduTest { FakeRaftPeerPB(kLeaderUuid), kTabletId, raft_pool_->NewToken(ThreadPool::ExecutionMode::SERIAL), + /*server_quiescing*/nullptr, MinimumOpId(), MinimumOpId())); diff --git a/src/kudu/consensus/consensus_queue-test.cc b/src/kudu/consensus/consensus_queue-test.cc index 094f2cc..092d475 100644 --- a/src/kudu/consensus/consensus_queue-test.cc +++ b/src/kudu/consensus/consensus_queue-test.cc @@ -17,16 +17,20 @@ #include "kudu/consensus/consensus_queue.h" -#include <cstddef> +#include <algorithm> +#include <atomic> #include <cstdint> +#include <deque> #include <memory> #include <ostream> #include <string> #include <vector> +#include <boost/optional/optional.hpp> #include <gflags/gflags.h> #include <glog/logging.h> #include <gtest/gtest.h> +#include <gtest/gtest_prod.h> #include "kudu/clock/clock.h" #include "kudu/clock/hybrid_clock.h" @@ -63,6 +67,9 @@ DECLARE_int32(consensus_max_batch_size_bytes); DECLARE_int32(follower_unavailable_considered_failed_sec); using kudu::consensus::HealthReportPB; +using std::atomic; +using std::deque; +using std::string; using std::unique_ptr; using std::vector; @@ -78,7 +85,8 @@ class ConsensusQueueTest : public KuduTest { ConsensusQueueTest() : schema_(GetSimpleTestSchema()), metric_entity_(METRIC_ENTITY_tablet.Instantiate(&metric_registry_, "queue-test")), - registry_(new log::LogAnchorRegistry) { + registry_(new log::LogAnchorRegistry), + quiescing_(false) { } virtual void SetUp() OVERRIDE { @@ -90,8 +98,8 @@ class ConsensusQueueTest : public KuduTest { fs_manager_.get(), kTestTablet, schema_, - 0, // schema_version - NULL, + /*schema_version*/0, + /*metric_entity*/nullptr, &log_)); clock_.reset(new clock::HybridClock()); ASSERT_OK(clock_->Init()); @@ -109,6 +117,7 @@ class ConsensusQueueTest : public KuduTest { FakeRaftPeerPB(kLeaderUuid), kTestTablet, raft_pool_->NewToken(ThreadPool::ExecutionMode::SERIAL), + &quiescing_, replicated_opid, committed_opid)); } @@ -233,8 +242,105 @@ class ConsensusQueueTest : public KuduTest { gscoped_ptr<PeerMessageQueue> queue_; scoped_refptr<log::LogAnchorRegistry> registry_; unique_ptr<clock::Clock> clock_; + atomic<bool> quiescing_; }; +// Observer of a PeerMessageQueue that tracks the notifications sent to +// observers. +class SimpleObserver : public PeerMessageQueueObserver { + public: + SimpleObserver() = default; + + void NotifyPeerToStartElection(const string& peer_uuid) override { + peers_to_start_election_.emplace_back(peer_uuid); + } + + // Other notifications aren't implemented. Just no-op. + void NotifyCommitIndex(int64_t /*commit_index*/) override {} + void NotifyTermChange(int64_t /*term*/) override {} + void NotifyFailedFollower(const string& /*peer_uuid*/, int64_t /*term*/, + const string& /*reason*/) override {} + void NotifyPeerToPromote(const string& /*peer_uuid*/) override {} + void NotifyPeerHealthChange() override {} + + private: + FRIEND_TEST(ConsensusQueueTest, TestTransferLeadershipWhenAppropriate); + + // The following track the notifications sent in chronological order. + deque<string> peers_to_start_election_; +}; + +// Test that the leader consensus queue will only attempt to trigger elections +// when appropriate. +TEST_F(ConsensusQueueTest, TestTransferLeadershipWhenAppropriate) { + SimpleObserver observer; + queue_->RegisterObserver(&observer); + RaftConfigPB config = BuildRaftConfigPBForTests(/*num_voters*/2); + queue_->SetLeaderMode(kMinimumOpIdIndex, kMinimumTerm, config); + RaftPeerPB follower = MakePeer(kPeerUuid, RaftPeerPB::VOTER); + queue_->TrackPeer(follower); + + AppendReplicateMessagesToQueue(queue_.get(), clock_.get(), 1, 10); + WaitForLocalPeerToAckIndex(10); + + ConsensusResponsePB peer_response; + peer_response.set_responder_term(1); + peer_response.set_responder_uuid(kPeerUuid); + SetLastReceivedAndLastCommitted(&peer_response, MakeOpId(1, 9), MinimumOpId().index()); + + int elections_so_far = 0; + // Simulates receiving the peer's response and checks that, upon receiving + // it, the PeerMessageQueue either did or didn't notify that the peer should + // start an election. + auto verify_elections = [&] (bool election_happened) { + ASSERT_TRUE(queue_->ResponseFromPeer(kPeerUuid, peer_response)); + // Notifications are communicated via the Raft threadpool, so wait for any + // such notifying tasks to finish. + raft_pool_->Wait(); + if (election_happened) { + elections_so_far++; + } + ASSERT_EQ(elections_so_far, observer.peers_to_start_election_.size()); + }; + // We haven't begun watching for a successor yet and our conditions aren't + // met for this peer to become a leader. + NO_FATALS(verify_elections(/*election_happened*/false)); + + // Even after waiting for a successor, this peer isn't ready yet. + queue_->BeginWatchForSuccessor(boost::none); + NO_FATALS(verify_elections(/*election_happened*/false)); + + // Once the peer says it's gotten the last-appended op, we should be good to + // transfer leadership to it. + SetLastReceivedAndLastCommitted(&peer_response, MakeOpId(1, 10), MinimumOpId().index()); + NO_FATALS(verify_elections(/*election_happened*/true)); + + // After we've triggered our election, we shouldn't trigger another. + NO_FATALS(verify_elections(/*election_happened*/false)); + + // And if we try to step down but specify a different peer, we also won't try + // electing the peer in-hand. + queue_->BeginWatchForSuccessor(boost::make_optional<string>("different-peer")); + NO_FATALS(verify_elections(/*election_happened*/false)); + + // Even if we begin quiescing, because we're looking for a specific + // successor, we shouldn't see an election. + quiescing_ = true; + NO_FATALS(verify_elections(/*election_happened*/false)); + + // If we stop watching for that successor and we're quiescing, we'll trigger + // elections. + queue_->EndWatchForSuccessor(); + for (int i = 0; i < 3; i++) { + NO_FATALS(verify_elections(/*election_happened*/true)); + } + + // If the peer weren't a voter, we would also not trigger elections. + config.mutable_peers(1)->set_member_type(RaftPeerPB::NON_VOTER); + queue_->SetLeaderMode(10, 1, config); + NO_FATALS(verify_elections(/*election_happened*/false)); +} + // Tests that the queue is able to track a peer when it starts tracking a peer // after the initial message in the queue. In particular this creates a queue // with several messages and then starts to track a peer whose watermark @@ -928,7 +1034,7 @@ TEST_F(ConsensusQueueTest, TestFollowerCommittedIndexAndMetrics) { AppendReplicateMessagesToQueue(queue_.get(), clock_.get(), 1, 10); WaitForLocalPeerToAckIndex(10); - // The committed_index should be MinimumOpId() since UpdateFollowerCommittedIndex + // The committed_index should be MinimumOpId() since UpdateFollowerWatermarks // has not been called. ASSERT_EQ(0, queue_->GetCommittedIndex()); diff --git a/src/kudu/consensus/consensus_queue.cc b/src/kudu/consensus/consensus_queue.cc index a8aaa8f..3c6373f 100644 --- a/src/kudu/consensus/consensus_queue.cc +++ b/src/kudu/consensus/consensus_queue.cc @@ -17,6 +17,7 @@ #include "kudu/consensus/consensus_queue.h" #include <algorithm> +#include <atomic> #include <cstdint> #include <functional> #include <memory> @@ -80,6 +81,7 @@ DECLARE_int64(rpc_max_message_size); using kudu::log::Log; using kudu::pb_util::SecureDebugString; using kudu::pb_util::SecureShortDebugString; +using std::atomic; using std::string; using std::unique_ptr; using std::unordered_map; @@ -174,9 +176,11 @@ PeerMessageQueue::PeerMessageQueue(const scoped_refptr<MetricEntity>& metric_ent RaftPeerPB local_peer_pb, string tablet_id, unique_ptr<ThreadPoolToken> raft_pool_observers_token, + const atomic<bool>* server_quiescing, OpId last_locally_replicated, const OpId& last_locally_committed) : raft_pool_observers_token_(std::move(raft_pool_observers_token)), + server_quiescing_(server_quiescing), local_peer_pb_(std::move(local_peer_pb)), tablet_id_(std::move(tablet_id)), successor_watch_in_progress_(false), @@ -1065,16 +1069,24 @@ void PeerMessageQueue::PromoteIfNeeded(TrackedPeer* peer, const TrackedPeer& pre void PeerMessageQueue::TransferLeadershipIfNeeded(const TrackedPeer& peer, const ConsensusStatusPB& status) { DCHECK(queue_lock_.is_locked()); - if (!successor_watch_in_progress_) { + bool server_quiescing = server_quiescing_ && *server_quiescing_; + // Only transfer leadership if the local peer has begun looking for a + // successor, or if the server is quiescing. Otherwise, exit early. + if (!successor_watch_in_progress_ && !server_quiescing) { return; } - if (designated_successor_uuid_ && peer.uuid() != designated_successor_uuid_.get()) { + // Do some basic sanity checks that we can actually transfer leadership to + // the given peer. + if (queue_state_.mode != PeerMessageQueue::LEADER || + peer.last_exchange_status != PeerStatus::OK || + local_peer_pb_.permanent_uuid() == peer.uuid()) { return; } - if (queue_state_.mode != PeerMessageQueue::LEADER || - peer.last_exchange_status != PeerStatus::OK) { + // If looking for a specific successor, ignore peers as appropriate. + if (successor_watch_in_progress_ && + designated_successor_uuid_ && peer.uuid() != designated_successor_uuid_.get()) { return; } diff --git a/src/kudu/consensus/consensus_queue.h b/src/kudu/consensus/consensus_queue.h index d2257bc..430a313 100644 --- a/src/kudu/consensus/consensus_queue.h +++ b/src/kudu/consensus/consensus_queue.h @@ -16,6 +16,7 @@ // under the License. #pragma once +#include <atomic> #include <cstdint> #include <functional> #include <ostream> @@ -186,6 +187,7 @@ class PeerMessageQueue { RaftPeerPB local_peer_pb, std::string tablet_id, std::unique_ptr<ThreadPoolToken> raft_pool_observers_token, + const std::atomic<bool>* server_quiescing, OpId last_locally_replicated, const OpId& last_locally_committed); @@ -545,6 +547,10 @@ class PeerMessageQueue { // The pool token which executes observer notifications. std::unique_ptr<ThreadPoolToken> raft_pool_observers_token_; + // Shared boolean that indicates whether the server is quiescing, in which + // case leadership should be transferred away from this peer. + const std::atomic<bool>* server_quiescing_; + // PB containing identifying information about the local peer. const RaftPeerPB local_peer_pb_; diff --git a/src/kudu/consensus/raft_consensus.cc b/src/kudu/consensus/raft_consensus.cc index 618e542..4f99b71 100644 --- a/src/kudu/consensus/raft_consensus.cc +++ b/src/kudu/consensus/raft_consensus.cc @@ -277,6 +277,7 @@ Status RaftConsensus::Start(const ConsensusBootstrapInfo& info, local_peer_pb_, options_.tablet_id, raft_pool->NewToken(ThreadPool::ExecutionMode::SERIAL), + server_ctx_.quiescing, info.last_id, info.last_committed_id)); @@ -913,11 +914,12 @@ void RaftConsensus::NotifyPeerToPromote(const string& peer_uuid) { } void RaftConsensus::NotifyPeerToStartElection(const string& peer_uuid) { - LOG(INFO) << "Instructing follower " << peer_uuid << " to start an election"; + const auto& log_prefix = LogPrefixThreadSafe(); + LOG(INFO) << log_prefix << ": Instructing follower " << peer_uuid << " to start an election"; WARN_NOT_OK(raft_pool_token_->SubmitFunc(std::bind(&RaftConsensus::TryStartElectionOnPeerTask, shared_from_this(), peer_uuid)), - LogPrefixThreadSafe() + "Unable to start TryStartElectionOnPeerTask"); + log_prefix + "Unable to start TryStartElectionOnPeerTask"); } void RaftConsensus::NotifyPeerHealthChange() { @@ -1002,7 +1004,7 @@ void RaftConsensus::TryStartElectionOnPeerTask(const string& peer_uuid) { return; } LOG_WITH_PREFIX_UNLOCKED(INFO) << "Signalling peer " << peer_uuid - << "to start an election"; + << " to start an election"; WARN_NOT_OK(peer_manager_->StartElection(peer_uuid), Substitute("unable to start election on peer $0", peer_uuid)); } diff --git a/src/kudu/integration-tests/tablet_server_quiescing-itest.cc b/src/kudu/integration-tests/tablet_server_quiescing-itest.cc index bf4fcec..70c4eb9 100644 --- a/src/kudu/integration-tests/tablet_server_quiescing-itest.cc +++ b/src/kudu/integration-tests/tablet_server_quiescing-itest.cc @@ -36,6 +36,7 @@ #include "kudu/integration-tests/internal_mini_cluster-itest-base.h" #include "kudu/integration-tests/test_workload.h" #include "kudu/mini-cluster/internal_mini_cluster.h" +#include "kudu/tablet/metadata.pb.h" #include "kudu/tserver/mini_tablet_server.h" #include "kudu/tserver/scanners.h" #include "kudu/tserver/tablet_server.h" @@ -50,6 +51,8 @@ DECLARE_bool(enable_leader_failure_detection); DECLARE_bool(catalog_manager_wait_for_new_tablets_to_elect_leader); DECLARE_double(leader_failure_max_missed_heartbeat_periods); DECLARE_int32(consensus_inject_latency_ms_in_notifications); +DECLARE_int32(tablet_copy_download_file_inject_latency_ms); +DECLARE_int32(tablet_copy_transfer_chunk_size_bytes); DECLARE_int32(scanner_default_batch_size_bytes); DECLARE_int32(scanner_ttl_ms); DECLARE_int32(raft_heartbeat_interval_ms); @@ -137,16 +140,16 @@ TEST_F(TServerQuiescingITest, TestQuiescingServerDoesntTriggerElections) { LOG(INFO) << Substitute("Quiescing ts $0", ts->uuid()); *ts->server()->mutable_quiescing() = true; - // Cause a bunch of elections. - FLAGS_leader_failure_max_missed_heartbeat_periods = 1; - FLAGS_consensus_inject_latency_ms_in_notifications = FLAGS_raft_heartbeat_interval_ms; - // Soon enough, elections will occur, and our quiescing server will cease to // be leader. ASSERT_EVENTUALLY([&] { ASSERT_EQ(0, ts->server()->num_raft_leaders()->value()); }); + // Cause a bunch of elections. + FLAGS_leader_failure_max_missed_heartbeat_periods = 1; + FLAGS_consensus_inject_latency_ms_in_notifications = FLAGS_raft_heartbeat_interval_ms; + // When we stop quiescing the server, we should eventually see some // leadership return to the server. *ts->server()->mutable_quiescing() = false; @@ -155,11 +158,36 @@ TEST_F(TServerQuiescingITest, TestQuiescingServerDoesntTriggerElections) { }); } +// Test that after quiescing a tablet's leader, leadership will be transferred +// elsewhere. +TEST_F(TServerQuiescingITest, TestQuiescingLeaderTransfersLeadership) { + const int kNumReplicas = 3; + NO_FATALS(StartCluster(kNumReplicas)); + vector<string> tablet_ids; + NO_FATALS(CreateWorkloadTable(/*num_tablets*/1, &tablet_ids)); + string tablet_id = tablet_ids[0]; + + const MonoDelta kTimeout = MonoDelta::FromSeconds(10); + TServerDetails* leader_details; + ASSERT_OK(FindTabletLeader(ts_map_, tablet_id, kTimeout, &leader_details)); + + // Start quiescing the leader. + const auto& orig_leader_uuid = leader_details->uuid(); + auto* leader_ts = cluster_->mini_tablet_server_by_uuid(orig_leader_uuid); + *leader_ts->server()->mutable_quiescing() = true; + + // The leader tserver will relinquish leadership soon enough. + ASSERT_EVENTUALLY([&] { + ASSERT_OK(FindTabletLeader(ts_map_, tablet_id, kTimeout, &leader_details)); + ASSERT_NE(orig_leader_uuid, leader_details->uuid()); + }); +} + // Test that even if a majority of replicas are quiescing, a tablet is still // able to elect a leader. TEST_F(TServerQuiescingITest, TestMajorityQuiescingElectsLeader) { const int kNumReplicas = 3; - FLAGS_raft_heartbeat_interval_ms = 50; + FLAGS_raft_heartbeat_interval_ms = 100; NO_FATALS(StartCluster(kNumReplicas)); vector<string> tablet_ids; NO_FATALS(CreateWorkloadTable(/*num_tablets*/1, &tablet_ids)); @@ -170,10 +198,6 @@ TEST_F(TServerQuiescingITest, TestMajorityQuiescingElectsLeader) { *cluster_->mini_tablet_server(i)->server()->mutable_quiescing() = true; } - // Cause a bunch of elections. - FLAGS_leader_failure_max_missed_heartbeat_periods = 1; - FLAGS_consensus_inject_latency_ms_in_notifications = FLAGS_raft_heartbeat_interval_ms; - // Eventually the first tserver will be elected leader. const MonoDelta kTimeout = MonoDelta::FromSeconds(10); TServerDetails* leader_details; @@ -234,10 +258,6 @@ TEST_F(TServerQuiescingITest, TestDoesntAllowNewScansLeadersOnly) { rw_workload->Setup(); rw_workload->Start(); - // Inject a bunch of leader elections to stress leadership changes. - FLAGS_leader_failure_max_missed_heartbeat_periods = 1; - FLAGS_consensus_inject_latency_ms_in_notifications = FLAGS_raft_heartbeat_interval_ms; - // Wait for the scans to begin. MiniTabletServer* ts = nullptr; ASSERT_EVENTUALLY([&] { @@ -263,6 +283,74 @@ TEST_F(TServerQuiescingITest, TestDoesntAllowNewScansLeadersOnly) { NO_FATALS(rw_workload->StopAndJoin()); } +// Test that when all followers are behind (e.g. because the others are down), +// the leader, even while quiescing, will remain leader. +TEST_F(TServerQuiescingITest, TestQuiesceLeaderWhileFollowersCatchingUp) { + const int kNumReplicas = 3; + FLAGS_raft_heartbeat_interval_ms = 100; + NO_FATALS(StartCluster(kNumReplicas)); + auto rw_workload = CreateFaultIntolerantRWWorkload(); + rw_workload->set_num_tablets(1); + rw_workload->Setup(); + rw_workload->Start(); + while (rw_workload->rows_inserted() < 10000) { + SleepFor(MonoDelta::FromMilliseconds(50)); + } + TServerDetails* leader_details; + const auto kTimeout = MonoDelta::FromSeconds(10); + const string tablet_id = cluster_->mini_tablet_server(0)->ListTablets()[0]; + ASSERT_OK(FindTabletLeader(ts_map_, tablet_id, kTimeout, &leader_details)); + const string leader_uuid = leader_details->uuid(); + + // Slow down tablet copies so our leader will be catching up followers long + // enough for us to observe. + FLAGS_tablet_copy_transfer_chunk_size_bytes = 512; + FLAGS_tablet_copy_download_file_inject_latency_ms = 500; + + // Stop our writes and delete the replicas on the follower servers, setting + // them up for tablet copies. + NO_FATALS(rw_workload->StopAndJoin()); + for (const auto& ts_and_details : ts_map_) { + const auto& ts_uuid = ts_and_details.first; + if (ts_uuid != leader_uuid) { + const auto* ts_details = ts_and_details.second; + ASSERT_OK(DeleteTablet(ts_details, tablet_id, + tablet::TabletDataState::TABLET_DATA_TOMBSTONED, + kTimeout)); + ASSERT_EVENTUALLY([&] { + vector<string> running_tablets; + ASSERT_OK(ListRunningTabletIds(ts_details, kTimeout, &running_tablets)); + ASSERT_EQ(0, running_tablets.size()); + }); + } + } + // Quiesce the leader and wait for a bit. While the leader is catching up + // replicas, it shouldn't relinquish leadership. + auto* leader_ts = cluster_->mini_tablet_server_by_uuid(leader_uuid); + *leader_ts->server()->mutable_quiescing() = true; + SleepFor(MonoDelta::FromSeconds(3)); + ASSERT_EQ(1, leader_ts->server()->num_raft_leaders()->value()); + ASSERT_OK(FindTabletLeader(ts_map_, tablet_id, kTimeout, &leader_details)); + ASSERT_EQ(leader_uuid, leader_details->uuid()); + + // Once we let the copy finish, the leader should relinquish leadership. + FLAGS_tablet_copy_download_file_inject_latency_ms = 0; + FLAGS_tablet_copy_transfer_chunk_size_bytes = 4 * 1024 * 1024; + for (const auto& ts_and_details : ts_map_) { + ASSERT_EVENTUALLY([&] { + vector<string> running_tablets; + ASSERT_OK(ListRunningTabletIds(ts_and_details.second, kTimeout, &running_tablets)); + ASSERT_EQ(1, running_tablets.size()); + }); + } + ASSERT_EVENTUALLY([&] { + ASSERT_EQ(0, leader_ts->server()->num_raft_leaders()->value()); + TServerDetails* new_leader_details; + ASSERT_OK(FindTabletLeader(ts_map_, tablet_id, kTimeout, &new_leader_details)); + ASSERT_NE(leader_uuid, new_leader_details->uuid()); + }); +} + class TServerQuiescingParamITest : public TServerQuiescingITest, public testing::WithParamInterface<int> {}; @@ -394,6 +482,72 @@ TEST_P(TServerQuiescingParamITest, TestScansRetry) { } } +// Test that when all the tablet servers hosting a replica are quiescing, we +// can still write (assuming a leader had previously been elected). +TEST_P(TServerQuiescingParamITest, TestWriteWhileAllQuiescing) { + const int kNumReplicas = GetParam(); + NO_FATALS(StartCluster(kNumReplicas)); + auto start_write_workload = [&] { + // Start up a workload with some writes, with no write error tolerance. + unique_ptr<TestWorkload> workload(new TestWorkload(cluster_.get())); + workload->set_num_replicas(kNumReplicas); + workload->set_num_write_threads(3); + workload->set_num_tablets(1); + workload->Setup(); + workload->Start(); + return workload; + }; + auto first_workload = start_write_workload(); + string tablet_id; + ASSERT_EVENTUALLY([&] { + vector<string> tablet_ids; + tablet_ids = cluster_->mini_tablet_server(0)->ListTablets(); + ASSERT_EQ(1, tablet_ids.size()); + tablet_id = tablet_ids[0]; + }); + + TServerDetails* leader_details; + const auto kLeaderTimeout = MonoDelta::FromSeconds(10); + ASSERT_OK(FindTabletLeader(ts_map_, tablet_id, kLeaderTimeout, &leader_details)); + + // Now quiesce all the tablet servers. + for (int i = 0; i < cluster_->num_tablet_servers(); i++) { + *cluster_->mini_tablet_server(i)->server()->mutable_quiescing() = true; + } + + // We should continue to write uninterrupted. + int start_rows = first_workload->rows_inserted(); + ASSERT_EVENTUALLY([&] { + ASSERT_GT(first_workload->rows_inserted(), start_rows + 1000); + }); +} + +TEST_P(TServerQuiescingParamITest, TestAbruptStepdownWhileAllQuiescing) { + SKIP_IF_SLOW_NOT_ALLOWED(); + + const int kNumReplicas = GetParam(); + NO_FATALS(StartCluster(kNumReplicas)); + vector<string> tablet_ids; + NO_FATALS(CreateWorkloadTable(/*num_tablets*/1, &tablet_ids)); + + TServerDetails* leader_details; + const auto kLeaderTimeout = MonoDelta::FromSeconds(10); + const auto& tablet_id = tablet_ids[0]; + ASSERT_OK(FindTabletLeader(ts_map_, tablet_id, kLeaderTimeout, &leader_details)); + + // Now quiesce all the tablet servers. + for (int i = 0; i < cluster_->num_tablet_servers(); i++) { + *cluster_->mini_tablet_server(i)->server()->mutable_quiescing() = true; + } + // Once we've stepped down, while quiescing, no new leader should be elected. + // Wait extra long to be sure. + ASSERT_OK(LeaderStepDown(leader_details, tablet_id, kLeaderTimeout)); + MonoDelta election_timeout = MonoDelta::FromMilliseconds( + 2 * FLAGS_raft_heartbeat_interval_ms * FLAGS_leader_failure_max_missed_heartbeat_periods); + Status s = FindTabletLeader(ts_map_, tablet_id, election_timeout, &leader_details); + ASSERT_TRUE(s.IsTimedOut()) << s.ToString(); +} + INSTANTIATE_TEST_CASE_P(NumReplicas, TServerQuiescingParamITest, ::testing::Values(1, 3)); } // namespace itest
