This is an automated email from the ASF dual-hosted git repository. alexey pushed a commit to branch branch-1.18.x in repository https://gitbox.apache.org/repos/asf/kudu.git
commit 82213c1fbc0884156fa3823c11728f41c3488754 Author: Alexey Serbin <[email protected]> AuthorDate: Mon Nov 25 12:39:53 2024 -0800 [tablet] create CompactionOrFlushInput wrapped into shared_ptr Prior to this patch, it was necessary to re-wrap pointers to newly created CompactionOrFlushInput objects from std::unique_ptr or raw pointers into std::shared_ptr since MergeCompactionInput::MergeState stores std::shared_ptr<CompactionOrFlushInput> as one of its fields. It resulted in one extra memory allocation compared with a case when using std::make_shared to create CompactionOrFlushInput object wrapped into std::shared_ptr from the very beginning [1]. This patch addresses the issue described above, so now all the related factories of CompactionOrFlushInput objects call std::make_shared to create CompactionOrFlushInput objects wrapped into std::shared_ptr from the very beginning. This patch doesn't contain any functional modifications. [1] https://en.cppreference.com/w/cpp/memory/shared_ptr Change-Id: I442b6ad3780c700bd3b1d357e24dbb2733aff8bf Reviewed-on: http://gerrit.cloudera.org:8080/22121 Tested-by: Alexey Serbin <[email protected]> Reviewed-by: Mahesh Reddy <[email protected]> Reviewed-by: Abhishek Chennaka <[email protected]> (cherry picked from commit 8025576523ecff7429ddf0778729b0afe738bbc6) Reviewed-on: http://gerrit.cloudera.org:8080/22238 --- src/kudu/tablet/compaction-test.cc | 81 +++++++++++++++++--------------------- src/kudu/tablet/compaction.cc | 27 +++++++------ src/kudu/tablet/compaction.h | 11 +++--- src/kudu/tablet/diskrowset.cc | 4 +- src/kudu/tablet/diskrowset.h | 2 +- src/kudu/tablet/memrowset.cc | 4 +- src/kudu/tablet/memrowset.h | 2 +- src/kudu/tablet/mock-rowsets.h | 2 +- src/kudu/tablet/rowset.cc | 2 +- src/kudu/tablet/rowset.h | 4 +- src/kudu/tablet/tablet.cc | 2 +- 11 files changed, 67 insertions(+), 74 deletions(-) diff --git a/src/kudu/tablet/compaction-test.cc b/src/kudu/tablet/compaction-test.cc index 0fd17cb46..1a7a92519 100644 --- a/src/kudu/tablet/compaction-test.cc +++ b/src/kudu/tablet/compaction-test.cc @@ -106,8 +106,6 @@ using strings::Substitute; namespace kudu { namespace tablet { -class RowSetMetadata; - constexpr const char* const kRowKeyFormat = "hello %010" PRId64; constexpr const size_t kLargeRollThreshold = 8UL * 1024 * 1024 * 1024; // 8GB constexpr const size_t kSmallRollThreshold = 1024; // 1KB @@ -352,14 +350,14 @@ class TestCompaction : public KuduRowSetTest { static Status BuildCompactionInput(const MvccSnapshot& merge_snap, const vector<shared_ptr<DiskRowSet>>& rowsets, const Schema& projection, - unique_ptr<CompactionOrFlushInput>* out) { + shared_ptr<CompactionOrFlushInput>* out) { vector<shared_ptr<CompactionOrFlushInput>> merge_inputs; for (const auto& rs : rowsets) { - unique_ptr<CompactionOrFlushInput> input; + shared_ptr<CompactionOrFlushInput> input; RETURN_NOT_OK(CompactionOrFlushInput::Create(*rs, &projection, merge_snap, nullptr, &input)); - merge_inputs.emplace_back(input.release()); + merge_inputs.emplace_back(std::move(input)); } - out->reset(CompactionOrFlushInput::Merge(merge_inputs, &projection)); + *out = CompactionOrFlushInput::Merge(merge_inputs, &projection); return Status::OK(); } @@ -371,7 +369,7 @@ class TestCompaction : public KuduRowSetTest { size_t roll_threshold, vector<shared_ptr<DiskRowSet>>* result_rowsets) { MvccSnapshot merge_snap(mvcc_); - unique_ptr<CompactionOrFlushInput> compact_input; + shared_ptr<CompactionOrFlushInput> compact_input; RETURN_NOT_OK(BuildCompactionInput(merge_snap, rowsets, projection, &compact_input)); return DoFlushAndReopen( compact_input.get(), projection, merge_snap, roll_threshold, result_rowsets); @@ -395,10 +393,7 @@ class TestCompaction : public KuduRowSetTest { size_t roll_threshold, vector<shared_ptr<DiskRowSet>>* result_rowsets) { MvccSnapshot snap(mvcc_); - vector<shared_ptr<RowSetMetadata>> rowset_metas; - unique_ptr<CompactionOrFlushInput> input(CompactionOrFlushInput::Create(mrs, - &projection, - snap)); + auto input(CompactionOrFlushInput::Create(mrs, &projection, snap)); return DoFlushAndReopen(input.get(), projection, snap, roll_threshold, result_rowsets); } @@ -533,7 +528,7 @@ class TestCompaction : public KuduRowSetTest { LOG_TIMING(INFO, "compacting " + std::string((OVERLAP_INPUTS ? "with overlap" : "without overlap"))) { MvccSnapshot merge_snap(mvcc_); - unique_ptr<CompactionOrFlushInput> compact_input; + shared_ptr<CompactionOrFlushInput> compact_input; ASSERT_OK(BuildCompactionInput(merge_snap, rowsets, schema_, &compact_input)); // Use a low target row size to increase the number of resulting rowsets. RollingDiskRowSetWriter rdrsw(tablet()->metadata(), schema_, @@ -581,7 +576,7 @@ TEST_F(TestCompaction, TestMemRowSetInput) { // and mutations. vector<string> out; MvccSnapshot snap(mvcc_); - unique_ptr<CompactionOrFlushInput> input(CompactionOrFlushInput::Create(*mrs, &schema_, snap)); + auto input(CompactionOrFlushInput::Create(*mrs, &schema_, snap)); IterateInput(input.get(), &out); ASSERT_EQ(10, out.size()); EXPECT_EQ("RowIdxInBlock: 0; Base: (string key=\"hello 0000000000\", int64 val=0, " @@ -644,7 +639,7 @@ TEST_F(TestCompaction, TestRowSetInput) { // Check compaction input vector<string> out; - unique_ptr<CompactionOrFlushInput> input; + shared_ptr<CompactionOrFlushInput> input; ASSERT_OK(CompactionOrFlushInput::Create(*rs, &schema_, MvccSnapshot(mvcc_), nullptr, &input)); IterateInput(input.get(), &out); ASSERT_EQ(10, out.size()); @@ -707,7 +702,7 @@ TEST_F(TestCompaction, TestDuplicatedGhostRowsMerging) { shared_ptr<DiskRowSet> result; CompactAndReopenNoRoll(all_rss, schema_, &result); - unique_ptr<CompactionOrFlushInput> input; + shared_ptr<CompactionOrFlushInput> input; ASSERT_OK(CompactionOrFlushInput::Create(*result, &schema_, MvccSnapshot::CreateSnapshotIncludingAllOps(), @@ -881,9 +876,9 @@ TEST_F(TestCompaction, TestDuplicatedRowsRandomCompaction) { vector<shared_ptr<CompactionOrFlushInput>> inputs; for (const auto& row_set : row_sets) { - unique_ptr<CompactionOrFlushInput> ci; + shared_ptr<CompactionOrFlushInput> ci; ASSERT_OK(row_set->NewCompactionInput(&schema_, all_snap, nullptr, &ci)); - inputs.emplace_back(ci.release()); + inputs.emplace_back(std::move(ci)); } // Compact the row sets by picking a few at random until we're left with just one. @@ -904,7 +899,7 @@ TEST_F(TestCompaction, TestDuplicatedRowsRandomCompaction) { } vector<string> out; - unique_ptr<CompactionOrFlushInput> ci; + shared_ptr<CompactionOrFlushInput> ci; ASSERT_OK(row_sets[0]->NewCompactionInput(&schema_, all_snap, nullptr, &ci)); IterateInput(ci.get(), &out); @@ -953,17 +948,17 @@ TEST_F(TestCompaction, TestMRSCompactionDoesntOutputUnobservableRows) { vector<shared_ptr<CompactionOrFlushInput>> to_merge; { - unique_ptr<CompactionOrFlushInput> rs1_input; + shared_ptr<CompactionOrFlushInput> rs1_input; ASSERT_OK(CompactionOrFlushInput::Create(*rs1, &schema_, all_snap, nullptr, &rs1_input)); - unique_ptr<CompactionOrFlushInput> rs2_input; + shared_ptr<CompactionOrFlushInput> rs2_input; ASSERT_OK(CompactionOrFlushInput::Create(*rs2, &schema_, all_snap, nullptr, &rs2_input)); - to_merge.emplace_back(rs1_input.release()); - to_merge.emplace_back(rs2_input.release()); + to_merge.emplace_back(std::move(rs1_input)); + to_merge.emplace_back(std::move(rs2_input)); } - unique_ptr<CompactionOrFlushInput> merged(CompactionOrFlushInput::Merge(to_merge, &schema_)); + auto merged(CompactionOrFlushInput::Merge(to_merge, &schema_)); // Make sure the unobservable version of the row that was inserted and deleted in the MRS // in the same op doesn't show up in the compaction input. @@ -998,13 +993,11 @@ TEST_F(TestCompaction, TestOneToOne) { // Catch the updates that came in after the snapshot flush was made. MvccSnapshot snap2(mvcc_); - unique_ptr<CompactionOrFlushInput> input(CompactionOrFlushInput::Create(*mrs, &schema_, snap2)); + auto input(CompactionOrFlushInput::Create(*mrs, &schema_, snap2)); // Add some more updates which come into the new rowset while the "reupdate" is happening. UpdateRows(rs.get(), 1000, 0, 3); - string dummy_name = ""; - ASSERT_OK(ReupdateMissedDeltas(nullptr, input.get(), HistoryGcOpts::Disabled(), snap, snap2, { rs })); @@ -1020,7 +1013,7 @@ TEST_F(TestCompaction, TestOneToOne) { // And compact (1 input to 1 output) MvccSnapshot snap3(mvcc_); - unique_ptr<CompactionOrFlushInput> compact_input; + shared_ptr<CompactionOrFlushInput> compact_input; ASSERT_OK(CompactionOrFlushInput::Create(*rs, &schema_, snap3, nullptr, &compact_input)); ASSERT_OK(DoFlushAndReopen(compact_input.get(), schema_, snap3, kLargeRollThreshold, nullptr)); } @@ -1054,9 +1047,7 @@ TEST_F(TestCompaction, TestKUDU102) { vector<shared_ptr<CompactionOrFlushInput>> merge_inputs; merge_inputs.emplace_back(CompactionOrFlushInput::Create(*mrs, &schema_, snap2)); merge_inputs.emplace_back(CompactionOrFlushInput::Create(*mrs_b, &schema_, snap2)); - unique_ptr<CompactionOrFlushInput> input(CompactionOrFlushInput::Merge(merge_inputs, &schema_)); - - string dummy_name = ""; + auto input(CompactionOrFlushInput::Merge(merge_inputs, &schema_)); // This would fail without KUDU-102 ASSERT_OK(ReupdateMissedDeltas(nullptr, input.get(), HistoryGcOpts::Disabled(), snap, snap2, @@ -1110,10 +1101,10 @@ TEST_F(TestCompaction, TestMergeMRS) { MvccSnapshot snap(mvcc_); vector<shared_ptr<CompactionOrFlushInput>> merge_inputs { - shared_ptr<CompactionOrFlushInput>(CompactionOrFlushInput::Create(*mrs_a, &schema_, snap)), - shared_ptr<CompactionOrFlushInput>(CompactionOrFlushInput::Create(*mrs_b, &schema_, snap)) + CompactionOrFlushInput::Create(*mrs_a, &schema_, snap), + CompactionOrFlushInput::Create(*mrs_b, &schema_, snap) }; - unique_ptr<CompactionOrFlushInput> input(CompactionOrFlushInput::Merge(merge_inputs, &schema_)); + auto input(CompactionOrFlushInput::Merge(merge_inputs, &schema_)); vector<shared_ptr<DiskRowSet>> result_rs; ASSERT_OK(DoFlushAndReopen(input.get(), schema_, snap, kSmallRollThreshold, &result_rs)); ASSERT_EQ(20, CountRows(result_rs)); @@ -1129,10 +1120,10 @@ TEST_F(TestCompaction, TestMergeMRSWithInvisibleRows) { InsertRows(mrs_b.get(), 10, 0); MvccSnapshot snap(mvcc_); vector<shared_ptr<CompactionOrFlushInput>> merge_inputs { - shared_ptr<CompactionOrFlushInput>(CompactionOrFlushInput::Create(*mrs_a, &schema_, snap)), - shared_ptr<CompactionOrFlushInput>(CompactionOrFlushInput::Create(*mrs_b, &schema_, snap)) + CompactionOrFlushInput::Create(*mrs_a, &schema_, snap), + CompactionOrFlushInput::Create(*mrs_b, &schema_, snap) }; - unique_ptr<CompactionOrFlushInput> input(CompactionOrFlushInput::Merge(merge_inputs, &schema_)); + auto input(CompactionOrFlushInput::Merge(merge_inputs, &schema_)); vector<shared_ptr<DiskRowSet>> result_rs; ASSERT_OK(DoFlushAndReopen(input.get(), schema_, snap, kSmallRollThreshold, &result_rs)); ASSERT_EQ(1, result_rs.size()); @@ -1209,10 +1200,10 @@ TEST_F(TestCompaction, TestRandomizeDuplicatedRowsAcrossTransactions) { MvccSnapshot snap(mvcc_); vector<shared_ptr<CompactionOrFlushInput>> merge_inputs; merge_inputs.emplace_back(CompactionOrFlushInput::Create(*main_mrs, &schema_, snap)); - for (auto& mrs : txn_mrss) { + for (const auto& mrs : txn_mrss) { merge_inputs.emplace_back(CompactionOrFlushInput::Create(*mrs, &schema_, snap)); } - unique_ptr<CompactionOrFlushInput> input(CompactionOrFlushInput::Merge(merge_inputs, &schema_)); + auto input(CompactionOrFlushInput::Merge(merge_inputs, &schema_)); vector<shared_ptr<DiskRowSet>> result_rs; ASSERT_OK(DoFlushAndReopen(input.get(), schema_, snap, kSmallRollThreshold, &result_rs)); ASSERT_EQ(1, result_rs.size()); @@ -1254,11 +1245,11 @@ TEST_F(TestCompaction, TestRowHistoryJumpsBetweenRowsets) { // should go off without a hitch. MvccSnapshot snap(mvcc_); vector<shared_ptr<CompactionOrFlushInput>> merge_inputs { - shared_ptr<CompactionOrFlushInput>(CompactionOrFlushInput::Create(*mrs_a, &schema_, snap)), - shared_ptr<CompactionOrFlushInput>(CompactionOrFlushInput::Create(*mrs_b, &schema_, snap)), - shared_ptr<CompactionOrFlushInput>(CompactionOrFlushInput::Create(*mrs_c, &schema_, snap)), + CompactionOrFlushInput::Create(*mrs_a, &schema_, snap), + CompactionOrFlushInput::Create(*mrs_b, &schema_, snap), + CompactionOrFlushInput::Create(*mrs_c, &schema_, snap), }; - unique_ptr<CompactionOrFlushInput> input(CompactionOrFlushInput::Merge(merge_inputs, &schema_)); + auto input(CompactionOrFlushInput::Merge(merge_inputs, &schema_)); vector<shared_ptr<DiskRowSet>> result_rs; ASSERT_OK(DoFlushAndReopen(input.get(), schema_, snap, kSmallRollThreshold, &result_rs)); ASSERT_EQ(1, result_rs.size()); @@ -1271,10 +1262,10 @@ TEST_F(TestCompaction, TestMergeMRSWithAllInvisibleRows) { shared_ptr<MemRowSet> mrs_b = CreateInvisibleMRS(); MvccSnapshot snap(mvcc_); vector<shared_ptr<CompactionOrFlushInput>> merge_inputs { - shared_ptr<CompactionOrFlushInput>(CompactionOrFlushInput::Create(*mrs_a, &schema_, snap)), - shared_ptr<CompactionOrFlushInput>(CompactionOrFlushInput::Create(*mrs_b, &schema_, snap)) + CompactionOrFlushInput::Create(*mrs_a, &schema_, snap), + CompactionOrFlushInput::Create(*mrs_b, &schema_, snap) }; - unique_ptr<CompactionOrFlushInput> input(CompactionOrFlushInput::Merge(merge_inputs, &schema_)); + auto input(CompactionOrFlushInput::Merge(merge_inputs, &schema_)); vector<shared_ptr<DiskRowSet>> result_rs; ASSERT_OK(DoFlushAndReopen(input.get(), schema_, snap, kSmallRollThreshold, &result_rs)); ASSERT_TRUE(result_rs.empty()); diff --git a/src/kudu/tablet/compaction.cc b/src/kudu/tablet/compaction.cc index 9ad3a1576..70e4f0aff 100644 --- a/src/kudu/tablet/compaction.cc +++ b/src/kudu/tablet/compaction.cc @@ -66,6 +66,7 @@ using kudu::fs::IOContext; using kudu::fs::FsErrorManager; using kudu::fs::KUDU_2233_CORRUPTION; using std::deque; +using std::make_shared; using std::shared_ptr; using std::string; using std::unique_ptr; @@ -1100,7 +1101,7 @@ Status CompactionOrFlushInput::Create(const DiskRowSet& rowset, const Schema* projection, const MvccSnapshot& snap, const IOContext* io_context, - unique_ptr<CompactionOrFlushInput>* out) { + shared_ptr<CompactionOrFlushInput>* out) { CHECK(projection->has_column_ids()); unique_ptr<ColumnwiseIterator> base_cwise(rowset.base_data_->NewIterator(projection, io_context)); @@ -1124,24 +1125,24 @@ Status CompactionOrFlushInput::Create(const DiskRowSet& rowset, RETURN_NOT_OK_PREPEND(rowset.delta_tracker_->NewDeltaIterator( undo_opts, DeltaTracker::UNDOS_ONLY, &undo_deltas), "Could not open UNDOs"); - out->reset(new DiskRowSetCompactionInput(std::move(base_iter), - std::move(redo_deltas), - std::move(undo_deltas))); + *out = make_shared<DiskRowSetCompactionInput>( + std::move(base_iter), std::move(redo_deltas), std::move(undo_deltas)); return Status::OK(); } -CompactionOrFlushInput* CompactionOrFlushInput::Create(const MemRowSet& memrowset, - const Schema* projection, - const MvccSnapshot& snap) { +shared_ptr<CompactionOrFlushInput> CompactionOrFlushInput::Create( + const MemRowSet& memrowset, + const Schema* projection, + const MvccSnapshot& snap) { CHECK(projection->has_column_ids()); - return new MemRowSetCompactionInput(memrowset, snap, projection); + return make_shared<MemRowSetCompactionInput>(memrowset, snap, projection); } -CompactionOrFlushInput* CompactionOrFlushInput::Merge( +shared_ptr<CompactionOrFlushInput> CompactionOrFlushInput::Merge( const vector<shared_ptr<CompactionOrFlushInput>>& inputs, const Schema* schema) { CHECK(schema->has_column_ids()); - return new MergeCompactionInput(inputs, schema); + return make_shared<MergeCompactionInput>(inputs, schema); } @@ -1154,17 +1155,17 @@ Status RowSetsInCompactionOrFlush::CreateCompactionOrFlushInput( vector<shared_ptr<CompactionOrFlushInput>> inputs; for (const auto& rs : rowsets_) { - unique_ptr<CompactionOrFlushInput> input; + shared_ptr<CompactionOrFlushInput> input; RETURN_NOT_OK_PREPEND(rs->NewCompactionInput(schema, snap, io_context, &input), Substitute("Could not create compaction input for rowset $0", rs->ToString())); - inputs.push_back(shared_ptr<CompactionOrFlushInput>(input.release())); + inputs.emplace_back(std::move(input)); } if (inputs.size() == 1) { *out = std::move(inputs[0]); } else { - out->reset(CompactionOrFlushInput::Merge(inputs, schema)); + *out = CompactionOrFlushInput::Merge(inputs, schema); } return Status::OK(); diff --git a/src/kudu/tablet/compaction.h b/src/kudu/tablet/compaction.h index 67c42587c..b5f862873 100644 --- a/src/kudu/tablet/compaction.h +++ b/src/kudu/tablet/compaction.h @@ -110,17 +110,18 @@ class CompactionOrFlushInput { const Schema* projection, const MvccSnapshot& snap, const fs::IOContext* io_context, - std::unique_ptr<CompactionOrFlushInput>* out); + std::shared_ptr<CompactionOrFlushInput>* out); // Create an input which reads from the given memrowset, yielding base rows and updates // prior to the given snapshot. - static CompactionOrFlushInput* Create(const MemRowSet& memrowset, - const Schema* projection, - const MvccSnapshot& snap); + static std::shared_ptr<CompactionOrFlushInput> Create( + const MemRowSet& memrowset, + const Schema* projection, + const MvccSnapshot& snap); // Create a collection of merge states containing a state per input. The inputs are merged // in key-order according to the given schema. All inputs must have matching schemas. - static CompactionOrFlushInput* Merge( + static std::shared_ptr<CompactionOrFlushInput> Merge( const std::vector<std::shared_ptr<CompactionOrFlushInput>>& inputs, const Schema* schema); diff --git a/src/kudu/tablet/diskrowset.cc b/src/kudu/tablet/diskrowset.cc index 974b24537..049c1f8e9 100644 --- a/src/kudu/tablet/diskrowset.cc +++ b/src/kudu/tablet/diskrowset.cc @@ -675,7 +675,7 @@ Status DiskRowSet::NewRowIterator(const RowIteratorOptions& opts, Status DiskRowSet::NewCompactionInput(const Schema* projection, const MvccSnapshot &snap, const IOContext* io_context, - unique_ptr<CompactionOrFlushInput>* out) const { + shared_ptr<CompactionOrFlushInput>* out) const { return CompactionOrFlushInput::Create(*this, projection, snap, io_context, out); } @@ -931,7 +931,7 @@ Status DiskRowSet::DeleteAncientUndoDeltas(Timestamp ancient_history_mark, Status DiskRowSet::DebugDumpImpl(int64_t* rows_left, vector<string>* lines) { // Using CompactionOrFlushInput to dump our data is an easy way of seeing all the // rows and deltas. - unique_ptr<CompactionOrFlushInput> input; + shared_ptr<CompactionOrFlushInput> input; RETURN_NOT_OK(NewCompactionInput(rowset_metadata_->tablet_schema().get(), MvccSnapshot::CreateSnapshotIncludingAllOps(), nullptr, &input)); diff --git a/src/kudu/tablet/diskrowset.h b/src/kudu/tablet/diskrowset.h index a6b75ee6f..089364711 100644 --- a/src/kudu/tablet/diskrowset.h +++ b/src/kudu/tablet/diskrowset.h @@ -381,7 +381,7 @@ class DiskRowSet : Status NewCompactionInput(const Schema* projection, const MvccSnapshot &snap, const fs::IOContext* io_context, - std::unique_ptr<CompactionOrFlushInput>* out) const override; + std::shared_ptr<CompactionOrFlushInput>* out) const override; // Gets the number of rows in this rowset, checking 'num_rows_' first. If not // yet set, consults the base data and stores the result in 'num_rows_'. diff --git a/src/kudu/tablet/memrowset.cc b/src/kudu/tablet/memrowset.cc index 404b1d236..2cc1d383d 100644 --- a/src/kudu/tablet/memrowset.cc +++ b/src/kudu/tablet/memrowset.cc @@ -343,8 +343,8 @@ Status MemRowSet::NewRowIterator(const RowIteratorOptions& opts, Status MemRowSet::NewCompactionInput(const Schema* projection, const MvccSnapshot& snap, const IOContext* /*io_context*/, - unique_ptr<CompactionOrFlushInput>* out) const { - out->reset(CompactionOrFlushInput::Create(*this, projection, snap)); + shared_ptr<CompactionOrFlushInput>* out) const { + *out = CompactionOrFlushInput::Create(*this, projection, snap); return Status::OK(); } diff --git a/src/kudu/tablet/memrowset.h b/src/kudu/tablet/memrowset.h index 2f7254c76..509c826bb 100644 --- a/src/kudu/tablet/memrowset.h +++ b/src/kudu/tablet/memrowset.h @@ -345,7 +345,7 @@ class MemRowSet : public RowSet, Status NewCompactionInput(const Schema* projection, const MvccSnapshot& snap, const fs::IOContext* io_context, - std::unique_ptr<CompactionOrFlushInput>* out) const override; + std::shared_ptr<CompactionOrFlushInput>* out) const override; // Return the Schema for the rows in this memrowset. const Schema &schema() const { diff --git a/src/kudu/tablet/mock-rowsets.h b/src/kudu/tablet/mock-rowsets.h index 1e099fb79..9391174d9 100644 --- a/src/kudu/tablet/mock-rowsets.h +++ b/src/kudu/tablet/mock-rowsets.h @@ -61,7 +61,7 @@ class MockRowSet : public RowSet { Status NewCompactionInput(const Schema* /*projection*/, const MvccSnapshot& /*snap*/, const fs::IOContext* /*io_context*/, - std::unique_ptr<CompactionOrFlushInput>* /*out*/) const override { + std::shared_ptr<CompactionOrFlushInput>* /*out*/) const override { LOG(FATAL) << "Unimplemented"; return Status::OK(); } diff --git a/src/kudu/tablet/rowset.cc b/src/kudu/tablet/rowset.cc index bd8084347..f6e3a9c03 100644 --- a/src/kudu/tablet/rowset.cc +++ b/src/kudu/tablet/rowset.cc @@ -142,7 +142,7 @@ Status DuplicatingRowSet::NewRowIterator(const RowIteratorOptions& opts, Status DuplicatingRowSet::NewCompactionInput(const Schema* /*projection*/, const MvccSnapshot& /*snap*/, const IOContext* /*io_context*/, - unique_ptr<CompactionOrFlushInput>* /*out*/) const { + shared_ptr<CompactionOrFlushInput>* /*out*/) const { LOG(FATAL) << "duplicating rowsets do not act as compaction input"; return Status::OK(); } diff --git a/src/kudu/tablet/rowset.h b/src/kudu/tablet/rowset.h index 7ecad1f28..229af3928 100644 --- a/src/kudu/tablet/rowset.h +++ b/src/kudu/tablet/rowset.h @@ -168,7 +168,7 @@ class RowSet { virtual Status NewCompactionInput(const Schema* projection, const MvccSnapshot &snap, const fs::IOContext* io_context, - std::unique_ptr<CompactionOrFlushInput>* out) const = 0; + std::shared_ptr<CompactionOrFlushInput>* out) const = 0; // Count the number of rows in this rowset. virtual Status CountRows(const fs::IOContext* io_context, rowid_t *count) const = 0; @@ -423,7 +423,7 @@ class DuplicatingRowSet : public RowSet { Status NewCompactionInput(const Schema* projection, const MvccSnapshot &snap, const fs::IOContext* io_context, - std::unique_ptr<CompactionOrFlushInput>* out) const override; + std::shared_ptr<CompactionOrFlushInput>* out) const override; Status CountRows(const fs::IOContext* io_context, rowid_t *count) const override; diff --git a/src/kudu/tablet/tablet.cc b/src/kudu/tablet/tablet.cc index bc6e9ad50..b29807833 100644 --- a/src/kudu/tablet/tablet.cc +++ b/src/kudu/tablet/tablet.cc @@ -2024,7 +2024,6 @@ Status Tablet::DoMergeCompactionOrFlush(const RowSetsInCompactionOrFlush &input, const auto& tid = tablet_id(); const IOContext io_context({ tid }); - shared_ptr<CompactionOrFlushInput> merge; const SchemaPtr schema_ptr = schema(); MvccSnapshot flush_snap(mvcc_); VLOG_WITH_PREFIX(1) << Substitute("$0: entering phase 1 (flushing snapshot). " @@ -2041,6 +2040,7 @@ Status Tablet::DoMergeCompactionOrFlush(const RowSetsInCompactionOrFlush &input, // - For compaction ops, create input that contains initialized base, // relevant REDO and UNDO delta iterators to be used read from persistent storage. // - For Flush ops, create iterator for in-memory tree holding data updates. + shared_ptr<CompactionOrFlushInput> merge; RETURN_NOT_OK(input.CreateCompactionOrFlushInput(flush_snap, schema_ptr.get(), &io_context,
