This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new 8c633e2  [consensus] use move semantics for LogEntryBatchPB
8c633e2 is described below

commit 8c633e2eb20fca9f45882b291069000c6451c97a
Author: Alexey Serbin <[email protected]>
AuthorDate: Fri Jun 12 18:23:45 2020 -0700

    [consensus] use move semantics for LogEntryBatchPB
    
    As it turned out, in the context of the Log class and related utilities,
    it's possible to get rid of the heap allocation and unique_ptr-wrapping
    for LogEntryBatchPB.  With this patch, instances of LogEntryBatchPB are
    allocated on the stack and passed around using the move semantics.
    
    There aren't "low-level unit" tests for the affected code paths and I'm
    not sure it makes sense to introduce one.  However, there is scenario
    named 'MultiThreadedLogTest.TestAppends' in the "synthetic" mt-log-test.
    To assess performance impact of this patch, I ran the following for
    RELEASE builds with and without this patch:
    
      ./bin/mt-log-test --num_writer_threads=1 --num_batches_per_thread=500000 
--num_reader_threads=0 --num_ops_per_batch_avg=1 --verify_log=false 
--log_segment_size_mb=4096 --never_fsync --unlock_unsafe_flags 
--gtest_filter='MultiThreadedLogTest.TestAppends' --gtest_repeat=100 | \
      grep 'Time spent inserting 500000 batches' | awk '{print $14}' | \
      sed 's/s$//' | awk 'BEGIN { sum = 0 } { sum += $1 } END { print sum }'
    
    With each run producing summary like below:
      Time spent inserting 500000 batches(1 threads, 500000 per-thread): real 
7.506s      user 0.000s     sys 0.000s
    
    I got the following results:
      with the patch:
        711.679
    
      without the patch:
        736.141
    
    So, the performance at least didn't degrade.
    
    Change-Id: Ib5d3c384dd6f17e8c4c71eec074af9e98827262b
    Reviewed-on: http://gerrit.cloudera.org:8080/16075
    Tested-by: Kudu Jenkins
    Reviewed-by: Andrew Wong <[email protected]>
---
 src/kudu/consensus/consensus_queue.cc |  4 +--
 src/kudu/consensus/consensus_queue.h  |  2 +-
 src/kudu/consensus/log.cc             | 51 +++++++++++++++++------------------
 src/kudu/consensus/log.h              | 20 +++++++-------
 src/kudu/consensus/log_cache-test.cc  |  6 +++--
 src/kudu/consensus/log_cache.cc       |  5 ++--
 src/kudu/consensus/log_cache.h        |  2 +-
 src/kudu/consensus/log_reader.cc      | 16 +++++------
 src/kudu/consensus/log_reader.h       |  2 +-
 src/kudu/consensus/log_util.cc        | 29 ++++++++++----------
 src/kudu/consensus/log_util.h         |  4 +--
 11 files changed, 70 insertions(+), 71 deletions(-)

diff --git a/src/kudu/consensus/consensus_queue.cc 
b/src/kudu/consensus/consensus_queue.cc
index 676d99d..9d2b6f7 100644
--- a/src/kudu/consensus/consensus_queue.cc
+++ b/src/kudu/consensus/consensus_queue.cc
@@ -384,7 +384,7 @@ Status PeerMessageQueue::AppendOperation(const 
ReplicateRefPtr& msg) {
   });
 }
 
-Status PeerMessageQueue::AppendOperations(const vector<ReplicateRefPtr>& msgs,
+Status PeerMessageQueue::AppendOperations(vector<ReplicateRefPtr> msgs,
                                           const StatusCallback& 
log_append_callback) {
 
   DFAKE_SCOPED_LOCK(append_fake_lock_);
@@ -424,7 +424,7 @@ Status PeerMessageQueue::AppendOperations(const 
vector<ReplicateRefPtr>& msgs,
   // which also needs queue_lock_.
   lock.unlock();
   RETURN_NOT_OK(log_cache_.AppendOperations(
-      msgs, [this, last_id, log_append_callback](const Status& s) {
+      std::move(msgs), [this, last_id, log_append_callback](const Status& s) {
         this->LocalPeerAppendFinished(last_id, log_append_callback, s);
       }));
   lock.lock();
diff --git a/src/kudu/consensus/consensus_queue.h 
b/src/kudu/consensus/consensus_queue.h
index 9414ae0..af8dfdb 100644
--- a/src/kudu/consensus/consensus_queue.h
+++ b/src/kudu/consensus/consensus_queue.h
@@ -241,7 +241,7 @@ class PeerMessageQueue {
   //
   // This is thread-safe against all of the read methods, but not thread-safe
   // with concurrent Append calls.
-  Status AppendOperations(const std::vector<ReplicateRefPtr>& msgs,
+  Status AppendOperations(std::vector<ReplicateRefPtr> msgs,
                           const StatusCallback& log_append_callback);
 
   // Truncate all operations coming after 'index'. Following this, the 
'last_appended'
diff --git a/src/kudu/consensus/log.cc b/src/kudu/consensus/log.cc
index 94166d0..d4f5fb2 100644
--- a/src/kudu/consensus/log.cc
+++ b/src/kudu/consensus/log.cc
@@ -577,7 +577,7 @@ void SegmentAllocator::UpdateFooterForBatch(const 
LogEntryBatch& batch) {
   // immediately.
   if (batch.type_ == REPLICATE) {
     // Update the index bounds for the current segment.
-    for (const LogEntryPB& entry_pb : batch.entry_batch_pb_->entry()) {
+    for (const LogEntryPB& entry_pb : batch.entry_batch_pb_.entry()) {
       UpdateFooterForReplicateEntry(entry_pb, &footer_);
     }
   }
@@ -827,9 +827,9 @@ Status Log::Init() {
 
 unique_ptr<LogEntryBatch> Log::CreateBatchFromPB(
     LogEntryTypePB type,
-    unique_ptr<LogEntryBatchPB> entry_batch_pb,
+    LogEntryBatchPB entry_batch_pb,
     StatusCallback cb) {
-  int num_ops = entry_batch_pb->entry_size();
+  size_t num_ops = entry_batch_pb.entry_size();
   unique_ptr<LogEntryBatch> new_entry_batch(new LogEntryBatch(
       type, std::move(entry_batch_pb), num_ops, std::move(cb)));
   new_entry_batch->Serialize();
@@ -851,18 +851,18 @@ Status Log::AsyncAppend(unique_ptr<LogEntryBatch> 
entry_batch) {
   return Status::OK();
 }
 
-Status Log::AsyncAppendReplicates(const vector<ReplicateRefPtr>& replicates,
+Status Log::AsyncAppendReplicates(vector<ReplicateRefPtr> replicates,
                                   StatusCallback callback) {
-  unique_ptr<LogEntryBatchPB> batch_pb(new LogEntryBatchPB);
-  batch_pb->mutable_entry()->Reserve(replicates.size());
+  LogEntryBatchPB batch_pb;
+  batch_pb.mutable_entry()->Reserve(replicates.size());
   for (const auto& r : replicates) {
-    LogEntryPB* entry_pb = batch_pb->add_entry();
+    LogEntryPB* entry_pb = batch_pb.add_entry();
     entry_pb->set_type(REPLICATE);
     entry_pb->set_allocated_replicate(r->get());
   }
   unique_ptr<LogEntryBatch> batch = CreateBatchFromPB(
       REPLICATE, std::move(batch_pb), std::move(callback));
-  batch->SetReplicates(replicates);
+  batch->SetReplicates(std::move(replicates));
   return AsyncAppend(std::move(batch));
 }
 
@@ -870,8 +870,8 @@ Status 
Log::AsyncAppendCommit(unique_ptr<consensus::CommitMsg> commit_msg,
                               StatusCallback callback) {
   MAYBE_FAULT(FLAGS_fault_crash_before_append_commit);
 
-  unique_ptr<LogEntryBatchPB> batch_pb(new LogEntryBatchPB);
-  LogEntryPB* entry = batch_pb->add_entry();
+  LogEntryBatchPB batch_pb;
+  LogEntryPB* entry = batch_pb.add_entry();
   entry->set_type(COMMIT);
   entry->set_allocated_commit(commit_msg.release());
 
@@ -939,9 +939,8 @@ Status Log::UpdateIndexForBatch(const LogEntryBatch& batch,
     return Status::OK();
   }
 
-  for (const LogEntryPB& entry_pb : batch.entry_batch_pb_->entry()) {
+  for (const LogEntryPB& entry_pb : batch.entry_batch_pb_.entry()) {
     LogIndexEntry index_entry;
-
     index_entry.op_id = entry_pb.replicate().id();
     index_entry.segment_sequence_number = 
segment_allocator_.active_segment_sequence_number();
     index_entry.offset_in_segment = start_offset;
@@ -1003,24 +1002,24 @@ void Log::GetSegmentsToGCUnlocked(RetentionIndexes 
retention_indexes,
 }
 
 Status Log::Append(LogEntryPB* entry) {
-  unique_ptr<LogEntryBatchPB> entry_batch_pb(new LogEntryBatchPB);
-  entry_batch_pb->mutable_entry()->AddAllocated(entry);
-  LogEntryBatch entry_batch(entry->type(), std::move(entry_batch_pb), 1,
-                            &DoNothingStatusCB);
+  LogEntryBatchPB entry_batch_pb;
+  entry_batch_pb.mutable_entry()->AddAllocated(entry);
+  LogEntryBatch entry_batch(
+      entry->type(), std::move(entry_batch_pb), 1, &DoNothingStatusCB);
   entry_batch.Serialize();
   Status s = WriteBatch(&entry_batch);
   if (s.ok()) {
     s = Sync();
   }
-  entry_batch.entry_batch_pb_->mutable_entry()->ExtractSubrange(0, 1, nullptr);
+  entry_batch.entry_batch_pb_.mutable_entry()->ExtractSubrange(0, 1, nullptr);
   return s;
 }
 
 Status Log::WaitUntilAllFlushed() {
   // In order to make sure we empty the queue we need to use
   // the async api.
-  unique_ptr<LogEntryBatchPB> entry_batch(new LogEntryBatchPB);
-  entry_batch->add_entry()->set_type(log::FLUSH_MARKER);
+  LogEntryBatchPB entry_batch;
+  entry_batch.add_entry()->set_type(log::FLUSH_MARKER);
   Synchronizer s;
   unique_ptr<LogEntryBatch> reserved_entry_batch = CreateBatchFromPB(
       FLUSH_MARKER, std::move(entry_batch), s.AsStatusCallback());
@@ -1234,21 +1233,21 @@ Log::~Log() {
 }
 
 LogEntryBatch::LogEntryBatch(LogEntryTypePB type,
-                             unique_ptr<LogEntryBatchPB> entry_batch_pb,
+                             LogEntryBatchPB entry_batch_pb,
                              size_t count,
                              StatusCallback cb)
     : type_(type),
       entry_batch_pb_(std::move(entry_batch_pb)),
       total_size_bytes_(
-          PREDICT_FALSE(count == 1 && entry_batch_pb_->entry(0).type() == 
FLUSH_MARKER) ?
-          0 : entry_batch_pb_->ByteSize()),
+          PREDICT_FALSE(count == 1 && entry_batch_pb_.entry(0).type() == 
FLUSH_MARKER)
+              ? 0 : entry_batch_pb_.ByteSize()),
       count_(count),
       callback_(std::move(cb)) {
 }
 
 LogEntryBatch::~LogEntryBatch() {
-  if (type_ == REPLICATE && entry_batch_pb_) {
-    for (LogEntryPB& entry : *entry_batch_pb_->mutable_entry()) {
+  if (type_ == REPLICATE) {
+    for (LogEntryPB& entry : *entry_batch_pb_.mutable_entry()) {
       // ReplicateMsg elements are owned by and must be freed by the caller
       // (e.g. the LogCache).
       entry.release_replicate();
@@ -1259,11 +1258,11 @@ LogEntryBatch::~LogEntryBatch() {
 void LogEntryBatch::Serialize() {
   DCHECK_EQ(buffer_.size(), 0);
   // FLUSH_MARKER LogEntries are markers and are not serialized.
-  if (PREDICT_FALSE(count() == 1 && entry_batch_pb_->entry(0).type() == 
FLUSH_MARKER)) {
+  if (PREDICT_FALSE(count() == 1 && entry_batch_pb_.entry(0).type() == 
FLUSH_MARKER)) {
     return;
   }
   buffer_.reserve(total_size_bytes_);
-  pb_util::AppendToString(*entry_batch_pb_, &buffer_);
+  pb_util::AppendToString(entry_batch_pb_, &buffer_);
 }
 
 
diff --git a/src/kudu/consensus/log.h b/src/kudu/consensus/log.h
index cf8f6a2..556a99c 100644
--- a/src/kudu/consensus/log.h
+++ b/src/kudu/consensus/log.h
@@ -25,6 +25,7 @@
 #include <map>
 #include <memory>
 #include <string>
+#include <utility>
 #include <vector>
 
 #include <glog/logging.h>
@@ -296,7 +297,7 @@ class Log : public RefCountedThreadSafe<Log> {
 
   // Append the given set of replicate messages, asynchronously.
   // This requires that the replicates have already been assigned OpIds.
-  Status AsyncAppendReplicates(const std::vector<consensus::ReplicateRefPtr>& 
replicates,
+  Status AsyncAppendReplicates(std::vector<consensus::ReplicateRefPtr> 
replicates,
                                StatusCallback callback);
 
   // Append the given commit message, asynchronously.
@@ -448,8 +449,7 @@ class Log : public RefCountedThreadSafe<Log> {
   // After the batch is appended to the log, 'cb' will be invoked with the
   // result status of the append.
   static std::unique_ptr<LogEntryBatch> CreateBatchFromPB(
-      LogEntryTypePB type, std::unique_ptr<LogEntryBatchPB> entry_batch_pb,
-      StatusCallback cb);
+      LogEntryTypePB type, LogEntryBatchPB entry_batch_pb, StatusCallback cb);
 
   // Asynchronously appends 'entry_batch' to the log.
   Status AsyncAppend(std::unique_ptr<LogEntryBatch> entry_batch);
@@ -564,7 +564,7 @@ class LogEntryBatch {
   friend class SegmentAllocator;
 
   LogEntryBatch(LogEntryTypePB type,
-                std::unique_ptr<LogEntryBatchPB> entry_batch_pb,
+                LogEntryBatchPB entry_batch_pb,
                 size_t count,
                 StatusCallback cb);
 
@@ -588,13 +588,13 @@ class LogEntryBatch {
   // Requires that this be a REPLICATE batch.
   consensus::OpId MaxReplicateOpId() const {
     DCHECK_EQ(REPLICATE, type_);
-    int idx = entry_batch_pb_->entry_size() - 1;
-    DCHECK(entry_batch_pb_->entry(idx).replicate().IsInitialized());
-    return entry_batch_pb_->entry(idx).replicate().id();
+    int idx = entry_batch_pb_.entry_size() - 1;
+    DCHECK(entry_batch_pb_.entry(idx).replicate().IsInitialized());
+    return entry_batch_pb_.entry(idx).replicate().id();
   }
 
-  void SetReplicates(const std::vector<consensus::ReplicateRefPtr>& 
replicates) {
-    replicates_ = replicates;
+  void SetReplicates(std::vector<consensus::ReplicateRefPtr> replicates) {
+    replicates_ = std::move(replicates);
   }
 
   void SetAppendError(const Status& s) {
@@ -612,7 +612,7 @@ class LogEntryBatch {
   const LogEntryTypePB type_;
 
   // Contents of the log entries that will be written to disk.
-  std::unique_ptr<LogEntryBatchPB> entry_batch_pb_;
+  LogEntryBatchPB entry_batch_pb_;
 
    // Total size in bytes of all entries
   const uint32_t total_size_bytes_;
diff --git a/src/kudu/consensus/log_cache-test.cc 
b/src/kudu/consensus/log_cache-test.cc
index 423f02a..e19183d 100644
--- a/src/kudu/consensus/log_cache-test.cc
+++ b/src/kudu/consensus/log_cache-test.cc
@@ -26,6 +26,7 @@
 #include <ostream>
 #include <string>
 #include <thread>
+#include <utility>
 #include <vector>
 
 #include <gflags/gflags_declare.h>
@@ -128,8 +129,9 @@ class LogCacheTest : public KuduTest {
       int64_t index = cur_index;
       vector<ReplicateRefPtr> msgs;
       msgs.push_back(make_scoped_refptr_replicate(
-                       CreateDummyReplicate(term, index, clock_->Now(), 
payload_size).release()));
-      RETURN_NOT_OK(cache_->AppendOperations(msgs, [](const Status& s) { 
FatalOnError(s); }));
+          CreateDummyReplicate(term, index, clock_->Now(), 
payload_size).release()));
+      RETURN_NOT_OK(cache_->AppendOperations(std::move(msgs),
+                                             [](const Status& s) { 
FatalOnError(s); }));
     }
     return Status::OK();
   }
diff --git a/src/kudu/consensus/log_cache.cc b/src/kudu/consensus/log_cache.cc
index 26e8a1c..78220d1 100644
--- a/src/kudu/consensus/log_cache.cc
+++ b/src/kudu/consensus/log_cache.cc
@@ -148,7 +148,7 @@ void LogCache::TruncateOpsAfterUnlocked(int64_t index) {
   next_sequential_op_index_ = index + 1;
 }
 
-Status LogCache::AppendOperations(const vector<ReplicateRefPtr>& msgs,
+Status LogCache::AppendOperations(vector<ReplicateRefPtr> msgs,
                                   const StatusCallback& callback) {
   CHECK_GT(msgs.size(), 0);
 
@@ -212,7 +212,7 @@ Status LogCache::AppendOperations(const 
vector<ReplicateRefPtr>& msgs,
   metrics_.log_cache_num_ops->IncrementBy(msgs.size());
 
   Status log_status = log_->AsyncAppendReplicates(
-      msgs, [this, last_idx_in_batch, borrowed_memory, callback](const Status& 
s) {
+      std::move(msgs), [this, last_idx_in_batch, borrowed_memory, 
callback](const Status& s) {
         this->LogCallback(last_idx_in_batch, borrowed_memory, callback, s);
       });
 
@@ -222,7 +222,6 @@ Status LogCache::AppendOperations(const 
vector<ReplicateRefPtr>& msgs,
     return log_status;
   }
 
-
   return Status::OK();
 }
 
diff --git a/src/kudu/consensus/log_cache.h b/src/kudu/consensus/log_cache.h
index 33eb37c..93b35d1 100644
--- a/src/kudu/consensus/log_cache.h
+++ b/src/kudu/consensus/log_cache.h
@@ -94,7 +94,7 @@ class LogCache {
   // when the callback fires.
   //
   // Returns non-OK if the Log append itself fails.
-  Status AppendOperations(const std::vector<ReplicateRefPtr>& msgs,
+  Status AppendOperations(std::vector<ReplicateRefPtr> msgs,
                           const StatusCallback& callback);
 
   // Truncate any operations with index > 'index'.
diff --git a/src/kudu/consensus/log_reader.cc b/src/kudu/consensus/log_reader.cc
index 0721c70..10ee0d0 100644
--- a/src/kudu/consensus/log_reader.cc
+++ b/src/kudu/consensus/log_reader.cc
@@ -245,7 +245,7 @@ scoped_refptr<ReadableLogSegment> 
LogReader::GetSegmentBySequenceNumber(int64_t
 
 Status LogReader::ReadBatchUsingIndexEntry(const LogIndexEntry& index_entry,
                                            faststring* tmp_buf,
-                                           unique_ptr<LogEntryBatchPB>* batch) 
const {
+                                           LogEntryBatchPB* batch) const {
   const int64_t index = index_entry.op_id.index();
 
   scoped_refptr<ReadableLogSegment> segment = GetSegmentBySequenceNumber(
@@ -270,7 +270,7 @@ Status LogReader::ReadBatchUsingIndexEntry(const 
LogIndexEntry& index_entry,
 
   if (bytes_read_) {
     bytes_read_->IncrementBy(segment->entry_header_size() + tmp_buf->length());
-    entries_read_->IncrementBy((**batch).entry_size());
+    entries_read_->IncrementBy(batch->entry_size());
   }
 
   return Status::OK();
@@ -291,7 +291,7 @@ Status LogReader::ReadReplicatesInRange(int64_t starting_at,
   int64_t total_size = 0;
   bool limit_exceeded = false;
   faststring tmp_buf;
-  unique_ptr<LogEntryBatchPB> batch;
+  LogEntryBatchPB batch;
   for (int64_t index = starting_at; index <= up_to && !limit_exceeded; 
index++) {
     LogIndexEntry index_entry;
     RETURN_NOT_OK_PREPEND(log_index_->GetEntry(index, &index_entry),
@@ -308,21 +308,21 @@ Status LogReader::ReadReplicatesInRange(int64_t 
starting_at,
 
       // Sanity-check the property that a batch should only have increasing 
indexes.
       int64_t prev_index = 0;
-      for (int i = 0; i < batch->entry_size(); ++i) {
-        LogEntryPB* entry = batch->mutable_entry(i);
+      for (int i = 0; i < batch.entry_size(); ++i) {
+        LogEntryPB* entry = batch.mutable_entry(i);
         if (!entry->has_replicate()) continue;
         int64_t this_index = entry->replicate().id().index();
         CHECK_GT(this_index, prev_index)
           << "Expected that an entry batch should only include increasing log 
indexes: "
           << index_entry.ToString()
-          << "\nBatch: " << SecureDebugString(*batch);
+          << "\nBatch: " << SecureDebugString(batch);
         prev_index = this_index;
       }
     }
 
     bool found = false;
-    for (int i = 0; i < batch->entry_size(); ++i) {
-      LogEntryPB* entry = batch->mutable_entry(i);
+    for (int i = 0; i < batch.entry_size(); ++i) {
+      LogEntryPB* entry = batch.mutable_entry(i);
       if (!entry->has_replicate()) {
         continue;
       }
diff --git a/src/kudu/consensus/log_reader.h b/src/kudu/consensus/log_reader.h
index 2809c1a..620e90d 100644
--- a/src/kudu/consensus/log_reader.h
+++ b/src/kudu/consensus/log_reader.h
@@ -181,7 +181,7 @@ class LogReader : public enable_make_shared<LogReader> {
   // 'tmp_buf' is used as scratch space to avoid extra allocation.
   Status ReadBatchUsingIndexEntry(const LogIndexEntry& index_entry,
                                   faststring* tmp_buf,
-                                  std::unique_ptr<LogEntryBatchPB>* batch) 
const;
+                                  LogEntryBatchPB* batch) const;
 
   // Reads the headers of all segments in 'tablet_wal_path'.
   Status Init(const std::string& tablet_wal_path);
diff --git a/src/kudu/consensus/log_util.cc b/src/kudu/consensus/log_util.cc
index 865fb07..810f369 100644
--- a/src/kudu/consensus/log_util.cc
+++ b/src/kudu/consensus/log_util.cc
@@ -147,7 +147,7 @@ Status 
LogEntryReader::ReadNextEntry(unique_ptr<LogEntryPB>* entry) {
     }
 
     // We still expect to have more entries in the log.
-    unique_ptr<LogEntryBatchPB> current_batch;
+    LogEntryBatchPB current_batch;
 
     // Read and validate the entry header first.
     Status s;
@@ -163,8 +163,8 @@ Status 
LogEntryReader::ReadNextEntry(unique_ptr<LogEntryPB>* entry) {
     }
 
     // Add the entries from this batch to our pending queue.
-    for (int i = 0; i < current_batch->entry_size(); i++) {
-      auto entry = current_batch->mutable_entry(i);
+    for (int i = 0; i < current_batch.entry_size(); i++) {
+      auto entry = current_batch.mutable_entry(i);
       pending_entries_.emplace_back(entry);
       num_entries_read_++;
 
@@ -180,8 +180,8 @@ Status 
LogEntryReader::ReadNextEntry(unique_ptr<LogEntryPB>* entry) {
       }
       recent_entries_.push_back({ offset_, entry->type(), op_id });
     }
-    current_batch->mutable_entry()->ExtractSubrange(
-        0, current_batch->entry_size(), nullptr);
+    current_batch.mutable_entry()->ExtractSubrange(
+        0, current_batch.entry_size(), nullptr);
   }
 
   *entry = std::move(pending_entries_.front());
@@ -628,7 +628,7 @@ Status ReadableLogSegment::ScanForValidEntryHeaders(
 }
 
 Status ReadableLogSegment::ReadEntryHeaderAndBatch(int64_t* offset, 
faststring* tmp_buf,
-                                                   
unique_ptr<LogEntryBatchPB>* batch,
+                                                   LogEntryBatchPB* batch,
                                                    EntryHeaderStatus* 
status_detail) const {
   int64_t cur_offset = *offset;
   EntryHeader header;
@@ -701,12 +701,11 @@ EntryHeaderStatus ReadableLogSegment::DecodeEntryHeader(
 Status ReadableLogSegment::ReadEntryBatch(int64_t* offset,
                                           const EntryHeader& header,
                                           faststring* tmp_buf,
-                                          unique_ptr<LogEntryBatchPB>* 
entry_batch) const {
+                                          LogEntryBatchPB* entry_batch) const {
   TRACE_EVENT2("log", "ReadableLogSegment::ReadEntryBatch",
                "path", path_,
                "range", Substitute("offset=$0 entry_len=$1",
                                    *offset, header.msg_length));
-
   if (header.msg_length == 0) {
     return Status::Corruption("Invalid 0 entry length");
   }
@@ -728,9 +727,10 @@ Status ReadableLogSegment::ReadEntryBatch(int64_t* offset,
   tmp_buf->resize(buf_len);
   Slice entry_batch_slice(tmp_buf->data(), header.msg_length_compressed);
   Status s =  file_->Read(*offset, entry_batch_slice);
-
-  if (!s.ok()) return Status::IOError(Substitute("Could not read entry. Cause: 
$0",
-                                                 s.ToString()));
+  if (PREDICT_FALSE(!s.ok())) {
+    return Status::IOError(Substitute("Could not read entry. Cause: $0",
+                                      s.ToString()));
+  }
 
   // Verify the CRC.
   uint32_t read_crc = crc::Crc32c(entry_batch_slice.data(), 
entry_batch_slice.size());
@@ -750,17 +750,16 @@ Status ReadableLogSegment::ReadEntryBatch(int64_t* offset,
     entry_batch_slice = Slice(uncompress_buf, header.msg_length);
   }
 
-  unique_ptr<LogEntryBatchPB> read_entry_batch(new LogEntryBatchPB);
-  s = pb_util::ParseFromArray(read_entry_batch.get(),
+  LogEntryBatchPB read_entry_batch;
+  s = pb_util::ParseFromArray(&read_entry_batch,
                               entry_batch_slice.data(),
                               header.msg_length);
-
   if (!s.ok()) {
     return Status::Corruption(Substitute("Could not parse PB. Cause: $0", 
s.ToString()));
   }
 
   *offset += header.msg_length_compressed;
-  entry_batch->reset(read_entry_batch.release());
+  *entry_batch = std::move(read_entry_batch);
   return Status::OK();
 }
 
diff --git a/src/kudu/consensus/log_util.h b/src/kudu/consensus/log_util.h
index 630cb24..6a2655b 100644
--- a/src/kudu/consensus/log_util.h
+++ b/src/kudu/consensus/log_util.h
@@ -343,7 +343,7 @@ class ReadableLogSegment : public 
RefCountedThreadSafe<ReadableLogSegment> {
   // If unsuccessful, '*offset' is not updated, and *status_detail will be 
updated
   // to indicate the cause of the error.
   Status ReadEntryHeaderAndBatch(int64_t* offset, faststring* tmp_buf,
-                                 std::unique_ptr<LogEntryBatchPB>* batch,
+                                 LogEntryBatchPB* batch,
                                  EntryHeaderStatus* status_detail) const;
 
   // Reads a log entry header from the segment.
@@ -366,7 +366,7 @@ class ReadableLogSegment : public 
RefCountedThreadSafe<ReadableLogSegment> {
   Status ReadEntryBatch(int64_t* offset,
                         const EntryHeader& header,
                         faststring* tmp_buf,
-                        std::unique_ptr<LogEntryBatchPB>* entry_batch) const;
+                        LogEntryBatchPB* entry_batch) const;
 
   void UpdateReadableToOffset(int64_t readable_to_offset);
 

Reply via email to