KUDU-1135 (part 1): avoid flushing cmeta to disk twice when voting

This adds a flag to the term advancement calls which prevents it from
flushing to disk. We use this flag during voting when we know we are
about to flush our vote to disk immediately following the term
advancement.

This is part 1: another patch will do the same optimization for the case
of starting an election.

Change-Id: Iecc55bc9e96dcdc918ede1190b7cbac719f95506
Reviewed-on: http://gerrit.cloudera.org:8080/4333
Reviewed-by: Mike Percy <mpe...@apache.org>
Reviewed-by: David Ribeiro Alves <dral...@apache.org>
Tested-by: David Ribeiro Alves <dral...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/d87e3ff6
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/d87e3ff6
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/d87e3ff6

Branch: refs/heads/master
Commit: d87e3ff6036ba070710ecf383448bdfb8380378f
Parents: 9911c48
Author: Todd Lipcon <t...@apache.org>
Authored: Wed Sep 7 20:37:51 2016 -0700
Committer: Todd Lipcon <t...@apache.org>
Committed: Mon Sep 19 21:24:57 2016 +0000

----------------------------------------------------------------------
 src/kudu/consensus/consensus_meta.cc            |  5 +++-
 src/kudu/consensus/consensus_meta.h             |  7 ++++++
 src/kudu/consensus/raft_consensus.cc            | 26 ++++++++++++++------
 src/kudu/consensus/raft_consensus.h             |  7 ++++--
 .../consensus/raft_consensus_quorum-test.cc     | 26 ++++++++++++++++++++
 src/kudu/consensus/raft_consensus_state.cc      |  7 ++++--
 src/kudu/consensus/raft_consensus_state.h       | 16 +++++++++++-
 7 files changed, 80 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/d87e3ff6/src/kudu/consensus/consensus_meta.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/consensus_meta.cc 
b/src/kudu/consensus/consensus_meta.cc
index 9426ceb..cdb4186 100644
--- a/src/kudu/consensus/consensus_meta.cc
+++ b/src/kudu/consensus/consensus_meta.cc
@@ -186,6 +186,8 @@ void 
ConsensusMetadata::MergeCommittedConsensusStatePB(const ConsensusStatePB& c
 
 Status ConsensusMetadata::Flush() {
   SCOPED_LOG_SLOW_EXECUTION_PREFIX(WARNING, 500, LogPrefix(), "flushing 
consensus metadata");
+
+  flush_count_for_tests_++;
   // Sanity test to ensure we never write out a bad configuration.
   RETURN_NOT_OK_PREPEND(VerifyRaftConfig(pb_.committed_config(), 
COMMITTED_QUORUM),
                         "Invalid config in ConsensusMetadata, cannot flush to 
disk");
@@ -222,7 +224,8 @@ ConsensusMetadata::ConsensusMetadata(FsManager* fs_manager,
     : fs_manager_(CHECK_NOTNULL(fs_manager)),
       tablet_id_(std::move(tablet_id)),
       peer_uuid_(std::move(peer_uuid)),
-      has_pending_config_(false) {}
+      has_pending_config_(false),
+      flush_count_for_tests_(0) {}
 
 std::string ConsensusMetadata::LogPrefix() const {
   return Substitute("T $0 P $1: ", tablet_id_, peer_uuid_);

http://git-wip-us.apache.org/repos/asf/kudu/blob/d87e3ff6/src/kudu/consensus/consensus_meta.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/consensus_meta.h 
b/src/kudu/consensus/consensus_meta.h
index a8d5f93..9b014fa 100644
--- a/src/kudu/consensus/consensus_meta.h
+++ b/src/kudu/consensus/consensus_meta.h
@@ -138,6 +138,10 @@ class ConsensusMetadata {
   // Persist current state of the protobuf to disk.
   Status Flush();
 
+  int flush_count_for_tests() const {
+    return flush_count_for_tests_;
+  }
+
  private:
   ConsensusMetadata(FsManager* fs_manager, std::string tablet_id,
                     std::string peer_uuid);
@@ -162,6 +166,9 @@ class ConsensusMetadata {
   // Cached role of the peer_uuid_ within the active configuration.
   RaftPeerPB::Role active_role_;
 
+  // The number of times the metadata has been flushed to disk.
+  int flush_count_for_tests_;
+
   // Durable fields.
   ConsensusMetadataPB pb_;
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/d87e3ff6/src/kudu/consensus/raft_consensus.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus.cc 
b/src/kudu/consensus/raft_consensus.cc
index 7a302b5..77125e1 100644
--- a/src/kudu/consensus/raft_consensus.cc
+++ b/src/kudu/consensus/raft_consensus.cc
@@ -378,6 +378,8 @@ Status RaftConsensus::StartElection(ElectionMode mode) {
     }
 
     // Increment the term.
+    // TODO: this causes an extra flush of the consensus metadata which
+    // will be flushed again for our vote below. Consolidate these.
     RETURN_NOT_OK(IncrementTermUnlocked());
 
     // Snooze to avoid the election timer firing again as much as possible.
@@ -1365,17 +1367,24 @@ Status RaftConsensus::RequestVote(const VoteRequestPB* 
request, VoteResponsePB*
     return RequestVoteRespondAlreadyVotedForOther(request, response);
   }
 
-  // The term advanced.
+  // Candidate must have last-logged OpId at least as large as our own to get
+  // our vote.
+  OpId local_last_logged_opid = GetLatestOpIdFromLog();
+  bool vote_yes = !OpIdLessThan(request->candidate_status().last_received(),
+                                local_last_logged_opid);
+
+  // Record the term advancement if necessary.
   if (request->candidate_term() > state_->GetCurrentTermUnlocked()) {
-    RETURN_NOT_OK_PREPEND(HandleTermAdvanceUnlocked(request->candidate_term()),
+    // If we are going to vote for this peer, then we will flush the consensus 
metadata
+    // to disk below when we record the vote, and we can skip flushing the 
term advancement
+    // to disk here.
+    auto flush = vote_yes ? ReplicaState::SKIP_FLUSH_TO_DISK : 
ReplicaState::FLUSH_TO_DISK;
+    RETURN_NOT_OK_PREPEND(HandleTermAdvanceUnlocked(request->candidate_term(), 
flush),
         Substitute("Could not step down in RequestVote. Current term: $0, 
candidate term: $1",
                    state_->GetCurrentTermUnlocked(), 
request->candidate_term()));
   }
 
-  // Candidate must have last-logged OpId at least as large as our own to get
-  // our vote.
-  OpId local_last_logged_opid = GetLatestOpIdFromLog();
-  if (OpIdLessThan(request->candidate_status().last_received(), 
local_last_logged_opid)) {
+  if (!vote_yes) {
     return RequestVoteRespondLastOpIdTooOld(local_last_logged_opid, request, 
response);
   }
 
@@ -2017,7 +2026,8 @@ Status RaftConsensus::IncrementTermUnlocked() {
   return HandleTermAdvanceUnlocked(state_->GetCurrentTermUnlocked() + 1);
 }
 
-Status RaftConsensus::HandleTermAdvanceUnlocked(ConsensusTerm new_term) {
+Status RaftConsensus::HandleTermAdvanceUnlocked(ConsensusTerm new_term,
+                                                ReplicaState::FlushToDisk 
flush) {
   if (new_term <= state_->GetCurrentTermUnlocked()) {
     return Status::IllegalState(Substitute("Can't advance term to: $0 current 
term: $1 is higher.",
                                            new_term, 
state_->GetCurrentTermUnlocked()));
@@ -2029,7 +2039,7 @@ Status 
RaftConsensus::HandleTermAdvanceUnlocked(ConsensusTerm new_term) {
   }
 
   LOG_WITH_PREFIX_UNLOCKED(INFO) << "Advancing to term " << new_term;
-  RETURN_NOT_OK(state_->SetCurrentTermUnlocked(new_term));
+  RETURN_NOT_OK(state_->SetCurrentTermUnlocked(new_term, flush));
   term_metric_->set_value(new_term);
   return Status::OK();
 }

http://git-wip-us.apache.org/repos/asf/kudu/blob/d87e3ff6/src/kudu/consensus/raft_consensus.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus.h 
b/src/kudu/consensus/raft_consensus.h
index d0f9fe0..90b10b6 100644
--- a/src/kudu/consensus/raft_consensus.h
+++ b/src/kudu/consensus/raft_consensus.h
@@ -29,6 +29,7 @@
 #include "kudu/consensus/consensus.pb.h"
 #include "kudu/consensus/consensus_meta.h"
 #include "kudu/consensus/consensus_queue.h"
+#include "kudu/consensus/raft_consensus_state.h"
 #include "kudu/util/atomic.h"
 #include "kudu/util/failure_detector.h"
 
@@ -55,7 +56,6 @@ class ConsensusMetadata;
 class Peer;
 class PeerProxyFactory;
 class PeerManager;
-class ReplicaState;
 struct ElectionResult;
 
 class RaftConsensus : public Consensus,
@@ -405,7 +405,10 @@ class RaftConsensus : public Consensus,
   Status IncrementTermUnlocked();
 
   // Handle when the term has advanced beyond the current term.
-  Status HandleTermAdvanceUnlocked(ConsensusTerm new_term);
+  //
+  // 'flush' may be used to control whether the term change is flushed to disk.
+  Status HandleTermAdvanceUnlocked(ConsensusTerm new_term,
+                                   ReplicaState::FlushToDisk flush = 
ReplicaState::FLUSH_TO_DISK);
 
   // Asynchronously (on thread_pool_) notify the tablet peer that the 
consensus configuration
   // has changed, thus reporting it back to the master.

http://git-wip-us.apache.org/repos/asf/kudu/blob/d87e3ff6/src/kudu/consensus/raft_consensus_quorum-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus_quorum-test.cc 
b/src/kudu/consensus/raft_consensus_quorum-test.cc
index 24428fc..648cb1d 100644
--- a/src/kudu/consensus/raft_consensus_quorum-test.cc
+++ b/src/kudu/consensus/raft_consensus_quorum-test.cc
@@ -1026,6 +1026,10 @@ TEST_F(RaftConsensusQuorumTest, TestRequestVote) {
   const int kPeerIndex = 1;
   scoped_refptr<RaftConsensus> peer;
   CHECK_OK(peers_->GetPeerByIdx(kPeerIndex, &peer));
+  auto flush_count = [&]() {
+    return peer->GetReplicaStateForTests()
+      ->consensus_metadata_for_tests()->flush_count_for_tests();
+  };
 
   VoteRequestPB request;
   request.set_tablet_id(kTestTablet);
@@ -1033,29 +1037,39 @@ TEST_F(RaftConsensusQuorumTest, TestRequestVote) {
 
   // Test that the replica won't vote since it has recently heard from
   // a valid leader.
+  int flush_count_before = flush_count();
   VoteResponsePB response;
   request.set_candidate_uuid("peer-0");
   request.set_candidate_term(last_op_id.term() + 1);
   ASSERT_OK(peer->RequestVote(&request, &response));
   ASSERT_FALSE(response.vote_granted());
   ASSERT_EQ(ConsensusErrorPB::LEADER_IS_ALIVE, 
response.consensus_error().code());
+  ASSERT_EQ(0, flush_count() - flush_count_before)
+      << "A rejected vote should not flush metadata";
 
   // Test that replicas only vote yes for a single peer per term.
 
   // Indicate that replicas should vote even if they think another leader is 
alive.
   // This will allow the rest of the requests in the test to go through.
+  flush_count_before = flush_count();
   request.set_ignore_live_leader(true);
   ASSERT_OK(peer->RequestVote(&request, &response));
   ASSERT_TRUE(response.vote_granted());
   ASSERT_EQ(last_op_id.term() + 1, response.responder_term());
   ASSERT_NO_FATAL_FAILURE(AssertDurableTermAndVote(kPeerIndex, 
last_op_id.term() + 1, "peer-0"));
+  ASSERT_EQ(1, flush_count() - flush_count_before)
+      << "A granted vote should flush only once";
 
   // Ensure we get same response for same term and same UUID.
   response.Clear();
+  flush_count_before = flush_count();
   ASSERT_OK(peer->RequestVote(&request, &response));
   ASSERT_TRUE(response.vote_granted());
+  ASSERT_EQ(0, flush_count() - flush_count_before)
+      << "Confirming a previous vote should not flush";
 
   // Ensure we get a "no" for a different candidate UUID for that term.
+  flush_count_before = flush_count();
   response.Clear();
   request.set_candidate_uuid("peer-2");
   ASSERT_OK(peer->RequestVote(&request, &response));
@@ -1064,6 +1078,8 @@ TEST_F(RaftConsensusQuorumTest, TestRequestVote) {
   ASSERT_EQ(ConsensusErrorPB::ALREADY_VOTED, 
response.consensus_error().code());
   ASSERT_EQ(last_op_id.term() + 1, response.responder_term());
   ASSERT_NO_FATAL_FAILURE(AssertDurableTermAndVote(kPeerIndex, 
last_op_id.term() + 1, "peer-0"));
+  ASSERT_EQ(0, flush_count() - flush_count_before)
+      << "Rejected votes for same term should not flush";
 
   //
   // Test that replicas refuse votes for an old term.
@@ -1071,6 +1087,7 @@ TEST_F(RaftConsensusQuorumTest, TestRequestVote) {
 
   // Increase the term of our candidate, which will cause the voter replica to
   // increase its own term to match.
+  flush_count_before = flush_count();
   request.set_candidate_uuid("peer-0");
   request.set_candidate_term(last_op_id.term() + 2);
   response.Clear();
@@ -1078,10 +1095,13 @@ TEST_F(RaftConsensusQuorumTest, TestRequestVote) {
   ASSERT_TRUE(response.vote_granted());
   ASSERT_EQ(last_op_id.term() + 2, response.responder_term());
   ASSERT_NO_FATAL_FAILURE(AssertDurableTermAndVote(kPeerIndex, 
last_op_id.term() + 2, "peer-0"));
+  ASSERT_EQ(1, flush_count() - flush_count_before)
+      << "Accepted votes with increased term should flush once";
 
   // Now try the old term.
   // Note: Use the peer who "won" the election on the previous term (peer-0),
   // although in practice the impl does not store historical vote data.
+  flush_count_before = flush_count();
   request.set_candidate_term(last_op_id.term() + 1);
   response.Clear();
   ASSERT_OK(peer->RequestVote(&request, &response));
@@ -1090,11 +1110,14 @@ TEST_F(RaftConsensusQuorumTest, TestRequestVote) {
   ASSERT_EQ(ConsensusErrorPB::INVALID_TERM, response.consensus_error().code());
   ASSERT_EQ(last_op_id.term() + 2, response.responder_term());
   ASSERT_NO_FATAL_FAILURE(AssertDurableTermAndVote(kPeerIndex, 
last_op_id.term() + 2, "peer-0"));
+  ASSERT_EQ(0, flush_count() - flush_count_before)
+      << "Rejected votes for old terms should not flush";
 
   //
   // Ensure replicas vote no for an old op index.
   //
 
+  flush_count_before = flush_count();
   request.set_candidate_uuid("peer-0");
   request.set_candidate_term(last_op_id.term() + 3);
   
request.mutable_candidate_status()->mutable_last_received()->CopyFrom(MinimumOpId());
@@ -1105,6 +1128,9 @@ TEST_F(RaftConsensusQuorumTest, TestRequestVote) {
   ASSERT_EQ(ConsensusErrorPB::LAST_OPID_TOO_OLD, 
response.consensus_error().code());
   ASSERT_EQ(last_op_id.term() + 3, response.responder_term());
   ASSERT_NO_FATAL_FAILURE(AssertDurableTermWithoutVote(kPeerIndex, 
last_op_id.term() + 3));
+  ASSERT_EQ(1, flush_count() - flush_count_before)
+      << "Rejected votes for old op index but new term should flush once.";
+
 
   // Send a "heartbeat" to the peer. It should be rejected.
   ConsensusRequestPB req;

http://git-wip-us.apache.org/repos/asf/kudu/blob/d87e3ff6/src/kudu/consensus/raft_consensus_state.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus_state.cc 
b/src/kudu/consensus/raft_consensus_state.cc
index 92222bf..7763cb6 100644
--- a/src/kudu/consensus/raft_consensus_state.cc
+++ b/src/kudu/consensus/raft_consensus_state.cc
@@ -280,7 +280,8 @@ bool ReplicaState::IsOpCommittedOrPending(const OpId& 
op_id, bool* term_mismatch
   return true;
 }
 
-Status ReplicaState::SetCurrentTermUnlocked(int64_t new_term) {
+Status ReplicaState::SetCurrentTermUnlocked(int64_t new_term,
+                                            FlushToDisk flush) {
   TRACE_EVENT1("consensus", "ReplicaState::SetCurrentTermUnlocked",
                "term", new_term);
   DCHECK(update_lock_.is_locked());
@@ -291,7 +292,9 @@ Status ReplicaState::SetCurrentTermUnlocked(int64_t 
new_term) {
   }
   cmeta_->set_current_term(new_term);
   cmeta_->clear_voted_for();
-  CHECK_OK(cmeta_->Flush());
+  if (flush == FLUSH_TO_DISK) {
+    CHECK_OK(cmeta_->Flush());
+  }
   ClearLeaderUnlocked();
   last_received_op_id_current_leader_ = MinimumOpId();
   return Status::OK();

http://git-wip-us.apache.org/repos/asf/kudu/blob/d87e3ff6/src/kudu/consensus/raft_consensus_state.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus_state.h 
b/src/kudu/consensus/raft_consensus_state.h
index efcaff2..f73cccd 100644
--- a/src/kudu/consensus/raft_consensus_state.h
+++ b/src/kudu/consensus/raft_consensus_state.h
@@ -194,9 +194,19 @@ class ReplicaState {
   // otherwise return the committed configuration.
   const RaftConfigPB& GetActiveConfigUnlocked() const;
 
+  // Enum for the 'flush' argument to SetCurrentTermUnlocked() below.
+  enum FlushToDisk {
+    SKIP_FLUSH_TO_DISK,
+    FLUSH_TO_DISK
+  };
+
   // Checks if the term change is legal. If so, sets 'current_term'
   // to 'new_term' and sets 'has voted' to no for the current term.
-  Status SetCurrentTermUnlocked(int64_t new_term) WARN_UNUSED_RESULT;
+  //
+  // If the caller knows that it will call another method soon after
+  // to flush the change to disk, it may set 'flush' to 'SKIP_FLUSH_TO_DISK'.
+  Status SetCurrentTermUnlocked(int64_t new_term,
+                                FlushToDisk flush) WARN_UNUSED_RESULT;
 
   // Returns the term set in the last config change round.
   const int64_t GetCurrentTermUnlocked() const;
@@ -326,6 +336,10 @@ class ReplicaState {
   // The update_lock_ must be held.
   ReplicaState::State state() const;
 
+  ConsensusMetadata* consensus_metadata_for_tests() {
+    return cmeta_.get();
+  }
+
  private:
   const ConsensusOptions options_;
 

Reply via email to