This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new 802557652 [tablet] create CompactionOrFlushInput wrapped into 
shared_ptr
802557652 is described below

commit 8025576523ecff7429ddf0778729b0afe738bbc6
Author: Alexey Serbin <[email protected]>
AuthorDate: Mon Nov 25 12:39:53 2024 -0800

    [tablet] create CompactionOrFlushInput wrapped into shared_ptr
    
    Prior to this patch, it was necessary to re-wrap pointers to newly
    created CompactionOrFlushInput objects from std::unique_ptr or raw
    pointers into std::shared_ptr since MergeCompactionInput::MergeState
    stores std::shared_ptr<CompactionOrFlushInput> as one of its fields.
    It resulted in one extra memory allocation compared with a case when
    using std::make_shared to create CompactionOrFlushInput object wrapped
    into std::shared_ptr from the very beginning [1].
    
    This patch addresses the issue described above, so now all the related
    factories of CompactionOrFlushInput objects call std::make_shared
    to create CompactionOrFlushInput objects wrapped into std::shared_ptr
    from the very beginning.
    
    This patch doesn't contain any functional modifications.
    
    [1] https://en.cppreference.com/w/cpp/memory/shared_ptr
    
    Change-Id: I442b6ad3780c700bd3b1d357e24dbb2733aff8bf
    Reviewed-on: http://gerrit.cloudera.org:8080/22121
    Tested-by: Alexey Serbin <[email protected]>
    Reviewed-by: Mahesh Reddy <[email protected]>
    Reviewed-by: Abhishek Chennaka <[email protected]>
---
 src/kudu/tablet/compaction-test.cc | 81 +++++++++++++++++---------------------
 src/kudu/tablet/compaction.cc      | 27 +++++++------
 src/kudu/tablet/compaction.h       | 11 +++---
 src/kudu/tablet/diskrowset.cc      |  4 +-
 src/kudu/tablet/diskrowset.h       |  2 +-
 src/kudu/tablet/memrowset.cc       |  4 +-
 src/kudu/tablet/memrowset.h        |  2 +-
 src/kudu/tablet/mock-rowsets.h     |  2 +-
 src/kudu/tablet/rowset.cc          |  2 +-
 src/kudu/tablet/rowset.h           |  4 +-
 src/kudu/tablet/tablet.cc          |  2 +-
 11 files changed, 67 insertions(+), 74 deletions(-)

diff --git a/src/kudu/tablet/compaction-test.cc 
b/src/kudu/tablet/compaction-test.cc
index 6b2151116..83565091d 100644
--- a/src/kudu/tablet/compaction-test.cc
+++ b/src/kudu/tablet/compaction-test.cc
@@ -106,8 +106,6 @@ using strings::Substitute;
 namespace kudu {
 namespace tablet {
 
-class RowSetMetadata;
-
 constexpr const char* const kRowKeyFormat = "hello %010" PRId64;
 constexpr const size_t kLargeRollThreshold = 8UL * 1024 * 1024 * 1024;  // 8GB
 constexpr const size_t kSmallRollThreshold = 1024; // 1KB
@@ -351,14 +349,14 @@ class TestCompaction : public KuduRowSetTest {
   static Status BuildCompactionInput(const MvccSnapshot& merge_snap,
                                      const vector<shared_ptr<DiskRowSet>>& 
rowsets,
                                      const Schema& projection,
-                                     unique_ptr<CompactionOrFlushInput>* out) {
+                                     shared_ptr<CompactionOrFlushInput>* out) {
     vector<shared_ptr<CompactionOrFlushInput>> merge_inputs;
     for (const auto& rs : rowsets) {
-      unique_ptr<CompactionOrFlushInput> input;
+      shared_ptr<CompactionOrFlushInput> input;
       RETURN_NOT_OK(CompactionOrFlushInput::Create(*rs, &projection, 
merge_snap, nullptr, &input));
-      merge_inputs.emplace_back(input.release());
+      merge_inputs.emplace_back(std::move(input));
     }
-    out->reset(CompactionOrFlushInput::Merge(merge_inputs, &projection));
+    *out = CompactionOrFlushInput::Merge(merge_inputs, &projection);
     return Status::OK();
   }
 
@@ -370,7 +368,7 @@ class TestCompaction : public KuduRowSetTest {
                           size_t roll_threshold,
                           vector<shared_ptr<DiskRowSet>>* result_rowsets) {
     MvccSnapshot merge_snap(mvcc_);
-    unique_ptr<CompactionOrFlushInput> compact_input;
+    shared_ptr<CompactionOrFlushInput> compact_input;
     RETURN_NOT_OK(BuildCompactionInput(merge_snap, rowsets, projection, 
&compact_input));
     return DoFlushAndReopen(
         compact_input.get(), projection, merge_snap, roll_threshold, 
result_rowsets);
@@ -394,10 +392,7 @@ class TestCompaction : public KuduRowSetTest {
                            size_t roll_threshold,
                            vector<shared_ptr<DiskRowSet>>* result_rowsets) {
     MvccSnapshot snap(mvcc_);
-    vector<shared_ptr<RowSetMetadata>> rowset_metas;
-    unique_ptr<CompactionOrFlushInput> 
input(CompactionOrFlushInput::Create(mrs,
-                                                                            
&projection,
-                                                                            
snap));
+    auto input(CompactionOrFlushInput::Create(mrs, &projection, snap));
     return DoFlushAndReopen(input.get(), projection, snap, roll_threshold, 
result_rowsets);
   }
 
@@ -532,7 +527,7 @@ class TestCompaction : public KuduRowSetTest {
     LOG_TIMING(INFO, "compacting " +
                std::string((OVERLAP_INPUTS ? "with overlap" : "without 
overlap"))) {
       MvccSnapshot merge_snap(mvcc_);
-      unique_ptr<CompactionOrFlushInput> compact_input;
+      shared_ptr<CompactionOrFlushInput> compact_input;
       ASSERT_OK(BuildCompactionInput(merge_snap, rowsets, schema_, 
&compact_input));
       // Use a low target row size to increase the number of resulting rowsets.
       RollingDiskRowSetWriter rdrsw(tablet()->metadata(), schema_,
@@ -580,7 +575,7 @@ TEST_F(TestCompaction, TestMemRowSetInput) {
   // and mutations.
   vector<string> out;
   MvccSnapshot snap(mvcc_);
-  unique_ptr<CompactionOrFlushInput> 
input(CompactionOrFlushInput::Create(*mrs, &schema_, snap));
+  auto input(CompactionOrFlushInput::Create(*mrs, &schema_, snap));
   IterateInput(input.get(), &out);
   ASSERT_EQ(10, out.size());
   EXPECT_EQ("RowIdxInBlock: 0; Base: (string key=\"hello 0000000000\", int64 
val=0, "
@@ -643,7 +638,7 @@ TEST_F(TestCompaction, TestRowSetInput) {
 
   // Check compaction input
   vector<string> out;
-  unique_ptr<CompactionOrFlushInput> input;
+  shared_ptr<CompactionOrFlushInput> input;
   ASSERT_OK(CompactionOrFlushInput::Create(*rs, &schema_, MvccSnapshot(mvcc_), 
nullptr, &input));
   IterateInput(input.get(), &out);
   ASSERT_EQ(10, out.size());
@@ -706,7 +701,7 @@ TEST_F(TestCompaction, TestDuplicatedGhostRowsMerging) {
   shared_ptr<DiskRowSet> result;
   CompactAndReopenNoRoll(all_rss, schema_, &result);
 
-  unique_ptr<CompactionOrFlushInput> input;
+  shared_ptr<CompactionOrFlushInput> input;
   ASSERT_OK(CompactionOrFlushInput::Create(*result,
                                            &schema_,
                                            
MvccSnapshot::CreateSnapshotIncludingAllOps(),
@@ -880,9 +875,9 @@ TEST_F(TestCompaction, TestDuplicatedRowsRandomCompaction) {
 
   vector<shared_ptr<CompactionOrFlushInput>> inputs;
   for (const auto& row_set : row_sets) {
-    unique_ptr<CompactionOrFlushInput> ci;
+    shared_ptr<CompactionOrFlushInput> ci;
     ASSERT_OK(row_set->NewCompactionInput(&schema_, all_snap, nullptr, &ci));
-    inputs.emplace_back(ci.release());
+    inputs.emplace_back(std::move(ci));
   }
 
   // Compact the row sets by picking a few at random until we're left with 
just one.
@@ -903,7 +898,7 @@ TEST_F(TestCompaction, TestDuplicatedRowsRandomCompaction) {
   }
 
   vector<string> out;
-  unique_ptr<CompactionOrFlushInput> ci;
+  shared_ptr<CompactionOrFlushInput> ci;
   ASSERT_OK(row_sets[0]->NewCompactionInput(&schema_, all_snap, nullptr, &ci));
   IterateInput(ci.get(), &out);
 
@@ -952,17 +947,17 @@ TEST_F(TestCompaction, 
TestMRSCompactionDoesntOutputUnobservableRows) {
 
   vector<shared_ptr<CompactionOrFlushInput>> to_merge;
   {
-    unique_ptr<CompactionOrFlushInput> rs1_input;
+    shared_ptr<CompactionOrFlushInput> rs1_input;
     ASSERT_OK(CompactionOrFlushInput::Create(*rs1, &schema_, all_snap, 
nullptr, &rs1_input));
 
-    unique_ptr<CompactionOrFlushInput> rs2_input;
+    shared_ptr<CompactionOrFlushInput> rs2_input;
     ASSERT_OK(CompactionOrFlushInput::Create(*rs2, &schema_, all_snap, 
nullptr, &rs2_input));
 
-    to_merge.emplace_back(rs1_input.release());
-    to_merge.emplace_back(rs2_input.release());
+    to_merge.emplace_back(std::move(rs1_input));
+    to_merge.emplace_back(std::move(rs2_input));
   }
 
-  unique_ptr<CompactionOrFlushInput> 
merged(CompactionOrFlushInput::Merge(to_merge, &schema_));
+  auto merged(CompactionOrFlushInput::Merge(to_merge, &schema_));
 
   // Make sure the unobservable version of the row that was inserted and 
deleted in the MRS
   // in the same op doesn't show up in the compaction input.
@@ -997,13 +992,11 @@ TEST_F(TestCompaction, TestOneToOne) {
 
   // Catch the updates that came in after the snapshot flush was made.
   MvccSnapshot snap2(mvcc_);
-  unique_ptr<CompactionOrFlushInput> 
input(CompactionOrFlushInput::Create(*mrs, &schema_, snap2));
+  auto input(CompactionOrFlushInput::Create(*mrs, &schema_, snap2));
 
   // Add some more updates which come into the new rowset while the "reupdate" 
is happening.
   UpdateRows(rs.get(), 1000, 0, 3);
 
-  string dummy_name = "";
-
   ASSERT_OK(ReupdateMissedDeltas(nullptr, input.get(), 
HistoryGcOpts::Disabled(), snap, snap2,
                                  { rs }));
 
@@ -1019,7 +1012,7 @@ TEST_F(TestCompaction, TestOneToOne) {
 
   // And compact (1 input to 1 output)
   MvccSnapshot snap3(mvcc_);
-  unique_ptr<CompactionOrFlushInput> compact_input;
+  shared_ptr<CompactionOrFlushInput> compact_input;
   ASSERT_OK(CompactionOrFlushInput::Create(*rs, &schema_, snap3, nullptr, 
&compact_input));
   ASSERT_OK(DoFlushAndReopen(compact_input.get(), schema_, snap3, 
kLargeRollThreshold, nullptr));
 }
@@ -1053,9 +1046,7 @@ TEST_F(TestCompaction, TestKUDU102) {
   vector<shared_ptr<CompactionOrFlushInput>> merge_inputs;
   merge_inputs.emplace_back(CompactionOrFlushInput::Create(*mrs, &schema_, 
snap2));
   merge_inputs.emplace_back(CompactionOrFlushInput::Create(*mrs_b, &schema_, 
snap2));
-  unique_ptr<CompactionOrFlushInput> 
input(CompactionOrFlushInput::Merge(merge_inputs, &schema_));
-
-  string dummy_name = "";
+  auto input(CompactionOrFlushInput::Merge(merge_inputs, &schema_));
 
   // This would fail without KUDU-102
   ASSERT_OK(ReupdateMissedDeltas(nullptr, input.get(), 
HistoryGcOpts::Disabled(), snap, snap2,
@@ -1109,10 +1100,10 @@ TEST_F(TestCompaction, TestMergeMRS) {
 
   MvccSnapshot snap(mvcc_);
   vector<shared_ptr<CompactionOrFlushInput>> merge_inputs {
-    shared_ptr<CompactionOrFlushInput>(CompactionOrFlushInput::Create(*mrs_a, 
&schema_, snap)),
-    shared_ptr<CompactionOrFlushInput>(CompactionOrFlushInput::Create(*mrs_b, 
&schema_, snap))
+    CompactionOrFlushInput::Create(*mrs_a, &schema_, snap),
+    CompactionOrFlushInput::Create(*mrs_b, &schema_, snap)
   };
-  unique_ptr<CompactionOrFlushInput> 
input(CompactionOrFlushInput::Merge(merge_inputs, &schema_));
+  auto input(CompactionOrFlushInput::Merge(merge_inputs, &schema_));
   vector<shared_ptr<DiskRowSet>> result_rs;
   ASSERT_OK(DoFlushAndReopen(input.get(), schema_, snap, kSmallRollThreshold, 
&result_rs));
   ASSERT_EQ(20, CountRows(result_rs));
@@ -1128,10 +1119,10 @@ TEST_F(TestCompaction, TestMergeMRSWithInvisibleRows) {
   InsertRows(mrs_b.get(), 10, 0);
   MvccSnapshot snap(mvcc_);
   vector<shared_ptr<CompactionOrFlushInput>> merge_inputs {
-    shared_ptr<CompactionOrFlushInput>(CompactionOrFlushInput::Create(*mrs_a, 
&schema_, snap)),
-    shared_ptr<CompactionOrFlushInput>(CompactionOrFlushInput::Create(*mrs_b, 
&schema_, snap))
+    CompactionOrFlushInput::Create(*mrs_a, &schema_, snap),
+    CompactionOrFlushInput::Create(*mrs_b, &schema_, snap)
   };
-  unique_ptr<CompactionOrFlushInput> 
input(CompactionOrFlushInput::Merge(merge_inputs, &schema_));
+  auto input(CompactionOrFlushInput::Merge(merge_inputs, &schema_));
   vector<shared_ptr<DiskRowSet>> result_rs;
   ASSERT_OK(DoFlushAndReopen(input.get(), schema_, snap, kSmallRollThreshold, 
&result_rs));
   ASSERT_EQ(1, result_rs.size());
@@ -1208,10 +1199,10 @@ TEST_F(TestCompaction, 
TestRandomizeDuplicatedRowsAcrossTransactions) {
   MvccSnapshot snap(mvcc_);
   vector<shared_ptr<CompactionOrFlushInput>> merge_inputs;
   merge_inputs.emplace_back(CompactionOrFlushInput::Create(*main_mrs, 
&schema_, snap));
-  for (auto& mrs : txn_mrss) {
+  for (const auto& mrs : txn_mrss) {
     merge_inputs.emplace_back(CompactionOrFlushInput::Create(*mrs, &schema_, 
snap));
   }
-  unique_ptr<CompactionOrFlushInput> 
input(CompactionOrFlushInput::Merge(merge_inputs, &schema_));
+  auto input(CompactionOrFlushInput::Merge(merge_inputs, &schema_));
   vector<shared_ptr<DiskRowSet>> result_rs;
   ASSERT_OK(DoFlushAndReopen(input.get(), schema_, snap, kSmallRollThreshold, 
&result_rs));
   ASSERT_EQ(1, result_rs.size());
@@ -1253,11 +1244,11 @@ TEST_F(TestCompaction, 
TestRowHistoryJumpsBetweenRowsets) {
   // should go off without a hitch.
   MvccSnapshot snap(mvcc_);
   vector<shared_ptr<CompactionOrFlushInput>> merge_inputs {
-    shared_ptr<CompactionOrFlushInput>(CompactionOrFlushInput::Create(*mrs_a, 
&schema_, snap)),
-    shared_ptr<CompactionOrFlushInput>(CompactionOrFlushInput::Create(*mrs_b, 
&schema_, snap)),
-    shared_ptr<CompactionOrFlushInput>(CompactionOrFlushInput::Create(*mrs_c, 
&schema_, snap)),
+    CompactionOrFlushInput::Create(*mrs_a, &schema_, snap),
+    CompactionOrFlushInput::Create(*mrs_b, &schema_, snap),
+    CompactionOrFlushInput::Create(*mrs_c, &schema_, snap),
   };
-  unique_ptr<CompactionOrFlushInput> 
input(CompactionOrFlushInput::Merge(merge_inputs, &schema_));
+  auto input(CompactionOrFlushInput::Merge(merge_inputs, &schema_));
   vector<shared_ptr<DiskRowSet>> result_rs;
   ASSERT_OK(DoFlushAndReopen(input.get(), schema_, snap, kSmallRollThreshold, 
&result_rs));
   ASSERT_EQ(1, result_rs.size());
@@ -1270,10 +1261,10 @@ TEST_F(TestCompaction, 
TestMergeMRSWithAllInvisibleRows) {
   shared_ptr<MemRowSet> mrs_b = CreateInvisibleMRS();
   MvccSnapshot snap(mvcc_);
   vector<shared_ptr<CompactionOrFlushInput>> merge_inputs {
-    shared_ptr<CompactionOrFlushInput>(CompactionOrFlushInput::Create(*mrs_a, 
&schema_, snap)),
-    shared_ptr<CompactionOrFlushInput>(CompactionOrFlushInput::Create(*mrs_b, 
&schema_, snap))
+    CompactionOrFlushInput::Create(*mrs_a, &schema_, snap),
+    CompactionOrFlushInput::Create(*mrs_b, &schema_, snap)
   };
-  unique_ptr<CompactionOrFlushInput> 
input(CompactionOrFlushInput::Merge(merge_inputs, &schema_));
+  auto input(CompactionOrFlushInput::Merge(merge_inputs, &schema_));
   vector<shared_ptr<DiskRowSet>> result_rs;
   ASSERT_OK(DoFlushAndReopen(input.get(), schema_, snap, kSmallRollThreshold, 
&result_rs));
   ASSERT_TRUE(result_rs.empty());
diff --git a/src/kudu/tablet/compaction.cc b/src/kudu/tablet/compaction.cc
index 9ad3a1576..70e4f0aff 100644
--- a/src/kudu/tablet/compaction.cc
+++ b/src/kudu/tablet/compaction.cc
@@ -66,6 +66,7 @@ using kudu::fs::IOContext;
 using kudu::fs::FsErrorManager;
 using kudu::fs::KUDU_2233_CORRUPTION;
 using std::deque;
+using std::make_shared;
 using std::shared_ptr;
 using std::string;
 using std::unique_ptr;
@@ -1100,7 +1101,7 @@ Status CompactionOrFlushInput::Create(const DiskRowSet& 
rowset,
                                       const Schema* projection,
                                       const MvccSnapshot& snap,
                                       const IOContext* io_context,
-                                      unique_ptr<CompactionOrFlushInput>* out) 
{
+                                      shared_ptr<CompactionOrFlushInput>* out) 
{
   CHECK(projection->has_column_ids());
 
   unique_ptr<ColumnwiseIterator> 
base_cwise(rowset.base_data_->NewIterator(projection, io_context));
@@ -1124,24 +1125,24 @@ Status CompactionOrFlushInput::Create(const DiskRowSet& 
rowset,
   RETURN_NOT_OK_PREPEND(rowset.delta_tracker_->NewDeltaIterator(
       undo_opts, DeltaTracker::UNDOS_ONLY, &undo_deltas), "Could not open 
UNDOs");
 
-  out->reset(new DiskRowSetCompactionInput(std::move(base_iter),
-                                           std::move(redo_deltas),
-                                           std::move(undo_deltas)));
+  *out = make_shared<DiskRowSetCompactionInput>(
+      std::move(base_iter), std::move(redo_deltas), std::move(undo_deltas));
   return Status::OK();
 }
 
-CompactionOrFlushInput* CompactionOrFlushInput::Create(const MemRowSet& 
memrowset,
-                                                       const Schema* 
projection,
-                                                       const MvccSnapshot& 
snap) {
+shared_ptr<CompactionOrFlushInput> CompactionOrFlushInput::Create(
+    const MemRowSet& memrowset,
+    const Schema* projection,
+    const MvccSnapshot& snap) {
   CHECK(projection->has_column_ids());
-  return new MemRowSetCompactionInput(memrowset, snap, projection);
+  return make_shared<MemRowSetCompactionInput>(memrowset, snap, projection);
 }
 
-CompactionOrFlushInput* CompactionOrFlushInput::Merge(
+shared_ptr<CompactionOrFlushInput> CompactionOrFlushInput::Merge(
     const vector<shared_ptr<CompactionOrFlushInput>>& inputs,
     const Schema* schema) {
   CHECK(schema->has_column_ids());
-  return new MergeCompactionInput(inputs, schema);
+  return make_shared<MergeCompactionInput>(inputs, schema);
 }
 
 
@@ -1154,17 +1155,17 @@ Status 
RowSetsInCompactionOrFlush::CreateCompactionOrFlushInput(
 
   vector<shared_ptr<CompactionOrFlushInput>> inputs;
   for (const auto& rs : rowsets_) {
-    unique_ptr<CompactionOrFlushInput> input;
+    shared_ptr<CompactionOrFlushInput> input;
     RETURN_NOT_OK_PREPEND(rs->NewCompactionInput(schema, snap, io_context, 
&input),
                           Substitute("Could not create compaction input for 
rowset $0",
                                      rs->ToString()));
-    inputs.push_back(shared_ptr<CompactionOrFlushInput>(input.release()));
+    inputs.emplace_back(std::move(input));
   }
 
   if (inputs.size() == 1) {
     *out = std::move(inputs[0]);
   } else {
-    out->reset(CompactionOrFlushInput::Merge(inputs, schema));
+    *out = CompactionOrFlushInput::Merge(inputs, schema);
   }
 
   return Status::OK();
diff --git a/src/kudu/tablet/compaction.h b/src/kudu/tablet/compaction.h
index 67c42587c..b5f862873 100644
--- a/src/kudu/tablet/compaction.h
+++ b/src/kudu/tablet/compaction.h
@@ -110,17 +110,18 @@ class CompactionOrFlushInput {
                        const Schema* projection,
                        const MvccSnapshot& snap,
                        const fs::IOContext* io_context,
-                       std::unique_ptr<CompactionOrFlushInput>* out);
+                       std::shared_ptr<CompactionOrFlushInput>* out);
 
   // Create an input which reads from the given memrowset, yielding base rows 
and updates
   // prior to the given snapshot.
-  static CompactionOrFlushInput* Create(const MemRowSet& memrowset,
-                                        const Schema* projection,
-                                        const MvccSnapshot& snap);
+  static std::shared_ptr<CompactionOrFlushInput> Create(
+      const MemRowSet& memrowset,
+      const Schema* projection,
+      const MvccSnapshot& snap);
 
   // Create a collection of merge states containing a state per input. The 
inputs are merged
   // in key-order according to the given schema. All inputs must have matching 
schemas.
-  static CompactionOrFlushInput* Merge(
+  static std::shared_ptr<CompactionOrFlushInput> Merge(
       const std::vector<std::shared_ptr<CompactionOrFlushInput>>& inputs,
       const Schema* schema);
 
diff --git a/src/kudu/tablet/diskrowset.cc b/src/kudu/tablet/diskrowset.cc
index 912cf0ffb..1c01ded1a 100644
--- a/src/kudu/tablet/diskrowset.cc
+++ b/src/kudu/tablet/diskrowset.cc
@@ -671,7 +671,7 @@ Status DiskRowSet::NewRowIterator(const RowIteratorOptions& 
opts,
 Status DiskRowSet::NewCompactionInput(const Schema* projection,
                                       const MvccSnapshot& snap,
                                       const IOContext* io_context,
-                                      unique_ptr<CompactionOrFlushInput>* out) 
const {
+                                      shared_ptr<CompactionOrFlushInput>* out) 
const {
   return CompactionOrFlushInput::Create(*this, projection, snap, io_context, 
out);
 }
 
@@ -927,7 +927,7 @@ Status DiskRowSet::DeleteAncientUndoDeltas(Timestamp 
ancient_history_mark,
 Status DiskRowSet::DebugDumpImpl(int64_t* rows_left, vector<string>* lines) {
   // Using CompactionOrFlushInput to dump our data is an easy way of seeing 
all the
   // rows and deltas.
-  unique_ptr<CompactionOrFlushInput> input;
+  shared_ptr<CompactionOrFlushInput> input;
   RETURN_NOT_OK(NewCompactionInput(rowset_metadata_->tablet_schema().get(),
                                    
MvccSnapshot::CreateSnapshotIncludingAllOps(),
                                    nullptr, &input));
diff --git a/src/kudu/tablet/diskrowset.h b/src/kudu/tablet/diskrowset.h
index 070260cbc..0454d9e4c 100644
--- a/src/kudu/tablet/diskrowset.h
+++ b/src/kudu/tablet/diskrowset.h
@@ -383,7 +383,7 @@ class DiskRowSet :
   Status NewCompactionInput(const Schema* projection,
                             const MvccSnapshot& snap,
                             const fs::IOContext* io_context,
-                            std::unique_ptr<CompactionOrFlushInput>* out) 
const override;
+                            std::shared_ptr<CompactionOrFlushInput>* out) 
const override;
 
   // Gets the number of rows in this rowset, checking 'num_rows_' first. If not
   // yet set, consults the base data and stores the result in 'num_rows_'.
diff --git a/src/kudu/tablet/memrowset.cc b/src/kudu/tablet/memrowset.cc
index 182365b45..86b7cec3b 100644
--- a/src/kudu/tablet/memrowset.cc
+++ b/src/kudu/tablet/memrowset.cc
@@ -354,8 +354,8 @@ Status MemRowSet::NewRowIterator(const RowIteratorOptions& 
opts,
 Status MemRowSet::NewCompactionInput(const Schema* projection,
                                      const MvccSnapshot& snap,
                                      const IOContext* /*io_context*/,
-                                     unique_ptr<CompactionOrFlushInput>* out) 
const  {
-  out->reset(CompactionOrFlushInput::Create(*this, projection, snap));
+                                     shared_ptr<CompactionOrFlushInput>* out) 
const  {
+  *out = CompactionOrFlushInput::Create(*this, projection, snap);
   return Status::OK();
 }
 
diff --git a/src/kudu/tablet/memrowset.h b/src/kudu/tablet/memrowset.h
index ae194066c..c5b73c81e 100644
--- a/src/kudu/tablet/memrowset.h
+++ b/src/kudu/tablet/memrowset.h
@@ -331,7 +331,7 @@ class MemRowSet : public RowSet,
   Status NewCompactionInput(const Schema* projection,
                             const MvccSnapshot& snap,
                             const fs::IOContext* io_context,
-                            std::unique_ptr<CompactionOrFlushInput>* out) 
const override;
+                            std::shared_ptr<CompactionOrFlushInput>* out) 
const override;
 
   // Return the Schema for the rows in this memrowset.
    const Schema &schema() const {
diff --git a/src/kudu/tablet/mock-rowsets.h b/src/kudu/tablet/mock-rowsets.h
index 1e099fb79..9391174d9 100644
--- a/src/kudu/tablet/mock-rowsets.h
+++ b/src/kudu/tablet/mock-rowsets.h
@@ -61,7 +61,7 @@ class MockRowSet : public RowSet {
   Status NewCompactionInput(const Schema* /*projection*/,
                             const MvccSnapshot& /*snap*/,
                             const fs::IOContext* /*io_context*/,
-                            std::unique_ptr<CompactionOrFlushInput>* /*out*/) 
const override {
+                            std::shared_ptr<CompactionOrFlushInput>* /*out*/) 
const override {
     LOG(FATAL) << "Unimplemented";
     return Status::OK();
   }
diff --git a/src/kudu/tablet/rowset.cc b/src/kudu/tablet/rowset.cc
index bd8084347..f6e3a9c03 100644
--- a/src/kudu/tablet/rowset.cc
+++ b/src/kudu/tablet/rowset.cc
@@ -142,7 +142,7 @@ Status DuplicatingRowSet::NewRowIterator(const 
RowIteratorOptions& opts,
 Status DuplicatingRowSet::NewCompactionInput(const Schema* /*projection*/,
                                              const MvccSnapshot& /*snap*/,
                                              const IOContext* /*io_context*/,
-                                             
unique_ptr<CompactionOrFlushInput>* /*out*/) const  {
+                                             
shared_ptr<CompactionOrFlushInput>* /*out*/) const  {
   LOG(FATAL) << "duplicating rowsets do not act as compaction input";
   return Status::OK();
 }
diff --git a/src/kudu/tablet/rowset.h b/src/kudu/tablet/rowset.h
index 7ecad1f28..229af3928 100644
--- a/src/kudu/tablet/rowset.h
+++ b/src/kudu/tablet/rowset.h
@@ -168,7 +168,7 @@ class RowSet {
   virtual Status NewCompactionInput(const Schema* projection,
                                     const MvccSnapshot &snap,
                                     const fs::IOContext* io_context,
-                                    std::unique_ptr<CompactionOrFlushInput>* 
out) const = 0;
+                                    std::shared_ptr<CompactionOrFlushInput>* 
out) const = 0;
 
   // Count the number of rows in this rowset.
   virtual Status CountRows(const fs::IOContext* io_context, rowid_t *count) 
const = 0;
@@ -423,7 +423,7 @@ class DuplicatingRowSet : public RowSet {
   Status NewCompactionInput(const Schema* projection,
                             const MvccSnapshot &snap,
                             const fs::IOContext* io_context,
-                            std::unique_ptr<CompactionOrFlushInput>* out) 
const override;
+                            std::shared_ptr<CompactionOrFlushInput>* out) 
const override;
 
   Status CountRows(const fs::IOContext* io_context, rowid_t *count) const 
override;
 
diff --git a/src/kudu/tablet/tablet.cc b/src/kudu/tablet/tablet.cc
index 7145b68f1..389114723 100644
--- a/src/kudu/tablet/tablet.cc
+++ b/src/kudu/tablet/tablet.cc
@@ -2024,7 +2024,6 @@ Status Tablet::DoMergeCompactionOrFlush(const 
RowSetsInCompactionOrFlush &input,
   const auto& tid = tablet_id();
   const IOContext io_context({ tid });
 
-  shared_ptr<CompactionOrFlushInput> merge;
   const SchemaPtr schema_ptr = schema();
   MvccSnapshot flush_snap(mvcc_);
   VLOG_WITH_PREFIX(1) << Substitute("$0: entering phase 1 (flushing snapshot). 
"
@@ -2041,6 +2040,7 @@ Status Tablet::DoMergeCompactionOrFlush(const 
RowSetsInCompactionOrFlush &input,
   //   - For compaction ops, create input that contains initialized base,
   //     relevant REDO and UNDO delta iterators to be used read from 
persistent storage.
   //   - For Flush ops, create iterator for in-memory tree holding data 
updates.
+  shared_ptr<CompactionOrFlushInput> merge;
   RETURN_NOT_OK(input.CreateCompactionOrFlushInput(flush_snap,
                                                    schema_ptr.get(),
                                                    &io_context,

Reply via email to