This is an automated email from the ASF dual-hosted git repository.
alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new 802557652 [tablet] create CompactionOrFlushInput wrapped into
shared_ptr
802557652 is described below
commit 8025576523ecff7429ddf0778729b0afe738bbc6
Author: Alexey Serbin <[email protected]>
AuthorDate: Mon Nov 25 12:39:53 2024 -0800
[tablet] create CompactionOrFlushInput wrapped into shared_ptr
Prior to this patch, it was necessary to re-wrap pointers to newly
created CompactionOrFlushInput objects from std::unique_ptr or raw
pointers into std::shared_ptr since MergeCompactionInput::MergeState
stores std::shared_ptr<CompactionOrFlushInput> as one of its fields.
It resulted in one extra memory allocation compared with a case when
using std::make_shared to create CompactionOrFlushInput object wrapped
into std::shared_ptr from the very beginning [1].
This patch addresses the issue described above, so now all the related
factories of CompactionOrFlushInput objects call std::make_shared
to create CompactionOrFlushInput objects wrapped into std::shared_ptr
from the very beginning.
This patch doesn't contain any functional modifications.
[1] https://en.cppreference.com/w/cpp/memory/shared_ptr
Change-Id: I442b6ad3780c700bd3b1d357e24dbb2733aff8bf
Reviewed-on: http://gerrit.cloudera.org:8080/22121
Tested-by: Alexey Serbin <[email protected]>
Reviewed-by: Mahesh Reddy <[email protected]>
Reviewed-by: Abhishek Chennaka <[email protected]>
---
src/kudu/tablet/compaction-test.cc | 81 +++++++++++++++++---------------------
src/kudu/tablet/compaction.cc | 27 +++++++------
src/kudu/tablet/compaction.h | 11 +++---
src/kudu/tablet/diskrowset.cc | 4 +-
src/kudu/tablet/diskrowset.h | 2 +-
src/kudu/tablet/memrowset.cc | 4 +-
src/kudu/tablet/memrowset.h | 2 +-
src/kudu/tablet/mock-rowsets.h | 2 +-
src/kudu/tablet/rowset.cc | 2 +-
src/kudu/tablet/rowset.h | 4 +-
src/kudu/tablet/tablet.cc | 2 +-
11 files changed, 67 insertions(+), 74 deletions(-)
diff --git a/src/kudu/tablet/compaction-test.cc
b/src/kudu/tablet/compaction-test.cc
index 6b2151116..83565091d 100644
--- a/src/kudu/tablet/compaction-test.cc
+++ b/src/kudu/tablet/compaction-test.cc
@@ -106,8 +106,6 @@ using strings::Substitute;
namespace kudu {
namespace tablet {
-class RowSetMetadata;
-
constexpr const char* const kRowKeyFormat = "hello %010" PRId64;
constexpr const size_t kLargeRollThreshold = 8UL * 1024 * 1024 * 1024; // 8GB
constexpr const size_t kSmallRollThreshold = 1024; // 1KB
@@ -351,14 +349,14 @@ class TestCompaction : public KuduRowSetTest {
static Status BuildCompactionInput(const MvccSnapshot& merge_snap,
const vector<shared_ptr<DiskRowSet>>&
rowsets,
const Schema& projection,
- unique_ptr<CompactionOrFlushInput>* out) {
+ shared_ptr<CompactionOrFlushInput>* out) {
vector<shared_ptr<CompactionOrFlushInput>> merge_inputs;
for (const auto& rs : rowsets) {
- unique_ptr<CompactionOrFlushInput> input;
+ shared_ptr<CompactionOrFlushInput> input;
RETURN_NOT_OK(CompactionOrFlushInput::Create(*rs, &projection,
merge_snap, nullptr, &input));
- merge_inputs.emplace_back(input.release());
+ merge_inputs.emplace_back(std::move(input));
}
- out->reset(CompactionOrFlushInput::Merge(merge_inputs, &projection));
+ *out = CompactionOrFlushInput::Merge(merge_inputs, &projection);
return Status::OK();
}
@@ -370,7 +368,7 @@ class TestCompaction : public KuduRowSetTest {
size_t roll_threshold,
vector<shared_ptr<DiskRowSet>>* result_rowsets) {
MvccSnapshot merge_snap(mvcc_);
- unique_ptr<CompactionOrFlushInput> compact_input;
+ shared_ptr<CompactionOrFlushInput> compact_input;
RETURN_NOT_OK(BuildCompactionInput(merge_snap, rowsets, projection,
&compact_input));
return DoFlushAndReopen(
compact_input.get(), projection, merge_snap, roll_threshold,
result_rowsets);
@@ -394,10 +392,7 @@ class TestCompaction : public KuduRowSetTest {
size_t roll_threshold,
vector<shared_ptr<DiskRowSet>>* result_rowsets) {
MvccSnapshot snap(mvcc_);
- vector<shared_ptr<RowSetMetadata>> rowset_metas;
- unique_ptr<CompactionOrFlushInput>
input(CompactionOrFlushInput::Create(mrs,
-
&projection,
-
snap));
+ auto input(CompactionOrFlushInput::Create(mrs, &projection, snap));
return DoFlushAndReopen(input.get(), projection, snap, roll_threshold,
result_rowsets);
}
@@ -532,7 +527,7 @@ class TestCompaction : public KuduRowSetTest {
LOG_TIMING(INFO, "compacting " +
std::string((OVERLAP_INPUTS ? "with overlap" : "without
overlap"))) {
MvccSnapshot merge_snap(mvcc_);
- unique_ptr<CompactionOrFlushInput> compact_input;
+ shared_ptr<CompactionOrFlushInput> compact_input;
ASSERT_OK(BuildCompactionInput(merge_snap, rowsets, schema_,
&compact_input));
// Use a low target row size to increase the number of resulting rowsets.
RollingDiskRowSetWriter rdrsw(tablet()->metadata(), schema_,
@@ -580,7 +575,7 @@ TEST_F(TestCompaction, TestMemRowSetInput) {
// and mutations.
vector<string> out;
MvccSnapshot snap(mvcc_);
- unique_ptr<CompactionOrFlushInput>
input(CompactionOrFlushInput::Create(*mrs, &schema_, snap));
+ auto input(CompactionOrFlushInput::Create(*mrs, &schema_, snap));
IterateInput(input.get(), &out);
ASSERT_EQ(10, out.size());
EXPECT_EQ("RowIdxInBlock: 0; Base: (string key=\"hello 0000000000\", int64
val=0, "
@@ -643,7 +638,7 @@ TEST_F(TestCompaction, TestRowSetInput) {
// Check compaction input
vector<string> out;
- unique_ptr<CompactionOrFlushInput> input;
+ shared_ptr<CompactionOrFlushInput> input;
ASSERT_OK(CompactionOrFlushInput::Create(*rs, &schema_, MvccSnapshot(mvcc_),
nullptr, &input));
IterateInput(input.get(), &out);
ASSERT_EQ(10, out.size());
@@ -706,7 +701,7 @@ TEST_F(TestCompaction, TestDuplicatedGhostRowsMerging) {
shared_ptr<DiskRowSet> result;
CompactAndReopenNoRoll(all_rss, schema_, &result);
- unique_ptr<CompactionOrFlushInput> input;
+ shared_ptr<CompactionOrFlushInput> input;
ASSERT_OK(CompactionOrFlushInput::Create(*result,
&schema_,
MvccSnapshot::CreateSnapshotIncludingAllOps(),
@@ -880,9 +875,9 @@ TEST_F(TestCompaction, TestDuplicatedRowsRandomCompaction) {
vector<shared_ptr<CompactionOrFlushInput>> inputs;
for (const auto& row_set : row_sets) {
- unique_ptr<CompactionOrFlushInput> ci;
+ shared_ptr<CompactionOrFlushInput> ci;
ASSERT_OK(row_set->NewCompactionInput(&schema_, all_snap, nullptr, &ci));
- inputs.emplace_back(ci.release());
+ inputs.emplace_back(std::move(ci));
}
// Compact the row sets by picking a few at random until we're left with
just one.
@@ -903,7 +898,7 @@ TEST_F(TestCompaction, TestDuplicatedRowsRandomCompaction) {
}
vector<string> out;
- unique_ptr<CompactionOrFlushInput> ci;
+ shared_ptr<CompactionOrFlushInput> ci;
ASSERT_OK(row_sets[0]->NewCompactionInput(&schema_, all_snap, nullptr, &ci));
IterateInput(ci.get(), &out);
@@ -952,17 +947,17 @@ TEST_F(TestCompaction,
TestMRSCompactionDoesntOutputUnobservableRows) {
vector<shared_ptr<CompactionOrFlushInput>> to_merge;
{
- unique_ptr<CompactionOrFlushInput> rs1_input;
+ shared_ptr<CompactionOrFlushInput> rs1_input;
ASSERT_OK(CompactionOrFlushInput::Create(*rs1, &schema_, all_snap,
nullptr, &rs1_input));
- unique_ptr<CompactionOrFlushInput> rs2_input;
+ shared_ptr<CompactionOrFlushInput> rs2_input;
ASSERT_OK(CompactionOrFlushInput::Create(*rs2, &schema_, all_snap,
nullptr, &rs2_input));
- to_merge.emplace_back(rs1_input.release());
- to_merge.emplace_back(rs2_input.release());
+ to_merge.emplace_back(std::move(rs1_input));
+ to_merge.emplace_back(std::move(rs2_input));
}
- unique_ptr<CompactionOrFlushInput>
merged(CompactionOrFlushInput::Merge(to_merge, &schema_));
+ auto merged(CompactionOrFlushInput::Merge(to_merge, &schema_));
// Make sure the unobservable version of the row that was inserted and
deleted in the MRS
// in the same op doesn't show up in the compaction input.
@@ -997,13 +992,11 @@ TEST_F(TestCompaction, TestOneToOne) {
// Catch the updates that came in after the snapshot flush was made.
MvccSnapshot snap2(mvcc_);
- unique_ptr<CompactionOrFlushInput>
input(CompactionOrFlushInput::Create(*mrs, &schema_, snap2));
+ auto input(CompactionOrFlushInput::Create(*mrs, &schema_, snap2));
// Add some more updates which come into the new rowset while the "reupdate"
is happening.
UpdateRows(rs.get(), 1000, 0, 3);
- string dummy_name = "";
-
ASSERT_OK(ReupdateMissedDeltas(nullptr, input.get(),
HistoryGcOpts::Disabled(), snap, snap2,
{ rs }));
@@ -1019,7 +1012,7 @@ TEST_F(TestCompaction, TestOneToOne) {
// And compact (1 input to 1 output)
MvccSnapshot snap3(mvcc_);
- unique_ptr<CompactionOrFlushInput> compact_input;
+ shared_ptr<CompactionOrFlushInput> compact_input;
ASSERT_OK(CompactionOrFlushInput::Create(*rs, &schema_, snap3, nullptr,
&compact_input));
ASSERT_OK(DoFlushAndReopen(compact_input.get(), schema_, snap3,
kLargeRollThreshold, nullptr));
}
@@ -1053,9 +1046,7 @@ TEST_F(TestCompaction, TestKUDU102) {
vector<shared_ptr<CompactionOrFlushInput>> merge_inputs;
merge_inputs.emplace_back(CompactionOrFlushInput::Create(*mrs, &schema_,
snap2));
merge_inputs.emplace_back(CompactionOrFlushInput::Create(*mrs_b, &schema_,
snap2));
- unique_ptr<CompactionOrFlushInput>
input(CompactionOrFlushInput::Merge(merge_inputs, &schema_));
-
- string dummy_name = "";
+ auto input(CompactionOrFlushInput::Merge(merge_inputs, &schema_));
// This would fail without KUDU-102
ASSERT_OK(ReupdateMissedDeltas(nullptr, input.get(),
HistoryGcOpts::Disabled(), snap, snap2,
@@ -1109,10 +1100,10 @@ TEST_F(TestCompaction, TestMergeMRS) {
MvccSnapshot snap(mvcc_);
vector<shared_ptr<CompactionOrFlushInput>> merge_inputs {
- shared_ptr<CompactionOrFlushInput>(CompactionOrFlushInput::Create(*mrs_a,
&schema_, snap)),
- shared_ptr<CompactionOrFlushInput>(CompactionOrFlushInput::Create(*mrs_b,
&schema_, snap))
+ CompactionOrFlushInput::Create(*mrs_a, &schema_, snap),
+ CompactionOrFlushInput::Create(*mrs_b, &schema_, snap)
};
- unique_ptr<CompactionOrFlushInput>
input(CompactionOrFlushInput::Merge(merge_inputs, &schema_));
+ auto input(CompactionOrFlushInput::Merge(merge_inputs, &schema_));
vector<shared_ptr<DiskRowSet>> result_rs;
ASSERT_OK(DoFlushAndReopen(input.get(), schema_, snap, kSmallRollThreshold,
&result_rs));
ASSERT_EQ(20, CountRows(result_rs));
@@ -1128,10 +1119,10 @@ TEST_F(TestCompaction, TestMergeMRSWithInvisibleRows) {
InsertRows(mrs_b.get(), 10, 0);
MvccSnapshot snap(mvcc_);
vector<shared_ptr<CompactionOrFlushInput>> merge_inputs {
- shared_ptr<CompactionOrFlushInput>(CompactionOrFlushInput::Create(*mrs_a,
&schema_, snap)),
- shared_ptr<CompactionOrFlushInput>(CompactionOrFlushInput::Create(*mrs_b,
&schema_, snap))
+ CompactionOrFlushInput::Create(*mrs_a, &schema_, snap),
+ CompactionOrFlushInput::Create(*mrs_b, &schema_, snap)
};
- unique_ptr<CompactionOrFlushInput>
input(CompactionOrFlushInput::Merge(merge_inputs, &schema_));
+ auto input(CompactionOrFlushInput::Merge(merge_inputs, &schema_));
vector<shared_ptr<DiskRowSet>> result_rs;
ASSERT_OK(DoFlushAndReopen(input.get(), schema_, snap, kSmallRollThreshold,
&result_rs));
ASSERT_EQ(1, result_rs.size());
@@ -1208,10 +1199,10 @@ TEST_F(TestCompaction,
TestRandomizeDuplicatedRowsAcrossTransactions) {
MvccSnapshot snap(mvcc_);
vector<shared_ptr<CompactionOrFlushInput>> merge_inputs;
merge_inputs.emplace_back(CompactionOrFlushInput::Create(*main_mrs,
&schema_, snap));
- for (auto& mrs : txn_mrss) {
+ for (const auto& mrs : txn_mrss) {
merge_inputs.emplace_back(CompactionOrFlushInput::Create(*mrs, &schema_,
snap));
}
- unique_ptr<CompactionOrFlushInput>
input(CompactionOrFlushInput::Merge(merge_inputs, &schema_));
+ auto input(CompactionOrFlushInput::Merge(merge_inputs, &schema_));
vector<shared_ptr<DiskRowSet>> result_rs;
ASSERT_OK(DoFlushAndReopen(input.get(), schema_, snap, kSmallRollThreshold,
&result_rs));
ASSERT_EQ(1, result_rs.size());
@@ -1253,11 +1244,11 @@ TEST_F(TestCompaction,
TestRowHistoryJumpsBetweenRowsets) {
// should go off without a hitch.
MvccSnapshot snap(mvcc_);
vector<shared_ptr<CompactionOrFlushInput>> merge_inputs {
- shared_ptr<CompactionOrFlushInput>(CompactionOrFlushInput::Create(*mrs_a,
&schema_, snap)),
- shared_ptr<CompactionOrFlushInput>(CompactionOrFlushInput::Create(*mrs_b,
&schema_, snap)),
- shared_ptr<CompactionOrFlushInput>(CompactionOrFlushInput::Create(*mrs_c,
&schema_, snap)),
+ CompactionOrFlushInput::Create(*mrs_a, &schema_, snap),
+ CompactionOrFlushInput::Create(*mrs_b, &schema_, snap),
+ CompactionOrFlushInput::Create(*mrs_c, &schema_, snap),
};
- unique_ptr<CompactionOrFlushInput>
input(CompactionOrFlushInput::Merge(merge_inputs, &schema_));
+ auto input(CompactionOrFlushInput::Merge(merge_inputs, &schema_));
vector<shared_ptr<DiskRowSet>> result_rs;
ASSERT_OK(DoFlushAndReopen(input.get(), schema_, snap, kSmallRollThreshold,
&result_rs));
ASSERT_EQ(1, result_rs.size());
@@ -1270,10 +1261,10 @@ TEST_F(TestCompaction,
TestMergeMRSWithAllInvisibleRows) {
shared_ptr<MemRowSet> mrs_b = CreateInvisibleMRS();
MvccSnapshot snap(mvcc_);
vector<shared_ptr<CompactionOrFlushInput>> merge_inputs {
- shared_ptr<CompactionOrFlushInput>(CompactionOrFlushInput::Create(*mrs_a,
&schema_, snap)),
- shared_ptr<CompactionOrFlushInput>(CompactionOrFlushInput::Create(*mrs_b,
&schema_, snap))
+ CompactionOrFlushInput::Create(*mrs_a, &schema_, snap),
+ CompactionOrFlushInput::Create(*mrs_b, &schema_, snap)
};
- unique_ptr<CompactionOrFlushInput>
input(CompactionOrFlushInput::Merge(merge_inputs, &schema_));
+ auto input(CompactionOrFlushInput::Merge(merge_inputs, &schema_));
vector<shared_ptr<DiskRowSet>> result_rs;
ASSERT_OK(DoFlushAndReopen(input.get(), schema_, snap, kSmallRollThreshold,
&result_rs));
ASSERT_TRUE(result_rs.empty());
diff --git a/src/kudu/tablet/compaction.cc b/src/kudu/tablet/compaction.cc
index 9ad3a1576..70e4f0aff 100644
--- a/src/kudu/tablet/compaction.cc
+++ b/src/kudu/tablet/compaction.cc
@@ -66,6 +66,7 @@ using kudu::fs::IOContext;
using kudu::fs::FsErrorManager;
using kudu::fs::KUDU_2233_CORRUPTION;
using std::deque;
+using std::make_shared;
using std::shared_ptr;
using std::string;
using std::unique_ptr;
@@ -1100,7 +1101,7 @@ Status CompactionOrFlushInput::Create(const DiskRowSet&
rowset,
const Schema* projection,
const MvccSnapshot& snap,
const IOContext* io_context,
- unique_ptr<CompactionOrFlushInput>* out)
{
+ shared_ptr<CompactionOrFlushInput>* out)
{
CHECK(projection->has_column_ids());
unique_ptr<ColumnwiseIterator>
base_cwise(rowset.base_data_->NewIterator(projection, io_context));
@@ -1124,24 +1125,24 @@ Status CompactionOrFlushInput::Create(const DiskRowSet&
rowset,
RETURN_NOT_OK_PREPEND(rowset.delta_tracker_->NewDeltaIterator(
undo_opts, DeltaTracker::UNDOS_ONLY, &undo_deltas), "Could not open
UNDOs");
- out->reset(new DiskRowSetCompactionInput(std::move(base_iter),
- std::move(redo_deltas),
- std::move(undo_deltas)));
+ *out = make_shared<DiskRowSetCompactionInput>(
+ std::move(base_iter), std::move(redo_deltas), std::move(undo_deltas));
return Status::OK();
}
-CompactionOrFlushInput* CompactionOrFlushInput::Create(const MemRowSet&
memrowset,
- const Schema*
projection,
- const MvccSnapshot&
snap) {
+shared_ptr<CompactionOrFlushInput> CompactionOrFlushInput::Create(
+ const MemRowSet& memrowset,
+ const Schema* projection,
+ const MvccSnapshot& snap) {
CHECK(projection->has_column_ids());
- return new MemRowSetCompactionInput(memrowset, snap, projection);
+ return make_shared<MemRowSetCompactionInput>(memrowset, snap, projection);
}
-CompactionOrFlushInput* CompactionOrFlushInput::Merge(
+shared_ptr<CompactionOrFlushInput> CompactionOrFlushInput::Merge(
const vector<shared_ptr<CompactionOrFlushInput>>& inputs,
const Schema* schema) {
CHECK(schema->has_column_ids());
- return new MergeCompactionInput(inputs, schema);
+ return make_shared<MergeCompactionInput>(inputs, schema);
}
@@ -1154,17 +1155,17 @@ Status
RowSetsInCompactionOrFlush::CreateCompactionOrFlushInput(
vector<shared_ptr<CompactionOrFlushInput>> inputs;
for (const auto& rs : rowsets_) {
- unique_ptr<CompactionOrFlushInput> input;
+ shared_ptr<CompactionOrFlushInput> input;
RETURN_NOT_OK_PREPEND(rs->NewCompactionInput(schema, snap, io_context,
&input),
Substitute("Could not create compaction input for
rowset $0",
rs->ToString()));
- inputs.push_back(shared_ptr<CompactionOrFlushInput>(input.release()));
+ inputs.emplace_back(std::move(input));
}
if (inputs.size() == 1) {
*out = std::move(inputs[0]);
} else {
- out->reset(CompactionOrFlushInput::Merge(inputs, schema));
+ *out = CompactionOrFlushInput::Merge(inputs, schema);
}
return Status::OK();
diff --git a/src/kudu/tablet/compaction.h b/src/kudu/tablet/compaction.h
index 67c42587c..b5f862873 100644
--- a/src/kudu/tablet/compaction.h
+++ b/src/kudu/tablet/compaction.h
@@ -110,17 +110,18 @@ class CompactionOrFlushInput {
const Schema* projection,
const MvccSnapshot& snap,
const fs::IOContext* io_context,
- std::unique_ptr<CompactionOrFlushInput>* out);
+ std::shared_ptr<CompactionOrFlushInput>* out);
// Create an input which reads from the given memrowset, yielding base rows
and updates
// prior to the given snapshot.
- static CompactionOrFlushInput* Create(const MemRowSet& memrowset,
- const Schema* projection,
- const MvccSnapshot& snap);
+ static std::shared_ptr<CompactionOrFlushInput> Create(
+ const MemRowSet& memrowset,
+ const Schema* projection,
+ const MvccSnapshot& snap);
// Create a collection of merge states containing a state per input. The
inputs are merged
// in key-order according to the given schema. All inputs must have matching
schemas.
- static CompactionOrFlushInput* Merge(
+ static std::shared_ptr<CompactionOrFlushInput> Merge(
const std::vector<std::shared_ptr<CompactionOrFlushInput>>& inputs,
const Schema* schema);
diff --git a/src/kudu/tablet/diskrowset.cc b/src/kudu/tablet/diskrowset.cc
index 912cf0ffb..1c01ded1a 100644
--- a/src/kudu/tablet/diskrowset.cc
+++ b/src/kudu/tablet/diskrowset.cc
@@ -671,7 +671,7 @@ Status DiskRowSet::NewRowIterator(const RowIteratorOptions&
opts,
Status DiskRowSet::NewCompactionInput(const Schema* projection,
const MvccSnapshot& snap,
const IOContext* io_context,
- unique_ptr<CompactionOrFlushInput>* out)
const {
+ shared_ptr<CompactionOrFlushInput>* out)
const {
return CompactionOrFlushInput::Create(*this, projection, snap, io_context,
out);
}
@@ -927,7 +927,7 @@ Status DiskRowSet::DeleteAncientUndoDeltas(Timestamp
ancient_history_mark,
Status DiskRowSet::DebugDumpImpl(int64_t* rows_left, vector<string>* lines) {
// Using CompactionOrFlushInput to dump our data is an easy way of seeing
all the
// rows and deltas.
- unique_ptr<CompactionOrFlushInput> input;
+ shared_ptr<CompactionOrFlushInput> input;
RETURN_NOT_OK(NewCompactionInput(rowset_metadata_->tablet_schema().get(),
MvccSnapshot::CreateSnapshotIncludingAllOps(),
nullptr, &input));
diff --git a/src/kudu/tablet/diskrowset.h b/src/kudu/tablet/diskrowset.h
index 070260cbc..0454d9e4c 100644
--- a/src/kudu/tablet/diskrowset.h
+++ b/src/kudu/tablet/diskrowset.h
@@ -383,7 +383,7 @@ class DiskRowSet :
Status NewCompactionInput(const Schema* projection,
const MvccSnapshot& snap,
const fs::IOContext* io_context,
- std::unique_ptr<CompactionOrFlushInput>* out)
const override;
+ std::shared_ptr<CompactionOrFlushInput>* out)
const override;
// Gets the number of rows in this rowset, checking 'num_rows_' first. If not
// yet set, consults the base data and stores the result in 'num_rows_'.
diff --git a/src/kudu/tablet/memrowset.cc b/src/kudu/tablet/memrowset.cc
index 182365b45..86b7cec3b 100644
--- a/src/kudu/tablet/memrowset.cc
+++ b/src/kudu/tablet/memrowset.cc
@@ -354,8 +354,8 @@ Status MemRowSet::NewRowIterator(const RowIteratorOptions&
opts,
Status MemRowSet::NewCompactionInput(const Schema* projection,
const MvccSnapshot& snap,
const IOContext* /*io_context*/,
- unique_ptr<CompactionOrFlushInput>* out)
const {
- out->reset(CompactionOrFlushInput::Create(*this, projection, snap));
+ shared_ptr<CompactionOrFlushInput>* out)
const {
+ *out = CompactionOrFlushInput::Create(*this, projection, snap);
return Status::OK();
}
diff --git a/src/kudu/tablet/memrowset.h b/src/kudu/tablet/memrowset.h
index ae194066c..c5b73c81e 100644
--- a/src/kudu/tablet/memrowset.h
+++ b/src/kudu/tablet/memrowset.h
@@ -331,7 +331,7 @@ class MemRowSet : public RowSet,
Status NewCompactionInput(const Schema* projection,
const MvccSnapshot& snap,
const fs::IOContext* io_context,
- std::unique_ptr<CompactionOrFlushInput>* out)
const override;
+ std::shared_ptr<CompactionOrFlushInput>* out)
const override;
// Return the Schema for the rows in this memrowset.
const Schema &schema() const {
diff --git a/src/kudu/tablet/mock-rowsets.h b/src/kudu/tablet/mock-rowsets.h
index 1e099fb79..9391174d9 100644
--- a/src/kudu/tablet/mock-rowsets.h
+++ b/src/kudu/tablet/mock-rowsets.h
@@ -61,7 +61,7 @@ class MockRowSet : public RowSet {
Status NewCompactionInput(const Schema* /*projection*/,
const MvccSnapshot& /*snap*/,
const fs::IOContext* /*io_context*/,
- std::unique_ptr<CompactionOrFlushInput>* /*out*/)
const override {
+ std::shared_ptr<CompactionOrFlushInput>* /*out*/)
const override {
LOG(FATAL) << "Unimplemented";
return Status::OK();
}
diff --git a/src/kudu/tablet/rowset.cc b/src/kudu/tablet/rowset.cc
index bd8084347..f6e3a9c03 100644
--- a/src/kudu/tablet/rowset.cc
+++ b/src/kudu/tablet/rowset.cc
@@ -142,7 +142,7 @@ Status DuplicatingRowSet::NewRowIterator(const
RowIteratorOptions& opts,
Status DuplicatingRowSet::NewCompactionInput(const Schema* /*projection*/,
const MvccSnapshot& /*snap*/,
const IOContext* /*io_context*/,
-
unique_ptr<CompactionOrFlushInput>* /*out*/) const {
+
shared_ptr<CompactionOrFlushInput>* /*out*/) const {
LOG(FATAL) << "duplicating rowsets do not act as compaction input";
return Status::OK();
}
diff --git a/src/kudu/tablet/rowset.h b/src/kudu/tablet/rowset.h
index 7ecad1f28..229af3928 100644
--- a/src/kudu/tablet/rowset.h
+++ b/src/kudu/tablet/rowset.h
@@ -168,7 +168,7 @@ class RowSet {
virtual Status NewCompactionInput(const Schema* projection,
const MvccSnapshot &snap,
const fs::IOContext* io_context,
- std::unique_ptr<CompactionOrFlushInput>*
out) const = 0;
+ std::shared_ptr<CompactionOrFlushInput>*
out) const = 0;
// Count the number of rows in this rowset.
virtual Status CountRows(const fs::IOContext* io_context, rowid_t *count)
const = 0;
@@ -423,7 +423,7 @@ class DuplicatingRowSet : public RowSet {
Status NewCompactionInput(const Schema* projection,
const MvccSnapshot &snap,
const fs::IOContext* io_context,
- std::unique_ptr<CompactionOrFlushInput>* out)
const override;
+ std::shared_ptr<CompactionOrFlushInput>* out)
const override;
Status CountRows(const fs::IOContext* io_context, rowid_t *count) const
override;
diff --git a/src/kudu/tablet/tablet.cc b/src/kudu/tablet/tablet.cc
index 7145b68f1..389114723 100644
--- a/src/kudu/tablet/tablet.cc
+++ b/src/kudu/tablet/tablet.cc
@@ -2024,7 +2024,6 @@ Status Tablet::DoMergeCompactionOrFlush(const
RowSetsInCompactionOrFlush &input,
const auto& tid = tablet_id();
const IOContext io_context({ tid });
- shared_ptr<CompactionOrFlushInput> merge;
const SchemaPtr schema_ptr = schema();
MvccSnapshot flush_snap(mvcc_);
VLOG_WITH_PREFIX(1) << Substitute("$0: entering phase 1 (flushing snapshot).
"
@@ -2041,6 +2040,7 @@ Status Tablet::DoMergeCompactionOrFlush(const
RowSetsInCompactionOrFlush &input,
// - For compaction ops, create input that contains initialized base,
// relevant REDO and UNDO delta iterators to be used read from
persistent storage.
// - For Flush ops, create iterator for in-memory tree holding data
updates.
+ shared_ptr<CompactionOrFlushInput> merge;
RETURN_NOT_OK(input.CreateCompactionOrFlushInput(flush_snap,
schema_ptr.get(),
&io_context,