Repository: kudu Updated Branches: refs/heads/master 64f9ab34f -> bce1dd777
consensus: refactor tracking of received OpIds out of ReplicaState The PeerMessageQueue class was already tracking the last appended OpId, so tracking it in ReplicaState was redundant and confusing. This removes a bunch of stuff from ReplicaState and adds just a little bit of new functionality to PeerMessageQueue: - TruncateOpsAfter() now takes an index instead of an OpId. The queue can already map the index to an OpId by asking the log. - Added a getter to expose the last OpId in the log back to RaftConsensus - Changed OpId generation to happen in PeerMessageQueue. This was easy because it already knows the previous OpId and the current term. The 'last_received_cur_leader' tracking was moved into RaftConsensus itself, since it's just transient state tracking the RPC back-and-forths between a leader and the follower. This patch also removes raft_consensus-test, the mock-based testing for RaftConsensus. I found that maintaining this test was very difficult, in particular because now we rely on the fact that AppendOperations() is reflected in GetLastOpIdInLog(). With a mock PeerMessageQueue, this state update wasn't happening properly, and trying to reproduce that behavior in the mocks themselves seemed like I was basically re-implementing the actual production code for the queue. I looked over the tests in this suite and I believe all of the cases are covered by various other tests (randomized and otherwise). I looped raft_consensus-itest 100 times[1], the Churny test case 1000 times[2], and exactly_once_writes-itest 1000 times[3]. Lastly, I was able to re-enable TestChurnyElections_WithNotificationLatency and loop it 500 times[4]. [1] http://dist-test.cloudera.org/job?job_id=todd.1474357631.30024 [2] http://dist-test.cloudera.org//job?job_id=todd.1474359004.2328 [3] http://dist-test.cloudera.org//job?job_id=todd.1474358436.31536 [4] http://dist-test.cloudera.org//job?job_id=todd.1474359250.4834 Change-Id: I81614d26328b0fbba37bf279f59717e05a07b816 Reviewed-on: http://gerrit.cloudera.org:8080/4476 Tested-by: Kudu Jenkins Reviewed-by: Todd Lipcon <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/kudu/repo Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/4bcbb4a4 Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/4bcbb4a4 Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/4bcbb4a4 Branch: refs/heads/master Commit: 4bcbb4a405c037162954cba218c2316e517cdbf3 Parents: 64f9ab3 Author: Todd Lipcon <[email protected]> Authored: Tue Sep 20 00:33:45 2016 -0700 Committer: Todd Lipcon <[email protected]> Committed: Wed Oct 5 21:41:26 2016 +0000 ---------------------------------------------------------------------- src/kudu/consensus/CMakeLists.txt | 1 - src/kudu/consensus/consensus_queue.cc | 20 +- src/kudu/consensus/consensus_queue.h | 14 +- src/kudu/consensus/raft_consensus-test.cc | 663 ------------------- src/kudu/consensus/raft_consensus.cc | 48 +- src/kudu/consensus/raft_consensus.h | 7 +- .../consensus/raft_consensus_quorum-test.cc | 9 +- src/kudu/consensus/raft_consensus_state.cc | 58 +- src/kudu/consensus/raft_consensus_state.h | 35 - .../integration-tests/raft_consensus-itest.cc | 7 +- 10 files changed, 61 insertions(+), 801 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kudu/blob/4bcbb4a4/src/kudu/consensus/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/src/kudu/consensus/CMakeLists.txt b/src/kudu/consensus/CMakeLists.txt index 88f0c61..da4cc65 100644 --- a/src/kudu/consensus/CMakeLists.txt +++ b/src/kudu/consensus/CMakeLists.txt @@ -136,7 +136,6 @@ ADD_KUDU_TEST(mt-log-test) ADD_KUDU_TEST(quorum_util-test) ADD_KUDU_TEST(raft_consensus_quorum-test) ADD_KUDU_TEST(raft_consensus_state-test) -ADD_KUDU_TEST(raft_consensus-test) # Our current version of gmock overrides virtual functions without adding # the 'override' keyword which, since our move to c++11, make the compiler http://git-wip-us.apache.org/repos/asf/kudu/blob/4bcbb4a4/src/kudu/consensus/consensus_queue.cc ---------------------------------------------------------------------- diff --git a/src/kudu/consensus/consensus_queue.cc b/src/kudu/consensus/consensus_queue.cc index b153bdd..6598662 100644 --- a/src/kudu/consensus/consensus_queue.cc +++ b/src/kudu/consensus/consensus_queue.cc @@ -291,9 +291,13 @@ Status PeerMessageQueue::AppendOperations(const vector<ReplicateRefPtr>& msgs, return Status::OK(); } -void PeerMessageQueue::TruncateOpsAfter(const OpId& op) { +void PeerMessageQueue::TruncateOpsAfter(int64_t index) { DFAKE_SCOPED_LOCK(append_fake_lock_); // should not race with append. - + OpId op; + CHECK_OK_PREPEND(log_cache_.LookupOpId(index, &op), + Substitute("$0: cannot truncate ops after bad index $1", + LogPrefixUnlocked(), + index)); { std::unique_lock<simple_spinlock> lock(queue_lock_); queue_state_.last_appended = op; @@ -301,6 +305,18 @@ void PeerMessageQueue::TruncateOpsAfter(const OpId& op) { log_cache_.TruncateOpsAfter(op.index()); } +OpId PeerMessageQueue::GetLastOpIdInLog() const { + std::unique_lock<simple_spinlock> lock(queue_lock_); + return queue_state_.last_appended; +} + +OpId PeerMessageQueue::GetNextOpId() const { + std::unique_lock<simple_spinlock> lock(queue_lock_); + return MakeOpId(queue_state_.current_term, + queue_state_.last_appended.index() + 1); +} + + Status PeerMessageQueue::RequestForPeer(const string& uuid, ConsensusRequestPB* request, vector<ReplicateRefPtr>* msg_refs, http://git-wip-us.apache.org/repos/asf/kudu/blob/4bcbb4a4/src/kudu/consensus/consensus_queue.h ---------------------------------------------------------------------- diff --git a/src/kudu/consensus/consensus_queue.h b/src/kudu/consensus/consensus_queue.h index abcc089..c5fc98c 100644 --- a/src/kudu/consensus/consensus_queue.h +++ b/src/kudu/consensus/consensus_queue.h @@ -181,9 +181,17 @@ class PeerMessageQueue { virtual Status AppendOperations(const std::vector<ReplicateRefPtr>& msgs, const StatusCallback& log_append_callback); - // Truncate all operations coming after 'op'. Following this, the 'last_appended' - // operation is reset to 'op', and the log cache will be truncated accordingly. - virtual void TruncateOpsAfter(const OpId& op); + // Truncate all operations coming after 'index'. Following this, the 'last_appended' + // operation is reset to the OpId with this index, and the log cache will be truncated + // accordingly. + virtual void TruncateOpsAfter(int64_t index); + + // Return the last OpId in the log. + // Note that this can move backwards after a truncation (TruncateOpsAfter). + virtual OpId GetLastOpIdInLog() const; + + // Return the next OpId to be appended to the queue in the current term. + virtual OpId GetNextOpId() const; // Assembles a request for a peer, adding entries past 'op_id' up to // 'consensus_max_batch_size_bytes'. http://git-wip-us.apache.org/repos/asf/kudu/blob/4bcbb4a4/src/kudu/consensus/raft_consensus-test.cc ---------------------------------------------------------------------- diff --git a/src/kudu/consensus/raft_consensus-test.cc b/src/kudu/consensus/raft_consensus-test.cc deleted file mode 100644 index 147d626..0000000 --- a/src/kudu/consensus/raft_consensus-test.cc +++ /dev/null @@ -1,663 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include <gmock/gmock.h> -#include <gtest/gtest.h> -#include <memory> - -#include "kudu/common/schema.h" -#include "kudu/common/wire_protocol-test-util.h" -#include "kudu/consensus/consensus_peers.h" -#include "kudu/consensus/consensus-test-util.h" -#include "kudu/consensus/log.h" -#include "kudu/consensus/peer_manager.h" -#include "kudu/fs/fs_manager.h" -#include "kudu/gutil/stl_util.h" -#include "kudu/server/logical_clock.h" -#include "kudu/util/async_util.h" -#include "kudu/util/mem_tracker.h" -#include "kudu/util/metrics.h" -#include "kudu/util/test_macros.h" -#include "kudu/util/test_util.h" - -DECLARE_bool(enable_leader_failure_detection); - -METRIC_DECLARE_entity(tablet); - -using std::shared_ptr; -using std::string; -using std::unique_ptr; - -namespace kudu { -namespace consensus { - -using log::Log; -using log::LogOptions; -using ::testing::_; -using ::testing::AnyNumber; -using ::testing::AtLeast; -using ::testing::InSequence; -using ::testing::Invoke; -using ::testing::Mock; -using ::testing::Return; - -const char* kTestTablet = "TestTablet"; -const char* kLocalPeerUuid = "peer-0"; - -// A simple map to collect the results of a sequence of transactions. -typedef std::map<OpId, Status, OpIdCompareFunctor> StatusesMap; - -class MockQueue : public PeerMessageQueue { - public: - explicit MockQueue(const scoped_refptr<MetricEntity>& metric_entity, log::Log* log) - : PeerMessageQueue(metric_entity, log, FakeRaftPeerPB(kLocalPeerUuid), kTestTablet) {} - MOCK_METHOD1(Init, void(const OpId& locally_replicated_index)); - MOCK_METHOD3(SetLeaderMode, void(int64_t committed_opid, - int64_t current_term, - const RaftConfigPB& active_config)); - MOCK_METHOD0(SetNonLeaderMode, void()); - virtual Status AppendOperations(const vector<ReplicateRefPtr>& msgs, - const StatusCallback& callback) OVERRIDE { - return AppendOperationsMock(msgs, callback); - } - MOCK_METHOD2(AppendOperationsMock, Status(const vector<ReplicateRefPtr>& msgs, - const StatusCallback& callback)); - MOCK_METHOD1(TruncateOpsAfter, void(const OpId& op_id)); - MOCK_METHOD1(TrackPeer, void(const string&)); - MOCK_METHOD1(UntrackPeer, void(const string&)); - MOCK_METHOD4(RequestForPeer, Status(const std::string& uuid, - ConsensusRequestPB* request, - std::vector<ReplicateRefPtr>* msg_refs, - bool* needs_tablet_copy)); - MOCK_METHOD3(ResponseFromPeer, void(const std::string& peer_uuid, - const ConsensusResponsePB& response, - bool* more_pending)); - MOCK_METHOD0(Close, void()); -}; - -class MockPeerManager : public PeerManager { - public: - MockPeerManager() : PeerManager("", "", nullptr, nullptr, nullptr, nullptr) {} - MOCK_METHOD1(UpdateRaftConfig, Status(const consensus::RaftConfigPB& config)); - MOCK_METHOD1(SignalRequest, void(bool force_if_queue_empty)); - MOCK_METHOD0(Close, void()); -}; - -class RaftConsensusSpy : public RaftConsensus { - public: - typedef Callback<Status(const scoped_refptr<ConsensusRound>& round)> AppendCallback; - - RaftConsensusSpy(const ConsensusOptions& options, - unique_ptr<ConsensusMetadata> cmeta, - gscoped_ptr<PeerProxyFactory> proxy_factory, - gscoped_ptr<PeerMessageQueue> queue, - gscoped_ptr<PeerManager> peer_manager, - gscoped_ptr<ThreadPool> thread_pool, - const scoped_refptr<MetricEntity>& metric_entity, - const std::string& peer_uuid, - const scoped_refptr<server::Clock>& clock, - ReplicaTransactionFactory* txn_factory, - const scoped_refptr<log::Log>& log, - const shared_ptr<MemTracker>& parent_mem_tracker, - const Callback<void(const std::string& reason)>& mark_dirty_clbk) - : RaftConsensus(options, - std::move(cmeta), - std::move(proxy_factory), - std::move(queue), - std::move(peer_manager), - std::move(thread_pool), - metric_entity, - peer_uuid, - clock, - txn_factory, - log, - parent_mem_tracker, - mark_dirty_clbk) { - // These "aliases" allow us to count invocations and assert on them. - ON_CALL(*this, StartConsensusOnlyRoundUnlocked(_)) - .WillByDefault(Invoke(this, - &RaftConsensusSpy::StartNonLeaderConsensusRoundUnlockedConcrete)); - ON_CALL(*this, NonTxRoundReplicationFinished(_, _, _)) - .WillByDefault(Invoke(this, &RaftConsensusSpy::NonTxRoundReplicationFinishedConcrete)); - } - - MOCK_METHOD1(AppendNewRoundToQueueUnlocked, Status(const scoped_refptr<ConsensusRound>& round)); - Status AppendNewRoundToQueueUnlockedConcrete(const scoped_refptr<ConsensusRound>& round) { - return RaftConsensus::AppendNewRoundToQueueUnlocked(round); - } - - MOCK_METHOD1(StartConsensusOnlyRoundUnlocked, Status(const ReplicateRefPtr& msg)); - Status StartNonLeaderConsensusRoundUnlockedConcrete(const ReplicateRefPtr& msg) { - return RaftConsensus::StartConsensusOnlyRoundUnlocked(msg); - } - - MOCK_METHOD3(NonTxRoundReplicationFinished, void(ConsensusRound* round, - const StatusCallback& client_cb, - const Status& status)); - void NonTxRoundReplicationFinishedConcrete(ConsensusRound* round, - const StatusCallback& client_cb, - const Status& status) { - LOG(INFO) << "Committing round with opid " << round->id() - << " given Status " << status.ToString(); - RaftConsensus::NonTxRoundReplicationFinished(round, client_cb, status); - } - - private: - DISALLOW_COPY_AND_ASSIGN(RaftConsensusSpy); -}; - -void DoNothing(const string& s) { -} - -class RaftConsensusTest : public KuduTest { - public: - RaftConsensusTest() - : clock_(server::LogicalClock::CreateStartingAt(Timestamp(0))), - metric_entity_(METRIC_ENTITY_tablet.Instantiate(&metric_registry_, "raft-consensus-test")), - schema_(GetSimpleTestSchema()) { - FLAGS_enable_leader_failure_detection = false; - options_.tablet_id = kTestTablet; - } - - virtual void SetUp() OVERRIDE { - LogOptions options; - string test_path = GetTestPath("test-peer-root"); - - // TODO mock the Log too, since we're gonna mock the queue - // monitors and pretty much everything else. - fs_manager_.reset(new FsManager(env_.get(), test_path)); - CHECK_OK(fs_manager_->CreateInitialFileSystemLayout()); - CHECK_OK(fs_manager_->Open()); - CHECK_OK(Log::Open(LogOptions(), - fs_manager_.get(), - kTestTablet, - schema_, - 0, // schema_version - nullptr, - &log_)); - - queue_ = new MockQueue(metric_entity_, log_.get()); - peer_manager_ = new MockPeerManager; - txn_factory_.reset(new MockTransactionFactory); - - ON_CALL(*queue_, AppendOperationsMock(_, _)) - .WillByDefault(Invoke(this, &RaftConsensusTest::AppendToLog)); - } - - void SetUpConsensus(int64_t initial_term = consensus::kMinimumTerm, int num_peers = 1) { - config_ = BuildRaftConfigPBForTests(num_peers); - config_.set_opid_index(kInvalidOpIdIndex); - - gscoped_ptr<PeerProxyFactory> proxy_factory(new LocalTestPeerProxyFactory(nullptr)); - - string peer_uuid = config_.peers(num_peers - 1).permanent_uuid(); - - unique_ptr<ConsensusMetadata> cmeta; - CHECK_OK(ConsensusMetadata::Create(fs_manager_.get(), kTestTablet, peer_uuid, - config_, initial_term, &cmeta)); - - gscoped_ptr<ThreadPool> thread_pool; - CHECK_OK(ThreadPoolBuilder("raft-pool") .Build(&thread_pool)); - - consensus_.reset(new RaftConsensusSpy(options_, - std::move(cmeta), - std::move(proxy_factory), - gscoped_ptr<PeerMessageQueue>(queue_), - gscoped_ptr<PeerManager>(peer_manager_), - std::move(thread_pool), - metric_entity_, - peer_uuid, - clock_, - txn_factory_.get(), - log_.get(), - MemTracker::GetRootTracker(), - Bind(&DoNothing))); - - ON_CALL(*consensus_.get(), AppendNewRoundToQueueUnlocked(_)) - .WillByDefault(Invoke(this, &RaftConsensusTest::MockAppendNewRound)); - } - - Status AppendToLog(const vector<ReplicateRefPtr>& msgs, - const StatusCallback& callback) { - return log_->AsyncAppendReplicates(msgs, - Bind(LogAppendCallback, callback)); - } - - static void LogAppendCallback(const StatusCallback& callback, - const Status& s) { - CHECK_OK(s); - callback.Run(s); - } - - Status MockAppendNewRound(const scoped_refptr<ConsensusRound>& round) { - rounds_.push_back(round); - RETURN_NOT_OK(consensus_->AppendNewRoundToQueueUnlockedConcrete(round)); - LOG(INFO) << "Round append: " << round->id() << ", ReplicateMsg: " - << round->replicate_msg()->ShortDebugString(); - return Status::OK(); - } - - void SetUpGeneralExpectations() { - EXPECT_CALL(*peer_manager_, SignalRequest(_)) - .Times(AnyNumber()); - EXPECT_CALL(*peer_manager_, Close()) - .Times(AtLeast(1)); - EXPECT_CALL(*queue_, Close()) - .Times(1); - EXPECT_CALL(*consensus_.get(), AppendNewRoundToQueueUnlocked(_)) - .Times(AnyNumber()); - } - - // Create a ConsensusRequestPB suitable to send to a peer. - ConsensusRequestPB MakeConsensusRequest(int64_t caller_term, - const string& caller_uuid, - const OpId& preceding_opid); - - // Add a single no-op with the given OpId to a ConsensusRequestPB. - void AddNoOpToConsensusRequest(ConsensusRequestPB* request, const OpId& noop_opid); - - scoped_refptr<ConsensusRound> AppendNoOpRound() { - ReplicateRefPtr replicate_ptr(make_scoped_refptr_replicate(new ReplicateMsg)); - replicate_ptr->get()->set_op_type(NO_OP); - replicate_ptr->get()->set_timestamp(clock_->Now().ToUint64()); - scoped_refptr<ConsensusRound> round(new ConsensusRound(consensus_.get(), replicate_ptr)); - round->SetConsensusReplicatedCallback( - Bind(&RaftConsensusSpy::NonTxRoundReplicationFinished, - Unretained(consensus_.get()), Unretained(round.get()), Bind(&DoNothingStatusCB))); - - CHECK_OK(consensus_->Replicate(round)); - LOG(INFO) << "Appended NO_OP round with opid " << round->id(); - return round; - } - - void DumpRounds() { - LOG(INFO) << "Dumping rounds..."; - for (const scoped_refptr<ConsensusRound>& round : rounds_) { - LOG(INFO) << "Round: OpId " << round->id() << ", ReplicateMsg: " - << round->replicate_msg()->ShortDebugString(); - } - } - - protected: - ConsensusOptions options_; - RaftConfigPB config_; - OpId initial_id_; - gscoped_ptr<FsManager> fs_manager_; - scoped_refptr<Log> log_; - gscoped_ptr<PeerProxyFactory> proxy_factory_; - scoped_refptr<server::Clock> clock_; - MetricRegistry metric_registry_; - scoped_refptr<MetricEntity> metric_entity_; - const Schema schema_; - scoped_refptr<RaftConsensusSpy> consensus_; - - vector<scoped_refptr<ConsensusRound> > rounds_; - - // Mocks. - // NOTE: both 'queue_' and 'peer_manager_' belong to 'consensus_' and may be deleted before - // the test is. - MockQueue* queue_; - MockPeerManager* peer_manager_; - gscoped_ptr<MockTransactionFactory> txn_factory_; -}; - -ConsensusRequestPB RaftConsensusTest::MakeConsensusRequest(int64_t caller_term, - const string& caller_uuid, - const OpId& preceding_opid) { - ConsensusRequestPB request; - request.set_caller_term(caller_term); - request.set_caller_uuid(caller_uuid); - request.set_tablet_id(kTestTablet); - request.set_all_replicated_index(0); - *request.mutable_preceding_id() = preceding_opid; - return request; -} - -void RaftConsensusTest::AddNoOpToConsensusRequest(ConsensusRequestPB* request, - const OpId& noop_opid) { - ReplicateMsg* noop_msg = request->add_ops(); - *noop_msg->mutable_id() = noop_opid; - noop_msg->set_op_type(NO_OP); - noop_msg->set_timestamp(clock_->Now().ToUint64()); - noop_msg->mutable_noop_request(); -} - -// Asserts that a ConsensusRound has an OpId set in its ReplicateMsg. -MATCHER(HasOpId, "") { return arg->id().IsInitialized(); } - -// These matchers assert that a Status object is of a certain type. -MATCHER(IsOk, "") { return arg.ok(); } -MATCHER(IsAborted, "") { return arg.IsAborted(); } - -// Tests that consensus is able to handle pending operations. It tests this in two ways: -// - It tests that consensus does the right thing with pending transactions from the the WAL. -// - It tests that when a follower gets promoted to leader it does the right thing -// with the pending operations. -TEST_F(RaftConsensusTest, TestPendingTransactions) { - SetUpConsensus(10); - - // Emulate a stateful system by having a bunch of operations in flight when consensus starts. - // Specifically we emulate we're on term 10, with 5 operations before the last known - // committed operation, 10.104, which should be committed immediately, and 5 operations after the - // last known committed operation, which should be pending but not yet committed. - ConsensusBootstrapInfo info; - info.last_id.set_term(10); - for (int i = 0; i < 10; i++) { - auto replicate = new ReplicateMsg(); - replicate->set_op_type(NO_OP); - info.last_id.set_index(100L + i); - replicate->mutable_id()->CopyFrom(info.last_id); - info.orphaned_replicates.push_back(replicate); - } - - info.last_committed_id.set_term(10); - info.last_committed_id.set_index(104); - - { - InSequence dummy; - // On start we expect 10 NO_OPs to be enqueued, with 5 of those having - // their commit continuation called immediately. - EXPECT_CALL(*consensus_.get(), StartConsensusOnlyRoundUnlocked(_)) - .Times(10); - - // Queue gets initted when the peer starts. - EXPECT_CALL(*queue_, Init(_)) - .Times(1); - } - - ASSERT_OK(consensus_->Start(info)); - - ASSERT_TRUE(testing::Mock::VerifyAndClearExpectations(queue_)); - ASSERT_TRUE(testing::Mock::VerifyAndClearExpectations(txn_factory_.get())); - ASSERT_TRUE(testing::Mock::VerifyAndClearExpectations(peer_manager_)); - ASSERT_TRUE(testing::Mock::VerifyAndClearExpectations(consensus_.get())); - // Now we test what this peer does with the pending operations once it's elected leader. - { - InSequence dummy; - // Peer manager gets updated with the new set of peers to send stuff to. - EXPECT_CALL(*peer_manager_, UpdateRaftConfig(_)) - .Times(1).WillOnce(Return(Status::OK())); - // The no-op should be appended to the queue. - // One more op will be appended for the election. - EXPECT_CALL(*consensus_.get(), AppendNewRoundToQueueUnlocked(_)) - .Times(1); - EXPECT_CALL(*queue_, AppendOperationsMock(_, _)) - .Times(1).WillRepeatedly(Return(Status::OK()));; - } - - // Emulate an election, this will make this peer become leader and trigger the - // above set expectations. - ASSERT_OK(consensus_->EmulateElection()); - - ASSERT_TRUE(testing::Mock::VerifyAndClearExpectations(queue_)); - ASSERT_TRUE(testing::Mock::VerifyAndClearExpectations(txn_factory_.get())); - ASSERT_TRUE(testing::Mock::VerifyAndClearExpectations(peer_manager_)); - - // Commit the 5 no-ops from the previous term, along with the one pushed to - // assert leadership. - EXPECT_CALL(*consensus_.get(), NonTxRoundReplicationFinished(HasOpId(), _, IsOk())) - .Times(6); - EXPECT_CALL(*peer_manager_, SignalRequest(_)) - .Times(AnyNumber()); - // In the end peer manager and the queue get closed. - EXPECT_CALL(*peer_manager_, Close()) - .Times(AtLeast(1)); - EXPECT_CALL(*queue_, Close()) - .Times(1); - - // Now mark the last operation (the no-op round) as committed. - // This should advance the committed index, since that round in on our current term, - // and we should be able to commit all previous rounds. - int64_t cc_round_index = info.orphaned_replicates.back()->id().index() + 1; - consensus_->NotifyCommitIndex(cc_round_index); -} - -MATCHER_P2(RoundHasOpId, term, index, "") { - LOG(INFO) << "expected: " << MakeOpId(term, index) << ", actual: " << arg->id(); - return arg->id().term() == term && arg->id().index() == index; -} - -MATCHER_P2(EqOpId, term, index, "") { - return arg.term() == term && arg.index() == index; -} - -// Tests the case where a a leader is elected and pushed a sequence of -// operations of which some never get committed. Eventually a new leader in a higher -// term pushes operations that overwrite some of the original indexes. -TEST_F(RaftConsensusTest, TestAbortOperations) { - SetUpConsensus(1, 2); - - EXPECT_CALL(*consensus_.get(), AppendNewRoundToQueueUnlocked(_)) - .Times(AnyNumber()); - - EXPECT_CALL(*peer_manager_, SignalRequest(_)) - .Times(AnyNumber()); - EXPECT_CALL(*peer_manager_, Close()) - .Times(AtLeast(1)); - EXPECT_CALL(*queue_, Close()) - .Times(1); - EXPECT_CALL(*queue_, Init(_)) - .Times(1); - EXPECT_CALL(*peer_manager_, UpdateRaftConfig(_)) - .Times(1) - .WillRepeatedly(Return(Status::OK())); - - // We'll append to the queue 12 times, the initial noop txn + 10 initial ops while leader - // and the new leader's update, when we're overwriting operations. - EXPECT_CALL(*queue_, AppendOperationsMock(_, _)) - .Times(12); - - // .. but those will be overwritten later by another - // leader, which will push and commit 5 ops. - // Only these five should start as replica rounds. - EXPECT_CALL(*consensus_.get(), StartConsensusOnlyRoundUnlocked(_)) - .Times(4); - - ConsensusBootstrapInfo info; - ASSERT_OK(consensus_->Start(info)); - ASSERT_OK(consensus_->EmulateElection()); - - // Append 10 rounds: 2.2 - 2.11 - for (int i = 0; i < 10; i++) { - AppendNoOpRound(); - } - - // Expectations for what gets committed and what gets aborted: - // (note: the aborts may be triggered before the commits) - // 5 OK's for the 2.1-2.5 ops. - // 6 Aborts for the 2.6-2.11 ops. - // 1 OK for the 3.6 op. - for (int index = 1; index < 6; index++) { - EXPECT_CALL(*consensus_.get(), - NonTxRoundReplicationFinished(RoundHasOpId(2, index), _, IsOk())).Times(1); - } - for (int index = 6; index < 12; index++) { - EXPECT_CALL(*consensus_.get(), - NonTxRoundReplicationFinished(RoundHasOpId(2, index), _, IsAborted())).Times(1); - } - EXPECT_CALL(*consensus_.get(), - NonTxRoundReplicationFinished(RoundHasOpId(3, 6), _, IsOk())).Times(1); - EXPECT_CALL(*queue_, TruncateOpsAfter(EqOpId(2, 5))).Times(1); - - // Nothing's committed so far, so now just send an Update() message - // emulating another guy got elected leader and is overwriting a suffix - // of the previous messages. - // In particular this request has: - // - Op 2.5 from the previous leader's term - // - Ops 3.6-3.9 from the new leader's term - // - A new committed index of 3.6 - ConsensusRequestPB request; - request.set_caller_term(3); - const string PEER_0_UUID = "peer-0"; - request.set_caller_uuid(PEER_0_UUID); - request.set_tablet_id(kTestTablet); - request.set_all_replicated_index(0); - request.mutable_preceding_id()->CopyFrom(MakeOpId(2, 4)); - - ReplicateMsg* replicate = request.add_ops(); - replicate->mutable_id()->CopyFrom(MakeOpId(2, 5)); - replicate->set_op_type(NO_OP); - - ReplicateMsg* noop_msg = request.add_ops(); - noop_msg->mutable_id()->CopyFrom(MakeOpId(3, 6)); - noop_msg->set_op_type(NO_OP); - noop_msg->set_timestamp(clock_->Now().ToUint64()); - noop_msg->mutable_noop_request(); - - // Overwrite another 3 of the original rounds for a total of 4 overwrites. - for (int i = 7; i < 10; i++) { - ReplicateMsg* replicate = request.add_ops(); - replicate->mutable_id()->CopyFrom(MakeOpId(3, i)); - replicate->set_op_type(NO_OP); - replicate->set_timestamp(clock_->Now().ToUint64()); - } - - request.set_committed_index(6); - - ConsensusResponsePB response; - ASSERT_OK(consensus_->Update(&request, &response)); - ASSERT_FALSE(response.has_error()); - - ASSERT_TRUE(Mock::VerifyAndClearExpectations(consensus_.get())); - - // Now we expect to commit ops 3.7 - 3.9. - for (int index = 7; index < 10; index++) { - EXPECT_CALL(*consensus_.get(), - NonTxRoundReplicationFinished(RoundHasOpId(3, index), _, IsOk())).Times(1); - } - - request.mutable_ops()->Clear(); - request.mutable_preceding_id()->CopyFrom(MakeOpId(3, 9)); - request.set_committed_index(9); - - ASSERT_OK(consensus_->Update(&request, &response)); - ASSERT_FALSE(response.has_error()); -} - -TEST_F(RaftConsensusTest, TestReceivedIdIsInittedBeforeStart) { - SetUpConsensus(); - OpId opid; - ASSERT_OK(consensus_->GetLastOpId(RECEIVED_OPID, &opid)); - ASSERT_TRUE(opid.IsInitialized()); - ASSERT_OPID_EQ(opid, MinimumOpId()); -} - -// Ensure that followers reset their "last_received_current_leader" -// ConsensusStatusPB field when a new term is encountered. This is a -// correctness test for the logic on the follower side that allows the -// leader-side queue to determine which op to send next in various scenarios. -TEST_F(RaftConsensusTest, TestResetRcvdFromCurrentLeaderOnNewTerm) { - SetUpConsensus(kMinimumTerm, 3); - SetUpGeneralExpectations(); - ConsensusBootstrapInfo info; - ASSERT_OK(consensus_->Start(info)); - - ConsensusRequestPB request; - ConsensusResponsePB response; - int64_t caller_term = 0; - int64_t log_index = 0; - - caller_term = 1; - string caller_uuid = config_.peers(0).permanent_uuid(); - OpId preceding_opid = MinimumOpId(); - - // Heartbeat. This will cause the term to increment on the follower. - request = MakeConsensusRequest(caller_term, caller_uuid, preceding_opid); - response.Clear(); - ASSERT_OK(consensus_->Update(&request, &response)); - ASSERT_FALSE(response.status().has_error()) << response.ShortDebugString(); - ASSERT_EQ(caller_term, response.responder_term()); - ASSERT_OPID_EQ(response.status().last_received(), MinimumOpId()); - ASSERT_OPID_EQ(response.status().last_received_current_leader(), MinimumOpId()); - - // Replicate a no-op. - OpId noop_opid = MakeOpId(caller_term, ++log_index); - AddNoOpToConsensusRequest(&request, noop_opid); - response.Clear(); - ASSERT_OK(consensus_->Update(&request, &response)); - ASSERT_FALSE(response.status().has_error()) << response.ShortDebugString(); - ASSERT_OPID_EQ(response.status().last_received(), noop_opid); - ASSERT_OPID_EQ(response.status().last_received_current_leader(), noop_opid); - - // New leader heartbeat. Term increase to 2. - // The preceding_opid is the no-op replicated above. This will match on the - // follower side, so it can update its last_received_current_leader to - // the same operation (indicating to the queue that it doesn't need to re-replicate - // this operation). - caller_term = 2; - caller_uuid = config_.peers(1).permanent_uuid(); - preceding_opid = noop_opid; - request = MakeConsensusRequest(caller_term, caller_uuid, preceding_opid); - response.Clear(); - ASSERT_OK(consensus_->Update(&request, &response)); - ASSERT_FALSE(response.status().has_error()) << response.ShortDebugString(); - ASSERT_EQ(caller_term, response.responder_term()); - ASSERT_OPID_EQ(response.status().last_received(), preceding_opid); - ASSERT_OPID_EQ(response.status().last_received_current_leader(), preceding_opid); - - // Append a no-op. - noop_opid = MakeOpId(caller_term, ++log_index); - AddNoOpToConsensusRequest(&request, noop_opid); - response.Clear(); - ASSERT_OK(consensus_->Update(&request, &response)); - ASSERT_FALSE(response.status().has_error()) << response.ShortDebugString(); - ASSERT_OPID_EQ(response.status().last_received(), noop_opid); - ASSERT_OPID_EQ(response.status().last_received_current_leader(), noop_opid); - - // New leader heartbeat. The term should rev but we should get an LMP mismatch. - caller_term = 3; - caller_uuid = config_.peers(0).permanent_uuid(); - preceding_opid = MakeOpId(caller_term, log_index + 1); // Not replicated yet. - request = MakeConsensusRequest(caller_term, caller_uuid, preceding_opid); - response.Clear(); - ASSERT_OK(consensus_->Update(&request, &response)); - ASSERT_EQ(caller_term, response.responder_term()); - ASSERT_OPID_EQ(response.status().last_received(), noop_opid); // Not preceding this time. - ASSERT_OPID_EQ(response.status().last_received_current_leader(), MinimumOpId()); - ASSERT_TRUE(response.status().has_error()) << response.ShortDebugString(); - ASSERT_EQ(ConsensusErrorPB::PRECEDING_ENTRY_DIDNT_MATCH, response.status().error().code()); - - // Decrement preceding and append a no-op. - preceding_opid = MakeOpId(2, log_index); - noop_opid = MakeOpId(caller_term, ++log_index); - request = MakeConsensusRequest(caller_term, caller_uuid, preceding_opid); - AddNoOpToConsensusRequest(&request, noop_opid); - response.Clear(); - ASSERT_OK(consensus_->Update(&request, &response)); - ASSERT_FALSE(response.status().has_error()) << response.ShortDebugString(); - ASSERT_OPID_EQ(response.status().last_received(), noop_opid) << response.ShortDebugString(); - ASSERT_OPID_EQ(response.status().last_received_current_leader(), noop_opid) - << response.ShortDebugString(); - - // Happy case. New leader with new no-op to append right off the bat. - // Response should be OK with all last_received* fields equal to the new no-op. - caller_term = 4; - caller_uuid = config_.peers(1).permanent_uuid(); - preceding_opid = noop_opid; - noop_opid = MakeOpId(caller_term, ++log_index); - request = MakeConsensusRequest(caller_term, caller_uuid, preceding_opid); - AddNoOpToConsensusRequest(&request, noop_opid); - response.Clear(); - ASSERT_OK(consensus_->Update(&request, &response)); - ASSERT_FALSE(response.status().has_error()) << response.ShortDebugString(); - ASSERT_EQ(caller_term, response.responder_term()); - ASSERT_OPID_EQ(response.status().last_received(), noop_opid); - ASSERT_OPID_EQ(response.status().last_received_current_leader(), noop_opid); -} - -} // namespace consensus -} // namespace kudu http://git-wip-us.apache.org/repos/asf/kudu/blob/4bcbb4a4/src/kudu/consensus/raft_consensus.cc ---------------------------------------------------------------------- diff --git a/src/kudu/consensus/raft_consensus.cc b/src/kudu/consensus/raft_consensus.cc index aaaf228..0cb724e 100644 --- a/src/kudu/consensus/raft_consensus.cc +++ b/src/kudu/consensus/raft_consensus.cc @@ -231,6 +231,7 @@ RaftConsensus::RaftConsensus( FLAGS_raft_heartbeat_interval_ms * FLAGS_leader_failure_max_missed_heartbeat_periods))), withhold_votes_until_(MonoTime::Min()), + last_received_cur_leader_(MinimumOpId()), mark_dirty_clbk_(std::move(mark_dirty_clbk)), shutdown_(false), follower_memory_pressure_rejections_(metric_entity->FindOrCreateCounter( @@ -283,7 +284,7 @@ Status RaftConsensus::Start(const ConsensusBootstrapInfo& info) { state_->SetInitialCommittedOpIdUnlocked(info.last_committed_id); - queue_->Init(state_->GetLastReceivedOpIdUnlocked()); + queue_->Init(info.last_id); } { @@ -413,7 +414,7 @@ Status RaftConsensus::StartElection(ElectionMode mode) { request.set_candidate_term(state_->GetCurrentTermUnlocked()); request.set_tablet_id(state_->GetOptions().tablet_id); *request.mutable_candidate_status()->mutable_last_received() = - state_->GetLastReceivedOpIdUnlocked(); + queue_->GetLastOpIdInLog(); election.reset(new LeaderElection(active_config, peer_proxy_factory_.get(), @@ -548,7 +549,7 @@ Status RaftConsensus::CheckLeadershipAndBindTerm(const scoped_refptr<ConsensusRo } Status RaftConsensus::AppendNewRoundToQueueUnlocked(const scoped_refptr<ConsensusRound>& round) { - state_->NewIdUnlocked(round->replicate_msg()->mutable_id()); + *round->replicate_msg()->mutable_id() = queue_->GetNextOpId(); RETURN_NOT_OK(state_->AddPendingOperation(round)); Status s = queue_->AppendOperation(round->replicate_scoped_refptr()); @@ -556,23 +557,17 @@ Status RaftConsensus::AppendNewRoundToQueueUnlocked(const scoped_refptr<Consensu // Handle Status::ServiceUnavailable(), which means the queue is full. if (PREDICT_FALSE(s.IsServiceUnavailable())) { gscoped_ptr<OpId> id(round->replicate_msg()->release_id()); - // Rollback the id gen. so that we reuse this id later, when we can - // actually append to the state machine, i.e. this makes the state - // machine have continuous ids, for the same term, even if the queue - // refused to add any more operations. + // Cancel the operation that we started. state_->CancelPendingOperation(*id); LOG_WITH_PREFIX_UNLOCKED(WARNING) << ": Could not append replicate request " << "to the queue. Queue is Full. " << "Queue metrics: " << queue_->ToString(); - - // TODO Possibly evict a dangling peer from the configuration here. - // TODO count of number of ops failed due to consensus queue overflow. + // TODO(todd) count of number of ops failed due to consensus queue overflow. } else if (PREDICT_FALSE(s.IsIOError())) { // This likely came from the log. LOG(FATAL) << "IO error appending to the queue: " << s.ToString(); } RETURN_NOT_OK_PREPEND(s, "Unable to append operation to consensus queue"); - state_->UpdateLastReceivedOpIdUnlocked(round->id()); return Status::OK(); } @@ -754,7 +749,7 @@ void RaftConsensus::DeduplicateLeaderRequestUnlocked(ConsensusRequestPB* rpc_req // The leader's preceding id. deduplicated_req->preceding_opid = &rpc_req->preceding_id(); - int64_t dedup_up_to_index = state_->GetLastReceivedOpIdUnlocked().index(); + int64_t dedup_up_to_index = queue_->GetLastOpIdInLog().index(); deduplicated_req->first_message_idx = -1; @@ -844,7 +839,7 @@ Status RaftConsensus::EnforceLogMatchingPropertyMatchesUnlocked(const LeaderRequ string error_msg = Substitute( "Log matching property violated." " Preceding OpId in replica: $0. Preceding OpId from leader: $1. ($2 mismatch)", - state_->GetLastReceivedOpIdUnlocked().ShortDebugString(), + queue_->GetLastOpIdInLog().ShortDebugString(), req.preceding_opid->ShortDebugString(), term_mismatch ? "term" : "index"); @@ -875,10 +870,7 @@ Status RaftConsensus::EnforceLogMatchingPropertyMatchesUnlocked(const LeaderRequ void RaftConsensus::TruncateAndAbortOpsAfterUnlocked(int64_t truncate_after_index) { state_->AbortOpsAfterUnlocked(truncate_after_index); - // Above resets the 'last received' to the operation with index 'truncate_after_index'. - OpId new_last_received = state_->GetLastReceivedOpIdUnlocked(); - DCHECK_EQ(truncate_after_index, new_last_received.index()); - queue_->TruncateOpsAfter(new_last_received); + queue_->TruncateOpsAfter(truncate_after_index); } Status RaftConsensus::CheckLeaderRequestUnlocked(const ConsensusRequestPB* request, @@ -1215,16 +1207,10 @@ Status RaftConsensus::UpdateReplica(const ConsensusRequestPB* request, CHECK_OK(state_->AdvanceCommittedIndexUnlocked(apply_up_to)); queue_->UpdateFollowerWatermarks(apply_up_to, request->all_replicated_index()); - // We can now update the last received watermark. - // - // We do it here (and before we actually hear back from the wal whether things - // are durable) so that, if we receive another, possible duplicate, message - // that exercises this path we don't handle these messages twice. - // // If any messages failed to be started locally, then we already have removed them - // from 'deduped_req' at this point. So, we can simply update our last-received - // watermark to the last message that remains in 'deduped_req'. - state_->UpdateLastReceivedOpIdUnlocked(last_from_leader); + // from 'deduped_req' at this point. So, 'last_from_leader' is the last one that + // we might apply. + last_received_cur_leader_ = last_from_leader; // Fill the response with the current state. We will not mutate anymore state until // we actually reply to the leader, we'll just wait for the messages to be durable. @@ -1269,12 +1255,11 @@ void RaftConsensus::FillConsensusResponseOKUnlocked(ConsensusResponsePB* respons TRACE("Filling consensus response to leader."); response->set_responder_term(state_->GetCurrentTermUnlocked()); response->mutable_status()->mutable_last_received()->CopyFrom( - state_->GetLastReceivedOpIdUnlocked()); + queue_->GetLastOpIdInLog()); response->mutable_status()->mutable_last_received_current_leader()->CopyFrom( - state_->GetLastReceivedOpIdCurLeaderUnlocked()); - // TODO: interrogate queue rather than state? + last_received_cur_leader_); response->mutable_status()->set_last_committed_idx( - state_->GetCommittedIndexUnlocked()); + queue_->GetCommittedIndex()); } void RaftConsensus::FillConsensusResponseError(ConsensusResponsePB* response, @@ -1879,7 +1864,7 @@ Status RaftConsensus::GetLastOpId(OpIdType type, OpId* id) { ReplicaState::UniqueLock lock; RETURN_NOT_OK(state_->LockForRead(&lock)); if (type == RECEIVED_OPID) { - *DCHECK_NOTNULL(id) = state_->GetLastReceivedOpIdUnlocked(); + *DCHECK_NOTNULL(id) = queue_->GetLastOpIdInLog(); } else if (type == COMMITTED_OPID) { id->set_term(state_->GetTermWithLastCommittedOpUnlocked()); id->set_index(state_->GetCommittedIndexUnlocked()); @@ -2038,6 +2023,7 @@ Status RaftConsensus::HandleTermAdvanceUnlocked(ConsensusTerm new_term, LOG_WITH_PREFIX_UNLOCKED(INFO) << "Advancing to term " << new_term; RETURN_NOT_OK(state_->SetCurrentTermUnlocked(new_term, flush)); term_metric_->set_value(new_term); + last_received_cur_leader_ = MinimumOpId(); return Status::OK(); } http://git-wip-us.apache.org/repos/asf/kudu/blob/4bcbb4a4/src/kudu/consensus/raft_consensus.h ---------------------------------------------------------------------- diff --git a/src/kudu/consensus/raft_consensus.h b/src/kudu/consensus/raft_consensus.h index ec48edb..887201b 100644 --- a/src/kudu/consensus/raft_consensus.h +++ b/src/kudu/consensus/raft_consensus.h @@ -459,9 +459,14 @@ class RaftConsensus : public Consensus, // nodes from disturbing the healthy leader. MonoTime withhold_votes_until_; + // The last OpId received from the current leader. This is updated whenever the follower + // accepts operations from a leader, and passed back so that the leader knows from what + // point to continue sending operations. + OpId last_received_cur_leader_; + const Callback<void(const std::string& reason)> mark_dirty_clbk_; - // TODO hack to serialize updates due to repeated/out-of-order messages + // TODO(dralves) hack to serialize updates due to repeated/out-of-order messages // should probably be refactored out. // // Lock ordering note: If both this lock and the ReplicaState lock are to be http://git-wip-us.apache.org/repos/asf/kudu/blob/4bcbb4a4/src/kudu/consensus/raft_consensus_quorum-test.cc ---------------------------------------------------------------------- diff --git a/src/kudu/consensus/raft_consensus_quorum-test.cc b/src/kudu/consensus/raft_consensus_quorum-test.cc index 04e894a..9d63266 100644 --- a/src/kudu/consensus/raft_consensus_quorum-test.cc +++ b/src/kudu/consensus/raft_consensus_quorum-test.cc @@ -285,14 +285,9 @@ class RaftConsensusQuorumTest : public KuduTest { void WaitForReplicateIfNotAlreadyPresent(const OpId& to_wait_for, int peer_idx) { scoped_refptr<RaftConsensus> peer; CHECK_OK(peers_->GetPeerByIdx(peer_idx, &peer)); - ReplicaState* state = peer->GetReplicaStateForTests(); while (true) { - { - ReplicaState::UniqueLock lock; - CHECK_OK(state->LockForRead(&lock)); - if (OpIdCompare(state->GetLastReceivedOpIdUnlocked(), to_wait_for) >= 0) { - return; - } + if (OpIdCompare(peer->queue_->GetLastOpIdInLog(), to_wait_for) >= 0) { + return; } SleepFor(MonoDelta::FromMilliseconds(1)); } http://git-wip-us.apache.org/repos/asf/kudu/blob/4bcbb4a4/src/kudu/consensus/raft_consensus_state.cc ---------------------------------------------------------------------- diff --git a/src/kudu/consensus/raft_consensus_state.cc b/src/kudu/consensus/raft_consensus_state.cc index a212fad..f99fb28 100644 --- a/src/kudu/consensus/raft_consensus_state.cc +++ b/src/kudu/consensus/raft_consensus_state.cc @@ -48,10 +48,7 @@ ReplicaState::ReplicaState(ConsensusOptions options, string peer_uuid, : options_(std::move(options)), peer_uuid_(std::move(peer_uuid)), cmeta_(std::move(cmeta)), - next_index_(0), txn_factory_(txn_factory), - last_received_op_id_(MinimumOpId()), - last_received_op_id_current_leader_(MinimumOpId()), last_committed_op_id_(MinimumOpId()), state_(kInitialized) { CHECK(cmeta_) << "ConsensusMeta passed as NULL"; @@ -71,9 +68,6 @@ Status ReplicaState::StartUnlocked(const OpId& last_id_in_wal) { GetCurrentTermUnlocked())); } - next_index_ = last_id_in_wal.index() + 1; - last_received_op_id_.CopyFrom(last_id_in_wal); - state_ = kRunning; return Status::OK(); } @@ -266,13 +260,11 @@ bool ReplicaState::IsOpCommittedOrPending(const OpId& op_id, bool* term_mismatch return true; } - if (op_id.index() > GetLastReceivedOpIdUnlocked().index()) { + scoped_refptr<ConsensusRound> round = GetPendingOpByIndexOrNullUnlocked(op_id.index()); + if (!round) { return false; } - scoped_refptr<ConsensusRound> round = GetPendingOpByIndexOrNullUnlocked(op_id.index()); - DCHECK(round); - if (round->id().term() != op_id.term()) { *term_mismatch = true; return false; @@ -296,7 +288,6 @@ Status ReplicaState::SetCurrentTermUnlocked(int64_t new_term, CHECK_OK(cmeta_->Flush()); } ClearLeaderUnlocked(); - last_received_op_id_current_leader_ = MinimumOpId(); return Status::OK(); } @@ -405,12 +396,6 @@ void ReplicaState::AbortOpsAfterUnlocked(int64_t index) { new_preceding = last_committed_op_id_; } - // This is the same as UpdateLastReceivedOpIdUnlocked() but we do it - // here to avoid the bounds check, since we're breaking monotonicity. - last_received_op_id_ = new_preceding; - last_received_op_id_current_leader_ = last_received_op_id_; - next_index_ = new_preceding.index() + 1; - for (; iter != pending_txns_.end();) { const scoped_refptr<ConsensusRound>& round = (*iter).second; auto op_type = round->replicate_msg()->op_type(); @@ -611,53 +596,19 @@ Status ReplicaState::CheckHasCommittedOpInCurrentTermUnlocked() const { return Status::OK(); } -void ReplicaState::UpdateLastReceivedOpIdUnlocked(const OpId& op_id) { - DCHECK(update_lock_.is_locked()); - if (OpIdCompare(op_id, last_received_op_id_) > 0) { - TRACE("Updating last received op as $0", OpIdToString(op_id)); - last_received_op_id_ = op_id; - next_index_ = op_id.index() + 1; - } - last_received_op_id_current_leader_ = op_id; -} - -const OpId& ReplicaState::GetLastReceivedOpIdUnlocked() const { - DCHECK(update_lock_.is_locked()); - return last_received_op_id_; -} - -const OpId& ReplicaState::GetLastReceivedOpIdCurLeaderUnlocked() const { - DCHECK(update_lock_.is_locked()); - return last_received_op_id_current_leader_; -} - OpId ReplicaState::GetLastPendingTransactionOpIdUnlocked() const { DCHECK(update_lock_.is_locked()); return pending_txns_.empty() ? MinimumOpId() : (--pending_txns_.end())->second->id(); } -void ReplicaState::NewIdUnlocked(OpId* id) { - DCHECK(update_lock_.is_locked()); - id->set_term(GetCurrentTermUnlocked()); - id->set_index(next_index_++); -} void ReplicaState::CancelPendingOperation(const OpId& id) { OpId previous = id; previous.set_index(previous.index() - 1); DCHECK(update_lock_.is_locked()); CHECK_EQ(GetCurrentTermUnlocked(), id.term()); - CHECK_EQ(next_index_, id.index() + 1); - next_index_ = id.index(); - - // We don't use UpdateLastReceivedOpIdUnlocked because we're actually - // updating it back to a lower value and we need to avoid the checks - // that method has. - // This is only ok if we do _not_ release the lock after calling - // NewIdUnlocked() (which we don't in RaftConsensus::Replicate()). - last_received_op_id_ = previous; scoped_refptr<ConsensusRound> round = EraseKeyReturnValuePtr(&pending_txns_, id.index()); DCHECK(round); } @@ -696,10 +647,9 @@ string ReplicaState::ToString() const { string ReplicaState::ToStringUnlocked() const { DCHECK(update_lock_.is_locked()); - return Substitute("Replica: $0, State: $1, Role: $2, Watermarks: {Received: $3, Committed: $4}", + return Substitute("Replica: $0, State: $1, Role: $2, Last Committed: $3", peer_uuid_, state_, RaftPeerPB::Role_Name(GetActiveRoleUnlocked()), - last_received_op_id_.ShortDebugString(), - last_committed_op_id_.ShortDebugString()); + OpIdToString(last_committed_op_id_)); } Status ReplicaState::CheckOpInSequence(const OpId& previous, const OpId& current) { http://git-wip-us.apache.org/repos/asf/kudu/blob/4bcbb4a4/src/kudu/consensus/raft_consensus_state.h ---------------------------------------------------------------------- diff --git a/src/kudu/consensus/raft_consensus_state.h b/src/kudu/consensus/raft_consensus_state.h index e27b7b3..d5d6fe2 100644 --- a/src/kudu/consensus/raft_consensus_state.h +++ b/src/kudu/consensus/raft_consensus_state.h @@ -269,19 +269,6 @@ class ReplicaState { // Returns OK iff an op from the current term has been committed. Status CheckHasCommittedOpInCurrentTermUnlocked() const; - // Updates the last received operation, if 'op_id''s index is higher than - // the previous last received. Also updates 'last_received_from_current_leader_' - // regardless of whether it is higher or lower than the prior value. - // - // This must be called under a lock. - void UpdateLastReceivedOpIdUnlocked(const OpId& op_id); - - // Returns the last received op id. This must be called under the lock. - const OpId& GetLastReceivedOpIdUnlocked() const; - - // Returns the id of the last op received from the current leader. - const OpId& GetLastReceivedOpIdCurLeaderUnlocked() const; - // Returns the id of the latest pending transaction (i.e. the one with the // latest index). This must be called under the lock. OpId GetLastPendingTransactionOpIdUnlocked() const; @@ -291,8 +278,6 @@ class ReplicaState { // to complete. This does not cancel transactions being applied. Status CancelPendingTransactions(); - void NewIdUnlocked(OpId* id); - // Used when, for some reason, an operation that failed before it could be considered // a part of the state machine. Basically restores the id gen to the state it was before // generating 'id'. @@ -338,10 +323,6 @@ class ReplicaState { // Consensus metadata persistence object. std::unique_ptr<ConsensusMetadata> cmeta_; - // Used by the LEADER. This is the index of the next operation generated - // by this LEADER. - int64_t next_index_; - // Index=>Round map that manages pending ops, i.e. operations for which we've // received a replicate message from the leader but have yet to be committed. // The key is the index of the replicate operation. @@ -351,22 +332,6 @@ class ReplicaState { // this factory to start it. ReplicaTransactionFactory* txn_factory_; - // The id of the last received operation, which corresponds to the last entry - // written to the local log. Operations whose id is lower than or equal to - // this id do not need to be resent by the leader. This is not guaranteed to - // be monotonically increasing due to the possibility for log truncation and - // aborted operations when a leader change occurs. - OpId last_received_op_id_; - - // Same as last_received_op_id_ but only includes operations sent by the - // current leader. The "term" in this op may not actually match the current - // term, since leaders may replicate ops from prior terms. - // - // As an implementation detail, this field is reset to MinumumOpId() every - // time there is a term advancement on the local node, to simplify the logic - // involved in resetting this every time a new node becomes leader. - OpId last_received_op_id_current_leader_; - // The OpId of the Apply that was last triggered when the last message from the leader // was received. Initialized to MinimumOpId(). // http://git-wip-us.apache.org/repos/asf/kudu/blob/4bcbb4a4/src/kudu/integration-tests/raft_consensus-itest.cc ---------------------------------------------------------------------- diff --git a/src/kudu/integration-tests/raft_consensus-itest.cc b/src/kudu/integration-tests/raft_consensus-itest.cc index 1bbef3c..5d94a9b 100644 --- a/src/kudu/integration-tests/raft_consensus-itest.cc +++ b/src/kudu/integration-tests/raft_consensus-itest.cc @@ -897,10 +897,9 @@ TEST_F(RaftConsensusITest, TestChurnyElections) { } // The same test, except inject artificial latency when propagating notifications -// from the queue back to consensus. This can reproduce bugs like KUDU-1078 which -// normally only appear under high load. TODO: Re-enable once we get to the -// bottom of KUDU-1078. -TEST_F(RaftConsensusITest, DISABLED_TestChurnyElections_WithNotificationLatency) { +// from the queue back to consensus. This previously reproduced bugs like KUDU-1078 which +// normally only appear under high load. +TEST_F(RaftConsensusITest, TestChurnyElections_WithNotificationLatency) { DoTestChurnyElections(WITH_NOTIFICATION_LATENCY); }
