This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new 9287bc2  KUDU-636. Use protobuf arenas for CommitMsgs
9287bc2 is described below

commit 9287bc2095bfea2713cd743dc3f5bb4cd0f41476
Author: Todd Lipcon <[email protected]>
AuthorDate: Mon Jul 6 14:27:34 2020 -0700

    KUDU-636. Use protobuf arenas for CommitMsgs
    
    This commit optimizes the write path by allowing protobuf arenas to be
    used to construct the operation results protobuf and the CommitMsg that
    contains it. The operation result for a large batch of writes has one or
    more embedded protobuf per inserted row, so using a protobuf arena for
    allocation is much more efficient than calling into the system allocator
    for each object.
    
    In order to accomplish this, I had to simplify the Log interface a bit.
    Previously, the Log code constructed a LogEntryPB and passed that
    through to the log's appender thread, even though it had already
    performed all of the serialization on the submitting thread. Doing that
    required that the log entry retain references to all of the embedded
    protobufs, which complicated lifetime quite a bit.
    
    The new Log interface performs all of the serialization and analysis
    (including extracting the OpIds of the replicate messages in the batch)
    inline in the submission path instead of doing any such work on the
    Append thread. With this, the interface now just takes a const protobuf
    reference instead of a unique_ptr<CommitMsg>, which means that the
    caller has a simpler model around its lifetime.
    
    With the above accomplished, it was straightforward to add a protobuf
    Arena to the OpState structure and allocate the CommitMsg and its
    constituent sub-messages from that Arena.
    
    The performance benefits are substantial. I benchmarked on a local
    machine using:
    
      $ kudu perf loadgen localhost -num_rows_per_thread=10000000 -num_threads=8
    
    and ran the tserver under `perf stat` to collect counters:
    
    Without patch:
    
      INSERT report
          rows total: 80000000
          time total: 35860.9 ms
        time per row: 0.000448261 ms
    
       Performance counter stats for './build/latest/bin/kudu tserver run 
-fs-wal-dir /tmp/ts':
    
               378784.92 msec task-clock                #    8.453 CPUs utilized
                 1429039      context-switches          #    0.004 M/sec
                  132930      cpu-migrations            #    0.351 K/sec
                 3128091      page-faults               #    0.008 M/sec
           1553122880821      cycles                    #    4.100 GHz          
            (83.24%)
            313764365792      stalled-cycles-frontend   #   20.20% frontend 
cycles idle     (83.33%)
            166769392663      stalled-cycles-backend    #   10.74% backend 
cycles idle      (83.39%)
            943534760864      instructions              #    0.61  insn per 
cycle
                                                        #    0.33  stalled 
cycles per insn  (83.34%)
            170465210875      branches                  #  450.032 M/sec        
            (83.39%)
               834101556      branch-misses             #    0.49% of all 
branches          (83.32%)
    
           357.520042000 seconds user
            21.770448000 seconds sys
    
    With patch:
    
      INSERT report
          rows total: 80000000
          time total: 32701 ms
        time per row: 0.000408763 ms
    
       Performance counter stats for './build/latest/bin/kudu tserver run 
-fs-wal-dir /tmp/ts':
    
               272393.27 msec task-clock                #    4.915 CPUs utilized
                  300768      context-switches          #    0.001 M/sec
                   44879      cpu-migrations            #    0.165 K/sec
                 2861143      page-faults               #    0.011 M/sec
           1126891932279      cycles                    #    4.137 GHz          
            (83.28%)
            209167186469      stalled-cycles-frontend   #   18.56% frontend 
cycles idle     (83.42%)
            144156173079      stalled-cycles-backend    #   12.79% backend 
cycles idle      (83.34%)
            925439690437      instructions              #    0.82  insn per 
cycle
                                                        #    0.23  stalled 
cycles per insn  (83.28%)
            163672508297      branches                  #  600.868 M/sec        
            (83.33%)
               655509045      branch-misses             #    0.40% of all 
branches          (83.34%)
    
           257.521990000 seconds user
            15.112482000 seconds sys
    
    Summary:
    * 9.6% throughput increase
    * 39% reduction in tserver cycles
    
    Change-Id: I78698d4cb4944bddd8dabd6cbbf1e3a064056199
    Reviewed-on: http://gerrit.cloudera.org:8080/16147
    Tested-by: Kudu Jenkins
    Reviewed-by: Alexey Serbin <[email protected]>
---
 src/kudu/consensus/consensus-test-util.h         |  8 +-
 src/kudu/consensus/log-test-base.h               | 26 +++----
 src/kudu/consensus/log-test.cc                   |  8 +-
 src/kudu/consensus/log.cc                        | 96 +++++++++++-------------
 src/kudu/consensus/log.h                         | 40 +++-------
 src/kudu/consensus/log_util.cc                   |  7 +-
 src/kudu/consensus/log_util.h                    |  8 +-
 src/kudu/consensus/raft_consensus.cc             |  8 +-
 src/kudu/consensus/raft_consensus_quorum-test.cc |  8 +-
 src/kudu/tablet/local_tablet_writer.h            | 14 ++--
 src/kudu/tablet/ops/alter_schema_op.cc           |  5 +-
 src/kudu/tablet/ops/alter_schema_op.h            |  2 +-
 src/kudu/tablet/ops/op.h                         |  9 ++-
 src/kudu/tablet/ops/op_driver.cc                 |  4 +-
 src/kudu/tablet/ops/op_tracker-test.cc           |  2 +-
 src/kudu/tablet/ops/write_op.cc                  | 11 ++-
 src/kudu/tablet/ops/write_op.h                   |  2 +-
 src/kudu/tablet/row_op.cc                        | 21 +++---
 src/kudu/tablet/row_op.h                         | 22 ++++--
 src/kudu/tablet/tablet.cc                        | 17 +++--
 src/kudu/tablet/tablet_bootstrap-test.cc         | 70 ++++++++---------
 src/kudu/tablet/tablet_bootstrap.cc              | 13 ++--
 src/kudu/tablet/tablet_replica-test.cc           |  2 +-
 23 files changed, 206 insertions(+), 197 deletions(-)

diff --git a/src/kudu/consensus/consensus-test-util.h 
b/src/kudu/consensus/consensus-test-util.h
index 262bd7c..7f2fbbd 100644
--- a/src/kudu/consensus/consensus-test-util.h
+++ b/src/kudu/consensus/consensus-test-util.h
@@ -672,11 +672,11 @@ class TestDriver {
   // The commit message has the exact same type of the replicate message, but
   // no content.
   void Apply() {
-    std::unique_ptr<CommitMsg> msg(new CommitMsg);
-    msg->set_op_type(round_->replicate_msg()->op_type());
-    msg->mutable_commited_op_id()->CopyFrom(round_->id());
+    CommitMsg msg;
+    msg.set_op_type(round_->replicate_msg()->op_type());
+    msg.mutable_commited_op_id()->CopyFrom(round_->id());
     CHECK_OK(log_->AsyncAppendCommit(
-        std::move(msg), [this](const Status& s) { this->CommitCallback(s); }));
+        msg, [this](const Status& s) { this->CommitCallback(s); }));
   }
 
   void CommitCallback(const Status& s) {
diff --git a/src/kudu/consensus/log-test-base.h 
b/src/kudu/consensus/log-test-base.h
index 0b412f7..6692102 100644
--- a/src/kudu/consensus/log-test-base.h
+++ b/src/kudu/consensus/log-test-base.h
@@ -278,12 +278,12 @@ class LogTestBase : public KuduTest {
                       int rs_id,
                       int dms_id,
                       bool sync = APPEND_SYNC) {
-    std::unique_ptr<consensus::CommitMsg> commit(new consensus::CommitMsg);
-    commit->set_op_type(consensus::WRITE_OP);
+    consensus::CommitMsg commit;
+    commit.set_op_type(consensus::WRITE_OP);
 
-    commit->mutable_commited_op_id()->CopyFrom(original_opid);
+    commit.mutable_commited_op_id()->CopyFrom(original_opid);
 
-    tablet::TxResultPB* result = commit->mutable_result();
+    tablet::TxResultPB* result = commit.mutable_result();
 
     tablet::OperationResultPB* insert = result->add_ops();
     insert->add_mutated_stores()->set_mrs_id(mrs_id);
@@ -292,35 +292,35 @@ class LogTestBase : public KuduTest {
     tablet::MemStoreTargetPB* target = mutate->add_mutated_stores();
     target->set_dms_id(dms_id);
     target->set_rs_id(rs_id);
-    return AppendCommit(std::move(commit), sync);
+    return AppendCommit(commit, sync);
   }
 
   // Append a COMMIT message for 'original_opid', but with results
   // indicating that the associated writes failed due to
   // "NotFound" errors.
   Status AppendCommitWithNotFoundOpResults(const consensus::OpId& 
original_opid) {
-    std::unique_ptr<consensus::CommitMsg> commit(new consensus::CommitMsg);
-    commit->set_op_type(consensus::WRITE_OP);
-    commit->mutable_commited_op_id()->CopyFrom(original_opid);
+    consensus::CommitMsg commit;
+    commit.set_op_type(consensus::WRITE_OP);
+    commit.mutable_commited_op_id()->CopyFrom(original_opid);
 
-    tablet::TxResultPB* result = commit->mutable_result();
+    tablet::TxResultPB* result = commit.mutable_result();
 
     tablet::OperationResultPB* insert = result->add_ops();
     StatusToPB(Status::NotFound("fake failed write"), 
insert->mutable_failed_status());
     tablet::OperationResultPB* mutate = result->add_ops();
     StatusToPB(Status::NotFound("fake failed write"), 
mutate->mutable_failed_status());
 
-    return AppendCommit(std::move(commit));
+    return AppendCommit(commit);
   }
 
-  Status AppendCommit(std::unique_ptr<consensus::CommitMsg> commit,
+  Status AppendCommit(const consensus::CommitMsg& commit,
                       bool sync = APPEND_SYNC) {
     if (sync) {
       Synchronizer s;
-      RETURN_NOT_OK(log_->AsyncAppendCommit(std::move(commit), 
s.AsStatusCallback()));
+      RETURN_NOT_OK(log_->AsyncAppendCommit(commit, s.AsStatusCallback()));
       return s.Wait();
     }
-    return log_->AsyncAppendCommit(std::move(commit),
+    return log_->AsyncAppendCommit(commit,
                                    [](const Status& s) { CheckCommitResult(s); 
});
   }
 
diff --git a/src/kudu/consensus/log-test.cc b/src/kudu/consensus/log-test.cc
index 3750a35..096bf05 100644
--- a/src/kudu/consensus/log-test.cc
+++ b/src/kudu/consensus/log-test.cc
@@ -928,11 +928,11 @@ void LogTest::AppendTestSequence(const 
vector<TestLogSequenceElem>& seq) {
       }
       case TestLogSequenceElem::COMMIT:
       {
-        unique_ptr<CommitMsg> commit(new CommitMsg);
-        commit->set_op_type(NO_OP);
-        commit->mutable_commited_op_id()->CopyFrom(e.id);
+        CommitMsg commit;
+        commit.set_op_type(NO_OP);
+        commit.mutable_commited_op_id()->CopyFrom(e.id);
         Synchronizer s;
-        ASSERT_OK(log_->AsyncAppendCommit(std::move(commit), 
s.AsStatusCallback()));
+        ASSERT_OK(log_->AsyncAppendCommit(commit, s.AsStatusCallback()));
         ASSERT_OK(s.Wait());
       }
       case TestLogSequenceElem::ROLL:
diff --git a/src/kudu/consensus/log.cc b/src/kudu/consensus/log.cc
index 67c26d2..e94b36c 100644
--- a/src/kudu/consensus/log.cc
+++ b/src/kudu/consensus/log.cc
@@ -29,6 +29,7 @@
 #include <gflags/gflags.h>
 
 #include "kudu/common/wire_protocol.h"
+#include "kudu/consensus/consensus.pb.h"
 #include "kudu/consensus/log_index.h"
 #include "kudu/consensus/log_metrics.h"
 #include "kudu/consensus/log_reader.h"
@@ -175,10 +176,11 @@ static bool ValidateLogsToRetain(const char* flagname, 
int value) {
 DEFINE_validator(log_min_segments_to_retain, &ValidateLogsToRetain);
 
 using kudu::consensus::CommitMsg;
+using kudu::consensus::OpId;
 using kudu::consensus::ReplicateRefPtr;
 using std::string;
-using std::vector;
 using std::unique_ptr;
+using std::vector;
 using strings::Substitute;
 
 namespace kudu {
@@ -576,8 +578,8 @@ void SegmentAllocator::UpdateFooterForBatch(const 
LogEntryBatch& batch) {
   // immediately.
   if (batch.type_ == REPLICATE) {
     // Update the index bounds for the current segment.
-    for (const LogEntryPB& entry_pb : batch.entry_batch_pb_.entry()) {
-      UpdateFooterForReplicateEntry(entry_pb, &footer_);
+    for (const OpId& op_id : batch.replicate_op_ids_) {
+      UpdateFooterForReplicateEntry(op_id, &footer_);
     }
   }
 }
@@ -825,13 +827,10 @@ Status Log::Init() {
 }
 
 unique_ptr<LogEntryBatch> Log::CreateBatchFromPB(
-    LogEntryTypePB type,
-    LogEntryBatchPB entry_batch_pb,
+    LogEntryTypePB type, const LogEntryBatchPB& entry_batch_pb,
     StatusCallback cb) {
-  size_t num_ops = entry_batch_pb.entry_size();
-  unique_ptr<LogEntryBatch> new_entry_batch(new LogEntryBatch(
-      type, std::move(entry_batch_pb), num_ops, std::move(cb)));
-  new_entry_batch->Serialize();
+  unique_ptr<LogEntryBatch> new_entry_batch(
+      new LogEntryBatch(type, entry_batch_pb, std::move(cb)));
   TRACE("Serialized $0 byte log entry", new_entry_batch->total_size_bytes());
   return new_entry_batch;
 }
@@ -859,28 +858,38 @@ Status Log::AsyncAppendReplicates(vector<ReplicateRefPtr> 
replicates,
     entry_pb->set_type(REPLICATE);
     entry_pb->set_allocated_replicate(r->get());
   }
-  unique_ptr<LogEntryBatch> batch = CreateBatchFromPB(
-      REPLICATE, std::move(batch_pb), std::move(callback));
-  batch->SetReplicates(std::move(replicates));
+  unique_ptr<LogEntryBatch> batch =
+      CreateBatchFromPB(REPLICATE, batch_pb, std::move(callback));
+
+  for (LogEntryPB& entry : *batch_pb.mutable_entry()) {
+    entry.release_replicate();
+  }
+
   return AsyncAppend(std::move(batch));
 }
 
-Status Log::AsyncAppendCommit(unique_ptr<consensus::CommitMsg> commit_msg,
+Status Log::AsyncAppendCommit(const consensus::CommitMsg& commit_msg,
                               StatusCallback callback) {
   MAYBE_FAULT(FLAGS_fault_crash_before_append_commit);
 
   LogEntryBatchPB batch_pb;
   LogEntryPB* entry = batch_pb.add_entry();
   entry->set_type(COMMIT);
-  entry->set_allocated_commit(commit_msg.release());
+  
entry->unsafe_arena_set_allocated_commit(const_cast<consensus::CommitMsg*>(&commit_msg));
 
   unique_ptr<LogEntryBatch> entry_batch = CreateBatchFromPB(
-      COMMIT, std::move(batch_pb), std::move(callback));
+      COMMIT, batch_pb, std::move(callback));
+  entry->unsafe_arena_release_commit();
   AsyncAppend(std::move(entry_batch));
   return Status::OK();
 }
 
 Status Log::WriteBatch(LogEntryBatch* entry_batch) {
+  // If there is no data to write return OK.
+  if (PREDICT_FALSE(entry_batch->type_ == FLUSH_MARKER)) {
+    return Status::OK();
+  }
+
   size_t num_entries = entry_batch->count();
   DCHECK_GT(num_entries, 0) << "Cannot call WriteBatch() with zero entries 
reserved";
 
@@ -889,10 +898,6 @@ Status Log::WriteBatch(LogEntryBatch* entry_batch) {
 
   Slice entry_batch_data = entry_batch->data();
   uint32_t entry_batch_bytes = entry_batch->total_size_bytes();
-  // If there is no data to write return OK.
-  if (PREDICT_FALSE(entry_batch_bytes == 0)) {
-    return Status::OK();
-  }
 
   scoped_refptr<ReadableLogSegment> finished_segment;
   scoped_refptr<ReadableLogSegment> new_readable_segment;
@@ -938,9 +943,9 @@ Status Log::UpdateIndexForBatch(const LogEntryBatch& batch,
     return Status::OK();
   }
 
-  for (const LogEntryPB& entry_pb : batch.entry_batch_pb_.entry()) {
+  for (const OpId& op_id : batch.replicate_op_ids_) {
     LogIndexEntry index_entry;
-    index_entry.op_id = entry_pb.replicate().id();
+    index_entry.op_id = op_id;
     index_entry.segment_sequence_number = 
segment_allocator_.active_segment_sequence_number();
     index_entry.offset_in_segment = start_offset;
     RETURN_NOT_OK(log_index_->AddEntry(index_entry));
@@ -1002,15 +1007,13 @@ void Log::GetSegmentsToGCUnlocked(RetentionIndexes 
retention_indexes,
 
 Status Log::Append(LogEntryPB* entry) {
   LogEntryBatchPB entry_batch_pb;
-  entry_batch_pb.mutable_entry()->AddAllocated(entry);
-  LogEntryBatch entry_batch(
-      entry->type(), std::move(entry_batch_pb), 1, &DoNothingStatusCB);
-  entry_batch.Serialize();
+  entry_batch_pb.mutable_entry()->UnsafeArenaAddAllocated(entry);
+  LogEntryBatch entry_batch(entry->type(), entry_batch_pb, &DoNothingStatusCB);
+  entry_batch_pb.mutable_entry()->ExtractSubrange(0, 1, nullptr);
   Status s = WriteBatch(&entry_batch);
   if (s.ok()) {
     s = Sync();
   }
-  entry_batch.entry_batch_pb_.mutable_entry()->ExtractSubrange(0, 1, nullptr);
   return s;
 }
 
@@ -1020,8 +1023,8 @@ Status Log::WaitUntilAllFlushed() {
   LogEntryBatchPB entry_batch;
   entry_batch.add_entry()->set_type(log::FLUSH_MARKER);
   Synchronizer s;
-  unique_ptr<LogEntryBatch> reserved_entry_batch = CreateBatchFromPB(
-      FLUSH_MARKER, std::move(entry_batch), s.AsStatusCallback());
+  unique_ptr<LogEntryBatch> reserved_entry_batch =
+      CreateBatchFromPB(FLUSH_MARKER, entry_batch, s.AsStatusCallback());
   AsyncAppend(std::move(reserved_entry_batch));
   return s.Wait();
 }
@@ -1232,38 +1235,27 @@ Log::~Log() {
 }
 
 LogEntryBatch::LogEntryBatch(LogEntryTypePB type,
-                             LogEntryBatchPB entry_batch_pb,
-                             size_t count,
+                             const LogEntryBatchPB& entry_batch_pb,
                              StatusCallback cb)
     : type_(type),
-      entry_batch_pb_(std::move(entry_batch_pb)),
-      total_size_bytes_(
-          PREDICT_FALSE(count == 1 && entry_batch_pb_.entry(0).type() == 
FLUSH_MARKER)
-              ? 0 : entry_batch_pb_.ByteSizeLong()),
-      count_(count),
+      total_size_bytes_(entry_batch_pb.ByteSizeLong()),
+      count_(entry_batch_pb.entry().size()),
       callback_(std::move(cb)) {
-}
-
-LogEntryBatch::~LogEntryBatch() {
-  if (type_ == REPLICATE) {
-    for (LogEntryPB& entry : *entry_batch_pb_.mutable_entry()) {
-      // ReplicateMsg elements are owned by and must be freed by the caller
-      // (e.g. the LogCache).
-      entry.release_replicate();
-    }
+  if (total_size_bytes_) {
+    buffer_.reserve(total_size_bytes_);
+    pb_util::AppendToString(entry_batch_pb, &buffer_);
   }
-}
 
-void LogEntryBatch::Serialize() {
-  DCHECK_EQ(buffer_.size(), 0);
-  // FLUSH_MARKER LogEntries are markers and are not serialized.
-  if (PREDICT_FALSE(count() == 1 && entry_batch_pb_.entry(0).type() == 
FLUSH_MARKER)) {
-    return;
+  if (type == REPLICATE) {
+    replicate_op_ids_.reserve(entry_batch_pb.entry().size());
+    for (const auto& e : entry_batch_pb.entry()) {
+      DCHECK(e.has_replicate());
+      replicate_op_ids_.emplace_back(e.replicate().id());
+    }
   }
-  buffer_.reserve(total_size_bytes_);
-  pb_util::AppendToString(entry_batch_pb_, &buffer_);
 }
 
+LogEntryBatch::~LogEntryBatch() {}
 
 }  // namespace log
 }  // namespace kudu
diff --git a/src/kudu/consensus/log.h b/src/kudu/consensus/log.h
index 9c6484d..0eaec78 100644
--- a/src/kudu/consensus/log.h
+++ b/src/kudu/consensus/log.h
@@ -25,14 +25,12 @@
 #include <map>
 #include <memory>
 #include <string>
-#include <utility>
 #include <vector>
 
 #include <glog/logging.h>
 #include <gtest/gtest_prod.h>
 
 #include "kudu/common/schema.h"
-#include "kudu/consensus/consensus.pb.h"
 #include "kudu/consensus/log.pb.h"
 #include "kudu/consensus/log_metrics.h"
 #include "kudu/consensus/log_util.h"
@@ -57,6 +55,9 @@ class CompressionCodec;
 class FileCache;
 class FsManager;
 class RWFile;
+namespace consensus {
+class CommitMsg;
+}  // namespace consensus
 
 namespace log {
 
@@ -303,7 +304,7 @@ class Log : public RefCountedThreadSafe<Log> {
   // Append the given commit message, asynchronously.
   //
   // Returns a bad status if the log is already shut down.
-  Status AsyncAppendCommit(std::unique_ptr<consensus::CommitMsg> commit_msg,
+  Status AsyncAppendCommit(const consensus::CommitMsg& commit_msg,
                            StatusCallback callback);
 
   // Blocks the current thread until all the entries in the log queue
@@ -449,7 +450,8 @@ class Log : public RefCountedThreadSafe<Log> {
   // After the batch is appended to the log, 'cb' will be invoked with the
   // result status of the append.
   static std::unique_ptr<LogEntryBatch> CreateBatchFromPB(
-      LogEntryTypePB type, LogEntryBatchPB entry_batch_pb, StatusCallback cb);
+      LogEntryTypePB type, const LogEntryBatchPB& entry_batch_pb,
+      StatusCallback cb);
 
   // Asynchronously appends 'entry_batch' to the log.
   Status AsyncAppend(std::unique_ptr<LogEntryBatch> entry_batch);
@@ -563,9 +565,7 @@ class LogEntryBatch {
   friend class MultiThreadedLogTest;
   friend class SegmentAllocator;
 
-  LogEntryBatch(LogEntryTypePB type,
-                LogEntryBatchPB entry_batch_pb,
-                size_t count,
+  LogEntryBatch(LogEntryTypePB type, const LogEntryBatchPB& entry_batch_pb,
                 StatusCallback cb);
 
   // Serializes contents of the entry to an internal buffer.
@@ -584,19 +584,6 @@ class LogEntryBatch {
     return total_size_bytes_;
   }
 
-  // The highest OpId of a REPLICATE message in this batch.
-  // Requires that this be a REPLICATE batch.
-  consensus::OpId MaxReplicateOpId() const {
-    DCHECK_EQ(REPLICATE, type_);
-    int idx = entry_batch_pb_.entry_size() - 1;
-    DCHECK(entry_batch_pb_.entry(idx).replicate().IsInitialized());
-    return entry_batch_pb_.entry(idx).replicate().id();
-  }
-
-  void SetReplicates(std::vector<consensus::ReplicateRefPtr> replicates) {
-    replicates_ = std::move(replicates);
-  }
-
   void SetAppendError(const Status& s) {
     DCHECK(!s.ok());
     if (append_status_.ok()) {
@@ -611,20 +598,15 @@ class LogEntryBatch {
   // The type of entries in this batch.
   const LogEntryTypePB type_;
 
-  // Contents of the log entries that will be written to disk.
-  LogEntryBatchPB entry_batch_pb_;
-
    // Total size in bytes of all entries
   const size_t total_size_bytes_;
 
-  // Number of entries in 'entry_batch_pb_'
+  // Number of entries in the serialized batch.
   const size_t count_;
 
-  // The vector of refcounted replicates.
-  // Used only when type is REPLICATE, this makes sure there's at
-  // least a reference to each replicate message until we're finished
-  // appending.
-  std::vector<consensus::ReplicateRefPtr> replicates_;
+  // Used only when type is REPLICATE, the opids of the serialized
+  // ReplicateMsgs.
+  std::vector<consensus::OpId> replicate_op_ids_;
 
   // Callback to be invoked upon the entries being written and
   // synced to disk.
diff --git a/src/kudu/consensus/log_util.cc b/src/kudu/consensus/log_util.cc
index 08bab3a..0dd655c 100644
--- a/src/kudu/consensus/log_util.cc
+++ b/src/kudu/consensus/log_util.cc
@@ -392,7 +392,7 @@ Status ReadableLogSegment::RebuildFooterByScanning() {
 
     DCHECK(entry);
     if (entry->has_replicate()) {
-      UpdateFooterForReplicateEntry(*entry, &new_footer);
+      UpdateFooterForReplicateEntry(entry->replicate().id(), &new_footer);
     }
     num_entries++;
   }
@@ -873,10 +873,9 @@ bool IsLogFileName(const string& fname) {
   return true;
 }
 
-void UpdateFooterForReplicateEntry(const LogEntryPB& entry_pb,
+void UpdateFooterForReplicateEntry(const OpId& op_id,
                                    LogSegmentFooterPB* footer) {
-  DCHECK(entry_pb.has_replicate());
-  int64_t index = entry_pb.replicate().id().index();
+  int64_t index = op_id.index();
   if (!footer->has_min_replicate_index() ||
       index < footer->min_replicate_index()) {
     footer->set_min_replicate_index(index);
diff --git a/src/kudu/consensus/log_util.h b/src/kudu/consensus/log_util.h
index 6a2655b..2964126 100644
--- a/src/kudu/consensus/log_util.h
+++ b/src/kudu/consensus/log_util.h
@@ -505,10 +505,10 @@ class WritableLogSegment {
 // Checks if 'fname' is a correctly formatted name of log segment file.
 bool IsLogFileName(const std::string& fname);
 
-// Update 'footer' to reflect the given REPLICATE message 'entry_pb'.
-// In particular, updates the min/max seen replicate OpID.
-void UpdateFooterForReplicateEntry(
-    const LogEntryPB& entry_pb, LogSegmentFooterPB* footer);
+// Update 'footer' to reflect a REPLICATE message with the given
+// op_id. In particular, updates the min/max seen replicate OpID.
+void UpdateFooterForReplicateEntry(const consensus::OpId& op_id,
+                                   LogSegmentFooterPB* footer);
 
 }  // namespace log
 }  // namespace kudu
diff --git a/src/kudu/consensus/raft_consensus.cc 
b/src/kudu/consensus/raft_consensus.cc
index 2bf40e9..c16cc80 100644
--- a/src/kudu/consensus/raft_consensus.cc
+++ b/src/kudu/consensus/raft_consensus.cc
@@ -2848,12 +2848,12 @@ void 
RaftConsensus::NonTxRoundReplicationFinished(ConsensusRound* round,
   VLOG_WITH_PREFIX_UNLOCKED(1) << "Committing " << op_type_str << " with op id 
"
                                << round->id();
   round_handler_->FinishConsensusOnlyRound(round);
-  unique_ptr<CommitMsg> commit_msg(new CommitMsg);
-  commit_msg->set_op_type(round->replicate_msg()->op_type());
-  *commit_msg->mutable_commited_op_id() = round->id();
+  CommitMsg commit_msg;
+  commit_msg.set_op_type(round->replicate_msg()->op_type());
+  *commit_msg.mutable_commited_op_id() = round->id();
 
   CHECK_OK(log_->AsyncAppendCommit(
-      std::move(commit_msg), [](const Status& s) {
+      commit_msg, [](const Status& s) {
         CrashIfNotOkStatusCB("Enqueued commit operation failed to write to 
WAL", s);
       }));
 
diff --git a/src/kudu/consensus/raft_consensus_quorum-test.cc 
b/src/kudu/consensus/raft_consensus_quorum-test.cc
index 307651a..1a1994a 100644
--- a/src/kudu/consensus/raft_consensus_quorum-test.cc
+++ b/src/kudu/consensus/raft_consensus_quorum-test.cc
@@ -300,10 +300,10 @@ class RaftConsensusQuorumTest : public KuduTest {
       commit_callback = &DoNothingStatusCB;
     }
 
-    unique_ptr<CommitMsg> msg(new CommitMsg());
-    msg->set_op_type(NO_OP);
-    msg->mutable_commited_op_id()->CopyFrom(round->id());
-    CHECK_OK(logs_[peer_idx]->AsyncAppendCommit(std::move(msg), 
commit_callback));
+    CommitMsg msg;
+    msg.set_op_type(NO_OP);
+    msg.mutable_commited_op_id()->CopyFrom(round->id());
+    CHECK_OK(logs_[peer_idx]->AsyncAppendCommit(msg, commit_callback));
     return Status::OK();
   }
 
diff --git a/src/kudu/tablet/local_tablet_writer.h 
b/src/kudu/tablet/local_tablet_writer.h
index 38c2223..b59e681 100644
--- a/src/kudu/tablet/local_tablet_writer.h
+++ b/src/kudu/tablet/local_tablet_writer.h
@@ -19,6 +19,8 @@
 #include <memory>
 #include <vector>
 
+#include <google/protobuf/arena.h>
+
 #include "kudu/common/partial_row.h"
 #include "kudu/common/row_operations.h"
 #include "kudu/consensus/log_anchor_registry.h"
@@ -108,13 +110,15 @@ class LocalTabletWriter {
     op_state_->mutable_op_id()->CopyFrom(consensus::MaximumOpId());
     RETURN_NOT_OK(tablet_->ApplyRowOperations(op_state_.get()));
 
-    op_state_->ReleaseTxResultPB(&result_);
+    result_ = google::protobuf::Arena::CreateMessage<TxResultPB>(
+        op_state_->pb_arena());
+    op_state_->ReleaseTxResultPB(result_);
     tablet_->mvcc_manager()->AdjustNewOpLowerBound(op_state_->timestamp());
     op_state_->CommitOrAbort(Op::COMMITTED);
 
     // Return the status of first failed op.
     int op_idx = 0;
-    for (const OperationResultPB& result : result_.ops()) {
+    for (const OperationResultPB& result : result_->ops()) {
       if (result.has_failed_status()) {
         return StatusFromPB(result.failed_status())
           .CloneAndPrepend(ops[op_idx].row->ToString());
@@ -134,15 +138,15 @@ class LocalTabletWriter {
 
   // Return the result of the last row operation run against the tablet.
   const OperationResultPB& last_op_result() {
-    CHECK_GE(result_.ops_size(), 1);
-    return result_.ops(result_.ops_size() - 1);
+    CHECK_GE(result_->ops_size(), 1);
+    return result_->ops(result_->ops_size() - 1);
   }
 
  private:
   Tablet* const tablet_;
   const Schema* client_schema_;
 
-  TxResultPB result_;
+  TxResultPB* result_ = nullptr;
   tserver::WriteRequestPB req_;
   std::unique_ptr<WriteOpState> op_state_;
 
diff --git a/src/kudu/tablet/ops/alter_schema_op.cc 
b/src/kudu/tablet/ops/alter_schema_op.cc
index e1de230..72b5833 100644
--- a/src/kudu/tablet/ops/alter_schema_op.cc
+++ b/src/kudu/tablet/ops/alter_schema_op.cc
@@ -22,6 +22,7 @@
 #include <utility>
 
 #include <glog/logging.h>
+#include <google/protobuf/arena.h>
 
 #include "kudu/clock/hybrid_clock.h"
 #include "kudu/common/schema.h"
@@ -118,13 +119,13 @@ Status AlterSchemaOp::Start() {
   return Status::OK();
 }
 
-Status AlterSchemaOp::Apply(unique_ptr<CommitMsg>* commit_msg) {
+Status AlterSchemaOp::Apply(CommitMsg** commit_msg) {
   TRACE("APPLY ALTER-SCHEMA: Starting");
 
   Tablet* tablet = state_->tablet_replica()->tablet();
   RETURN_NOT_OK(tablet->AlterSchema(state()));
 
-  commit_msg->reset(new CommitMsg());
+  *commit_msg = 
google::protobuf::Arena::CreateMessage<CommitMsg>(state_->pb_arena());
   (*commit_msg)->set_op_type(consensus::OperationType::ALTER_SCHEMA_OP);
 
   // If there was a logical error (e.g. bad schema version) with the alter,
diff --git a/src/kudu/tablet/ops/alter_schema_op.h 
b/src/kudu/tablet/ops/alter_schema_op.h
index 778e06f..d95c80c 100644
--- a/src/kudu/tablet/ops/alter_schema_op.h
+++ b/src/kudu/tablet/ops/alter_schema_op.h
@@ -141,7 +141,7 @@ class AlterSchemaOp : public Op {
   Status Start() override;
 
   // Executes an Apply for the alter schema op
-  Status Apply(std::unique_ptr<consensus::CommitMsg>* commit_msg) override;
+  Status Apply(consensus::CommitMsg** commit_msg) override;
 
   // Actually commits the op.
   void Finish(OpResult result) override;
diff --git a/src/kudu/tablet/ops/op.h b/src/kudu/tablet/ops/op.h
index d7f0a70..73527cb 100644
--- a/src/kudu/tablet/ops/op.h
+++ b/src/kudu/tablet/ops/op.h
@@ -24,6 +24,7 @@
 #include <utility>
 
 #include <glog/logging.h>
+#include <google/protobuf/arena.h>
 
 #include "kudu/common/common.pb.h"
 #include "kudu/common/timestamp.h"
@@ -124,7 +125,7 @@ class Op {
   // Executes the Apply() phase of the op, the actual actions of
   // this phase depend on the op type, but usually this is the
   // method where data-structures are changed.
-  virtual Status Apply(std::unique_ptr<consensus::CommitMsg>* commit_msg) = 0;
+  virtual Status Apply(consensus::CommitMsg** commit_msg) = 0;
 
   // Executed after the op has been applied and the commit message has
   // been appended to the log (though it might not be durable yet), or if the
@@ -221,6 +222,10 @@ class OpState {
     return &arena_;
   }
 
+  google::protobuf::Arena* pb_arena() {
+    return &pb_arena_;
+  }
+
   // Each implementation should have its own ToString() method.
   virtual std::string ToString() const = 0;
 
@@ -293,6 +298,8 @@ class OpState {
 
   Arena arena_;
 
+  google::protobuf::Arena pb_arena_;
+
   // This OpId stores the canonical "anchor" OpId for this op.
   consensus::OpId op_id_;
 
diff --git a/src/kudu/tablet/ops/op_driver.cc b/src/kudu/tablet/ops/op_driver.cc
index 27261e6..fe93195 100644
--- a/src/kudu/tablet/ops/op_driver.cc
+++ b/src/kudu/tablet/ops/op_driver.cc
@@ -510,7 +510,7 @@ void OpDriver::ApplyTask() {
   scoped_refptr<OpDriver> ref(this);
 
   {
-    unique_ptr<CommitMsg> commit_msg;
+    CommitMsg* commit_msg;
     Status s = op_->Apply(&commit_msg);
     if (PREDICT_FALSE(!s.ok())) {
       LOG(WARNING) << Substitute("Did not Apply op $0: $1",
@@ -524,7 +524,7 @@ void OpDriver::ApplyTask() {
     {
       TRACE_EVENT1("op", "AsyncAppendCommit", "op", this);
       CHECK_OK(log_->AsyncAppendCommit(
-          std::move(commit_msg), [](const Status& s) {
+          *commit_msg, [](const Status& s) {
             CrashIfNotOkStatusCB("Enqueued commit operation failed to write to 
WAL", s);
           }));
     }
diff --git a/src/kudu/tablet/ops/op_tracker-test.cc 
b/src/kudu/tablet/ops/op_tracker-test.cc
index 885c5bf..e7035f0 100644
--- a/src/kudu/tablet/ops/op_tracker-test.cc
+++ b/src/kudu/tablet/ops/op_tracker-test.cc
@@ -89,7 +89,7 @@ class OpTrackerTest : public KuduTest,
 
     Status Prepare() override { return Status::OK(); }
     Status Start() override { return Status::OK(); }
-    Status Apply(unique_ptr<consensus::CommitMsg>* /* commit_msg */) override {
+    Status Apply(consensus::CommitMsg** /* commit_msg */) override {
       return Status::OK();
     }
     std::string ToString() const override {
diff --git a/src/kudu/tablet/ops/write_op.cc b/src/kudu/tablet/ops/write_op.cc
index 07a48e6..aa27c97 100644
--- a/src/kudu/tablet/ops/write_op.cc
+++ b/src/kudu/tablet/ops/write_op.cc
@@ -18,6 +18,7 @@
 #include "kudu/tablet/ops/write_op.h"
 
 #include <algorithm>
+#include <atomic>
 #include <cstdint>
 #include <ctime>
 #include <new>
@@ -27,6 +28,7 @@
 #include <boost/optional/optional.hpp>
 #include <gflags/gflags.h>
 #include <glog/logging.h>
+#include <google/protobuf/arena.h>
 
 #include "kudu/clock/clock.h"
 #include "kudu/common/common.pb.h"
@@ -220,7 +222,7 @@ void WriteOp::UpdatePerRowErrors() {
 
 // FIXME: Since this is called as a void in a thread-pool callback,
 // it seems pointless to return a Status!
-Status WriteOp::Apply(unique_ptr<CommitMsg>* commit_msg) {
+Status WriteOp::Apply(CommitMsg** commit_msg) {
   TRACE_EVENT0("op", "WriteOp::Apply");
   TRACE("APPLY: Starting.");
 
@@ -238,7 +240,7 @@ Status WriteOp::Apply(unique_ptr<CommitMsg>* commit_msg) {
   UpdatePerRowErrors();
 
   // Create the Commit message
-  commit_msg->reset(new CommitMsg());
+  *commit_msg = 
google::protobuf::Arena::CreateMessage<CommitMsg>(state_->pb_arena());
   state()->ReleaseTxResultPB((*commit_msg)->mutable_result());
   (*commit_msg)->set_op_type(consensus::OperationType::WRITE_OP);
 
@@ -354,7 +356,7 @@ void WriteOpState::SetRowOps(vector<DecodedRowOperation> 
decoded_ops) {
     if (authz_context_) {
       InsertIfNotPresent(&authz_context_->requested_op_types, op.type);
     }
-    row_ops_.emplace_back(arena->NewObject<RowOp>(std::move(op)));
+    row_ops_.emplace_back(arena->NewObject<RowOp>(pb_arena(), std::move(op)));
   }
 
   // Allocate the ProbeStats objects from the op's arena, so
@@ -405,7 +407,8 @@ void WriteOpState::ReleaseTxResultPB(TxResultPB* result) 
const {
   result->Clear();
   result->mutable_ops()->Reserve(row_ops_.size());
   for (RowOp* op : row_ops_) {
-    result->mutable_ops()->AddAllocated(CHECK_NOTNULL(op->result.release()));
+    DCHECK_EQ(op->result->GetArena(), result->GetArena());
+    result->mutable_ops()->AddAllocated(DCHECK_NOTNULL(op->result));
   }
 }
 
diff --git a/src/kudu/tablet/ops/write_op.h b/src/kudu/tablet/ops/write_op.h
index dbabd69..3db4cd6 100644
--- a/src/kudu/tablet/ops/write_op.h
+++ b/src/kudu/tablet/ops/write_op.h
@@ -312,7 +312,7 @@ class WriteOp : public Op {
   // are placed in the queue (but not necessarily in the same order of the
   // original requests) which is already a requirement of the consensus
   // algorithm.
-  Status Apply(std::unique_ptr<consensus::CommitMsg>* commit_msg) override;
+  Status Apply(consensus::CommitMsg** commit_msg) override;
 
   // If result == COMMITTED, commits the mvcc op and updates the metrics, if
   // result == ABORTED aborts the mvcc op.
diff --git a/src/kudu/tablet/row_op.cc b/src/kudu/tablet/row_op.cc
index e9aa02d..e7b955e 100644
--- a/src/kudu/tablet/row_op.cc
+++ b/src/kudu/tablet/row_op.cc
@@ -22,6 +22,7 @@
 #include <utility>
 
 #include <glog/logging.h>
+#include <google/protobuf/arena.h>
 
 #include "kudu/common/wire_protocol.h"
 #include "kudu/tablet/tablet.pb.h"
@@ -32,11 +33,11 @@ using kudu::pb_util::SecureDebugString;
 using std::unique_ptr;
 
 namespace kudu {
-
 namespace tablet {
 
-RowOp::RowOp(DecodedRowOperation op)
-    : decoded_op(std::move(op)) {
+RowOp::RowOp(google::protobuf::Arena* pb_arena, DecodedRowOperation op)
+    : pb_arena_(pb_arena),
+      decoded_op(std::move(op)) {
   if (!decoded_op.result.ok()) {
     SetFailed(decoded_op.result);
   }
@@ -44,25 +45,26 @@ RowOp::RowOp(DecodedRowOperation op)
 
 void RowOp::SetFailed(const Status& s) {
   DCHECK(!result) << SecureDebugString(*result);
-  result.reset(new OperationResultPB());
+  result = 
google::protobuf::Arena::CreateMessage<OperationResultPB>(pb_arena_);
   StatusToPB(s, result->mutable_failed_status());
 }
 
 void RowOp::SetInsertSucceeded(int mrs_id) {
   DCHECK(!result) << SecureDebugString(*result);
-  result.reset(new OperationResultPB());
+  result = 
google::protobuf::Arena::CreateMessage<OperationResultPB>(pb_arena_);
   result->add_mutated_stores()->set_mrs_id(mrs_id);
 }
 
 void RowOp::SetErrorIgnored() {
   DCHECK(!result) << SecureDebugString(*result);
-  result.reset(new OperationResultPB());
+  result = 
google::protobuf::Arena::CreateMessage<OperationResultPB>(pb_arena_);
   error_ignored = true;
 }
 
-void RowOp::SetMutateSucceeded(unique_ptr<OperationResultPB> result) {
+void RowOp::SetMutateSucceeded(OperationResultPB* result) {
   DCHECK(!this->result) << SecureDebugString(*result);
-  this->result = std::move(result);
+  DCHECK_EQ(result->GetArena(), pb_arena_);
+  this->result = result;
 }
 
 std::string RowOp::ToString(const Schema& schema) const {
@@ -71,7 +73,8 @@ std::string RowOp::ToString(const Schema& schema) const {
 
 void RowOp::SetSkippedResult(const OperationResultPB& result) {
   DCHECK(result.skip_on_replay());
-  this->result.reset(new OperationResultPB(result));
+  this->result = 
google::protobuf::Arena::CreateMessage<OperationResultPB>(pb_arena_);
+  this->result->CopyFrom(result);
 }
 
 } // namespace tablet
diff --git a/src/kudu/tablet/row_op.h b/src/kudu/tablet/row_op.h
index c2bbf16..6474bc3 100644
--- a/src/kudu/tablet/row_op.h
+++ b/src/kudu/tablet/row_op.h
@@ -22,7 +22,12 @@
 #include "kudu/common/row_operations.h"
 #include "kudu/tablet/lock_manager.h"
 #include "kudu/tablet/rowset.h"
-#include "kudu/tablet/tablet.pb.h"
+
+namespace google {
+namespace protobuf {
+class Arena;
+}
+}
 
 namespace kudu {
 
@@ -30,11 +35,12 @@ class Schema;
 class Status;
 
 namespace tablet {
+class OperationResultPB;
 
 // Structure tracking the progress of a single row operation within a 
WriteTransaction.
 struct RowOp {
  public:
-  explicit RowOp(DecodedRowOperation op);
+  RowOp(google::protobuf::Arena* pb_arena, DecodedRowOperation op);
   ~RowOp() = default;
 
   // Functions to set the result of the mutation.
@@ -42,7 +48,11 @@ struct RowOp {
   void SetFailed(const Status& s);
   void SetInsertSucceeded(int mrs_id);
   void SetErrorIgnored();
-  void SetMutateSucceeded(std::unique_ptr<OperationResultPB> result);
+
+  // REQUIRES: result must be allocated from the same protobuf::Arena 
associated
+  // with this RowOp.
+  void SetMutateSucceeded(OperationResultPB* result);
+
   // Sets the result of a skipped operation on bootstrap.
   // TODO(dralves) Currently this performs a copy. Might be avoided with some 
refactoring.
   // see TODO(dralves) in TabletBoostrap::ApplyOperations().
@@ -67,6 +77,8 @@ struct RowOp {
 
   std::string ToString(const Schema& schema) const;
 
+  google::protobuf::Arena* const pb_arena_;
+
   // The original operation as decoded from the client request.
   DecodedRowOperation decoded_op;
 
@@ -100,8 +112,8 @@ struct RowOp {
   // checked and found not to be alive in any RowSet.
   RowSet* present_in_rowset = nullptr;
 
-  // The result of the operation.
-  std::unique_ptr<OperationResultPB> result;
+  // The result of the operation, allocated from pb_arena_
+  OperationResultPB* result = nullptr;
 };
 
 
diff --git a/src/kudu/tablet/tablet.cc b/src/kudu/tablet/tablet.cc
index bc474cc..72212c2 100644
--- a/src/kudu/tablet/tablet.cc
+++ b/src/kudu/tablet/tablet.cc
@@ -31,6 +31,7 @@
 #include <boost/optional/optional.hpp>
 #include <gflags/gflags.h>
 #include <glog/logging.h>
+#include <google/protobuf/arena.h>
 
 #include "kudu/clock/clock.h"
 #include "kudu/clock/hybrid_clock.h"
@@ -702,9 +703,10 @@ Status Tablet::ApplyUpsertAsUpdate(const IOContext* 
io_context,
   // were unset (eg because the table only _has_ primary keys, or because
   // the rest are intended to be set to their defaults), we need to
   // avoid doing anything.
-  unique_ptr<OperationResultPB> result(new OperationResultPB());
+  auto* result = google::protobuf::Arena::CreateMessage<OperationResultPB>(
+      op_state->pb_arena());
   if (enc.is_empty()) {
-    upsert->SetMutateSucceeded(std::move(result));
+    upsert->SetMutateSucceeded(result);
     return Status::OK();
   }
 
@@ -716,13 +718,13 @@ Status Tablet::ApplyUpsertAsUpdate(const IOContext* 
io_context,
                                op_state->op_id(),
                                io_context,
                                stats,
-                               result.get());
+                               result);
   CHECK(!s.IsNotFound());
   if (s.ok()) {
     if (metrics_) {
       metrics_->upserts_as_updates->Increment();
     }
-    upsert->SetMutateSucceeded(std::move(result));
+    upsert->SetMutateSucceeded(result);
   } else {
     upsert->SetFailed(s);
   }
@@ -776,7 +778,8 @@ Status Tablet::MutateRowUnlocked(const IOContext* 
io_context,
   DCHECK(mutate->checked_present);
   DCHECK(mutate->valid);
 
-  unique_ptr<OperationResultPB> result(new OperationResultPB());
+  auto* result = google::protobuf::Arena::CreateMessage<OperationResultPB>(
+      op_state->pb_arena());
   const TabletComponents* comps = 
DCHECK_NOTNULL(op_state->tablet_components());
   Timestamp ts = op_state->timestamp();
 
@@ -790,9 +793,9 @@ Status Tablet::MutateRowUnlocked(const IOContext* 
io_context,
                                       op_state->op_id(),
                                       io_context,
                                       stats,
-                                      result.get());
+                                      result);
   if (PREDICT_TRUE(s.ok())) {
-    mutate->SetMutateSucceeded(std::move(result));
+    mutate->SetMutateSucceeded(result);
   } else {
     if (s.IsNotFound()) {
       // Replace internal error messages with one more suitable for users.
diff --git a/src/kudu/tablet/tablet_bootstrap-test.cc 
b/src/kudu/tablet/tablet_bootstrap-test.cc
index c3fd891..257dfd2 100644
--- a/src/kudu/tablet/tablet_bootstrap-test.cc
+++ b/src/kudu/tablet/tablet_bootstrap-test.cc
@@ -526,25 +526,25 @@ TEST_F(BootstrapTest, TestOutOfOrderCommits) {
   ASSERT_OK(AppendReplicateBatch(replicate));
 
   // Now commit the mutate before the insert (in the log).
-  unique_ptr<consensus::CommitMsg> mutate_commit(new consensus::CommitMsg);
-  mutate_commit->set_op_type(consensus::WRITE_OP);
-  mutate_commit->mutable_commited_op_id()->CopyFrom(mutate_opid);
-  TxResultPB* result = mutate_commit->mutable_result();
+  consensus::CommitMsg mutate_commit;
+  mutate_commit.set_op_type(consensus::WRITE_OP);
+  mutate_commit.mutable_commited_op_id()->CopyFrom(mutate_opid);
+  TxResultPB* result = mutate_commit.mutable_result();
   OperationResultPB* mutate = result->add_ops();
   MemStoreTargetPB* target = mutate->add_mutated_stores();
   target->set_mrs_id(1);
 
-  ASSERT_OK(AppendCommit(std::move(mutate_commit)));
+  ASSERT_OK(AppendCommit(mutate_commit));
 
-  unique_ptr<consensus::CommitMsg> insert_commit(new consensus::CommitMsg);
-  insert_commit->set_op_type(consensus::WRITE_OP);
-  insert_commit->mutable_commited_op_id()->CopyFrom(insert_opid);
-  result = insert_commit->mutable_result();
+  consensus::CommitMsg insert_commit;
+  insert_commit.set_op_type(consensus::WRITE_OP);
+  insert_commit.mutable_commited_op_id()->CopyFrom(insert_opid);
+  result = insert_commit.mutable_result();
   OperationResultPB* insert = result->add_ops();
   target = insert->add_mutated_stores();
   target->set_mrs_id(1);
 
-  ASSERT_OK(AppendCommit(std::move(insert_commit)));
+  ASSERT_OK(AppendCommit(insert_commit));
 
   ConsensusBootstrapInfo boot_info;
   shared_ptr<Tablet> tablet;
@@ -590,15 +590,15 @@ TEST_F(BootstrapTest, TestMissingCommitMessage) {
   ASSERT_OK(AppendReplicateBatch(replicate));
 
   // Now commit the mutate before the insert (in the log).
-  unique_ptr<consensus::CommitMsg> mutate_commit(new consensus::CommitMsg);
-  mutate_commit->set_op_type(consensus::WRITE_OP);
-  mutate_commit->mutable_commited_op_id()->CopyFrom(mutate_opid);
-  TxResultPB* result = mutate_commit->mutable_result();
+  consensus::CommitMsg mutate_commit;
+  mutate_commit.set_op_type(consensus::WRITE_OP);
+  mutate_commit.mutable_commited_op_id()->CopyFrom(mutate_opid);
+  TxResultPB* result = mutate_commit.mutable_result();
   OperationResultPB* mutate = result->add_ops();
   MemStoreTargetPB* target = mutate->add_mutated_stores();
   target->set_mrs_id(1);
 
-  ASSERT_OK(AppendCommit(std::move(mutate_commit)));
+  ASSERT_OK(AppendCommit(mutate_commit));
 
   ConsensusBootstrapInfo boot_info;
   shared_ptr<Tablet> tablet;
@@ -642,22 +642,22 @@ TEST_F(BootstrapTest, 
TestConsensusOnlyOperationOutOfOrderTimestamp) {
 
   // Now commit in OpId order.
   // NO_OP...
-  unique_ptr<consensus::CommitMsg> mutate_commit(new consensus::CommitMsg);
-  mutate_commit->set_op_type(consensus::NO_OP);
-  *mutate_commit->mutable_commited_op_id() = noop_replicate->get()->id();
+  consensus::CommitMsg mutate_commit;
+  mutate_commit.set_op_type(consensus::NO_OP);
+  *mutate_commit.mutable_commited_op_id() = noop_replicate->get()->id();
 
-  ASSERT_OK(AppendCommit(std::move(mutate_commit)));
+  ASSERT_OK(AppendCommit(mutate_commit));
 
   // ...and WRITE_OP...
-  mutate_commit = unique_ptr<consensus::CommitMsg>(new consensus::CommitMsg);
-  mutate_commit->set_op_type(consensus::WRITE_OP);
-  *mutate_commit->mutable_commited_op_id() = write_replicate->get()->id();
-  TxResultPB* result = mutate_commit->mutable_result();
+  mutate_commit.Clear();
+  mutate_commit.set_op_type(consensus::WRITE_OP);
+  *mutate_commit.mutable_commited_op_id() = write_replicate->get()->id();
+  TxResultPB* result = mutate_commit.mutable_result();
   OperationResultPB* mutate = result->add_ops();
   MemStoreTargetPB* target = mutate->add_mutated_stores();
   target->set_mrs_id(1);
 
-  ASSERT_OK(AppendCommit(std::move(mutate_commit)));
+  ASSERT_OK(AppendCommit(mutate_commit));
 
   ConsensusBootstrapInfo boot_info;
   shared_ptr<Tablet> tablet;
@@ -717,17 +717,17 @@ TEST_F(BootstrapTest, TestKudu2509) {
   ASSERT_OK(AppendReplicateBatch(replicate));
 
   // Now commit the mutate before the insert (in the log).
-  unique_ptr<consensus::CommitMsg> mutate_commit(new consensus::CommitMsg);
-  mutate_commit->set_op_type(consensus::WRITE_OP);
-  mutate_commit->mutable_commited_op_id()->CopyFrom(mutate_opid);
-  
mutate_commit->mutable_result()->add_ops()->add_mutated_stores()->set_mrs_id(1);
-  ASSERT_OK(AppendCommit(std::move(mutate_commit)));
-
-  unique_ptr<consensus::CommitMsg> insert_commit(new consensus::CommitMsg);
-  insert_commit->set_op_type(consensus::WRITE_OP);
-  insert_commit->mutable_commited_op_id()->CopyFrom(insert_opid);
-  
insert_commit->mutable_result()->add_ops()->add_mutated_stores()->set_mrs_id(1);
-  ASSERT_OK(AppendCommit(std::move(insert_commit)));
+  consensus::CommitMsg mutate_commit;
+  mutate_commit.set_op_type(consensus::WRITE_OP);
+  mutate_commit.mutable_commited_op_id()->CopyFrom(mutate_opid);
+  
mutate_commit.mutable_result()->add_ops()->add_mutated_stores()->set_mrs_id(1);
+  ASSERT_OK(AppendCommit(mutate_commit));
+
+  consensus::CommitMsg insert_commit;
+  insert_commit.set_op_type(consensus::WRITE_OP);
+  insert_commit.mutable_commited_op_id()->CopyFrom(insert_opid);
+  
insert_commit.mutable_result()->add_ops()->add_mutated_stores()->set_mrs_id(1);
+  ASSERT_OK(AppendCommit(insert_commit));
 
   ConsensusBootstrapInfo boot_info;
   shared_ptr<Tablet> tablet;
diff --git a/src/kudu/tablet/tablet_bootstrap.cc 
b/src/kudu/tablet/tablet_bootstrap.cc
index cd8eafa..17b0c28 100644
--- a/src/kudu/tablet/tablet_bootstrap.cc
+++ b/src/kudu/tablet/tablet_bootstrap.cc
@@ -31,6 +31,7 @@
 #include <boost/optional/optional.hpp>
 #include <gflags/gflags.h>
 #include <glog/logging.h>
+#include <google/protobuf/arena.h>
 
 #include "kudu/clock/clock.h"
 #include "kudu/clock/hybrid_clock.h"
@@ -1359,11 +1360,6 @@ Status 
TabletBootstrap::DetermineSkippedOpsAndBuildResponse(const TxResultPB& or
 Status TabletBootstrap::PlayWriteRequest(const IOContext* io_context,
                                          ReplicateMsg* replicate_msg,
                                          const CommitMsg& commit_msg) {
-  // Prepare the commit entry for the rewritten log.
-  LogEntryPB commit_entry;
-  commit_entry.set_type(log::COMMIT);
-  CommitMsg* new_commit = commit_entry.mutable_commit();
-  new_commit->CopyFrom(commit_msg);
 
   // Set up the new op.
   // Even if we're going to ignore the op, it's important to do this so that
@@ -1375,6 +1371,13 @@ Status TabletBootstrap::PlayWriteRequest(const 
IOContext* io_context,
   op_state.mutable_op_id()->CopyFrom(replicate_msg->id());
   op_state.set_timestamp(Timestamp(replicate_msg->timestamp()));
 
+  // Prepare the commit entry for the rewritten log.
+  LogEntryPB& commit_entry = 
*google::protobuf::Arena::CreateMessage<LogEntryPB>(
+      op_state.pb_arena());
+  commit_entry.set_type(log::COMMIT);
+  CommitMsg* new_commit = commit_entry.mutable_commit();
+  new_commit->CopyFrom(commit_msg);
+
   tablet_->StartOp(&op_state);
   tablet_->StartApplying(&op_state);
 
diff --git a/src/kudu/tablet/tablet_replica-test.cc 
b/src/kudu/tablet/tablet_replica-test.cc
index a29e174..ecd4b1a 100644
--- a/src/kudu/tablet/tablet_replica-test.cc
+++ b/src/kudu/tablet/tablet_replica-test.cc
@@ -257,7 +257,7 @@ class DelayedApplyOp : public WriteOp {
         apply_continue_(DCHECK_NOTNULL(apply_continue)) {
   }
 
-  virtual Status Apply(unique_ptr<CommitMsg>* commit_msg) override {
+  virtual Status Apply(CommitMsg** commit_msg) override {
     apply_started_->CountDown();
     LOG(INFO) << "Delaying apply...";
     apply_continue_->Wait();

Reply via email to