This is an automated email from the ASF dual-hosted git repository.
alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new 8c633e2 [consensus] use move semantics for LogEntryBatchPB
8c633e2 is described below
commit 8c633e2eb20fca9f45882b291069000c6451c97a
Author: Alexey Serbin <[email protected]>
AuthorDate: Fri Jun 12 18:23:45 2020 -0700
[consensus] use move semantics for LogEntryBatchPB
As it turned out, in the context of the Log class and related utilities,
it's possible to get rid of the heap allocation and unique_ptr-wrapping
for LogEntryBatchPB. With this patch, instances of LogEntryBatchPB are
allocated on the stack and passed around using the move semantics.
There aren't "low-level unit" tests for the affected code paths and I'm
not sure it makes sense to introduce one. However, there is scenario
named 'MultiThreadedLogTest.TestAppends' in the "synthetic" mt-log-test.
To assess performance impact of this patch, I ran the following for
RELEASE builds with and without this patch:
./bin/mt-log-test --num_writer_threads=1 --num_batches_per_thread=500000
--num_reader_threads=0 --num_ops_per_batch_avg=1 --verify_log=false
--log_segment_size_mb=4096 --never_fsync --unlock_unsafe_flags
--gtest_filter='MultiThreadedLogTest.TestAppends' --gtest_repeat=100 | \
grep 'Time spent inserting 500000 batches' | awk '{print $14}' | \
sed 's/s$//' | awk 'BEGIN { sum = 0 } { sum += $1 } END { print sum }'
With each run producing summary like below:
Time spent inserting 500000 batches(1 threads, 500000 per-thread): real
7.506s user 0.000s sys 0.000s
I got the following results:
with the patch:
711.679
without the patch:
736.141
So, the performance at least didn't degrade.
Change-Id: Ib5d3c384dd6f17e8c4c71eec074af9e98827262b
Reviewed-on: http://gerrit.cloudera.org:8080/16075
Tested-by: Kudu Jenkins
Reviewed-by: Andrew Wong <[email protected]>
---
src/kudu/consensus/consensus_queue.cc | 4 +--
src/kudu/consensus/consensus_queue.h | 2 +-
src/kudu/consensus/log.cc | 51 +++++++++++++++++------------------
src/kudu/consensus/log.h | 20 +++++++-------
src/kudu/consensus/log_cache-test.cc | 6 +++--
src/kudu/consensus/log_cache.cc | 5 ++--
src/kudu/consensus/log_cache.h | 2 +-
src/kudu/consensus/log_reader.cc | 16 +++++------
src/kudu/consensus/log_reader.h | 2 +-
src/kudu/consensus/log_util.cc | 29 ++++++++++----------
src/kudu/consensus/log_util.h | 4 +--
11 files changed, 70 insertions(+), 71 deletions(-)
diff --git a/src/kudu/consensus/consensus_queue.cc
b/src/kudu/consensus/consensus_queue.cc
index 676d99d..9d2b6f7 100644
--- a/src/kudu/consensus/consensus_queue.cc
+++ b/src/kudu/consensus/consensus_queue.cc
@@ -384,7 +384,7 @@ Status PeerMessageQueue::AppendOperation(const
ReplicateRefPtr& msg) {
});
}
-Status PeerMessageQueue::AppendOperations(const vector<ReplicateRefPtr>& msgs,
+Status PeerMessageQueue::AppendOperations(vector<ReplicateRefPtr> msgs,
const StatusCallback&
log_append_callback) {
DFAKE_SCOPED_LOCK(append_fake_lock_);
@@ -424,7 +424,7 @@ Status PeerMessageQueue::AppendOperations(const
vector<ReplicateRefPtr>& msgs,
// which also needs queue_lock_.
lock.unlock();
RETURN_NOT_OK(log_cache_.AppendOperations(
- msgs, [this, last_id, log_append_callback](const Status& s) {
+ std::move(msgs), [this, last_id, log_append_callback](const Status& s) {
this->LocalPeerAppendFinished(last_id, log_append_callback, s);
}));
lock.lock();
diff --git a/src/kudu/consensus/consensus_queue.h
b/src/kudu/consensus/consensus_queue.h
index 9414ae0..af8dfdb 100644
--- a/src/kudu/consensus/consensus_queue.h
+++ b/src/kudu/consensus/consensus_queue.h
@@ -241,7 +241,7 @@ class PeerMessageQueue {
//
// This is thread-safe against all of the read methods, but not thread-safe
// with concurrent Append calls.
- Status AppendOperations(const std::vector<ReplicateRefPtr>& msgs,
+ Status AppendOperations(std::vector<ReplicateRefPtr> msgs,
const StatusCallback& log_append_callback);
// Truncate all operations coming after 'index'. Following this, the
'last_appended'
diff --git a/src/kudu/consensus/log.cc b/src/kudu/consensus/log.cc
index 94166d0..d4f5fb2 100644
--- a/src/kudu/consensus/log.cc
+++ b/src/kudu/consensus/log.cc
@@ -577,7 +577,7 @@ void SegmentAllocator::UpdateFooterForBatch(const
LogEntryBatch& batch) {
// immediately.
if (batch.type_ == REPLICATE) {
// Update the index bounds for the current segment.
- for (const LogEntryPB& entry_pb : batch.entry_batch_pb_->entry()) {
+ for (const LogEntryPB& entry_pb : batch.entry_batch_pb_.entry()) {
UpdateFooterForReplicateEntry(entry_pb, &footer_);
}
}
@@ -827,9 +827,9 @@ Status Log::Init() {
unique_ptr<LogEntryBatch> Log::CreateBatchFromPB(
LogEntryTypePB type,
- unique_ptr<LogEntryBatchPB> entry_batch_pb,
+ LogEntryBatchPB entry_batch_pb,
StatusCallback cb) {
- int num_ops = entry_batch_pb->entry_size();
+ size_t num_ops = entry_batch_pb.entry_size();
unique_ptr<LogEntryBatch> new_entry_batch(new LogEntryBatch(
type, std::move(entry_batch_pb), num_ops, std::move(cb)));
new_entry_batch->Serialize();
@@ -851,18 +851,18 @@ Status Log::AsyncAppend(unique_ptr<LogEntryBatch>
entry_batch) {
return Status::OK();
}
-Status Log::AsyncAppendReplicates(const vector<ReplicateRefPtr>& replicates,
+Status Log::AsyncAppendReplicates(vector<ReplicateRefPtr> replicates,
StatusCallback callback) {
- unique_ptr<LogEntryBatchPB> batch_pb(new LogEntryBatchPB);
- batch_pb->mutable_entry()->Reserve(replicates.size());
+ LogEntryBatchPB batch_pb;
+ batch_pb.mutable_entry()->Reserve(replicates.size());
for (const auto& r : replicates) {
- LogEntryPB* entry_pb = batch_pb->add_entry();
+ LogEntryPB* entry_pb = batch_pb.add_entry();
entry_pb->set_type(REPLICATE);
entry_pb->set_allocated_replicate(r->get());
}
unique_ptr<LogEntryBatch> batch = CreateBatchFromPB(
REPLICATE, std::move(batch_pb), std::move(callback));
- batch->SetReplicates(replicates);
+ batch->SetReplicates(std::move(replicates));
return AsyncAppend(std::move(batch));
}
@@ -870,8 +870,8 @@ Status
Log::AsyncAppendCommit(unique_ptr<consensus::CommitMsg> commit_msg,
StatusCallback callback) {
MAYBE_FAULT(FLAGS_fault_crash_before_append_commit);
- unique_ptr<LogEntryBatchPB> batch_pb(new LogEntryBatchPB);
- LogEntryPB* entry = batch_pb->add_entry();
+ LogEntryBatchPB batch_pb;
+ LogEntryPB* entry = batch_pb.add_entry();
entry->set_type(COMMIT);
entry->set_allocated_commit(commit_msg.release());
@@ -939,9 +939,8 @@ Status Log::UpdateIndexForBatch(const LogEntryBatch& batch,
return Status::OK();
}
- for (const LogEntryPB& entry_pb : batch.entry_batch_pb_->entry()) {
+ for (const LogEntryPB& entry_pb : batch.entry_batch_pb_.entry()) {
LogIndexEntry index_entry;
-
index_entry.op_id = entry_pb.replicate().id();
index_entry.segment_sequence_number =
segment_allocator_.active_segment_sequence_number();
index_entry.offset_in_segment = start_offset;
@@ -1003,24 +1002,24 @@ void Log::GetSegmentsToGCUnlocked(RetentionIndexes
retention_indexes,
}
Status Log::Append(LogEntryPB* entry) {
- unique_ptr<LogEntryBatchPB> entry_batch_pb(new LogEntryBatchPB);
- entry_batch_pb->mutable_entry()->AddAllocated(entry);
- LogEntryBatch entry_batch(entry->type(), std::move(entry_batch_pb), 1,
- &DoNothingStatusCB);
+ LogEntryBatchPB entry_batch_pb;
+ entry_batch_pb.mutable_entry()->AddAllocated(entry);
+ LogEntryBatch entry_batch(
+ entry->type(), std::move(entry_batch_pb), 1, &DoNothingStatusCB);
entry_batch.Serialize();
Status s = WriteBatch(&entry_batch);
if (s.ok()) {
s = Sync();
}
- entry_batch.entry_batch_pb_->mutable_entry()->ExtractSubrange(0, 1, nullptr);
+ entry_batch.entry_batch_pb_.mutable_entry()->ExtractSubrange(0, 1, nullptr);
return s;
}
Status Log::WaitUntilAllFlushed() {
// In order to make sure we empty the queue we need to use
// the async api.
- unique_ptr<LogEntryBatchPB> entry_batch(new LogEntryBatchPB);
- entry_batch->add_entry()->set_type(log::FLUSH_MARKER);
+ LogEntryBatchPB entry_batch;
+ entry_batch.add_entry()->set_type(log::FLUSH_MARKER);
Synchronizer s;
unique_ptr<LogEntryBatch> reserved_entry_batch = CreateBatchFromPB(
FLUSH_MARKER, std::move(entry_batch), s.AsStatusCallback());
@@ -1234,21 +1233,21 @@ Log::~Log() {
}
LogEntryBatch::LogEntryBatch(LogEntryTypePB type,
- unique_ptr<LogEntryBatchPB> entry_batch_pb,
+ LogEntryBatchPB entry_batch_pb,
size_t count,
StatusCallback cb)
: type_(type),
entry_batch_pb_(std::move(entry_batch_pb)),
total_size_bytes_(
- PREDICT_FALSE(count == 1 && entry_batch_pb_->entry(0).type() ==
FLUSH_MARKER) ?
- 0 : entry_batch_pb_->ByteSize()),
+ PREDICT_FALSE(count == 1 && entry_batch_pb_.entry(0).type() ==
FLUSH_MARKER)
+ ? 0 : entry_batch_pb_.ByteSize()),
count_(count),
callback_(std::move(cb)) {
}
LogEntryBatch::~LogEntryBatch() {
- if (type_ == REPLICATE && entry_batch_pb_) {
- for (LogEntryPB& entry : *entry_batch_pb_->mutable_entry()) {
+ if (type_ == REPLICATE) {
+ for (LogEntryPB& entry : *entry_batch_pb_.mutable_entry()) {
// ReplicateMsg elements are owned by and must be freed by the caller
// (e.g. the LogCache).
entry.release_replicate();
@@ -1259,11 +1258,11 @@ LogEntryBatch::~LogEntryBatch() {
void LogEntryBatch::Serialize() {
DCHECK_EQ(buffer_.size(), 0);
// FLUSH_MARKER LogEntries are markers and are not serialized.
- if (PREDICT_FALSE(count() == 1 && entry_batch_pb_->entry(0).type() ==
FLUSH_MARKER)) {
+ if (PREDICT_FALSE(count() == 1 && entry_batch_pb_.entry(0).type() ==
FLUSH_MARKER)) {
return;
}
buffer_.reserve(total_size_bytes_);
- pb_util::AppendToString(*entry_batch_pb_, &buffer_);
+ pb_util::AppendToString(entry_batch_pb_, &buffer_);
}
diff --git a/src/kudu/consensus/log.h b/src/kudu/consensus/log.h
index cf8f6a2..556a99c 100644
--- a/src/kudu/consensus/log.h
+++ b/src/kudu/consensus/log.h
@@ -25,6 +25,7 @@
#include <map>
#include <memory>
#include <string>
+#include <utility>
#include <vector>
#include <glog/logging.h>
@@ -296,7 +297,7 @@ class Log : public RefCountedThreadSafe<Log> {
// Append the given set of replicate messages, asynchronously.
// This requires that the replicates have already been assigned OpIds.
- Status AsyncAppendReplicates(const std::vector<consensus::ReplicateRefPtr>&
replicates,
+ Status AsyncAppendReplicates(std::vector<consensus::ReplicateRefPtr>
replicates,
StatusCallback callback);
// Append the given commit message, asynchronously.
@@ -448,8 +449,7 @@ class Log : public RefCountedThreadSafe<Log> {
// After the batch is appended to the log, 'cb' will be invoked with the
// result status of the append.
static std::unique_ptr<LogEntryBatch> CreateBatchFromPB(
- LogEntryTypePB type, std::unique_ptr<LogEntryBatchPB> entry_batch_pb,
- StatusCallback cb);
+ LogEntryTypePB type, LogEntryBatchPB entry_batch_pb, StatusCallback cb);
// Asynchronously appends 'entry_batch' to the log.
Status AsyncAppend(std::unique_ptr<LogEntryBatch> entry_batch);
@@ -564,7 +564,7 @@ class LogEntryBatch {
friend class SegmentAllocator;
LogEntryBatch(LogEntryTypePB type,
- std::unique_ptr<LogEntryBatchPB> entry_batch_pb,
+ LogEntryBatchPB entry_batch_pb,
size_t count,
StatusCallback cb);
@@ -588,13 +588,13 @@ class LogEntryBatch {
// Requires that this be a REPLICATE batch.
consensus::OpId MaxReplicateOpId() const {
DCHECK_EQ(REPLICATE, type_);
- int idx = entry_batch_pb_->entry_size() - 1;
- DCHECK(entry_batch_pb_->entry(idx).replicate().IsInitialized());
- return entry_batch_pb_->entry(idx).replicate().id();
+ int idx = entry_batch_pb_.entry_size() - 1;
+ DCHECK(entry_batch_pb_.entry(idx).replicate().IsInitialized());
+ return entry_batch_pb_.entry(idx).replicate().id();
}
- void SetReplicates(const std::vector<consensus::ReplicateRefPtr>&
replicates) {
- replicates_ = replicates;
+ void SetReplicates(std::vector<consensus::ReplicateRefPtr> replicates) {
+ replicates_ = std::move(replicates);
}
void SetAppendError(const Status& s) {
@@ -612,7 +612,7 @@ class LogEntryBatch {
const LogEntryTypePB type_;
// Contents of the log entries that will be written to disk.
- std::unique_ptr<LogEntryBatchPB> entry_batch_pb_;
+ LogEntryBatchPB entry_batch_pb_;
// Total size in bytes of all entries
const uint32_t total_size_bytes_;
diff --git a/src/kudu/consensus/log_cache-test.cc
b/src/kudu/consensus/log_cache-test.cc
index 423f02a..e19183d 100644
--- a/src/kudu/consensus/log_cache-test.cc
+++ b/src/kudu/consensus/log_cache-test.cc
@@ -26,6 +26,7 @@
#include <ostream>
#include <string>
#include <thread>
+#include <utility>
#include <vector>
#include <gflags/gflags_declare.h>
@@ -128,8 +129,9 @@ class LogCacheTest : public KuduTest {
int64_t index = cur_index;
vector<ReplicateRefPtr> msgs;
msgs.push_back(make_scoped_refptr_replicate(
- CreateDummyReplicate(term, index, clock_->Now(),
payload_size).release()));
- RETURN_NOT_OK(cache_->AppendOperations(msgs, [](const Status& s) {
FatalOnError(s); }));
+ CreateDummyReplicate(term, index, clock_->Now(),
payload_size).release()));
+ RETURN_NOT_OK(cache_->AppendOperations(std::move(msgs),
+ [](const Status& s) {
FatalOnError(s); }));
}
return Status::OK();
}
diff --git a/src/kudu/consensus/log_cache.cc b/src/kudu/consensus/log_cache.cc
index 26e8a1c..78220d1 100644
--- a/src/kudu/consensus/log_cache.cc
+++ b/src/kudu/consensus/log_cache.cc
@@ -148,7 +148,7 @@ void LogCache::TruncateOpsAfterUnlocked(int64_t index) {
next_sequential_op_index_ = index + 1;
}
-Status LogCache::AppendOperations(const vector<ReplicateRefPtr>& msgs,
+Status LogCache::AppendOperations(vector<ReplicateRefPtr> msgs,
const StatusCallback& callback) {
CHECK_GT(msgs.size(), 0);
@@ -212,7 +212,7 @@ Status LogCache::AppendOperations(const
vector<ReplicateRefPtr>& msgs,
metrics_.log_cache_num_ops->IncrementBy(msgs.size());
Status log_status = log_->AsyncAppendReplicates(
- msgs, [this, last_idx_in_batch, borrowed_memory, callback](const Status&
s) {
+ std::move(msgs), [this, last_idx_in_batch, borrowed_memory,
callback](const Status& s) {
this->LogCallback(last_idx_in_batch, borrowed_memory, callback, s);
});
@@ -222,7 +222,6 @@ Status LogCache::AppendOperations(const
vector<ReplicateRefPtr>& msgs,
return log_status;
}
-
return Status::OK();
}
diff --git a/src/kudu/consensus/log_cache.h b/src/kudu/consensus/log_cache.h
index 33eb37c..93b35d1 100644
--- a/src/kudu/consensus/log_cache.h
+++ b/src/kudu/consensus/log_cache.h
@@ -94,7 +94,7 @@ class LogCache {
// when the callback fires.
//
// Returns non-OK if the Log append itself fails.
- Status AppendOperations(const std::vector<ReplicateRefPtr>& msgs,
+ Status AppendOperations(std::vector<ReplicateRefPtr> msgs,
const StatusCallback& callback);
// Truncate any operations with index > 'index'.
diff --git a/src/kudu/consensus/log_reader.cc b/src/kudu/consensus/log_reader.cc
index 0721c70..10ee0d0 100644
--- a/src/kudu/consensus/log_reader.cc
+++ b/src/kudu/consensus/log_reader.cc
@@ -245,7 +245,7 @@ scoped_refptr<ReadableLogSegment>
LogReader::GetSegmentBySequenceNumber(int64_t
Status LogReader::ReadBatchUsingIndexEntry(const LogIndexEntry& index_entry,
faststring* tmp_buf,
- unique_ptr<LogEntryBatchPB>* batch)
const {
+ LogEntryBatchPB* batch) const {
const int64_t index = index_entry.op_id.index();
scoped_refptr<ReadableLogSegment> segment = GetSegmentBySequenceNumber(
@@ -270,7 +270,7 @@ Status LogReader::ReadBatchUsingIndexEntry(const
LogIndexEntry& index_entry,
if (bytes_read_) {
bytes_read_->IncrementBy(segment->entry_header_size() + tmp_buf->length());
- entries_read_->IncrementBy((**batch).entry_size());
+ entries_read_->IncrementBy(batch->entry_size());
}
return Status::OK();
@@ -291,7 +291,7 @@ Status LogReader::ReadReplicatesInRange(int64_t starting_at,
int64_t total_size = 0;
bool limit_exceeded = false;
faststring tmp_buf;
- unique_ptr<LogEntryBatchPB> batch;
+ LogEntryBatchPB batch;
for (int64_t index = starting_at; index <= up_to && !limit_exceeded;
index++) {
LogIndexEntry index_entry;
RETURN_NOT_OK_PREPEND(log_index_->GetEntry(index, &index_entry),
@@ -308,21 +308,21 @@ Status LogReader::ReadReplicatesInRange(int64_t
starting_at,
// Sanity-check the property that a batch should only have increasing
indexes.
int64_t prev_index = 0;
- for (int i = 0; i < batch->entry_size(); ++i) {
- LogEntryPB* entry = batch->mutable_entry(i);
+ for (int i = 0; i < batch.entry_size(); ++i) {
+ LogEntryPB* entry = batch.mutable_entry(i);
if (!entry->has_replicate()) continue;
int64_t this_index = entry->replicate().id().index();
CHECK_GT(this_index, prev_index)
<< "Expected that an entry batch should only include increasing log
indexes: "
<< index_entry.ToString()
- << "\nBatch: " << SecureDebugString(*batch);
+ << "\nBatch: " << SecureDebugString(batch);
prev_index = this_index;
}
}
bool found = false;
- for (int i = 0; i < batch->entry_size(); ++i) {
- LogEntryPB* entry = batch->mutable_entry(i);
+ for (int i = 0; i < batch.entry_size(); ++i) {
+ LogEntryPB* entry = batch.mutable_entry(i);
if (!entry->has_replicate()) {
continue;
}
diff --git a/src/kudu/consensus/log_reader.h b/src/kudu/consensus/log_reader.h
index 2809c1a..620e90d 100644
--- a/src/kudu/consensus/log_reader.h
+++ b/src/kudu/consensus/log_reader.h
@@ -181,7 +181,7 @@ class LogReader : public enable_make_shared<LogReader> {
// 'tmp_buf' is used as scratch space to avoid extra allocation.
Status ReadBatchUsingIndexEntry(const LogIndexEntry& index_entry,
faststring* tmp_buf,
- std::unique_ptr<LogEntryBatchPB>* batch)
const;
+ LogEntryBatchPB* batch) const;
// Reads the headers of all segments in 'tablet_wal_path'.
Status Init(const std::string& tablet_wal_path);
diff --git a/src/kudu/consensus/log_util.cc b/src/kudu/consensus/log_util.cc
index 865fb07..810f369 100644
--- a/src/kudu/consensus/log_util.cc
+++ b/src/kudu/consensus/log_util.cc
@@ -147,7 +147,7 @@ Status
LogEntryReader::ReadNextEntry(unique_ptr<LogEntryPB>* entry) {
}
// We still expect to have more entries in the log.
- unique_ptr<LogEntryBatchPB> current_batch;
+ LogEntryBatchPB current_batch;
// Read and validate the entry header first.
Status s;
@@ -163,8 +163,8 @@ Status
LogEntryReader::ReadNextEntry(unique_ptr<LogEntryPB>* entry) {
}
// Add the entries from this batch to our pending queue.
- for (int i = 0; i < current_batch->entry_size(); i++) {
- auto entry = current_batch->mutable_entry(i);
+ for (int i = 0; i < current_batch.entry_size(); i++) {
+ auto entry = current_batch.mutable_entry(i);
pending_entries_.emplace_back(entry);
num_entries_read_++;
@@ -180,8 +180,8 @@ Status
LogEntryReader::ReadNextEntry(unique_ptr<LogEntryPB>* entry) {
}
recent_entries_.push_back({ offset_, entry->type(), op_id });
}
- current_batch->mutable_entry()->ExtractSubrange(
- 0, current_batch->entry_size(), nullptr);
+ current_batch.mutable_entry()->ExtractSubrange(
+ 0, current_batch.entry_size(), nullptr);
}
*entry = std::move(pending_entries_.front());
@@ -628,7 +628,7 @@ Status ReadableLogSegment::ScanForValidEntryHeaders(
}
Status ReadableLogSegment::ReadEntryHeaderAndBatch(int64_t* offset,
faststring* tmp_buf,
-
unique_ptr<LogEntryBatchPB>* batch,
+ LogEntryBatchPB* batch,
EntryHeaderStatus*
status_detail) const {
int64_t cur_offset = *offset;
EntryHeader header;
@@ -701,12 +701,11 @@ EntryHeaderStatus ReadableLogSegment::DecodeEntryHeader(
Status ReadableLogSegment::ReadEntryBatch(int64_t* offset,
const EntryHeader& header,
faststring* tmp_buf,
- unique_ptr<LogEntryBatchPB>*
entry_batch) const {
+ LogEntryBatchPB* entry_batch) const {
TRACE_EVENT2("log", "ReadableLogSegment::ReadEntryBatch",
"path", path_,
"range", Substitute("offset=$0 entry_len=$1",
*offset, header.msg_length));
-
if (header.msg_length == 0) {
return Status::Corruption("Invalid 0 entry length");
}
@@ -728,9 +727,10 @@ Status ReadableLogSegment::ReadEntryBatch(int64_t* offset,
tmp_buf->resize(buf_len);
Slice entry_batch_slice(tmp_buf->data(), header.msg_length_compressed);
Status s = file_->Read(*offset, entry_batch_slice);
-
- if (!s.ok()) return Status::IOError(Substitute("Could not read entry. Cause:
$0",
- s.ToString()));
+ if (PREDICT_FALSE(!s.ok())) {
+ return Status::IOError(Substitute("Could not read entry. Cause: $0",
+ s.ToString()));
+ }
// Verify the CRC.
uint32_t read_crc = crc::Crc32c(entry_batch_slice.data(),
entry_batch_slice.size());
@@ -750,17 +750,16 @@ Status ReadableLogSegment::ReadEntryBatch(int64_t* offset,
entry_batch_slice = Slice(uncompress_buf, header.msg_length);
}
- unique_ptr<LogEntryBatchPB> read_entry_batch(new LogEntryBatchPB);
- s = pb_util::ParseFromArray(read_entry_batch.get(),
+ LogEntryBatchPB read_entry_batch;
+ s = pb_util::ParseFromArray(&read_entry_batch,
entry_batch_slice.data(),
header.msg_length);
-
if (!s.ok()) {
return Status::Corruption(Substitute("Could not parse PB. Cause: $0",
s.ToString()));
}
*offset += header.msg_length_compressed;
- entry_batch->reset(read_entry_batch.release());
+ *entry_batch = std::move(read_entry_batch);
return Status::OK();
}
diff --git a/src/kudu/consensus/log_util.h b/src/kudu/consensus/log_util.h
index 630cb24..6a2655b 100644
--- a/src/kudu/consensus/log_util.h
+++ b/src/kudu/consensus/log_util.h
@@ -343,7 +343,7 @@ class ReadableLogSegment : public
RefCountedThreadSafe<ReadableLogSegment> {
// If unsuccessful, '*offset' is not updated, and *status_detail will be
updated
// to indicate the cause of the error.
Status ReadEntryHeaderAndBatch(int64_t* offset, faststring* tmp_buf,
- std::unique_ptr<LogEntryBatchPB>* batch,
+ LogEntryBatchPB* batch,
EntryHeaderStatus* status_detail) const;
// Reads a log entry header from the segment.
@@ -366,7 +366,7 @@ class ReadableLogSegment : public
RefCountedThreadSafe<ReadableLogSegment> {
Status ReadEntryBatch(int64_t* offset,
const EntryHeader& header,
faststring* tmp_buf,
- std::unique_ptr<LogEntryBatchPB>* entry_batch) const;
+ LogEntryBatchPB* entry_batch) const;
void UpdateReadableToOffset(int64_t readable_to_offset);