Repository: kudu Updated Branches: refs/heads/master b1f1388e2 -> 46d9ed7aa
http://git-wip-us.apache.org/repos/asf/kudu/blob/ce0fcd4d/src/kudu/consensus/raft_consensus.h ---------------------------------------------------------------------- diff --git a/src/kudu/consensus/raft_consensus.h b/src/kudu/consensus/raft_consensus.h index c971b02..451d5f1 100644 --- a/src/kudu/consensus/raft_consensus.h +++ b/src/kudu/consensus/raft_consensus.h @@ -151,19 +151,23 @@ class RaftConsensus : public Consensus, // can cause consensus to deadlock. ReplicaState* GetReplicaStateForTests(); + virtual Status GetLastOpId(OpIdType type, OpId* id) OVERRIDE; + + + //------------------------------------------------------------ + // PeerMessageQueueObserver implementation + //------------------------------------------------------------ + // Updates the committed_index and triggers the Apply()s for whatever // transactions were pending. // This is idempotent. - void UpdateMajorityReplicated(const OpId& majority_replicated, - OpId* committed_index) OVERRIDE; + void NotifyCommitIndex(int64_t commit_index) override; - virtual void NotifyTermChange(int64_t term) OVERRIDE; + void NotifyTermChange(int64_t term) override; - virtual void NotifyFailedFollower(const std::string& uuid, - int64_t term, - const std::string& reason) OVERRIDE; - - virtual Status GetLastOpId(OpIdType type, OpId* id) OVERRIDE; + void NotifyFailedFollower(const std::string& uuid, + int64_t term, + const std::string& reason) override; protected: // Trigger that a non-Transaction ConsensusRound has finished replication. @@ -347,9 +351,6 @@ class RaftConsensus : public Consensus, Status RequestVoteRespondVoteGranted(const VoteRequestPB* request, VoteResponsePB* response); - void UpdateMajorityReplicatedUnlocked(const OpId& majority_replicated, - OpId* committed_index); - // Callback for leader election driver. ElectionCallback is run on the // reactor thread, so it simply defers its work to DoElectionCallback. void ElectionCallback(const ElectionResult& result); http://git-wip-us.apache.org/repos/asf/kudu/blob/ce0fcd4d/src/kudu/consensus/raft_consensus_quorum-test.cc ---------------------------------------------------------------------- diff --git a/src/kudu/consensus/raft_consensus_quorum-test.cc b/src/kudu/consensus/raft_consensus_quorum-test.cc index ee1a568..2e7a746 100644 --- a/src/kudu/consensus/raft_consensus_quorum-test.cc +++ b/src/kudu/consensus/raft_consensus_quorum-test.cc @@ -301,7 +301,7 @@ class RaftConsensusQuorumTest : public KuduTest { // Waits for an operation to be (database) committed in the replica at index // 'peer_idx'. If the operation was already committed this returns immediately. - void WaitForCommitIfNotAlreadyPresent(const OpId& to_wait_for, + void WaitForCommitIfNotAlreadyPresent(int64_t to_wait_for, int peer_idx, int leader_idx) { MonoDelta timeout(MonoDelta::FromSeconds(10)); @@ -313,13 +313,13 @@ class RaftConsensusQuorumTest : public KuduTest { int backoff_exp = 0; const int kMaxBackoffExp = 8; - OpId committed_op_id; + int64_t committed_op_idx; while (true) { { ReplicaState::UniqueLock lock; CHECK_OK(state->LockForRead(&lock)); - committed_op_id = state->GetCommittedOpIdUnlocked(); - if (OpIdCompare(committed_op_id, to_wait_for) >= 0) { + committed_op_idx = state->GetCommittedIndexUnlocked(); + if (committed_op_idx >= to_wait_for) { return; } } @@ -332,7 +332,7 @@ class RaftConsensusQuorumTest : public KuduTest { LOG(ERROR) << "Max timeout reached (" << timeout.ToString() << ") while waiting for commit of " << "op " << to_wait_for << " on replica. Last committed op on replica: " - << committed_op_id << ". Dumping state and quitting."; + << committed_op_idx << ". Dumping state and quitting."; vector<string> lines; scoped_refptr<RaftConsensus> leader; CHECK_OK(peers_->GetPeerByIdx(leader_idx, &leader)); @@ -608,8 +608,8 @@ TEST_F(RaftConsensusQuorumTest, TestFollowersReplicateAndCommitMessage) { // We thus wait for the commit callback to trigger, ensuring durability // on the leader and then for the commits to be present on the replicas. ASSERT_OK(commit_sync->Wait()); - WaitForCommitIfNotAlreadyPresent(last_op_id, kFollower0Idx, kLeaderIdx); - WaitForCommitIfNotAlreadyPresent(last_op_id, kFollower1Idx, kLeaderIdx); + WaitForCommitIfNotAlreadyPresent(last_op_id.index(), kFollower0Idx, kLeaderIdx); + WaitForCommitIfNotAlreadyPresent(last_op_id.index(), kFollower1Idx, kLeaderIdx); VerifyLogs(2, 0, 1); } @@ -646,8 +646,8 @@ TEST_F(RaftConsensusQuorumTest, TestFollowersReplicateAndCommitSequence) { // See comment at the end of TestFollowersReplicateAndCommitMessage // for an explanation on this waiting sequence. ASSERT_OK(commit_sync->Wait()); - WaitForCommitIfNotAlreadyPresent(last_op_id, kFollower0Idx, kLeaderIdx); - WaitForCommitIfNotAlreadyPresent(last_op_id, kFollower1Idx, kLeaderIdx); + WaitForCommitIfNotAlreadyPresent(last_op_id.index(), kFollower0Idx, kLeaderIdx); + WaitForCommitIfNotAlreadyPresent(last_op_id.index(), kFollower1Idx, kLeaderIdx); VerifyLogs(2, 0, 1); } @@ -686,12 +686,12 @@ TEST_F(RaftConsensusQuorumTest, TestConsensusContinuesIfAMinorityFallsBehind) { // this would hang here). We know he must have replicated but make sure // by calling Wait(). WaitForReplicateIfNotAlreadyPresent(last_replicate, kFollower1Idx); - WaitForCommitIfNotAlreadyPresent(last_replicate, kFollower1Idx, kLeaderIdx); + WaitForCommitIfNotAlreadyPresent(last_replicate.index(), kFollower1Idx, kLeaderIdx); } // After we let the lock go the remaining follower should get up-to-date WaitForReplicateIfNotAlreadyPresent(last_replicate, kFollower0Idx); - WaitForCommitIfNotAlreadyPresent(last_replicate, kFollower0Idx, kLeaderIdx); + WaitForCommitIfNotAlreadyPresent(last_replicate.index(), kFollower0Idx, kLeaderIdx); VerifyLogs(2, 0, 1); } @@ -738,8 +738,8 @@ TEST_F(RaftConsensusQuorumTest, TestConsensusStopsIfAMajorityFallsBehind) { // Assert that everything was ok WaitForReplicateIfNotAlreadyPresent(last_op_id, kFollower0Idx); WaitForReplicateIfNotAlreadyPresent(last_op_id, kFollower1Idx); - WaitForCommitIfNotAlreadyPresent(last_op_id, kFollower0Idx, kLeaderIdx); - WaitForCommitIfNotAlreadyPresent(last_op_id, kFollower1Idx, kLeaderIdx); + WaitForCommitIfNotAlreadyPresent(last_op_id.index(), kFollower0Idx, kLeaderIdx); + WaitForCommitIfNotAlreadyPresent(last_op_id.index(), kFollower1Idx, kLeaderIdx); VerifyLogs(2, 0, 1); } @@ -772,8 +772,8 @@ TEST_F(RaftConsensusQuorumTest, TestReplicasHandleCommunicationErrors) { // The commit should eventually reach both followers as well. last_op_id = round->id(); - WaitForCommitIfNotAlreadyPresent(last_op_id, kFollower0Idx, kLeaderIdx); - WaitForCommitIfNotAlreadyPresent(last_op_id, kFollower1Idx, kLeaderIdx); + WaitForCommitIfNotAlreadyPresent(last_op_id.index(), kFollower0Idx, kLeaderIdx); + WaitForCommitIfNotAlreadyPresent(last_op_id.index(), kFollower1Idx, kLeaderIdx); // Append a sequence of messages, and keep injecting errors into the // replica proxies. @@ -804,8 +804,8 @@ TEST_F(RaftConsensusQuorumTest, TestReplicasHandleCommunicationErrors) { // See comment at the end of TestFollowersReplicateAndCommitMessage // for an explanation on this waiting sequence. ASSERT_OK(commit_sync->Wait()); - WaitForCommitIfNotAlreadyPresent(last_op_id, kFollower0Idx, kLeaderIdx); - WaitForCommitIfNotAlreadyPresent(last_op_id, kFollower1Idx, kLeaderIdx); + WaitForCommitIfNotAlreadyPresent(last_op_id.index(), kFollower0Idx, kLeaderIdx); + WaitForCommitIfNotAlreadyPresent(last_op_id.index(), kFollower1Idx, kLeaderIdx); VerifyLogs(2, 0, 1); } @@ -847,8 +847,8 @@ TEST_F(RaftConsensusQuorumTest, TestLeaderHeartbeats) { OpId config_round; config_round.set_term(1); config_round.set_index(1); - WaitForCommitIfNotAlreadyPresent(config_round, kFollower0Idx, kLeaderIdx); - WaitForCommitIfNotAlreadyPresent(config_round, kFollower1Idx, kLeaderIdx); + WaitForCommitIfNotAlreadyPresent(config_round.index(), kFollower0Idx, kLeaderIdx); + WaitForCommitIfNotAlreadyPresent(config_round.index(), kFollower1Idx, kLeaderIdx); int repl0_init_count = counter_hook_rpl0->num_pre_update_calls(); int repl1_init_count = counter_hook_rpl1->num_pre_update_calls(); @@ -896,7 +896,7 @@ TEST_F(RaftConsensusQuorumTest, TestLeaderElectionWithQuiescedQuorum) { // Make sure the last operation is committed everywhere ASSERT_OK(last_commit_sync->Wait()); for (int i = 0; i < current_config_size - 1; i++) { - WaitForCommitIfNotAlreadyPresent(last_op_id, i, current_config_size - 1); + WaitForCommitIfNotAlreadyPresent(last_op_id.index(), i, current_config_size - 1); } // Now shutdown the current leader. @@ -929,7 +929,7 @@ TEST_F(RaftConsensusQuorumTest, TestLeaderElectionWithQuiescedQuorum) { // Make sure the last operation is committed everywhere ASSERT_OK(last_commit_sync->Wait()); for (int i = 0; i < current_config_size - 2; i++) { - WaitForCommitIfNotAlreadyPresent(last_op_id, i, current_config_size - 2); + WaitForCommitIfNotAlreadyPresent(last_op_id.index(), i, current_config_size - 2); } } // We can only verify the logs of the peers that were not killed, due to the @@ -953,8 +953,8 @@ TEST_F(RaftConsensusQuorumTest, TestReplicasEnforceTheLogMatchingProperty) { // Make sure the last operation is committed everywhere ASSERT_OK(last_commit_sync->Wait()); - WaitForCommitIfNotAlreadyPresent(last_op_id, 0, 2); - WaitForCommitIfNotAlreadyPresent(last_op_id, 1, 2); + WaitForCommitIfNotAlreadyPresent(last_op_id.index(), 0, 2); + WaitForCommitIfNotAlreadyPresent(last_op_id.index(), 1, 2); // Now replicas should only accept operations with // 'last_id' as the preceding id. @@ -971,7 +971,7 @@ TEST_F(RaftConsensusQuorumTest, TestReplicasEnforceTheLogMatchingProperty) { req.set_caller_uuid(leader->peer_uuid()); req.set_caller_term(last_op_id.term()); req.mutable_preceding_id()->CopyFrom(last_op_id); - req.mutable_committed_index()->CopyFrom(last_op_id); + req.set_committed_index(last_op_id.index()); ReplicateMsg* replicate = req.add_ops(); replicate->set_timestamp(clock_->Now().ToUint64()); @@ -1016,8 +1016,8 @@ TEST_F(RaftConsensusQuorumTest, TestRequestVote) { // Make sure the last operation is committed everywhere ASSERT_OK(last_commit_sync->Wait()); - WaitForCommitIfNotAlreadyPresent(last_op_id, 0, 2); - WaitForCommitIfNotAlreadyPresent(last_op_id, 1, 2); + WaitForCommitIfNotAlreadyPresent(last_op_id.index(), 0, 2); + WaitForCommitIfNotAlreadyPresent(last_op_id.index(), 1, 2); // Ensure last-logged OpId is > (0,0). ASSERT_TRUE(OpIdLessThan(MinimumOpId(), last_op_id)); @@ -1109,7 +1109,7 @@ TEST_F(RaftConsensusQuorumTest, TestRequestVote) { ConsensusRequestPB req; req.set_caller_term(last_op_id.term()); req.set_caller_uuid("peer-0"); - req.mutable_committed_index()->CopyFrom(last_op_id); + req.set_committed_index(last_op_id.index()); ConsensusResponsePB res; Status s = peer->Update(&req, &res); ASSERT_EQ(last_op_id.term() + 3, res.responder_term()); http://git-wip-us.apache.org/repos/asf/kudu/blob/ce0fcd4d/src/kudu/consensus/raft_consensus_state.cc ---------------------------------------------------------------------- diff --git a/src/kudu/consensus/raft_consensus_state.cc b/src/kudu/consensus/raft_consensus_state.cc index 1ab803b..7631f26 100644 --- a/src/kudu/consensus/raft_consensus_state.cc +++ b/src/kudu/consensus/raft_consensus_state.cc @@ -52,7 +52,7 @@ ReplicaState::ReplicaState(ConsensusOptions options, string peer_uuid, txn_factory_(txn_factory), last_received_op_id_(MinimumOpId()), last_received_op_id_current_leader_(MinimumOpId()), - last_committed_index_(MinimumOpId()), + last_committed_op_id_(MinimumOpId()), state_(kInitialized) { CHECK(cmeta_) << "ConsensusMeta passed as NULL"; } @@ -118,22 +118,6 @@ Status ReplicaState::LockForCommit(UniqueLock* lock) const { return Status::OK(); } -Status ReplicaState::LockForMajorityReplicatedIndexUpdate(UniqueLock* lock) const { - TRACE_EVENT0("consensus", "ReplicaState::LockForMajorityReplicatedIndexUpdate"); - ThreadRestrictions::AssertWaitAllowed(); - UniqueLock l(update_lock_); - - if (PREDICT_FALSE(state_ != kRunning)) { - return Status::IllegalState("Replica not in running state"); - } - - if (PREDICT_FALSE(GetActiveRoleUnlocked() != RaftPeerPB::LEADER)) { - return Status::IllegalState("Replica not LEADER"); - } - lock->swap(l); - return Status::OK(); -} - Status ReplicaState::CheckActiveLeaderUnlocked() const { RaftPeerPB::Role role = GetActiveRoleUnlocked(); switch (role) { @@ -278,7 +262,7 @@ bool ReplicaState::IsOpCommittedOrPending(const OpId& op_id, bool* term_mismatch *term_mismatch = false; - if (op_id.index() <= GetCommittedOpIdUnlocked().index()) { + if (op_id.index() <= GetCommittedIndexUnlocked()) { return true; } @@ -392,7 +376,7 @@ Status ReplicaState::CancelPendingTransactions() { void ReplicaState::GetUncommittedPendingOperationsUnlocked( vector<scoped_refptr<ConsensusRound> >* ops) { for (const IndexToRoundMap::value_type& entry : pending_txns_) { - if (entry.first > last_committed_index_.index()) { + if (entry.first > last_committed_op_id_.index()) { ops->push_back(entry.second); } } @@ -414,8 +398,8 @@ Status ReplicaState::AbortOpsAfterUnlocked(int64_t new_preceding_idx) { new_preceding = (*iter).second->replicate_msg()->id(); ++iter; } else { - CHECK_EQ(new_preceding_idx, last_committed_index_.index()); - new_preceding = last_committed_index_; + CHECK_EQ(new_preceding_idx, last_committed_op_id_.index()); + new_preceding = last_committed_op_id_; } // This is the same as UpdateLastReceivedOpIdUnlocked() but we do it @@ -503,93 +487,45 @@ scoped_refptr<ConsensusRound> ReplicaState::GetPendingOpByIndexOrNullUnlocked(in return FindPtrOrNull(pending_txns_, index); } -Status ReplicaState::UpdateMajorityReplicatedUnlocked(const OpId& majority_replicated, - OpId* committed_index, - bool* committed_index_changed) { - DCHECK(update_lock_.is_locked()); - DCHECK(majority_replicated.IsInitialized()); - DCHECK(last_committed_index_.IsInitialized()); - if (PREDICT_FALSE(state_ == kShuttingDown || state_ == kShutDown)) { - return Status::ServiceUnavailable("Cannot trigger apply. Replica is shutting down."); - } - if (PREDICT_FALSE(state_ != kRunning)) { - return Status::IllegalState("Cannot trigger apply. Replica is not in kRunning state."); - } - - // If the last committed operation was in the current term (the normal case) - // then 'committed_index' is simply equal to majority replicated. - if (last_committed_index_.term() == GetCurrentTermUnlocked()) { - RETURN_NOT_OK(AdvanceCommittedIndexUnlocked(majority_replicated, - committed_index_changed)); - committed_index->CopyFrom(last_committed_index_); - return Status::OK(); - } - - // If the last committed operation is not in the current term (such as when - // we change leaders) but 'majority_replicated' is then we can advance the - // 'committed_index' too. - if (majority_replicated.term() == GetCurrentTermUnlocked()) { - OpId previous = last_committed_index_; - RETURN_NOT_OK(AdvanceCommittedIndexUnlocked(majority_replicated, - committed_index_changed)); - committed_index->CopyFrom(last_committed_index_); - LOG_WITH_PREFIX_UNLOCKED(INFO) << "Advanced the committed_index across terms." - << " Last committed operation was: " << previous.ShortDebugString() - << " New committed index is: " << last_committed_index_.ShortDebugString(); - return Status::OK(); - } - - committed_index->CopyFrom(last_committed_index_); - KLOG_EVERY_N_SECS(WARNING, 1) << LogPrefixUnlocked() - << "Can't advance the committed index across term boundaries" - << " until operations from the current term are replicated." - << " Last committed operation was: " << last_committed_index_.ShortDebugString() << "," - << " New majority replicated is: " << majority_replicated.ShortDebugString() << "," - << " Current term is: " << GetCurrentTermUnlocked(); - - return Status::OK(); -} - -Status ReplicaState::AdvanceCommittedIndexUnlocked(const OpId& committed_index, - bool *committed_index_changed) { - *committed_index_changed = false; +Status ReplicaState::AdvanceCommittedIndexUnlocked(int64_t committed_index) { // If we already committed up to (or past) 'id' return. // This can happen in the case that multiple UpdateConsensus() calls end // up in the RPC queue at the same time, and then might get interleaved out // of order. - if (last_committed_index_.index() >= committed_index.index()) { + if (last_committed_op_id_.index() >= committed_index) { VLOG_WITH_PREFIX_UNLOCKED(1) - << "Already marked ops through " << last_committed_index_ << " as committed. " + << "Already marked ops through " << last_committed_op_id_ << " as committed. " << "Now trying to mark " << committed_index << " which would be a no-op."; return Status::OK(); } if (pending_txns_.empty()) { - last_committed_index_.CopyFrom(committed_index); + LOG(ERROR) << "Advancing commit index to " << committed_index + << " from " << last_committed_op_id_ + << " we have no pending txns" + << GetStackTrace(); VLOG_WITH_PREFIX_UNLOCKED(1) << "No transactions to mark as committed up to: " - << committed_index.ShortDebugString(); + << committed_index; return Status::OK(); } // Start at the operation after the last committed one. - auto iter = pending_txns_.upper_bound(last_committed_index_.index()); + auto iter = pending_txns_.upper_bound(last_committed_op_id_.index()); // Stop at the operation after the last one we must commit. - auto end_iter = pending_txns_.upper_bound(committed_index.index()); + auto end_iter = pending_txns_.upper_bound(committed_index); CHECK(iter != pending_txns_.end()); VLOG_WITH_PREFIX_UNLOCKED(1) << "Last triggered apply was: " - << last_committed_index_.ShortDebugString() + << last_committed_op_id_ << " Starting to apply from log index: " << (*iter).first; - OpId prev_id = last_committed_index_; - while (iter != end_iter) { scoped_refptr<ConsensusRound> round = (*iter).second; // Make a copy. DCHECK(round); const OpId& current_id = round->id(); - if (PREDICT_TRUE(!OpIdEquals(prev_id, MinimumOpId()))) { - CHECK_OK(CheckOpInSequence(prev_id, current_id)); + if (PREDICT_TRUE(!OpIdEquals(last_committed_op_id_, MinimumOpId()))) { + CHECK_OK(CheckOpInSequence(last_committed_op_id_, current_id)); } pending_txns_.erase(iter++); @@ -622,23 +558,53 @@ Status ReplicaState::AdvanceCommittedIndexUnlocked(const OpId& committed_index, } } - prev_id.CopyFrom(round->id()); + last_committed_op_id_ = round->id(); round->NotifyReplicationFinished(Status::OK()); } - last_committed_index_.CopyFrom(committed_index); - *committed_index_changed = true; return Status::OK(); } -const OpId& ReplicaState::GetCommittedOpIdUnlocked() const { + +Status ReplicaState::SetInitialCommittedOpIdUnlocked(const OpId& committed_op) { + CHECK_EQ(last_committed_op_id_.index(), 0); + if (!pending_txns_.empty()) { + int64_t first_pending_index = pending_txns_.begin()->first; + if (committed_op.index() < first_pending_index) { + if (committed_op.index() != first_pending_index - 1) { + return Status::Corruption(Substitute( + "pending operations should start at first operation " + "after the committed operation (committed=$0, first pending=$1)", + OpIdToString(committed_op), first_pending_index)); + } + last_committed_op_id_ = committed_op; + } + + RETURN_NOT_OK(AdvanceCommittedIndexUnlocked(committed_op.index())); + CHECK_EQ(last_committed_op_id_.ShortDebugString(), + committed_op.ShortDebugString()); + + } else { + last_committed_op_id_ = committed_op; + LOG_WITH_PREFIX_UNLOCKED(WARNING) << "setting committed at start to " << committed_op; + } + return Status::OK(); +} + +int64_t ReplicaState::GetCommittedIndexUnlocked() const { DCHECK(update_lock_.is_locked()); - return last_committed_index_; + return last_committed_op_id_.index(); } +int64_t ReplicaState::GetTermWithLastCommittedOpUnlocked() const { + DCHECK(update_lock_.is_locked()); + return last_committed_op_id_.term(); +} + + Status ReplicaState::CheckHasCommittedOpInCurrentTermUnlocked() const { int64_t term = GetCurrentTermUnlocked(); - const OpId& opid = GetCommittedOpIdUnlocked(); + const OpId& opid = last_committed_op_id_; if (opid.term() != term) { return Status::IllegalState("Latest committed op is not from this term", OpIdToString(opid)); } @@ -737,7 +703,7 @@ string ReplicaState::ToStringUnlocked() const { SubstituteAndAppend(&ret, "Watermarks: {Received: $0 Committed: $1}\n", last_received_op_id_.ShortDebugString(), - last_committed_index_.ShortDebugString()); + last_committed_op_id_.ShortDebugString()); return ret; } http://git-wip-us.apache.org/repos/asf/kudu/blob/ce0fcd4d/src/kudu/consensus/raft_consensus_state.h ---------------------------------------------------------------------- diff --git a/src/kudu/consensus/raft_consensus_state.h b/src/kudu/consensus/raft_consensus_state.h index 9ce24ab..aa30ead 100644 --- a/src/kudu/consensus/raft_consensus_state.h +++ b/src/kudu/consensus/raft_consensus_state.h @@ -138,12 +138,6 @@ class ReplicaState { // Obtains the lock for a state read, does not check state. Status LockForRead(UniqueLock* lock) const WARN_UNUSED_RESULT; - // Obtains the lock so that we can advance the majority replicated - // index and possibly the committed index. - // Requires that this peer is leader. - Status LockForMajorityReplicatedIndexUpdate( - UniqueLock* lock) const WARN_UNUSED_RESULT; - // Ensure the local peer is the active leader. // Returns OK if leader, IllegalState otherwise. Status CheckActiveLeaderUnlocked() const; @@ -247,27 +241,21 @@ class ReplicaState { // Add 'round' to the set of rounds waiting to be committed. Status AddPendingOperation(const scoped_refptr<ConsensusRound>& round); - // Marks ReplicaTransactions up to 'id' as majority replicated, meaning the - // transaction may Apply() (immediately if Prepare() has completed or when Prepare() - // completes, if not). - // - // If this advanced the committed index, sets *committed_index_changed to true. - Status UpdateMajorityReplicatedUnlocked(const OpId& majority_replicated, - OpId* committed_index, - bool* committed_index_changed); - // Advances the committed index. // This is a no-op if the committed index has not changed. - // Returns in '*committed_index_changed' whether the operation actually advanced - // the index. - Status AdvanceCommittedIndexUnlocked(const OpId& committed_index, - bool* committed_index_changed); + Status AdvanceCommittedIndexUnlocked(int64_t committed_index); + + // Set the committed op during startup. This should be done after + // appending any of the pending transactions, and will take care + // of triggering any that are now considered committed. + Status SetInitialCommittedOpIdUnlocked(const OpId& committed_op); // Returns the watermark below which all operations are known to // be committed according to consensus. // // This must be called under a lock. - const OpId& GetCommittedOpIdUnlocked() const; + int64_t GetCommittedIndexUnlocked() const; + int64_t GetTermWithLastCommittedOpUnlocked() const; // Returns OK iff an op from the current term has been committed. Status CheckHasCommittedOpInCurrentTermUnlocked() const; @@ -378,9 +366,12 @@ class ReplicaState { // involved in resetting this every time a new node becomes leader. OpId last_received_op_id_current_leader_; - // The id of the Apply that was last triggered when the last message from the leader + // The OpId of the Apply that was last triggered when the last message from the leader // was received. Initialized to MinimumOpId(). - OpId last_committed_index_; + // + // TODO: are there cases where this doesn't track the actual commit index, + // if there are no-ops? + OpId last_committed_op_id_; State state_; }; http://git-wip-us.apache.org/repos/asf/kudu/blob/ce0fcd4d/src/kudu/integration-tests/raft_consensus-itest.cc ---------------------------------------------------------------------- diff --git a/src/kudu/integration-tests/raft_consensus-itest.cc b/src/kudu/integration-tests/raft_consensus-itest.cc index 387c048..63be00b 100644 --- a/src/kudu/integration-tests/raft_consensus-itest.cc +++ b/src/kudu/integration-tests/raft_consensus-itest.cc @@ -973,6 +973,7 @@ TEST_F(RaftConsensusITest, MultiThreadedInsertWithFailovers) { } for (CountDownLatch* latch : latches) { + NO_FATALS(cluster_->AssertNoCrashes()); latch->Wait(); StopOrKillLeaderAndElectNewOne(); } @@ -1175,7 +1176,7 @@ TEST_F(RaftConsensusITest, TestReplicaBehaviorViaRPC) { req.set_dest_uuid(replica_ts->uuid()); req.set_caller_uuid("fake_caller"); req.set_caller_term(2); - req.mutable_committed_index()->CopyFrom(MakeOpId(1, 1)); + req.set_committed_index(1); req.mutable_preceding_id()->CopyFrom(MakeOpId(1, 1)); ASSERT_OK(c_proxy->UpdateConsensus(req, &resp, &rpc)); @@ -1234,7 +1235,7 @@ TEST_F(RaftConsensusITest, TestReplicaBehaviorViaRPC) { req.clear_ops(); req.mutable_preceding_id()->CopyFrom(MakeOpId(2, 2)); AddOp(MakeOpId(2, 3), &req); - req.mutable_committed_index()->CopyFrom(MakeOpId(2, 4)); + req.set_committed_index(4); rpc.Reset(); ASSERT_OK(c_proxy->UpdateConsensus(req, &resp, &rpc)); ASSERT_FALSE(resp.has_error()) << resp.DebugString(); @@ -1249,7 +1250,7 @@ TEST_F(RaftConsensusITest, TestReplicaBehaviorViaRPC) { resp.Clear(); req.clear_ops(); // Now send some more ops, and commit the earlier ones. - req.mutable_committed_index()->CopyFrom(MakeOpId(2, 4)); + req.set_committed_index(4); req.mutable_preceding_id()->CopyFrom(MakeOpId(2, 4)); AddOp(MakeOpId(2, 5), &req); AddOp(MakeOpId(2, 6), &req); @@ -1314,7 +1315,7 @@ TEST_F(RaftConsensusITest, TestReplicaBehaviorViaRPC) { // the earlier ops. { req.mutable_preceding_id()->CopyFrom(MakeOpId(leader_term, 6)); - req.mutable_committed_index()->CopyFrom(MakeOpId(leader_term, 6)); + req.set_committed_index(6); req.clear_ops(); rpc.Reset(); ASSERT_OK(c_proxy->UpdateConsensus(req, &resp, &rpc)); @@ -2034,7 +2035,7 @@ TEST_F(RaftConsensusITest, TestEarlyCommitDespiteMemoryPressure) { req.set_tablet_id(tablet_id_); req.set_caller_uuid(tservers[2]->instance_id.permanent_uuid()); req.set_caller_term(1); - req.mutable_committed_index()->CopyFrom(MakeOpId(1, 1)); + req.set_committed_index(1); req.mutable_preceding_id()->CopyFrom(MakeOpId(1, 1)); for (int i = 0; i < kNumOps; i++) { AddOp(MakeOpId(1, 2 + i), &req); @@ -2058,7 +2059,7 @@ TEST_F(RaftConsensusITest, TestEarlyCommitDespiteMemoryPressure) { // 1. Replicate just one new operation. // 2. Tell the follower that the previous set of operations were committed. req.mutable_preceding_id()->CopyFrom(last_opid); - req.mutable_committed_index()->CopyFrom(last_opid); + req.set_committed_index(last_opid.index()); req.mutable_ops()->Clear(); AddOp(MakeOpId(1, last_opid.index() + 1), &req); rpc.Reset(); @@ -2540,7 +2541,7 @@ TEST_F(RaftConsensusITest, TestUpdateConsensusErrorNonePrepared) { req.set_tablet_id(tablet_id_); req.set_caller_uuid(tservers[2]->instance_id.permanent_uuid()); req.set_caller_term(0); - req.mutable_committed_index()->CopyFrom(MakeOpId(0, 0)); + req.set_committed_index(0); req.mutable_preceding_id()->CopyFrom(MakeOpId(0, 0)); for (int i = 0; i < kNumOps; i++) { AddOp(MakeOpId(0, 1 + i), &req);
