Repository: kudu
Updated Branches:
  refs/heads/master b1f1388e2 -> 46d9ed7aa


http://git-wip-us.apache.org/repos/asf/kudu/blob/ce0fcd4d/src/kudu/consensus/raft_consensus.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus.h 
b/src/kudu/consensus/raft_consensus.h
index c971b02..451d5f1 100644
--- a/src/kudu/consensus/raft_consensus.h
+++ b/src/kudu/consensus/raft_consensus.h
@@ -151,19 +151,23 @@ class RaftConsensus : public Consensus,
   // can cause consensus to deadlock.
   ReplicaState* GetReplicaStateForTests();
 
+  virtual Status GetLastOpId(OpIdType type, OpId* id) OVERRIDE;
+
+
+  //------------------------------------------------------------
+  // PeerMessageQueueObserver implementation
+  //------------------------------------------------------------
+
   // Updates the committed_index and triggers the Apply()s for whatever
   // transactions were pending.
   // This is idempotent.
-  void UpdateMajorityReplicated(const OpId& majority_replicated,
-                                OpId* committed_index) OVERRIDE;
+  void NotifyCommitIndex(int64_t commit_index) override;
 
-  virtual void NotifyTermChange(int64_t term) OVERRIDE;
+  void NotifyTermChange(int64_t term) override;
 
-  virtual void NotifyFailedFollower(const std::string& uuid,
-                                    int64_t term,
-                                    const std::string& reason) OVERRIDE;
-
-  virtual Status GetLastOpId(OpIdType type, OpId* id) OVERRIDE;
+  void NotifyFailedFollower(const std::string& uuid,
+                            int64_t term,
+                            const std::string& reason) override;
 
  protected:
   // Trigger that a non-Transaction ConsensusRound has finished replication.
@@ -347,9 +351,6 @@ class RaftConsensus : public Consensus,
   Status RequestVoteRespondVoteGranted(const VoteRequestPB* request,
                                        VoteResponsePB* response);
 
-  void UpdateMajorityReplicatedUnlocked(const OpId& majority_replicated,
-                                        OpId* committed_index);
-
   // Callback for leader election driver. ElectionCallback is run on the
   // reactor thread, so it simply defers its work to DoElectionCallback.
   void ElectionCallback(const ElectionResult& result);

http://git-wip-us.apache.org/repos/asf/kudu/blob/ce0fcd4d/src/kudu/consensus/raft_consensus_quorum-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus_quorum-test.cc 
b/src/kudu/consensus/raft_consensus_quorum-test.cc
index ee1a568..2e7a746 100644
--- a/src/kudu/consensus/raft_consensus_quorum-test.cc
+++ b/src/kudu/consensus/raft_consensus_quorum-test.cc
@@ -301,7 +301,7 @@ class RaftConsensusQuorumTest : public KuduTest {
 
   // Waits for an operation to be (database) committed in the replica at index
   // 'peer_idx'. If the operation was already committed this returns 
immediately.
-  void WaitForCommitIfNotAlreadyPresent(const OpId& to_wait_for,
+  void WaitForCommitIfNotAlreadyPresent(int64_t to_wait_for,
                                         int peer_idx,
                                         int leader_idx) {
     MonoDelta timeout(MonoDelta::FromSeconds(10));
@@ -313,13 +313,13 @@ class RaftConsensusQuorumTest : public KuduTest {
 
     int backoff_exp = 0;
     const int kMaxBackoffExp = 8;
-    OpId committed_op_id;
+    int64_t committed_op_idx;
     while (true) {
       {
         ReplicaState::UniqueLock lock;
         CHECK_OK(state->LockForRead(&lock));
-        committed_op_id = state->GetCommittedOpIdUnlocked();
-        if (OpIdCompare(committed_op_id, to_wait_for) >= 0) {
+        committed_op_idx = state->GetCommittedIndexUnlocked();
+        if (committed_op_idx >= to_wait_for) {
           return;
         }
       }
@@ -332,7 +332,7 @@ class RaftConsensusQuorumTest : public KuduTest {
 
     LOG(ERROR) << "Max timeout reached (" << timeout.ToString() << ") while 
waiting for commit of "
                << "op " << to_wait_for << " on replica. Last committed op on 
replica: "
-               << committed_op_id << ". Dumping state and quitting.";
+               << committed_op_idx << ". Dumping state and quitting.";
     vector<string> lines;
     scoped_refptr<RaftConsensus> leader;
     CHECK_OK(peers_->GetPeerByIdx(leader_idx, &leader));
@@ -608,8 +608,8 @@ TEST_F(RaftConsensusQuorumTest, 
TestFollowersReplicateAndCommitMessage) {
   // We thus wait for the commit callback to trigger, ensuring durability
   // on the leader and then for the commits to be present on the replicas.
   ASSERT_OK(commit_sync->Wait());
-  WaitForCommitIfNotAlreadyPresent(last_op_id, kFollower0Idx, kLeaderIdx);
-  WaitForCommitIfNotAlreadyPresent(last_op_id, kFollower1Idx, kLeaderIdx);
+  WaitForCommitIfNotAlreadyPresent(last_op_id.index(), kFollower0Idx, 
kLeaderIdx);
+  WaitForCommitIfNotAlreadyPresent(last_op_id.index(), kFollower1Idx, 
kLeaderIdx);
   VerifyLogs(2, 0, 1);
 }
 
@@ -646,8 +646,8 @@ TEST_F(RaftConsensusQuorumTest, 
TestFollowersReplicateAndCommitSequence) {
   // See comment at the end of TestFollowersReplicateAndCommitMessage
   // for an explanation on this waiting sequence.
   ASSERT_OK(commit_sync->Wait());
-  WaitForCommitIfNotAlreadyPresent(last_op_id, kFollower0Idx, kLeaderIdx);
-  WaitForCommitIfNotAlreadyPresent(last_op_id, kFollower1Idx, kLeaderIdx);
+  WaitForCommitIfNotAlreadyPresent(last_op_id.index(), kFollower0Idx, 
kLeaderIdx);
+  WaitForCommitIfNotAlreadyPresent(last_op_id.index(), kFollower1Idx, 
kLeaderIdx);
   VerifyLogs(2, 0, 1);
 }
 
@@ -686,12 +686,12 @@ TEST_F(RaftConsensusQuorumTest, 
TestConsensusContinuesIfAMinorityFallsBehind) {
     // this would hang here). We know he must have replicated but make sure
     // by calling Wait().
     WaitForReplicateIfNotAlreadyPresent(last_replicate, kFollower1Idx);
-    WaitForCommitIfNotAlreadyPresent(last_replicate, kFollower1Idx, 
kLeaderIdx);
+    WaitForCommitIfNotAlreadyPresent(last_replicate.index(), kFollower1Idx, 
kLeaderIdx);
   }
 
   // After we let the lock go the remaining follower should get up-to-date
   WaitForReplicateIfNotAlreadyPresent(last_replicate, kFollower0Idx);
-  WaitForCommitIfNotAlreadyPresent(last_replicate, kFollower0Idx, kLeaderIdx);
+  WaitForCommitIfNotAlreadyPresent(last_replicate.index(), kFollower0Idx, 
kLeaderIdx);
   VerifyLogs(2, 0, 1);
 }
 
@@ -738,8 +738,8 @@ TEST_F(RaftConsensusQuorumTest, 
TestConsensusStopsIfAMajorityFallsBehind) {
   // Assert that everything was ok
   WaitForReplicateIfNotAlreadyPresent(last_op_id, kFollower0Idx);
   WaitForReplicateIfNotAlreadyPresent(last_op_id, kFollower1Idx);
-  WaitForCommitIfNotAlreadyPresent(last_op_id, kFollower0Idx, kLeaderIdx);
-  WaitForCommitIfNotAlreadyPresent(last_op_id, kFollower1Idx, kLeaderIdx);
+  WaitForCommitIfNotAlreadyPresent(last_op_id.index(), kFollower0Idx, 
kLeaderIdx);
+  WaitForCommitIfNotAlreadyPresent(last_op_id.index(), kFollower1Idx, 
kLeaderIdx);
   VerifyLogs(2, 0, 1);
 }
 
@@ -772,8 +772,8 @@ TEST_F(RaftConsensusQuorumTest, 
TestReplicasHandleCommunicationErrors) {
 
   // The commit should eventually reach both followers as well.
   last_op_id = round->id();
-  WaitForCommitIfNotAlreadyPresent(last_op_id, kFollower0Idx, kLeaderIdx);
-  WaitForCommitIfNotAlreadyPresent(last_op_id, kFollower1Idx, kLeaderIdx);
+  WaitForCommitIfNotAlreadyPresent(last_op_id.index(), kFollower0Idx, 
kLeaderIdx);
+  WaitForCommitIfNotAlreadyPresent(last_op_id.index(), kFollower1Idx, 
kLeaderIdx);
 
   // Append a sequence of messages, and keep injecting errors into the
   // replica proxies.
@@ -804,8 +804,8 @@ TEST_F(RaftConsensusQuorumTest, 
TestReplicasHandleCommunicationErrors) {
   // See comment at the end of TestFollowersReplicateAndCommitMessage
   // for an explanation on this waiting sequence.
   ASSERT_OK(commit_sync->Wait());
-  WaitForCommitIfNotAlreadyPresent(last_op_id, kFollower0Idx, kLeaderIdx);
-  WaitForCommitIfNotAlreadyPresent(last_op_id, kFollower1Idx, kLeaderIdx);
+  WaitForCommitIfNotAlreadyPresent(last_op_id.index(), kFollower0Idx, 
kLeaderIdx);
+  WaitForCommitIfNotAlreadyPresent(last_op_id.index(), kFollower1Idx, 
kLeaderIdx);
   VerifyLogs(2, 0, 1);
 }
 
@@ -847,8 +847,8 @@ TEST_F(RaftConsensusQuorumTest, TestLeaderHeartbeats) {
   OpId config_round;
   config_round.set_term(1);
   config_round.set_index(1);
-  WaitForCommitIfNotAlreadyPresent(config_round, kFollower0Idx, kLeaderIdx);
-  WaitForCommitIfNotAlreadyPresent(config_round, kFollower1Idx, kLeaderIdx);
+  WaitForCommitIfNotAlreadyPresent(config_round.index(), kFollower0Idx, 
kLeaderIdx);
+  WaitForCommitIfNotAlreadyPresent(config_round.index(), kFollower1Idx, 
kLeaderIdx);
 
   int repl0_init_count = counter_hook_rpl0->num_pre_update_calls();
   int repl1_init_count = counter_hook_rpl1->num_pre_update_calls();
@@ -896,7 +896,7 @@ TEST_F(RaftConsensusQuorumTest, 
TestLeaderElectionWithQuiescedQuorum) {
     // Make sure the last operation is committed everywhere
     ASSERT_OK(last_commit_sync->Wait());
     for (int i = 0; i < current_config_size - 1; i++) {
-      WaitForCommitIfNotAlreadyPresent(last_op_id, i, current_config_size - 1);
+      WaitForCommitIfNotAlreadyPresent(last_op_id.index(), i, 
current_config_size - 1);
     }
 
     // Now shutdown the current leader.
@@ -929,7 +929,7 @@ TEST_F(RaftConsensusQuorumTest, 
TestLeaderElectionWithQuiescedQuorum) {
     // Make sure the last operation is committed everywhere
     ASSERT_OK(last_commit_sync->Wait());
     for (int i = 0; i < current_config_size - 2; i++) {
-      WaitForCommitIfNotAlreadyPresent(last_op_id, i, current_config_size - 2);
+      WaitForCommitIfNotAlreadyPresent(last_op_id.index(), i, 
current_config_size - 2);
     }
   }
   // We can only verify the logs of the peers that were not killed, due to the
@@ -953,8 +953,8 @@ TEST_F(RaftConsensusQuorumTest, 
TestReplicasEnforceTheLogMatchingProperty) {
 
   // Make sure the last operation is committed everywhere
   ASSERT_OK(last_commit_sync->Wait());
-  WaitForCommitIfNotAlreadyPresent(last_op_id, 0, 2);
-  WaitForCommitIfNotAlreadyPresent(last_op_id, 1, 2);
+  WaitForCommitIfNotAlreadyPresent(last_op_id.index(), 0, 2);
+  WaitForCommitIfNotAlreadyPresent(last_op_id.index(), 1, 2);
 
   // Now replicas should only accept operations with
   // 'last_id' as the preceding id.
@@ -971,7 +971,7 @@ TEST_F(RaftConsensusQuorumTest, 
TestReplicasEnforceTheLogMatchingProperty) {
   req.set_caller_uuid(leader->peer_uuid());
   req.set_caller_term(last_op_id.term());
   req.mutable_preceding_id()->CopyFrom(last_op_id);
-  req.mutable_committed_index()->CopyFrom(last_op_id);
+  req.set_committed_index(last_op_id.index());
 
   ReplicateMsg* replicate = req.add_ops();
   replicate->set_timestamp(clock_->Now().ToUint64());
@@ -1016,8 +1016,8 @@ TEST_F(RaftConsensusQuorumTest, TestRequestVote) {
 
   // Make sure the last operation is committed everywhere
   ASSERT_OK(last_commit_sync->Wait());
-  WaitForCommitIfNotAlreadyPresent(last_op_id, 0, 2);
-  WaitForCommitIfNotAlreadyPresent(last_op_id, 1, 2);
+  WaitForCommitIfNotAlreadyPresent(last_op_id.index(), 0, 2);
+  WaitForCommitIfNotAlreadyPresent(last_op_id.index(), 1, 2);
 
   // Ensure last-logged OpId is > (0,0).
   ASSERT_TRUE(OpIdLessThan(MinimumOpId(), last_op_id));
@@ -1109,7 +1109,7 @@ TEST_F(RaftConsensusQuorumTest, TestRequestVote) {
   ConsensusRequestPB req;
   req.set_caller_term(last_op_id.term());
   req.set_caller_uuid("peer-0");
-  req.mutable_committed_index()->CopyFrom(last_op_id);
+  req.set_committed_index(last_op_id.index());
   ConsensusResponsePB res;
   Status s = peer->Update(&req, &res);
   ASSERT_EQ(last_op_id.term() + 3, res.responder_term());

http://git-wip-us.apache.org/repos/asf/kudu/blob/ce0fcd4d/src/kudu/consensus/raft_consensus_state.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus_state.cc 
b/src/kudu/consensus/raft_consensus_state.cc
index 1ab803b..7631f26 100644
--- a/src/kudu/consensus/raft_consensus_state.cc
+++ b/src/kudu/consensus/raft_consensus_state.cc
@@ -52,7 +52,7 @@ ReplicaState::ReplicaState(ConsensusOptions options, string 
peer_uuid,
       txn_factory_(txn_factory),
       last_received_op_id_(MinimumOpId()),
       last_received_op_id_current_leader_(MinimumOpId()),
-      last_committed_index_(MinimumOpId()),
+      last_committed_op_id_(MinimumOpId()),
       state_(kInitialized) {
   CHECK(cmeta_) << "ConsensusMeta passed as NULL";
 }
@@ -118,22 +118,6 @@ Status ReplicaState::LockForCommit(UniqueLock* lock) const 
{
   return Status::OK();
 }
 
-Status ReplicaState::LockForMajorityReplicatedIndexUpdate(UniqueLock* lock) 
const {
-  TRACE_EVENT0("consensus", 
"ReplicaState::LockForMajorityReplicatedIndexUpdate");
-  ThreadRestrictions::AssertWaitAllowed();
-  UniqueLock l(update_lock_);
-
-  if (PREDICT_FALSE(state_ != kRunning)) {
-    return Status::IllegalState("Replica not in running state");
-  }
-
-  if (PREDICT_FALSE(GetActiveRoleUnlocked() != RaftPeerPB::LEADER)) {
-    return Status::IllegalState("Replica not LEADER");
-  }
-  lock->swap(l);
-  return Status::OK();
-}
-
 Status ReplicaState::CheckActiveLeaderUnlocked() const {
   RaftPeerPB::Role role = GetActiveRoleUnlocked();
   switch (role) {
@@ -278,7 +262,7 @@ bool ReplicaState::IsOpCommittedOrPending(const OpId& 
op_id, bool* term_mismatch
 
   *term_mismatch = false;
 
-  if (op_id.index() <= GetCommittedOpIdUnlocked().index()) {
+  if (op_id.index() <= GetCommittedIndexUnlocked()) {
     return true;
   }
 
@@ -392,7 +376,7 @@ Status ReplicaState::CancelPendingTransactions() {
 void ReplicaState::GetUncommittedPendingOperationsUnlocked(
     vector<scoped_refptr<ConsensusRound> >* ops) {
   for (const IndexToRoundMap::value_type& entry : pending_txns_) {
-    if (entry.first > last_committed_index_.index()) {
+    if (entry.first > last_committed_op_id_.index()) {
       ops->push_back(entry.second);
     }
   }
@@ -414,8 +398,8 @@ Status ReplicaState::AbortOpsAfterUnlocked(int64_t 
new_preceding_idx) {
     new_preceding = (*iter).second->replicate_msg()->id();
     ++iter;
   } else {
-    CHECK_EQ(new_preceding_idx, last_committed_index_.index());
-    new_preceding = last_committed_index_;
+    CHECK_EQ(new_preceding_idx, last_committed_op_id_.index());
+    new_preceding = last_committed_op_id_;
   }
 
   // This is the same as UpdateLastReceivedOpIdUnlocked() but we do it
@@ -503,93 +487,45 @@ scoped_refptr<ConsensusRound> 
ReplicaState::GetPendingOpByIndexOrNullUnlocked(in
   return FindPtrOrNull(pending_txns_, index);
 }
 
-Status ReplicaState::UpdateMajorityReplicatedUnlocked(const OpId& 
majority_replicated,
-                                                      OpId* committed_index,
-                                                      bool* 
committed_index_changed) {
-  DCHECK(update_lock_.is_locked());
-  DCHECK(majority_replicated.IsInitialized());
-  DCHECK(last_committed_index_.IsInitialized());
-  if (PREDICT_FALSE(state_ == kShuttingDown || state_ == kShutDown)) {
-    return Status::ServiceUnavailable("Cannot trigger apply. Replica is 
shutting down.");
-  }
-  if (PREDICT_FALSE(state_ != kRunning)) {
-    return Status::IllegalState("Cannot trigger apply. Replica is not in 
kRunning state.");
-  }
-
-  // If the last committed operation was in the current term (the normal case)
-  // then 'committed_index' is simply equal to majority replicated.
-  if (last_committed_index_.term() == GetCurrentTermUnlocked()) {
-    RETURN_NOT_OK(AdvanceCommittedIndexUnlocked(majority_replicated,
-                                                committed_index_changed));
-    committed_index->CopyFrom(last_committed_index_);
-    return Status::OK();
-  }
-
-  // If the last committed operation is not in the current term (such as when
-  // we change leaders) but 'majority_replicated' is then we can advance the
-  // 'committed_index' too.
-  if (majority_replicated.term() == GetCurrentTermUnlocked()) {
-    OpId previous = last_committed_index_;
-    RETURN_NOT_OK(AdvanceCommittedIndexUnlocked(majority_replicated,
-                                                committed_index_changed));
-    committed_index->CopyFrom(last_committed_index_);
-    LOG_WITH_PREFIX_UNLOCKED(INFO) << "Advanced the committed_index across 
terms."
-        << " Last committed operation was: " << previous.ShortDebugString()
-        << " New committed index is: " << 
last_committed_index_.ShortDebugString();
-    return Status::OK();
-  }
-
-  committed_index->CopyFrom(last_committed_index_);
-  KLOG_EVERY_N_SECS(WARNING, 1) << LogPrefixUnlocked()
-          << "Can't advance the committed index across term boundaries"
-          << " until operations from the current term are replicated."
-          << " Last committed operation was: " << 
last_committed_index_.ShortDebugString() << ","
-          << " New majority replicated is: " << 
majority_replicated.ShortDebugString() << ","
-          << " Current term is: " << GetCurrentTermUnlocked();
-
-  return Status::OK();
-}
-
-Status ReplicaState::AdvanceCommittedIndexUnlocked(const OpId& committed_index,
-                                                   bool 
*committed_index_changed) {
-  *committed_index_changed = false;
+Status ReplicaState::AdvanceCommittedIndexUnlocked(int64_t committed_index) {
   // If we already committed up to (or past) 'id' return.
   // This can happen in the case that multiple UpdateConsensus() calls end
   // up in the RPC queue at the same time, and then might get interleaved out
   // of order.
-  if (last_committed_index_.index() >= committed_index.index()) {
+  if (last_committed_op_id_.index() >= committed_index) {
     VLOG_WITH_PREFIX_UNLOCKED(1)
-      << "Already marked ops through " << last_committed_index_ << " as 
committed. "
+      << "Already marked ops through " << last_committed_op_id_ << " as 
committed. "
       << "Now trying to mark " << committed_index << " which would be a 
no-op.";
     return Status::OK();
   }
 
   if (pending_txns_.empty()) {
-    last_committed_index_.CopyFrom(committed_index);
+    LOG(ERROR) << "Advancing commit index to " << committed_index
+               << " from " << last_committed_op_id_
+               << " we have no pending txns"
+               << GetStackTrace();
     VLOG_WITH_PREFIX_UNLOCKED(1) << "No transactions to mark as committed up 
to: "
-        << committed_index.ShortDebugString();
+                                 << committed_index;
     return Status::OK();
   }
 
   // Start at the operation after the last committed one.
-  auto iter = pending_txns_.upper_bound(last_committed_index_.index());
+  auto iter = pending_txns_.upper_bound(last_committed_op_id_.index());
   // Stop at the operation after the last one we must commit.
-  auto end_iter = pending_txns_.upper_bound(committed_index.index());
+  auto end_iter = pending_txns_.upper_bound(committed_index);
   CHECK(iter != pending_txns_.end());
 
   VLOG_WITH_PREFIX_UNLOCKED(1) << "Last triggered apply was: "
-      <<  last_committed_index_.ShortDebugString()
+      <<  last_committed_op_id_
       << " Starting to apply from log index: " << (*iter).first;
 
-  OpId prev_id = last_committed_index_;
-
   while (iter != end_iter) {
     scoped_refptr<ConsensusRound> round = (*iter).second; // Make a copy.
     DCHECK(round);
     const OpId& current_id = round->id();
 
-    if (PREDICT_TRUE(!OpIdEquals(prev_id, MinimumOpId()))) {
-      CHECK_OK(CheckOpInSequence(prev_id, current_id));
+    if (PREDICT_TRUE(!OpIdEquals(last_committed_op_id_, MinimumOpId()))) {
+      CHECK_OK(CheckOpInSequence(last_committed_op_id_, current_id));
     }
 
     pending_txns_.erase(iter++);
@@ -622,23 +558,53 @@ Status ReplicaState::AdvanceCommittedIndexUnlocked(const 
OpId& committed_index,
       }
     }
 
-    prev_id.CopyFrom(round->id());
+    last_committed_op_id_ = round->id();
     round->NotifyReplicationFinished(Status::OK());
   }
 
-  last_committed_index_.CopyFrom(committed_index);
-  *committed_index_changed = true;
   return Status::OK();
 }
 
-const OpId& ReplicaState::GetCommittedOpIdUnlocked() const {
+
+Status ReplicaState::SetInitialCommittedOpIdUnlocked(const OpId& committed_op) 
{
+  CHECK_EQ(last_committed_op_id_.index(), 0);
+  if (!pending_txns_.empty()) {
+    int64_t first_pending_index = pending_txns_.begin()->first;
+    if (committed_op.index() < first_pending_index) {
+      if (committed_op.index() != first_pending_index - 1) {
+        return Status::Corruption(Substitute(
+            "pending operations should start at first operation "
+            "after the committed operation (committed=$0, first pending=$1)",
+            OpIdToString(committed_op), first_pending_index));
+      }
+      last_committed_op_id_ = committed_op;
+    }
+
+    RETURN_NOT_OK(AdvanceCommittedIndexUnlocked(committed_op.index()));
+    CHECK_EQ(last_committed_op_id_.ShortDebugString(),
+             committed_op.ShortDebugString());
+
+  } else {
+    last_committed_op_id_ = committed_op;
+    LOG_WITH_PREFIX_UNLOCKED(WARNING) << "setting committed at start to " << 
committed_op;
+  }
+  return Status::OK();
+}
+
+int64_t ReplicaState::GetCommittedIndexUnlocked() const {
   DCHECK(update_lock_.is_locked());
-  return last_committed_index_;
+  return last_committed_op_id_.index();
 }
 
+int64_t ReplicaState::GetTermWithLastCommittedOpUnlocked() const {
+  DCHECK(update_lock_.is_locked());
+  return last_committed_op_id_.term();
+}
+
+
 Status ReplicaState::CheckHasCommittedOpInCurrentTermUnlocked() const {
   int64_t term = GetCurrentTermUnlocked();
-  const OpId& opid = GetCommittedOpIdUnlocked();
+  const OpId& opid = last_committed_op_id_;
   if (opid.term() != term) {
     return Status::IllegalState("Latest committed op is not from this term", 
OpIdToString(opid));
   }
@@ -737,7 +703,7 @@ string ReplicaState::ToStringUnlocked() const {
 
   SubstituteAndAppend(&ret, "Watermarks: {Received: $0 Committed: $1}\n",
                       last_received_op_id_.ShortDebugString(),
-                      last_committed_index_.ShortDebugString());
+                      last_committed_op_id_.ShortDebugString());
   return ret;
 }
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/ce0fcd4d/src/kudu/consensus/raft_consensus_state.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus_state.h 
b/src/kudu/consensus/raft_consensus_state.h
index 9ce24ab..aa30ead 100644
--- a/src/kudu/consensus/raft_consensus_state.h
+++ b/src/kudu/consensus/raft_consensus_state.h
@@ -138,12 +138,6 @@ class ReplicaState {
   // Obtains the lock for a state read, does not check state.
   Status LockForRead(UniqueLock* lock) const WARN_UNUSED_RESULT;
 
-  // Obtains the lock so that we can advance the majority replicated
-  // index and possibly the committed index.
-  // Requires that this peer is leader.
-  Status LockForMajorityReplicatedIndexUpdate(
-      UniqueLock* lock) const WARN_UNUSED_RESULT;
-
   // Ensure the local peer is the active leader.
   // Returns OK if leader, IllegalState otherwise.
   Status CheckActiveLeaderUnlocked() const;
@@ -247,27 +241,21 @@ class ReplicaState {
   // Add 'round' to the set of rounds waiting to be committed.
   Status AddPendingOperation(const scoped_refptr<ConsensusRound>& round);
 
-  // Marks ReplicaTransactions up to 'id' as majority replicated, meaning the
-  // transaction may Apply() (immediately if Prepare() has completed or when 
Prepare()
-  // completes, if not).
-  //
-  // If this advanced the committed index, sets *committed_index_changed to 
true.
-  Status UpdateMajorityReplicatedUnlocked(const OpId& majority_replicated,
-                                          OpId* committed_index,
-                                          bool* committed_index_changed);
-
   // Advances the committed index.
   // This is a no-op if the committed index has not changed.
-  // Returns in '*committed_index_changed' whether the operation actually 
advanced
-  // the index.
-  Status AdvanceCommittedIndexUnlocked(const OpId& committed_index,
-                                       bool* committed_index_changed);
+  Status AdvanceCommittedIndexUnlocked(int64_t committed_index);
+
+  // Set the committed op during startup. This should be done after
+  // appending any of the pending transactions, and will take care
+  // of triggering any that are now considered committed.
+  Status SetInitialCommittedOpIdUnlocked(const OpId& committed_op);
 
   // Returns the watermark below which all operations are known to
   // be committed according to consensus.
   //
   // This must be called under a lock.
-  const OpId& GetCommittedOpIdUnlocked() const;
+  int64_t GetCommittedIndexUnlocked() const;
+  int64_t GetTermWithLastCommittedOpUnlocked() const;
 
   // Returns OK iff an op from the current term has been committed.
   Status CheckHasCommittedOpInCurrentTermUnlocked() const;
@@ -378,9 +366,12 @@ class ReplicaState {
   // involved in resetting this every time a new node becomes leader.
   OpId last_received_op_id_current_leader_;
 
-  // The id of the Apply that was last triggered when the last message from 
the leader
+  // The OpId of the Apply that was last triggered when the last message from 
the leader
   // was received. Initialized to MinimumOpId().
-  OpId last_committed_index_;
+  //
+  // TODO: are there cases where this doesn't track the actual commit index,
+  // if there are no-ops?
+  OpId last_committed_op_id_;
 
   State state_;
 };

http://git-wip-us.apache.org/repos/asf/kudu/blob/ce0fcd4d/src/kudu/integration-tests/raft_consensus-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/raft_consensus-itest.cc 
b/src/kudu/integration-tests/raft_consensus-itest.cc
index 387c048..63be00b 100644
--- a/src/kudu/integration-tests/raft_consensus-itest.cc
+++ b/src/kudu/integration-tests/raft_consensus-itest.cc
@@ -973,6 +973,7 @@ TEST_F(RaftConsensusITest, 
MultiThreadedInsertWithFailovers) {
   }
 
   for (CountDownLatch* latch : latches) {
+    NO_FATALS(cluster_->AssertNoCrashes());
     latch->Wait();
     StopOrKillLeaderAndElectNewOne();
   }
@@ -1175,7 +1176,7 @@ TEST_F(RaftConsensusITest, TestReplicaBehaviorViaRPC) {
   req.set_dest_uuid(replica_ts->uuid());
   req.set_caller_uuid("fake_caller");
   req.set_caller_term(2);
-  req.mutable_committed_index()->CopyFrom(MakeOpId(1, 1));
+  req.set_committed_index(1);
   req.mutable_preceding_id()->CopyFrom(MakeOpId(1, 1));
 
   ASSERT_OK(c_proxy->UpdateConsensus(req, &resp, &rpc));
@@ -1234,7 +1235,7 @@ TEST_F(RaftConsensusITest, TestReplicaBehaviorViaRPC) {
   req.clear_ops();
   req.mutable_preceding_id()->CopyFrom(MakeOpId(2, 2));
   AddOp(MakeOpId(2, 3), &req);
-  req.mutable_committed_index()->CopyFrom(MakeOpId(2, 4));
+  req.set_committed_index(4);
   rpc.Reset();
   ASSERT_OK(c_proxy->UpdateConsensus(req, &resp, &rpc));
   ASSERT_FALSE(resp.has_error()) << resp.DebugString();
@@ -1249,7 +1250,7 @@ TEST_F(RaftConsensusITest, TestReplicaBehaviorViaRPC) {
   resp.Clear();
   req.clear_ops();
   // Now send some more ops, and commit the earlier ones.
-  req.mutable_committed_index()->CopyFrom(MakeOpId(2, 4));
+  req.set_committed_index(4);
   req.mutable_preceding_id()->CopyFrom(MakeOpId(2, 4));
   AddOp(MakeOpId(2, 5), &req);
   AddOp(MakeOpId(2, 6), &req);
@@ -1314,7 +1315,7 @@ TEST_F(RaftConsensusITest, TestReplicaBehaviorViaRPC) {
   // the earlier ops.
   {
     req.mutable_preceding_id()->CopyFrom(MakeOpId(leader_term, 6));
-    req.mutable_committed_index()->CopyFrom(MakeOpId(leader_term, 6));
+    req.set_committed_index(6);
     req.clear_ops();
     rpc.Reset();
     ASSERT_OK(c_proxy->UpdateConsensus(req, &resp, &rpc));
@@ -2034,7 +2035,7 @@ TEST_F(RaftConsensusITest, 
TestEarlyCommitDespiteMemoryPressure) {
   req.set_tablet_id(tablet_id_);
   req.set_caller_uuid(tservers[2]->instance_id.permanent_uuid());
   req.set_caller_term(1);
-  req.mutable_committed_index()->CopyFrom(MakeOpId(1, 1));
+  req.set_committed_index(1);
   req.mutable_preceding_id()->CopyFrom(MakeOpId(1, 1));
   for (int i = 0; i < kNumOps; i++) {
     AddOp(MakeOpId(1, 2 + i), &req);
@@ -2058,7 +2059,7 @@ TEST_F(RaftConsensusITest, 
TestEarlyCommitDespiteMemoryPressure) {
   // 1. Replicate just one new operation.
   // 2. Tell the follower that the previous set of operations were committed.
   req.mutable_preceding_id()->CopyFrom(last_opid);
-  req.mutable_committed_index()->CopyFrom(last_opid);
+  req.set_committed_index(last_opid.index());
   req.mutable_ops()->Clear();
   AddOp(MakeOpId(1, last_opid.index() + 1), &req);
   rpc.Reset();
@@ -2540,7 +2541,7 @@ TEST_F(RaftConsensusITest, 
TestUpdateConsensusErrorNonePrepared) {
   req.set_tablet_id(tablet_id_);
   req.set_caller_uuid(tservers[2]->instance_id.permanent_uuid());
   req.set_caller_term(0);
-  req.mutable_committed_index()->CopyFrom(MakeOpId(0, 0));
+  req.set_committed_index(0);
   req.mutable_preceding_id()->CopyFrom(MakeOpId(0, 0));
   for (int i = 0; i < kNumOps; i++) {
     AddOp(MakeOpId(0, 1 + i), &req);

Reply via email to