Repository: kudu
Updated Branches:
  refs/heads/master 7aab411d3 -> ccdb6b557


[tablet] clean-up on replay of WAL entries

Simplified handling of the ownership of log entries read from disk.

This changelist also contains other changes in tablet_bootstrap.cc
and corresponding WAL utilities:
  * reduced number of memory allocations while reading WAL entries
  * replaced gscoped_ptr with std::unique_ptr
  * other minor updates around WAL-related utilities and tests

Change-Id: Ie39516b09e4c756e8af1d8e1a5604672c96a80cb
Reviewed-on: http://gerrit.cloudera.org:8080/11032
Tested-by: Alexey Serbin <aser...@cloudera.com>
Reviewed-by: Adar Dembo <a...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/ccdb6b55
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/ccdb6b55
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/ccdb6b55

Branch: refs/heads/master
Commit: ccdb6b557d440423843952cace3e7d351a743fa6
Parents: 7aab411
Author: Alexey Serbin <aser...@cloudera.com>
Authored: Thu Jul 19 17:49:17 2018 -0700
Committer: Alexey Serbin <aser...@cloudera.com>
Committed: Tue Jul 24 20:24:57 2018 +0000

----------------------------------------------------------------------
 src/kudu/consensus/log-test-base.h              |   7 +-
 src/kudu/consensus/log-test.cc                  |  41 ++-
 src/kudu/consensus/log_reader.cc                |   6 +-
 src/kudu/consensus/log_reader.h                 |   3 +-
 src/kudu/consensus/log_util.cc                  |  49 +--
 src/kudu/consensus/log_util.h                   |  13 +-
 .../consensus/raft_consensus_quorum-test.cc     |  46 ++-
 src/kudu/integration-tests/log_verifier.cc      |   6 +-
 src/kudu/tablet/tablet_bootstrap.cc             | 304 ++++++++++---------
 src/kudu/tools/tool_action_common.cc            |  14 +-
 src/kudu/tools/tool_action_local_replica.cc     |   6 +-
 11 files changed, 256 insertions(+), 239 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/ccdb6b55/src/kudu/consensus/log-test-base.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/log-test-base.h 
b/src/kudu/consensus/log-test-base.h
index 987b769..89b39d0 100644
--- a/src/kudu/consensus/log-test-base.h
+++ b/src/kudu/consensus/log-test-base.h
@@ -22,6 +22,7 @@
 #include <glog/logging.h>
 #include <gtest/gtest.h>
 
+#include <memory>
 #include <string>
 #include <utility>
 #include <vector>
@@ -32,6 +33,7 @@
 #include "kudu/common/wire_protocol-test-util.h"
 #include "kudu/consensus/log_anchor_registry.h"
 #include "kudu/consensus/log_reader.h"
+#include "kudu/consensus/log_util.h"
 #include "kudu/consensus/metadata.pb.h"
 #include "kudu/consensus/opid_util.h"
 #include "kudu/fs/fs_manager.h"
@@ -164,7 +166,6 @@ class LogTestBase : public KuduTest {
 
   void TearDown() override {
     KuduTest::TearDown();
-    STLDeleteElements(&entries_);
   }
 
   Status BuildLog() {
@@ -196,7 +197,7 @@ class LogTestBase : public KuduTest {
   }
 
   void EntriesToIdList(std::vector<uint32_t>* ids) {
-    for (const LogEntryPB* entry : entries_) {
+    for (const auto& entry : entries_) {
       VLOG(2) << "Entry contents: " << pb_util::SecureDebugString(*entry);
       if (entry->type() == REPLICATE) {
         ids->push_back(entry->replicate().id().index());
@@ -381,7 +382,7 @@ class LogTestBase : public KuduTest {
   int64_t current_index_;
   LogOptions options_;
   // Reusable entries vector that deletes the entries on destruction.
-  std::vector<LogEntryPB* > entries_;
+  LogEntries entries_;
   scoped_refptr<LogAnchorRegistry> log_anchor_registry_;
   scoped_refptr<clock::Clock> clock_;
 };

http://git-wip-us.apache.org/repos/asf/kudu/blob/ccdb6b55/src/kudu/consensus/log-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/log-test.cc b/src/kudu/consensus/log-test.cc
index e92f112..ebb4aad 100644
--- a/src/kudu/consensus/log-test.cc
+++ b/src/kudu/consensus/log-test.cc
@@ -193,11 +193,10 @@ TEST_P(LogTestOptionalCompression, 
TestMultipleEntriesInABatch) {
   // RollOver() the batch so that we have a properly formed footer.
   ASSERT_OK(log_->AllocateSegmentAndRollOver());
 
-  vector<LogEntryPB*> entries;
-  ElementDeleter deleter(&entries);
   SegmentSequence segments;
   ASSERT_OK(log_->reader()->GetSegmentsSnapshot(&segments));
 
+  LogEntries entries;
   ASSERT_OK(segments[0]->ReadEntries(&entries));
 
   ASSERT_EQ(2, entries.size());
@@ -285,14 +284,13 @@ TEST_P(LogTestOptionalCompression, TestLogNotTrimmed) {
 
   AppendNoOp(&opid);
 
-  vector<LogEntryPB*> entries;
-  ElementDeleter deleter(&entries);
   SegmentSequence segments;
   ASSERT_OK(log_->reader()->GetSegmentsSnapshot(&segments));
 
+  LogEntries entries;
   ASSERT_OK(segments[0]->ReadEntries(&entries));
   // Close after testing to ensure correct shutdown
-  // TODO : put this in TearDown() with a test on log state?
+  // TODO(unknown): put this in TearDown() with a test on log state?
   ASSERT_OK(log_->Close());
 }
 
@@ -306,15 +304,14 @@ TEST_P(LogTestOptionalCompression, TestBlankLogFile) {
   ASSERT_EQ(log_->reader()->num_segments(), 1);
 
   // ...and we're able to read from it.
-  vector<LogEntryPB*> entries;
-  ElementDeleter deleter(&entries);
   SegmentSequence segments;
   ASSERT_OK(log_->reader()->GetSegmentsSnapshot(&segments));
 
+  LogEntries entries;
   ASSERT_OK(segments[0]->ReadEntries(&entries));
 
   // ...It's just that it's empty.
-  ASSERT_EQ(entries.size(), 0);
+  ASSERT_TRUE(entries.empty());
 }
 
 void LogTest::DoCorruptionTest(CorruptionType type, CorruptionPosition place,
@@ -354,7 +351,7 @@ void LogTest::DoCorruptionTest(CorruptionType type, 
CorruptionPosition place,
   ASSERT_OK(reader->GetSegmentsSnapshot(&segments));
   Status s = segments[0]->ReadEntries(&entries_);
   ASSERT_EQ(s.CodeAsString(), expected_status.CodeAsString())
-    << "Got unexpected status: " << s.ToString();
+      << "Got unexpected status: " << s.ToString();
 
   // Last entry is ignored, but we should still see the previous ones.
   ASSERT_EQ(expected_entries, entries_.size());
@@ -440,12 +437,11 @@ TEST_F(LogTest, 
TestWriteAndReadToAndFromInProgressSegment) {
   ASSERT_GT(header_size, 0);
   readable_segment->UpdateReadableToOffset(header_size);
 
-  vector<LogEntryPB*> entries;
-
   // Reading the readable segment now should return OK but yield no
   // entries.
+  LogEntries entries;
   ASSERT_OK(readable_segment->ReadEntries(&entries));
-  ASSERT_EQ(entries.size(), 0);
+  ASSERT_TRUE(entries.empty());
 
   // Dummy add_entry to help us estimate the size of what
   // gets written to disk.
@@ -469,18 +465,18 @@ TEST_F(LogTest, 
TestWriteAndReadToAndFromInProgressSegment) {
   // Updating the readable segment with the offset of the first entry should
   // make it read a single entry even though there are several in the log.
   readable_segment->UpdateReadableToOffset(header_size + single_entry_size);
+  entries.clear();
   ASSERT_OK(readable_segment->ReadEntries(&entries));
-  ASSERT_EQ(entries.size(), 1);
-  STLDeleteElements(&entries);
+  ASSERT_EQ(1, entries.size());
 
   // Now append another entry so that the Log sets the correct readable offset
   // on the reader.
   ASSERT_OK(AppendNoOps(&op_id, 1, &written_entries_size));
 
   // Now the reader should be able to read all 5 entries.
+  entries.clear();
   ASSERT_OK(readable_segment->ReadEntries(&entries));
-  ASSERT_EQ(entries.size(), 5);
-  STLDeleteElements(&entries);
+  ASSERT_EQ(5, entries.size());
 
   // Offset should get updated for an additional entry.
   ASSERT_EQ(single_entry_size * (kNumEntries + 1) + header_size,
@@ -495,11 +491,11 @@ TEST_F(LogTest, 
TestWriteAndReadToAndFromInProgressSegment) {
   // Now that we closed the original segment. If we get a segment from the 
reader
   // again, we should get one with a footer and we should be able to read all 
entries.
   ASSERT_OK(log_->reader()->GetSegmentsSnapshot(&segments));
-  ASSERT_EQ(segments.size(), 2);
+  ASSERT_EQ(2, segments.size());
   readable_segment = segments[0];
+  entries.clear();
   ASSERT_OK(readable_segment->ReadEntries(&entries));
-  ASSERT_EQ(entries.size(), 5);
-  STLDeleteElements(&entries);
+  ASSERT_EQ(5, entries.size());
 
   // Offset should get updated for an additional entry, again.
   ASSERT_OK(AppendNoOp(&op_id, &written_entries_size));
@@ -653,7 +649,7 @@ TEST_P(LogTestOptionalCompression, TestWaitUntilAllFlushed) 
{
   ASSERT_OK(log_->reader()->GetSegmentsSnapshot(&segments));
 
   ASSERT_OK(segments[0]->ReadEntries(&entries_));
-  ASSERT_EQ(entries_.size(), 4);
+  ASSERT_EQ(4, entries_.size());
   for (int i = 0; i < 4 ; i++) {
     if (i % 2 == 0) {
       ASSERT_TRUE(entries_[i]->has_replicate());
@@ -761,7 +757,7 @@ TEST_P(LogTestOptionalCompression, TestWriteManyBatches) {
     ASSERT_OK(reader->GetSegmentsSnapshot(&segments));
 
     for (const scoped_refptr<ReadableLogSegment>& entry : segments) {
-      STLDeleteElements(&entries_);
+      entries_.clear();
       ASSERT_OK(entry->ReadEntries(&entries_));
       num_entries += entries_.size();
     }
@@ -817,8 +813,7 @@ TEST_P(LogTestOptionalCompression, 
TestLogReaderReturnsLatestSegmentIfIndexEmpty
   ASSERT_OK(log_->reader()->GetSegmentsSnapshot(&segments));
   ASSERT_EQ(segments.size(), 1);
 
-  vector<LogEntryPB*> entries;
-  ElementDeleter deleter(&entries);
+  LogEntries entries;
   ASSERT_OK(segments[0]->ReadEntries(&entries));
   ASSERT_EQ(2, entries.size());
 }

http://git-wip-us.apache.org/repos/asf/kudu/blob/ccdb6b55/src/kudu/consensus/log_reader.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/log_reader.cc b/src/kudu/consensus/log_reader.cc
index 581f3ca..9fd09b5 100644
--- a/src/kudu/consensus/log_reader.cc
+++ b/src/kudu/consensus/log_reader.cc
@@ -18,6 +18,7 @@
 #include "kudu/consensus/log_reader.h"
 
 #include <algorithm>
+#include <memory>
 #include <mutex>
 #include <ostream>
 
@@ -57,6 +58,7 @@ using kudu::pb_util::SecureDebugString;
 using kudu::pb_util::SecureShortDebugString;
 using std::shared_ptr;
 using std::string;
+using std::unique_ptr;
 using std::vector;
 using strings::Substitute;
 
@@ -235,7 +237,7 @@ scoped_refptr<ReadableLogSegment> 
LogReader::GetSegmentBySequenceNumber(int64_t
 
 Status LogReader::ReadBatchUsingIndexEntry(const LogIndexEntry& index_entry,
                                            faststring* tmp_buf,
-                                           gscoped_ptr<LogEntryBatchPB>* 
batch) const {
+                                           unique_ptr<LogEntryBatchPB>* batch) 
const {
   const int64_t index = index_entry.op_id.index();
 
   scoped_refptr<ReadableLogSegment> segment = GetSegmentBySequenceNumber(
@@ -281,7 +283,7 @@ Status LogReader::ReadReplicatesInRange(int64_t starting_at,
   int64_t total_size = 0;
   bool limit_exceeded = false;
   faststring tmp_buf;
-  gscoped_ptr<LogEntryBatchPB> batch;
+  unique_ptr<LogEntryBatchPB> batch;
   for (int64_t index = starting_at; index <= up_to && !limit_exceeded; 
index++) {
     LogIndexEntry index_entry;
     RETURN_NOT_OK_PREPEND(log_index_->GetEntry(index, &index_entry),

http://git-wip-us.apache.org/repos/asf/kudu/blob/ccdb6b55/src/kudu/consensus/log_reader.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/log_reader.h b/src/kudu/consensus/log_reader.h
index e9c5e84..7d6ae2f 100644
--- a/src/kudu/consensus/log_reader.h
+++ b/src/kudu/consensus/log_reader.h
@@ -25,7 +25,6 @@
 #include <gtest/gtest_prod.h>
 
 #include "kudu/consensus/log_util.h"
-#include "kudu/gutil/gscoped_ptr.h"
 #include "kudu/gutil/macros.h"
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/util/locks.h"
@@ -170,7 +169,7 @@ class LogReader : public enable_make_shared<LogReader> {
   // 'tmp_buf' is used as scratch space to avoid extra allocation.
   Status ReadBatchUsingIndexEntry(const LogIndexEntry& index_entry,
                                   faststring* tmp_buf,
-                                  gscoped_ptr<LogEntryBatchPB>* batch) const;
+                                  std::unique_ptr<LogEntryBatchPB>* batch) 
const;
 
   // Reads the headers of all segments in 'tablet_wal_path'.
   Status Init(const std::string& tablet_wal_path);

http://git-wip-us.apache.org/repos/asf/kudu/blob/ccdb6b55/src/kudu/consensus/log_util.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/log_util.cc b/src/kudu/consensus/log_util.cc
index 0f0fd23..3680fd3 100644
--- a/src/kudu/consensus/log_util.cc
+++ b/src/kudu/consensus/log_util.cc
@@ -20,6 +20,7 @@
 #include <algorithm>
 #include <cstring>
 #include <iostream>
+#include <memory>
 
 #include <gflags/gflags.h>
 #include <glog/logging.h>
@@ -65,10 +66,7 @@ DEFINE_double(fault_crash_before_write_log_segment_header, 
0.0,
               "Fraction of the time we will crash just before writing the log 
segment header");
 TAG_FLAG(fault_crash_before_write_log_segment_header, unsafe);
 
-namespace kudu {
-namespace log {
-
-using consensus::OpId;
+using kudu::consensus::OpId;
 using std::string;
 using std::shared_ptr;
 using std::vector;
@@ -76,6 +74,9 @@ using std::unique_ptr;
 using strings::Substitute;
 using strings::SubstituteAndAppend;
 
+namespace kudu {
+namespace log {
+
 const char kLogSegmentHeaderMagicString[] = "kudulogf";
 
 // A magic that is written as the very last thing when a segment is closed.
@@ -128,7 +129,7 @@ LogEntryReader::LogEntryReader(ReadableLogSegment* seg)
 
 LogEntryReader::~LogEntryReader() {}
 
-Status LogEntryReader::ReadNextEntry(LogEntryPB* entry) {
+Status LogEntryReader::ReadNextEntry(unique_ptr<LogEntryPB>* entry) {
   // Refill pending_entries_ if none are available.
   while (pending_entries_.empty()) {
 
@@ -145,7 +146,7 @@ Status LogEntryReader::ReadNextEntry(LogEntryPB* entry) {
     }
 
     // We still expect to have more entries in the log.
-    gscoped_ptr<LogEntryBatchPB> current_batch;
+    unique_ptr<LogEntryBatchPB> current_batch;
 
     // Read and validate the entry header first.
     Status s;
@@ -182,7 +183,7 @@ Status LogEntryReader::ReadNextEntry(LogEntryPB* entry) {
         0, current_batch->entry_size(), nullptr);
   }
 
-  pending_entries_[0]->Swap(entry);
+  *entry = std::move(pending_entries_.front());
   pending_entries_.pop_front();
   return Status::OK();
 }
@@ -372,14 +373,15 @@ Status ReadableLogSegment::RebuildFooterByScanning() {
 
   LogSegmentFooterPB new_footer;
   int num_entries = 0;
-  LogEntryPB entry;
   while (true) {
+    unique_ptr<LogEntryPB> entry;
     Status s = reader.ReadNextEntry(&entry);
     if (s.IsEndOfFile()) break;
     RETURN_NOT_OK(s);
 
-    if (entry.has_replicate()) {
-      UpdateFooterForReplicateEntry(entry, &new_footer);
+    DCHECK(entry);
+    if (entry->has_replicate()) {
+      UpdateFooterForReplicateEntry(*entry, &new_footer);
     }
     num_entries++;
   }
@@ -545,17 +547,20 @@ Status 
ReadableLogSegment::ParseFooterMagicAndFooterLength(const Slice &data,
   return Status::OK();
 }
 
-Status ReadableLogSegment::ReadEntries(vector<LogEntryPB*>* entries) {
+Status ReadableLogSegment::ReadEntries(LogEntries* entries) {
   TRACE_EVENT1("log", "ReadableLogSegment::ReadEntries",
                "path", path_);
   LogEntryReader reader(this);
 
   while (true) {
-    unique_ptr<LogEntryPB> entry(new LogEntryPB());
-    Status s = reader.ReadNextEntry(entry.get());
-    if (s.IsEndOfFile()) break;
+    unique_ptr<LogEntryPB> entry;
+    Status s = reader.ReadNextEntry(&entry);
+    if (s.IsEndOfFile()) {
+      break;
+    }
     RETURN_NOT_OK(s);
-    entries->push_back(entry.release());
+    DCHECK(entry);
+    entries->emplace_back(std::move(entry));
   }
 
   return Status::OK();
@@ -573,8 +578,8 @@ Status ReadableLogSegment::ScanForValidEntryHeaders(int64_t 
offset, bool* has_va
           << "following offset " << offset << "...";
   *has_valid_entries = false;
 
-  const int kChunkSize = 1024 * 1024;
-  gscoped_ptr<uint8_t[]> buf(new uint8_t[kChunkSize]);
+  constexpr auto kChunkSize = 1024 * 1024;
+  unique_ptr<uint8_t[]> buf(new uint8_t[kChunkSize]);
 
   // We overlap the reads by the size of the header, so that if a header
   // spans chunks, we don't miss it.
@@ -611,7 +616,7 @@ Status ReadableLogSegment::ScanForValidEntryHeaders(int64_t 
offset, bool* has_va
 }
 
 Status ReadableLogSegment::ReadEntryHeaderAndBatch(int64_t* offset, 
faststring* tmp_buf,
-                                                   
gscoped_ptr<LogEntryBatchPB>* batch,
+                                                   
unique_ptr<LogEntryBatchPB>* batch,
                                                    EntryHeaderStatus* 
status_detail) {
   int64_t cur_offset = *offset;
   EntryHeader header;
@@ -681,10 +686,10 @@ EntryHeaderStatus ReadableLogSegment::DecodeEntryHeader(
 }
 
 
-Status ReadableLogSegment::ReadEntryBatch(int64_t *offset,
+Status ReadableLogSegment::ReadEntryBatch(int64_t* offset,
                                           const EntryHeader& header,
-                                          faststring *tmp_buf,
-                                          gscoped_ptr<LogEntryBatchPB> 
*entry_batch) {
+                                          faststring* tmp_buf,
+                                          unique_ptr<LogEntryBatchPB>* 
entry_batch) {
   TRACE_EVENT2("log", "ReadableLogSegment::ReadEntryBatch",
                "path", path_,
                "range", Substitute("offset=$0 entry_len=$1",
@@ -733,7 +738,7 @@ Status ReadableLogSegment::ReadEntryBatch(int64_t *offset,
     entry_batch_slice = Slice(uncompress_buf, header.msg_length);
   }
 
-  gscoped_ptr<LogEntryBatchPB> read_entry_batch(new LogEntryBatchPB());
+  unique_ptr<LogEntryBatchPB> read_entry_batch(new LogEntryBatchPB);
   s = pb_util::ParseFromArray(read_entry_batch.get(),
                               entry_batch_slice.data(),
                               header.msg_length);

http://git-wip-us.apache.org/repos/asf/kudu/blob/ccdb6b55/src/kudu/consensus/log_util.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/log_util.h b/src/kudu/consensus/log_util.h
index af81c82..9d504f3 100644
--- a/src/kudu/consensus/log_util.h
+++ b/src/kudu/consensus/log_util.h
@@ -32,7 +32,6 @@
 #include "kudu/consensus/log.pb.h"
 #include "kudu/consensus/opid.pb.h"
 #include "kudu/consensus/ref_counted_replicate.h"
-#include "kudu/gutil/gscoped_ptr.h"
 #include "kudu/gutil/macros.h"
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/util/atomic.h"
@@ -56,6 +55,8 @@ extern const size_t kEntryHeaderSizeV2;
 
 class ReadableLogSegment;
 
+typedef std::vector<std::unique_ptr<LogEntryPB>> LogEntries;
+
 // Options for the State Machine/Write Ahead Log
 struct LogOptions {
 
@@ -108,7 +109,7 @@ class LogEntryReader {
   // Read the next entry from the log, replacing the contents of 'entry'.
   //
   // When there are no more entries to read, returns Status::EndOfFile().
-  Status ReadNextEntry(LogEntryPB* entry);
+  Status ReadNextEntry(std::unique_ptr<LogEntryPB>* entry);
 
   // Return the offset of the next entry to be read from the file.
   int64_t offset() const {
@@ -206,7 +207,7 @@ class ReadableLogSegment : public 
RefCountedThreadSafe<ReadableLogSegment> {
   // If the log is corrupted (i.e. the returned 'Status' is 'Corruption') all
   // the log entries read up to the corrupted one are returned in the 'entries'
   // vector.
-  Status ReadEntries(std::vector<LogEntryPB*>* entries);
+  Status ReadEntries(LogEntries* entries);
 
   // Rebuilds this segment's footer by scanning its entries.
   // This is an expensive operation as it reads and parses the whole segment
@@ -337,7 +338,7 @@ class ReadableLogSegment : public 
RefCountedThreadSafe<ReadableLogSegment> {
   // If unsuccessful, '*offset' is not updated, and *status_detail will be 
updated
   // to indicate the cause of the error.
   Status ReadEntryHeaderAndBatch(int64_t* offset, faststring* tmp_buf,
-                                 gscoped_ptr<LogEntryBatchPB>* batch,
+                                 std::unique_ptr<LogEntryBatchPB>* batch,
                                  EntryHeaderStatus* status_detail);
 
   // Reads a log entry header from the segment.
@@ -357,10 +358,10 @@ class ReadableLogSegment : public 
RefCountedThreadSafe<ReadableLogSegment> {
 
   // Reads a log entry batch from the provided readable segment, which gets 
decoded
   // into 'entry_batch' and increments 'offset' by the batch's length.
-  Status ReadEntryBatch(int64_t *offset,
+  Status ReadEntryBatch(int64_t* offset,
                         const EntryHeader& header,
                         faststring* tmp_buf,
-                        gscoped_ptr<LogEntryBatchPB>* entry_batch);
+                        std::unique_ptr<LogEntryBatchPB>* entry_batch);
 
   void UpdateReadableToOffset(int64_t readable_to_offset);
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/ccdb6b55/src/kudu/consensus/raft_consensus_quorum-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus_quorum-test.cc 
b/src/kudu/consensus/raft_consensus_quorum-test.cc
index dc6cdb9..a6dbab0 100644
--- a/src/kudu/consensus/raft_consensus_quorum-test.cc
+++ b/src/kudu/consensus/raft_consensus_quorum-test.cc
@@ -80,13 +80,14 @@ DECLARE_bool(enable_leader_failure_detection);
 
 METRIC_DECLARE_entity(tablet);
 
-using kudu::pb_util::SecureShortDebugString;
 using kudu::log::Log;
 using kudu::log::LogEntryPB;
 using kudu::log::LogOptions;
 using kudu::log::LogReader;
+using kudu::pb_util::SecureShortDebugString;
 using std::shared_ptr;
 using std::string;
+using std::unique_ptr;
 using std::vector;
 using strings::Substitute;
 using strings::SubstituteAndAppend;
@@ -115,6 +116,8 @@ Status WaitUntilLeaderForTests(RaftConsensus* raft) {
 // without integrating with other components, such as transactions.
 class RaftConsensusQuorumTest : public KuduTest {
  public:
+  typedef vector<unique_ptr<LogEntryPB>> LogEntries;
+
   RaftConsensusQuorumTest()
     : clock_(clock::LogicalClock::CreateStartingAt(Timestamp(1))),
       metric_entity_(METRIC_ENTITY_tablet.Instantiate(&metric_registry_, 
"raft-test")),
@@ -360,11 +363,9 @@ class RaftConsensusQuorumTest : public KuduTest {
     }
 
     // Gather the replica and leader operations for printing
-    vector<LogEntryPB*> replica_ops;
-    ElementDeleter repl0_deleter(&replica_ops);
+    LogEntries replica_ops;
     GatherLogEntries(peer_idx, logs_[peer_idx], &replica_ops);
-    vector<LogEntryPB*> leader_ops;
-    ElementDeleter leader_deleter(&leader_ops);
+    LogEntries leader_ops;
     GatherLogEntries(leader_idx, logs_[leader_idx], &leader_ops);
     SCOPED_TRACE(PrintOnError(replica_ops, Substitute("local peer ($0)", 
peer->peer_uuid())));
     SCOPED_TRACE(PrintOnError(leader_ops, Substitute("leader (peer-$0)", 
leader_idx)));
@@ -423,7 +424,8 @@ class RaftConsensusQuorumTest : public KuduTest {
     }
   }
 
-  void GatherLogEntries(int idx, const scoped_refptr<Log>& log, 
vector<LogEntryPB* >* entries) {
+  void GatherLogEntries(int idx, const scoped_refptr<Log>& log,
+                        LogEntries* entries) {
     ASSERT_OK(log->WaitUntilAllFlushed());
     log->Close();
     shared_ptr<LogReader> log_reader;
@@ -432,16 +434,14 @@ class RaftConsensusQuorumTest : public KuduTest {
                                    kTestTablet,
                                    metric_entity_.get(),
                                    &log_reader));
-    vector<LogEntryPB*> ret;
-    ElementDeleter deleter(&ret);
     log::SegmentSequence segments;
     ASSERT_OK(log_reader->GetSegmentsSnapshot(&segments));
 
+    LogEntries ret;
     for (const log::SegmentSequence::value_type& entry : segments) {
       ASSERT_OK(entry->ReadEntries(&ret));
     }
-
-    entries->swap(ret);
+    *entries = std::move(ret);
   }
 
   // Verifies that the replica's log match the leader's. This deletes the
@@ -460,15 +460,13 @@ class RaftConsensusQuorumTest : public KuduTest {
       entry.second->Shutdown();
     }
 
-    vector<LogEntryPB*> leader_entries;
-    ElementDeleter leader_entry_deleter(&leader_entries);
+    LogEntries leader_entries;
     GatherLogEntries(leader_idx, logs_[leader_idx], &leader_entries);
     shared_ptr<RaftConsensus> leader;
     CHECK_OK(peers_->GetPeerByIdx(leader_idx, &leader));
 
     for (int replica_idx = first_replica_idx; replica_idx < last_replica_idx; 
replica_idx++) {
-      vector<LogEntryPB*> replica_entries;
-      ElementDeleter replica_entry_deleter(&replica_entries);
+      LogEntries replica_entries;
       GatherLogEntries(replica_idx, logs_[replica_idx], &replica_entries);
 
       shared_ptr<RaftConsensus> replica;
@@ -480,18 +478,18 @@ class RaftConsensusQuorumTest : public KuduTest {
     }
   }
 
-  void ExtractReplicateIds(const vector<LogEntryPB*>& entries,
+  void ExtractReplicateIds(const LogEntries& entries,
                            vector<OpId>* ids) {
     ids->reserve(entries.size() / 2);
-    for (const LogEntryPB* entry : entries) {
+    for (const auto& entry : entries) {
       if (entry->has_replicate()) {
         ids->push_back(entry->replicate().id());
       }
     }
   }
 
-  void VerifyReplicateOrderMatches(const vector<LogEntryPB*>& leader_entries,
-                                   const vector<LogEntryPB*>& replica_entries) 
{
+  void VerifyReplicateOrderMatches(const LogEntries& leader_entries,
+                                   const LogEntries& replica_entries) {
     vector<OpId> leader_ids, replica_ids;
     ExtractReplicateIds(leader_entries, &leader_ids);
     ExtractReplicateIds(replica_entries, &replica_ids);
@@ -502,10 +500,10 @@ class RaftConsensusQuorumTest : public KuduTest {
     }
   }
 
-  void VerifyNoCommitsBeforeReplicates(const vector<LogEntryPB*>& entries) {
+  void VerifyNoCommitsBeforeReplicates(const LogEntries& entries) {
     std::unordered_set<OpId, OpIdHashFunctor, OpIdEqualsFunctor> 
replication_ops;
 
-    for (const LogEntryPB* entry : entries) {
+    for (const auto& entry : entries) {
       if (entry->has_replicate()) {
         ASSERT_TRUE(InsertIfNotPresent(&replication_ops, 
entry->replicate().id()))
           << "REPLICATE op id showed up twice: " << 
SecureShortDebugString(*entry);
@@ -516,8 +514,8 @@ class RaftConsensusQuorumTest : public KuduTest {
     }
   }
 
-  void VerifyReplica(const vector<LogEntryPB*>& leader_entries,
-                     const vector<LogEntryPB*>& replica_entries,
+  void VerifyReplica(const LogEntries& leader_entries,
+                     const LogEntries& replica_entries,
                      const string& leader_name,
                      const string& replica_name) {
     SCOPED_TRACE(PrintOnError(leader_entries, Substitute("Leader: $0", 
leader_name)));
@@ -532,12 +530,12 @@ class RaftConsensusQuorumTest : public KuduTest {
     VerifyNoCommitsBeforeReplicates(leader_entries);
   }
 
-  string PrintOnError(const vector<LogEntryPB*>& replica_entries,
+  string PrintOnError(const LogEntries& replica_entries,
                       const string& replica_id) {
     string ret = "";
     SubstituteAndAppend(&ret, "$1 log entries for replica $0:\n",
                         replica_id, replica_entries.size());
-    for (LogEntryPB* replica_entry : replica_entries) {
+    for (const auto& replica_entry : replica_entries) {
       StrAppend(&ret, "Replica log entry: ", 
SecureShortDebugString(*replica_entry), "\n");
     }
     return ret;

http://git-wip-us.apache.org/repos/asf/kudu/blob/ccdb6b55/src/kudu/integration-tests/log_verifier.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/log_verifier.cc 
b/src/kudu/integration-tests/log_verifier.cc
index beeb546..e6bf62e 100644
--- a/src/kudu/integration-tests/log_verifier.cc
+++ b/src/kudu/integration-tests/log_verifier.cc
@@ -79,15 +79,15 @@ Status LogVerifier::ScanForCommittedOpIds(int ts_idx, const 
string& tablet_id,
                                 scoped_refptr<MetricEntity>(), &reader));
   log::SegmentSequence segs;
   RETURN_NOT_OK(reader->GetSegmentsSnapshot(&segs));
-  log::LogEntryPB entry;
+  unique_ptr<log::LogEntryPB> entry;
   for (const auto& seg : segs) {
     log::LogEntryReader reader(seg.get());
     while (true) {
       Status s = reader.ReadNextEntry(&entry);
       if (s.IsEndOfFile() || s.IsCorruption()) break;
       RETURN_NOT_OK(s);
-      if (entry.type() != log::COMMIT) continue;
-      const auto& op_id = entry.commit().commited_op_id();
+      if (entry->type() != log::COMMIT) continue;
+      const auto& op_id = entry->commit().commited_op_id();
 
       if (!InsertIfNotPresent(index_to_term, op_id.index(), op_id.term())) {
         return Status::Corruption(Substitute(

http://git-wip-us.apache.org/repos/asf/kudu/blob/ccdb6b55/src/kudu/tablet/tablet_bootstrap.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet_bootstrap.cc 
b/src/kudu/tablet/tablet_bootstrap.cc
index fcc472a..670d6ee 100644
--- a/src/kudu/tablet/tablet_bootstrap.cc
+++ b/src/kudu/tablet/tablet_bootstrap.cc
@@ -59,7 +59,6 @@
 #include "kudu/gutil/map-util.h"
 #include "kudu/gutil/port.h"
 #include "kudu/gutil/ref_counted.h"
-#include "kudu/gutil/stl_util.h"
 #include "kudu/gutil/strings/human_readable.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/rpc/result_tracker.h"
@@ -88,6 +87,7 @@
 #include "kudu/util/monotime.h"
 #include "kudu/util/path_util.h"
 #include "kudu/util/pb_util.h"
+#include "kudu/util/scoped_cleanup.h"
 #include "kudu/util/stopwatch.h"
 
 DECLARE_int32(group_commit_queue_size_bytes);
@@ -100,36 +100,34 @@ TAG_FLAG(fault_crash_during_log_replay, unsafe);
 
 DECLARE_int32(max_clock_sync_error_usec);
 
-namespace kudu {
-namespace tablet {
-
-using clock::Clock;
-using consensus::ALTER_SCHEMA_OP;
-using consensus::CHANGE_CONFIG_OP;
-using consensus::CommitMsg;
-using consensus::ConsensusBootstrapInfo;
-using consensus::MinimumOpId;
-using consensus::NO_OP;
-using consensus::OpId;
-using consensus::OpIdEquals;
-using consensus::OpIdEqualsFunctor;
-using consensus::OpIdHashFunctor;
-using consensus::OpIdToString;
-using consensus::OperationType;
-using consensus::OperationType_Name;
-using consensus::RaftConfigPB;
-using consensus::ReplicateMsg;
-using consensus::WRITE_OP;
-using log::Log;
-using log::LogAnchorRegistry;
-using log::LogEntryPB;
-using log::LogIndex;
-using log::LogOptions;
-using log::LogReader;
-using log::ReadableLogSegment;
-using rpc::ResultTracker;
-using pb_util::SecureDebugString;
-using pb_util::SecureShortDebugString;
+using kudu::clock::Clock;
+using kudu::consensus::ALTER_SCHEMA_OP;
+using kudu::consensus::CHANGE_CONFIG_OP;
+using kudu::consensus::CommitMsg;
+using kudu::consensus::ConsensusBootstrapInfo;
+using kudu::consensus::MinimumOpId;
+using kudu::consensus::NO_OP;
+using kudu::consensus::OpId;
+using kudu::consensus::OpIdEquals;
+using kudu::consensus::OpIdToString;
+using kudu::consensus::OperationType;
+using kudu::consensus::OperationType_Name;
+using kudu::consensus::RaftConfigPB;
+using kudu::consensus::ReplicateMsg;
+using kudu::consensus::WRITE_OP;
+using kudu::log::Log;
+using kudu::log::LogAnchorRegistry;
+using kudu::log::LogEntryPB;
+using kudu::log::LogIndex;
+using kudu::log::LogOptions;
+using kudu::log::LogReader;
+using kudu::log::ReadableLogSegment;
+using kudu::pb_util::SecureDebugString;
+using kudu::pb_util::SecureShortDebugString;
+using kudu::rpc::ResultTracker;
+using kudu::tserver::AlterSchemaRequestPB;
+using kudu::tserver::WriteRequestPB;
+using kudu::tserver::WriteResponsePB;
 using std::map;
 using std::shared_ptr;
 using std::string;
@@ -137,10 +135,9 @@ using std::unique_ptr;
 using std::unordered_map;
 using std::vector;
 using strings::Substitute;
-using tserver::AlterSchemaRequestPB;
-using tserver::WriteRequestPB;
-using tserver::WriteResponsePB;
 
+namespace kudu {
+namespace tablet {
 
 struct ReplayState;
 
@@ -339,11 +336,19 @@ class TabletBootstrap {
 
   void DumpReplayStateToLog(const ReplayState& state);
 
+  Status HandleEntry(ReplayState* state,
+                     unique_ptr<LogEntryPB> entry,
+                     string* entry_debug_info);
+
   // Handlers for each type of message seen in the log during replay.
-  Status HandleEntry(ReplayState* state, LogEntryPB* entry);
-  Status HandleReplicateMessage(ReplayState* state, LogEntryPB* 
replicate_entry);
-  Status HandleCommitMessage(ReplayState* state, LogEntryPB* commit_entry);
-  Status ApplyCommitMessage(ReplayState* state, LogEntryPB* commit_entry);
+  Status HandleReplicateMessage(ReplayState* state,
+                                unique_ptr<LogEntryPB> entry,
+                                string* entry_debug_info);
+  Status HandleCommitMessage(ReplayState* state,
+                             unique_ptr<LogEntryPB> entry,
+                             string* entry_debug_info);
+
+  Status ApplyCommitMessage(ReplayState* state, LogEntryPB* entry);
   Status HandleEntryPair(LogEntryPB* replicate_entry, LogEntryPB* 
commit_entry);
 
   // Checks that an orphaned commit message is actually irrelevant, i.e that 
none
@@ -367,7 +372,7 @@ class TabletBootstrap {
   scoped_refptr<rpc::ResultTracker> result_tracker_;
   MetricRegistry* metric_registry_;
   scoped_refptr<TabletReplica> tablet_replica_;
-  gscoped_ptr<tablet::Tablet> tablet_;
+  unique_ptr<tablet::Tablet> tablet_;
   const scoped_refptr<log::LogAnchorRegistry> log_anchor_registry_;
   scoped_refptr<log::Log> log_;
   std::shared_ptr<log::LogReader> log_reader_;
@@ -461,17 +466,20 @@ static string DebugInfo(const string& tablet_id,
                         int segment_seqno,
                         int entry_idx,
                         const string& segment_path,
-                        const LogEntryPB& entry) {
+                        const string& entry_debug_info) {
   // Truncate the debug string to a reasonable length for logging.
   // Otherwise, glog will truncate for us and we may miss important
   // information which came after this long string.
-  string debug_str = SecureShortDebugString(entry);
+  string debug_str = entry_debug_info;
   if (debug_str.size() > 500) {
     debug_str.resize(500);
     debug_str.append("...");
   }
+  if (!debug_str.empty()) {
+    debug_str = Substitute(" Entry: $0", debug_str);
+  }
   return Substitute("Debug Info: Error playing entry $0 of segment $1 of 
tablet $2. "
-                    "Segment path: $3. Entry: $4", entry_idx, segment_seqno, 
tablet_id,
+                    "Segment path: $3.$4", entry_idx, segment_seqno, tablet_id,
                     segment_path, debug_str);
 }
 
@@ -613,18 +621,18 @@ Status TabletBootstrap::FinishBootstrap(const string& 
message,
 }
 
 Status TabletBootstrap::OpenTablet(bool* has_blocks) {
-  gscoped_ptr<Tablet> tablet(new Tablet(tablet_meta_,
-                                        clock_,
-                                        mem_tracker_,
-                                        metric_registry_,
-                                        log_anchor_registry_));
+  unique_ptr<Tablet> tablet(new Tablet(tablet_meta_,
+                                       clock_,
+                                       mem_tracker_,
+                                       metric_registry_,
+                                       log_anchor_registry_));
   // doing nothing for now except opening a tablet locally.
   {
     SCOPED_LOG_SLOW_EXECUTION_PREFIX(INFO, 100, LogPrefix(), "opening tablet");
     RETURN_NOT_OK(tablet->Open());
   }
   *has_blocks = tablet->num_rowsets() != 0;
-  tablet_.reset(tablet.release());
+  tablet_ = std::move(tablet);
   return Status::OK();
 }
 
@@ -729,18 +737,13 @@ Status TabletBootstrap::OpenNewLog() {
   return Status::OK();
 }
 
-typedef map<int64_t, LogEntryPB*> OpIndexToEntryMap;
+typedef map<int64_t, unique_ptr<LogEntryPB>> OpIndexToEntryMap;
 
 // State kept during replay.
 struct ReplayState {
   ReplayState()
-    : prev_op_id(MinimumOpId()),
-      committed_op_id(MinimumOpId()) {
-  }
-
-  ~ReplayState() {
-    STLDeleteValues(&pending_replicates);
-    STLDeleteValues(&pending_commits);
+      : prev_op_id(MinimumOpId()),
+        committed_op_id(MinimumOpId()) {
   }
 
   // Return true if 'b' is allowed to immediately follow 'a' in the log.
@@ -784,8 +787,8 @@ struct ReplayState {
   }
 
   void AddEntriesToStrings(const OpIndexToEntryMap& entries, vector<string>* 
strings) const {
-    for (const OpIndexToEntryMap::value_type& map_entry : entries) {
-      LogEntryPB* entry = DCHECK_NOTNULL(map_entry.second);
+    for (const auto& map_entry : entries) {
+      const LogEntryPB* entry = DCHECK_NOTNULL(map_entry.second.get());
       strings->push_back(Substitute("   $0", SecureShortDebugString(*entry)));
     }
   }
@@ -819,23 +822,26 @@ struct ReplayState {
   OpIndexToEntryMap pending_commits;
 };
 
-// Handle the given log entry. If OK is returned, then takes ownership of 
'entry'.
-// Otherwise, caller frees.
-Status TabletBootstrap::HandleEntry(ReplayState* state, LogEntryPB* entry) {
+// Handle the given log entry.
+Status TabletBootstrap::HandleEntry(ReplayState* state,
+                                    unique_ptr<LogEntryPB> entry,
+                                    string* entry_debug_info) {
+  DCHECK(entry);
   if (VLOG_IS_ON(1)) {
     VLOG_WITH_PREFIX(1) << "Handling entry: " << 
SecureShortDebugString(*entry);
   }
 
-  switch (entry->type()) {
+  const auto entry_type = entry->type();
+  switch (entry_type) {
     case log::REPLICATE:
-      RETURN_NOT_OK(HandleReplicateMessage(state, entry));
+      RETURN_NOT_OK(HandleReplicateMessage(state, std::move(entry), 
entry_debug_info));
       break;
     case log::COMMIT:
       // check the unpaired ops for the matching replicate msg, abort if not 
found
-      RETURN_NOT_OK(HandleCommitMessage(state, entry));
+      RETURN_NOT_OK(HandleCommitMessage(state, std::move(entry), 
entry_debug_info));
       break;
     default:
-      return Status::Corruption(Substitute("Unexpected log entry type: $0", 
entry->type()));
+      return Status::Corruption(Substitute("unexpected log entry type: $0", 
entry_type));
   }
   MAYBE_FAULT(FLAGS_fault_crash_during_log_replay);
   return Status::OK();
@@ -860,64 +866,73 @@ void CheckAndRepairOpIdOverflow(OpId* opid) {
   }
 }
 
-// Takes ownership of 'replicate_entry' on OK status.
-Status TabletBootstrap::HandleReplicateMessage(ReplayState* state, LogEntryPB* 
replicate_entry) {
+Status TabletBootstrap::HandleReplicateMessage(ReplayState* state,
+                                               unique_ptr<LogEntryPB> entry,
+                                               string* entry_debug_info) {
+  auto info_collector = MakeScopedCleanup([&]() {
+    if (entry) {
+      *entry_debug_info = SecureShortDebugString(*entry);
+    }
+  });
+  DCHECK(entry->has_replicate())
+      << "not a replicate message: " << SecureDebugString(*entry);
   stats_.ops_read++;
 
-  DCHECK(replicate_entry->has_replicate());
-
   // Fix overflow if necessary (see KUDU-1933).
-  
CheckAndRepairOpIdOverflow(replicate_entry->mutable_replicate()->mutable_id());
+  CheckAndRepairOpIdOverflow(entry->mutable_replicate()->mutable_id());
 
-  const ReplicateMsg& replicate = replicate_entry->replicate();
+  const ReplicateMsg& replicate = entry->replicate();
   RETURN_NOT_OK(state->CheckSequentialReplicateId(replicate));
   DCHECK(replicate.has_timestamp());
   CHECK_OK(UpdateClock(replicate.timestamp()));
 
   // Append the replicate message to the log as is
-  RETURN_NOT_OK(log_->Append(replicate_entry));
-
-  int64_t index = replicate_entry->replicate().id().index();
-
-  LogEntryPB** existing_entry_ptr = InsertOrReturnExisting(
-      &state->pending_replicates, index, replicate_entry);
+  RETURN_NOT_OK(log_->Append(entry.get()));
 
-  // If there was a entry with the same index we're overwriting then we need 
to delete
-  // that entry and all entries with higher indexes.
-  if (existing_entry_ptr) {
-    LogEntryPB* existing_entry = *existing_entry_ptr;
+  const int64_t index = replicate.id().index();
+  const auto existing_entry_iter = state->pending_replicates.find(index);
+  if (existing_entry_iter != state->pending_replicates.end()) {
+    // If there was a entry with the same index we're overwriting then we need
+    // to delete that entry and all entries with higher indexes.
+    const auto& existing_entry = existing_entry_iter->second;
 
     auto iter = state->pending_replicates.lower_bound(index);
-    DCHECK(OpIdEquals((*iter).second->replicate().id(), 
existing_entry->replicate().id()));
-
-    LogEntryPB* last_entry = (*state->pending_replicates.rbegin()).second;
+    DCHECK(OpIdEquals(iter->second->replicate().id(), 
existing_entry->replicate().id()));
 
+    const auto& last_entry = state->pending_replicates.rbegin()->second;
     LOG_WITH_PREFIX(INFO) << "Overwriting operations starting at: "
                           << existing_entry->replicate().id()
                           << " up to: " << last_entry->replicate().id()
-                          << " with operation: " << 
replicate_entry->replicate().id();
+                          << " with operation: " << replicate.id();
 
     while (iter != state->pending_replicates.end()) {
-      delete (*iter).second;
-      state->pending_replicates.erase(iter++);
+      iter = state->pending_replicates.erase(iter);
       stats_.ops_overwritten++;
     }
-
-    InsertOrDie(&state->pending_replicates, index, replicate_entry);
   }
+  EmplaceOrDie(&state->pending_replicates, index, std::move(entry));
+  info_collector.cancel();
+
   return Status::OK();
 }
 
-// Takes ownership of 'commit_entry' on OK status.
-Status TabletBootstrap::HandleCommitMessage(ReplayState* state, LogEntryPB* 
commit_entry) {
-  DCHECK(commit_entry->has_commit()) << "Not a commit message: "
-                                     << SecureDebugString(*commit_entry);
+// On returning OK, takes ownership of the pointer from the 'entry_ptr' 
wrapper.
+Status TabletBootstrap::HandleCommitMessage(ReplayState* state,
+                                            unique_ptr<LogEntryPB> entry,
+                                            string* entry_debug_info) {
+  auto info_collector = MakeScopedCleanup([&]() {
+    if (entry) {
+      *entry_debug_info = SecureShortDebugString(*entry);
+    }
+  });
+  DCHECK(entry->has_commit())
+      << "not a commit message: " << SecureDebugString(*entry);
 
   // Fix overflow if necessary (see KUDU-1933).
-  
CheckAndRepairOpIdOverflow(commit_entry->mutable_commit()->mutable_commited_op_id());
+  
CheckAndRepairOpIdOverflow(entry->mutable_commit()->mutable_commited_op_id());
 
   // Match up the COMMIT record with the original entry that it's applied to.
-  const OpId& committed_op_id = commit_entry->commit().commited_op_id();
+  const OpId& committed_op_id = entry->commit().commited_op_id();
   state->UpdateCommittedOpId(committed_op_id);
 
   // If there are no pending replicates, or if this commit's index is lower 
than the
@@ -925,9 +940,9 @@ Status TabletBootstrap::HandleCommitMessage(ReplayState* 
state, LogEntryPB* comm
   if (state->pending_replicates.empty() ||
       (*state->pending_replicates.begin()).first > committed_op_id.index()) {
     VLOG_WITH_PREFIX(2) << "Found orphaned commit for " << committed_op_id;
-    RETURN_NOT_OK(CheckOrphanedCommitDoesntNeedReplay(commit_entry->commit()));
+    RETURN_NOT_OK(CheckOrphanedCommitDoesntNeedReplay(entry->commit()));
     stats_.orphaned_commits++;
-    delete commit_entry;
+    info_collector.cancel();
     return Status::OK();
   }
 
@@ -936,30 +951,31 @@ Status TabletBootstrap::HandleCommitMessage(ReplayState* 
state, LogEntryPB* comm
   if ((*state->pending_replicates.begin()).first != committed_op_id.index()) {
     if (!ContainsKey(state->pending_replicates, committed_op_id.index())) {
       return Status::Corruption(Substitute("Could not find replicate for 
commit: $0",
-                                           
SecureShortDebugString(*commit_entry)));
+                                           SecureShortDebugString(*entry)));
     }
     VLOG_WITH_PREFIX(2) << "Adding pending commit for " << committed_op_id;
-    InsertOrDie(&state->pending_commits, committed_op_id.index(), 
commit_entry);
+    EmplaceOrDie(&state->pending_commits, committed_op_id.index(), 
std::move(entry));
+    info_collector.cancel();
     return Status::OK();
   }
 
   // ... if it does, we apply it and all the commits that immediately follow 
in the sequence.
-  OpId last_applied = commit_entry->commit().commited_op_id();
-  RETURN_NOT_OK(ApplyCommitMessage(state, commit_entry));
+  OpId last_applied = committed_op_id;
+  RETURN_NOT_OK(ApplyCommitMessage(state, entry.get()));
 
   auto iter = state->pending_commits.begin();
   while (iter != state->pending_commits.end()) {
-    if ((*iter).first == last_applied.index() + 1) {
-      gscoped_ptr<LogEntryPB> buffered_commit_entry((*iter).second);
-      state->pending_commits.erase(iter++);
+    if (iter->first == last_applied.index() + 1) {
+      auto& buffered_commit_entry(iter->second);
       last_applied = buffered_commit_entry->commit().commited_op_id();
       RETURN_NOT_OK(ApplyCommitMessage(state, buffered_commit_entry.get()));
+      iter = state->pending_commits.erase(iter);
       continue;
     }
     break;
   }
 
-  delete commit_entry;
+  info_collector.cancel();
   return Status::OK();
 }
 
@@ -996,35 +1012,33 @@ Status 
TabletBootstrap::CheckOrphanedCommitDoesntNeedReplay(const CommitMsg& com
   return Status::OK();
 }
 
-Status TabletBootstrap::ApplyCommitMessage(ReplayState* state, LogEntryPB* 
commit_entry) {
-
-  const OpId& committed_op_id = commit_entry->commit().commited_op_id();
+Status TabletBootstrap::ApplyCommitMessage(ReplayState* state, LogEntryPB* 
entry) {
+  const OpId& committed_op_id = entry->commit().commited_op_id();
   VLOG_WITH_PREFIX(2) << "Applying commit for " << committed_op_id;
-  gscoped_ptr<LogEntryPB> pending_replicate_entry;
 
   // They should also have an associated replicate index (it may have been in a
   // deleted log segment though).
-  
pending_replicate_entry.reset(EraseKeyReturnValuePtr(&state->pending_replicates,
-                                                       
committed_op_id.index()));
-
-  if (pending_replicate_entry != nullptr) {
+  unique_ptr<LogEntryPB> pending_replicate_entry(EraseKeyReturnValuePtr(
+      &state->pending_replicates, committed_op_id.index()));
+  if (pending_replicate_entry) {
     // We found a replicate with the same index, make sure it also has the same
     // term.
-    if (!OpIdEquals(committed_op_id, 
pending_replicate_entry->replicate().id())) {
+    const auto& replicate = pending_replicate_entry->replicate();
+    if (!OpIdEquals(committed_op_id, replicate.id())) {
       string error_msg = Substitute("Committed operation's OpId: $0 didn't 
match the"
           "commit message's committed OpId: $1. Pending operation: $2, Commit 
message: $3",
-          SecureShortDebugString(pending_replicate_entry->replicate().id()),
+          SecureShortDebugString(replicate.id()),
           SecureShortDebugString(committed_op_id),
-          SecureShortDebugString(pending_replicate_entry->replicate()),
-          SecureShortDebugString(commit_entry->commit()));
+          SecureShortDebugString(replicate),
+          SecureShortDebugString(entry->commit()));
       LOG_WITH_PREFIX(DFATAL) << error_msg;
       return Status::Corruption(error_msg);
     }
-    RETURN_NOT_OK(HandleEntryPair(pending_replicate_entry.get(), 
commit_entry));
+    RETURN_NOT_OK(HandleEntryPair(pending_replicate_entry.get(), entry));
     stats_.ops_committed++;
   } else {
     stats_.orphaned_commits++;
-    RETURN_NOT_OK(CheckOrphanedCommitDoesntNeedReplay(commit_entry->commit()));
+    RETURN_NOT_OK(CheckOrphanedCommitDoesntNeedReplay(entry->commit()));
   }
 
   return Status::OK();
@@ -1143,36 +1157,36 @@ Status 
TabletBootstrap::PlaySegments(ConsensusBootstrapInfo* consensus_info) {
 
     int entry_count = 0;
     while (true) {
-      unique_ptr<LogEntryPB> entry(new LogEntryPB);
-
-      Status s = reader.ReadNextEntry(entry.get());
-      if (PREDICT_FALSE(!s.ok())) {
-        if (s.IsEndOfFile()) {
-          break;
+      {
+        unique_ptr<LogEntryPB> entry;
+        Status s = reader.ReadNextEntry(&entry);
+        if (PREDICT_FALSE(!s.ok())) {
+          if (s.IsEndOfFile()) {
+            break;
+          }
+          return Status::Corruption(
+              Substitute("Error reading Log Segment of tablet $0: $1 "
+                         "(Read up to entry $2 of segment $3, in path $4)",
+                         tablet_->tablet_id(),
+                         s.ToString(),
+                         entry_count,
+                         segment->header().sequence_number(),
+                         segment->path()));
         }
-        return Status::Corruption(Substitute("Error reading Log Segment of 
tablet $0: $1 "
-                                             "(Read up to entry $2 of segment 
$3, in path $4)",
-                                             tablet_->tablet_id(),
-                                             s.ToString(),
-                                             entry_count,
-                                             
segment->header().sequence_number(),
-                                             segment->path()));
-      }
-      entry_count++;
+        entry_count++;
 
-      s = HandleEntry(&state, entry.get());
-      if (!s.ok()) {
-        DumpReplayStateToLog(state);
-        RETURN_NOT_OK_PREPEND(s, DebugInfo(tablet_->tablet_id(),
-                                           segment->header().sequence_number(),
-                                           entry_count, segment->path(),
-                                           *entry));
+        string entry_debug_info;
+        s = HandleEntry(&state, std::move(entry), &entry_debug_info);
+        if (!s.ok()) {
+          DumpReplayStateToLog(state);
+          RETURN_NOT_OK_PREPEND(s, DebugInfo(tablet_->tablet_id(),
+                                             
segment->header().sequence_number(),
+                                             entry_count, segment->path(),
+                                             entry_debug_info));
+        }
       }
 
-      // If HandleEntry returns OK, then it has taken ownership of the entry.
-      entry.release();
-
-      auto now = MonoTime::Now();
+      const auto now = MonoTime::Now();
       if (now - last_status_update > kStatusUpdateInterval) {
         SetStatusMessage(Substitute("Bootstrap replaying log segment $0/$1 "
                                     "($2/$3 this segment, stats: $4)",

http://git-wip-us.apache.org/repos/asf/kudu/blob/ccdb6b55/src/kudu/tools/tool_action_common.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/tool_action_common.cc 
b/src/kudu/tools/tool_action_common.cc
index 09d0cf1..e3bcc8f 100644
--- a/src/kudu/tools/tool_action_common.cc
+++ b/src/kudu/tools/tool_action_common.cc
@@ -339,22 +339,24 @@ Status PrintSegment(const 
scoped_refptr<ReadableLogSegment>& segment) {
     RETURN_NOT_OK(SchemaFromPB(segment->header().schema(), &tablet_schema));
 
     LogEntryReader reader(segment.get());
-    LogEntryPB entry;
     while (true) {
+      unique_ptr<LogEntryPB> entry;
       Status s = reader.ReadNextEntry(&entry);
-      if (s.IsEndOfFile()) break;
+      if (s.IsEndOfFile()) {
+        break;
+      }
       RETURN_NOT_OK(s);
 
       if (print_type == PRINT_PB) {
         if (FLAGS_truncate_data > 0) {
-          pb_util::TruncateFields(&entry, FLAGS_truncate_data);
+          pb_util::TruncateFields(entry.get(), FLAGS_truncate_data);
         }
 
-        cout << "Entry:\n" << SecureDebugString(entry);
+        cout << "Entry:\n" << SecureDebugString(*entry);
       } else if (print_type == PRINT_DECODED) {
-        RETURN_NOT_OK(PrintDecoded(entry, tablet_schema));
+        RETURN_NOT_OK(PrintDecoded(*entry, tablet_schema));
       } else if (print_type == PRINT_ID) {
-        PrintIdOnly(entry);
+        PrintIdOnly(*entry);
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/kudu/blob/ccdb6b55/src/kudu/tools/tool_action_local_replica.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/tool_action_local_replica.cc 
b/src/kudu/tools/tool_action_local_replica.cc
index c65e51e..085c259 100644
--- a/src/kudu/tools/tool_action_local_replica.cc
+++ b/src/kudu/tools/tool_action_local_replica.cc
@@ -185,12 +185,12 @@ Status FindLastLoggedOpId(FsManager* fs, const string& 
tablet_id,
   for (seg = segs.rbegin(); seg != segs.rend(); ++seg) {
     LogEntryReader reader(seg->get());
     while (true) {
-      LogEntryPB entry;
+      unique_ptr<LogEntryPB> entry;
       Status s = reader.ReadNextEntry(&entry);
       if (s.IsEndOfFile()) break;
       RETURN_NOT_OK_PREPEND(s, "Error in log segment");
-      if (entry.type() != log::REPLICATE) continue;
-      *last_logged_opid = entry.replicate().id();
+      if (entry->type() != log::REPLICATE) continue;
+      *last_logged_opid = entry->replicate().id();
       found = true;
     }
     if (found) return Status::OK();

Reply via email to