This is an automated email from the ASF dual-hosted git repository.
abukor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new ec6cf7b12 [compaction/flush] Cleanup of compaction and flush code paths
ec6cf7b12 is described below
commit ec6cf7b12047f8a30884a5c4aa6c1dbdb6aabad6
Author: Ashwani Raina <[email protected]>
AuthorDate: Tue Nov 21 20:16:20 2023 +0530
[compaction/flush] Cleanup of compaction and flush code paths
The change tries to add more comments, rename methods for better
code readability and understanding. It mainly covers two codepaths:
Memrowset flush and RowsetCompaction.
Change-Id: Ic1c2f20230aa089baf34f418a59b6c97f7351217
Reviewed-on: http://gerrit.cloudera.org:8080/20720
Tested-by: Kudu Jenkins
Reviewed-by: Attila Bukor <[email protected]>
---
src/kudu/tablet/compaction-test.cc | 118 +++++++++++++++++++------------------
src/kudu/tablet/compaction.cc | 76 ++++++++++++++----------
src/kudu/tablet/compaction.h | 49 +++++++--------
src/kudu/tablet/diskrowset.cc | 8 +--
src/kudu/tablet/diskrowset.h | 6 +-
src/kudu/tablet/memrowset.cc | 4 +-
src/kudu/tablet/memrowset.h | 6 +-
src/kudu/tablet/mock-rowsets.h | 2 +-
src/kudu/tablet/rowset.cc | 2 +-
src/kudu/tablet/rowset.h | 6 +-
src/kudu/tablet/tablet.cc | 74 +++++++++++++++--------
src/kudu/tablet/tablet.h | 11 ++--
12 files changed, 207 insertions(+), 155 deletions(-)
diff --git a/src/kudu/tablet/compaction-test.cc
b/src/kudu/tablet/compaction-test.cc
index 83ea39e23..7abc0b4ca 100644
--- a/src/kudu/tablet/compaction-test.cc
+++ b/src/kudu/tablet/compaction-test.cc
@@ -302,15 +302,15 @@ class TestCompaction : public KuduRowSetTest {
// Iterate over the given compaction input, stringifying and dumping each
// yielded row to *out
- static void IterateInput(CompactionInput* input, vector<string>* out) {
+ static void IterateInput(CompactionOrFlushInput* input, vector<string>* out)
{
ASSERT_OK(DebugDumpCompactionInput(input, nullptr, out));
}
- // Flush the given CompactionInput 'input' to disk with the given snapshot.
+ // Flush the given CompactionOrFlushInput 'input' to disk with the given
snapshot.
// If 'result_rowsets' is not NULL, reopens the resulting rowset(s) and
appends
// them to the vector.
Status DoFlushAndReopen(
- CompactionInput* input,
+ CompactionOrFlushInput* input,
const Schema& projection,
const MvccSnapshot& snap,
size_t roll_threshold,
@@ -352,14 +352,14 @@ class TestCompaction : public KuduRowSetTest {
static Status BuildCompactionInput(const MvccSnapshot& merge_snap,
const vector<shared_ptr<DiskRowSet>>&
rowsets,
const Schema& projection,
- unique_ptr<CompactionInput>* out) {
- vector<shared_ptr<CompactionInput>> merge_inputs;
+ unique_ptr<CompactionOrFlushInput>* out) {
+ vector<shared_ptr<CompactionOrFlushInput>> merge_inputs;
for (const auto& rs : rowsets) {
- unique_ptr<CompactionInput> input;
- RETURN_NOT_OK(CompactionInput::Create(*rs, &projection, merge_snap,
nullptr, &input));
+ unique_ptr<CompactionOrFlushInput> input;
+ RETURN_NOT_OK(CompactionOrFlushInput::Create(*rs, &projection,
merge_snap, nullptr, &input));
merge_inputs.emplace_back(input.release());
}
- out->reset(CompactionInput::Merge(merge_inputs, &projection));
+ out->reset(CompactionOrFlushInput::Merge(merge_inputs, &projection));
return Status::OK();
}
@@ -371,7 +371,7 @@ class TestCompaction : public KuduRowSetTest {
size_t roll_threshold,
vector<shared_ptr<DiskRowSet>>* result_rowsets) {
MvccSnapshot merge_snap(mvcc_);
- unique_ptr<CompactionInput> compact_input;
+ unique_ptr<CompactionOrFlushInput> compact_input;
RETURN_NOT_OK(BuildCompactionInput(merge_snap, rowsets, projection,
&compact_input));
return DoFlushAndReopen(
compact_input.get(), projection, merge_snap, roll_threshold,
result_rowsets);
@@ -396,7 +396,9 @@ class TestCompaction : public KuduRowSetTest {
vector<shared_ptr<DiskRowSet>>* result_rowsets) {
MvccSnapshot snap(mvcc_);
vector<shared_ptr<RowSetMetadata>> rowset_metas;
- unique_ptr<CompactionInput> input(CompactionInput::Create(mrs,
&projection, snap));
+ unique_ptr<CompactionOrFlushInput>
input(CompactionOrFlushInput::Create(mrs,
+
&projection,
+
snap));
return DoFlushAndReopen(input.get(), projection, snap, roll_threshold,
result_rowsets);
}
@@ -531,7 +533,7 @@ class TestCompaction : public KuduRowSetTest {
LOG_TIMING(INFO, "compacting " +
std::string((OVERLAP_INPUTS ? "with overlap" : "without
overlap"))) {
MvccSnapshot merge_snap(mvcc_);
- unique_ptr<CompactionInput> compact_input;
+ unique_ptr<CompactionOrFlushInput> compact_input;
ASSERT_OK(BuildCompactionInput(merge_snap, rowsets, schema_,
&compact_input));
// Use a low target row size to increase the number of resulting rowsets.
RollingDiskRowSetWriter rdrsw(tablet()->metadata(), schema_,
@@ -579,7 +581,7 @@ TEST_F(TestCompaction, TestMemRowSetInput) {
// and mutations.
vector<string> out;
MvccSnapshot snap(mvcc_);
- unique_ptr<CompactionInput> input(CompactionInput::Create(*mrs, &schema_,
snap));
+ unique_ptr<CompactionOrFlushInput>
input(CompactionOrFlushInput::Create(*mrs, &schema_, snap));
IterateInput(input.get(), &out);
ASSERT_EQ(10, out.size());
EXPECT_EQ("RowIdxInBlock: 0; Base: (string key=\"hello 0000000000\", int64
val=0, "
@@ -642,8 +644,8 @@ TEST_F(TestCompaction, TestRowSetInput) {
// Check compaction input
vector<string> out;
- unique_ptr<CompactionInput> input;
- ASSERT_OK(CompactionInput::Create(*rs, &schema_, MvccSnapshot(mvcc_),
nullptr, &input));
+ unique_ptr<CompactionOrFlushInput> input;
+ ASSERT_OK(CompactionOrFlushInput::Create(*rs, &schema_, MvccSnapshot(mvcc_),
nullptr, &input));
IterateInput(input.get(), &out);
ASSERT_EQ(10, out.size());
EXPECT_EQ("RowIdxInBlock: 0; Base: (string key=\"hello 0000000000\", int64
val=0, "
@@ -705,12 +707,12 @@ TEST_F(TestCompaction, TestDuplicatedGhostRowsMerging) {
shared_ptr<DiskRowSet> result;
CompactAndReopenNoRoll(all_rss, schema_, &result);
- unique_ptr<CompactionInput> input;
- ASSERT_OK(CompactionInput::Create(*result,
- &schema_,
-
MvccSnapshot::CreateSnapshotIncludingAllOps(),
- nullptr,
- &input));
+ unique_ptr<CompactionOrFlushInput> input;
+ ASSERT_OK(CompactionOrFlushInput::Create(*result,
+ &schema_,
+
MvccSnapshot::CreateSnapshotIncludingAllOps(),
+ nullptr,
+ &input));
vector<string> out;
IterateInput(input.get(), &out);
ASSERT_EQ(out.size(), 10);
@@ -877,9 +879,9 @@ TEST_F(TestCompaction, TestDuplicatedRowsRandomCompaction) {
AddExpectedDelete(&row->redo_head, reinsert->timestamp());
}
- vector<shared_ptr<CompactionInput>> inputs;
+ vector<shared_ptr<CompactionOrFlushInput>> inputs;
for (const auto& row_set : row_sets) {
- unique_ptr<CompactionInput> ci;
+ unique_ptr<CompactionOrFlushInput> ci;
ASSERT_OK(row_set->NewCompactionInput(&schema_, all_snap, nullptr, &ci));
inputs.emplace_back(ci.release());
}
@@ -902,7 +904,7 @@ TEST_F(TestCompaction, TestDuplicatedRowsRandomCompaction) {
}
vector<string> out;
- unique_ptr<CompactionInput> ci;
+ unique_ptr<CompactionOrFlushInput> ci;
ASSERT_OK(row_sets[0]->NewCompactionInput(&schema_, all_snap, nullptr, &ci));
IterateInput(ci.get(), &out);
@@ -949,19 +951,19 @@ TEST_F(TestCompaction,
TestMRSCompactionDoesntOutputUnobservableRows) {
MvccSnapshot all_snap = MvccSnapshot::CreateSnapshotIncludingAllOps();
- vector<shared_ptr<CompactionInput>> to_merge;
+ vector<shared_ptr<CompactionOrFlushInput>> to_merge;
{
- unique_ptr<CompactionInput> rs1_input;
- ASSERT_OK(CompactionInput::Create(*rs1, &schema_, all_snap, nullptr,
&rs1_input));
+ unique_ptr<CompactionOrFlushInput> rs1_input;
+ ASSERT_OK(CompactionOrFlushInput::Create(*rs1, &schema_, all_snap,
nullptr, &rs1_input));
- unique_ptr<CompactionInput> rs2_input;
- ASSERT_OK(CompactionInput::Create(*rs2, &schema_, all_snap, nullptr,
&rs2_input));
+ unique_ptr<CompactionOrFlushInput> rs2_input;
+ ASSERT_OK(CompactionOrFlushInput::Create(*rs2, &schema_, all_snap,
nullptr, &rs2_input));
to_merge.emplace_back(rs1_input.release());
to_merge.emplace_back(rs2_input.release());
}
- unique_ptr<CompactionInput> merged(CompactionInput::Merge(to_merge,
&schema_));
+ unique_ptr<CompactionOrFlushInput>
merged(CompactionOrFlushInput::Merge(to_merge, &schema_));
// Make sure the unobservable version of the row that was inserted and
deleted in the MRS
// in the same op doesn't show up in the compaction input.
@@ -996,7 +998,7 @@ TEST_F(TestCompaction, TestOneToOne) {
// Catch the updates that came in after the snapshot flush was made.
MvccSnapshot snap2(mvcc_);
- unique_ptr<CompactionInput> input(CompactionInput::Create(*mrs, &schema_,
snap2));
+ unique_ptr<CompactionOrFlushInput>
input(CompactionOrFlushInput::Create(*mrs, &schema_, snap2));
// Add some more updates which come into the new rowset while the "reupdate"
is happening.
UpdateRows(rs.get(), 1000, 0, 3);
@@ -1008,7 +1010,7 @@ TEST_F(TestCompaction, TestOneToOne) {
// If we look at the contents of the DiskRowSet now, we should see the
"re-updated" data.
vector<string> out;
- ASSERT_OK(CompactionInput::Create(*rs, &schema_, MvccSnapshot(mvcc_),
nullptr, &input));
+ ASSERT_OK(CompactionOrFlushInput::Create(*rs, &schema_, MvccSnapshot(mvcc_),
nullptr, &input));
IterateInput(input.get(), &out);
ASSERT_EQ(1000, out.size());
EXPECT_EQ("RowIdxInBlock: 0; Base: (string key=\"hello 0000000000\", int64
val=1, "
@@ -1018,8 +1020,8 @@ TEST_F(TestCompaction, TestOneToOne) {
// And compact (1 input to 1 output)
MvccSnapshot snap3(mvcc_);
- unique_ptr<CompactionInput> compact_input;
- ASSERT_OK(CompactionInput::Create(*rs, &schema_, snap3, nullptr,
&compact_input));
+ unique_ptr<CompactionOrFlushInput> compact_input;
+ ASSERT_OK(CompactionOrFlushInput::Create(*rs, &schema_, snap3, nullptr,
&compact_input));
ASSERT_OK(DoFlushAndReopen(compact_input.get(), schema_, snap3,
kLargeRollThreshold, nullptr));
}
@@ -1049,10 +1051,10 @@ TEST_F(TestCompaction, TestKUDU102) {
// Catch the updates that came in after the snapshot flush was made.
// Note that we are merging two MRS, it's a hack
MvccSnapshot snap2(mvcc_);
- vector<shared_ptr<CompactionInput>> merge_inputs;
- merge_inputs.emplace_back(CompactionInput::Create(*mrs, &schema_, snap2));
- merge_inputs.emplace_back(CompactionInput::Create(*mrs_b, &schema_, snap2));
- unique_ptr<CompactionInput> input(CompactionInput::Merge(merge_inputs,
&schema_));
+ vector<shared_ptr<CompactionOrFlushInput>> merge_inputs;
+ merge_inputs.emplace_back(CompactionOrFlushInput::Create(*mrs, &schema_,
snap2));
+ merge_inputs.emplace_back(CompactionOrFlushInput::Create(*mrs_b, &schema_,
snap2));
+ unique_ptr<CompactionOrFlushInput>
input(CompactionOrFlushInput::Merge(merge_inputs, &schema_));
string dummy_name = "";
@@ -1107,11 +1109,11 @@ TEST_F(TestCompaction, TestMergeMRS) {
InsertRows(mrs_a.get(), 5, 1);
MvccSnapshot snap(mvcc_);
- vector<shared_ptr<CompactionInput>> merge_inputs {
- shared_ptr<CompactionInput>(CompactionInput::Create(*mrs_a, &schema_,
snap)),
- shared_ptr<CompactionInput>(CompactionInput::Create(*mrs_b, &schema_,
snap))
+ vector<shared_ptr<CompactionOrFlushInput>> merge_inputs {
+ shared_ptr<CompactionOrFlushInput>(CompactionOrFlushInput::Create(*mrs_a,
&schema_, snap)),
+ shared_ptr<CompactionOrFlushInput>(CompactionOrFlushInput::Create(*mrs_b,
&schema_, snap))
};
- unique_ptr<CompactionInput> input(CompactionInput::Merge(merge_inputs,
&schema_));
+ unique_ptr<CompactionOrFlushInput>
input(CompactionOrFlushInput::Merge(merge_inputs, &schema_));
vector<shared_ptr<DiskRowSet>> result_rs;
ASSERT_OK(DoFlushAndReopen(input.get(), schema_, snap, kSmallRollThreshold,
&result_rs));
ASSERT_EQ(20, CountRows(result_rs));
@@ -1126,11 +1128,11 @@ TEST_F(TestCompaction, TestMergeMRSWithInvisibleRows) {
mem_trackers_.tablet_tracker, &mrs_b));
InsertRows(mrs_b.get(), 10, 0);
MvccSnapshot snap(mvcc_);
- vector<shared_ptr<CompactionInput>> merge_inputs {
- shared_ptr<CompactionInput>(CompactionInput::Create(*mrs_a, &schema_,
snap)),
- shared_ptr<CompactionInput>(CompactionInput::Create(*mrs_b, &schema_,
snap))
+ vector<shared_ptr<CompactionOrFlushInput>> merge_inputs {
+ shared_ptr<CompactionOrFlushInput>(CompactionOrFlushInput::Create(*mrs_a,
&schema_, snap)),
+ shared_ptr<CompactionOrFlushInput>(CompactionOrFlushInput::Create(*mrs_b,
&schema_, snap))
};
- unique_ptr<CompactionInput> input(CompactionInput::Merge(merge_inputs,
&schema_));
+ unique_ptr<CompactionOrFlushInput>
input(CompactionOrFlushInput::Merge(merge_inputs, &schema_));
vector<shared_ptr<DiskRowSet>> result_rs;
ASSERT_OK(DoFlushAndReopen(input.get(), schema_, snap, kSmallRollThreshold,
&result_rs));
ASSERT_EQ(1, result_rs.size());
@@ -1205,12 +1207,12 @@ TEST_F(TestCompaction,
TestRandomizeDuplicatedRowsAcrossTransactions) {
}
}
MvccSnapshot snap(mvcc_);
- vector<shared_ptr<CompactionInput>> merge_inputs;
- merge_inputs.emplace_back(CompactionInput::Create(*main_mrs, &schema_,
snap));
+ vector<shared_ptr<CompactionOrFlushInput>> merge_inputs;
+ merge_inputs.emplace_back(CompactionOrFlushInput::Create(*main_mrs,
&schema_, snap));
for (auto& mrs : txn_mrss) {
- merge_inputs.emplace_back(CompactionInput::Create(*mrs, &schema_, snap));
+ merge_inputs.emplace_back(CompactionOrFlushInput::Create(*mrs, &schema_,
snap));
}
- unique_ptr<CompactionInput> input(CompactionInput::Merge(merge_inputs,
&schema_));
+ unique_ptr<CompactionOrFlushInput>
input(CompactionOrFlushInput::Merge(merge_inputs, &schema_));
vector<shared_ptr<DiskRowSet>> result_rs;
ASSERT_OK(DoFlushAndReopen(input.get(), schema_, snap, kSmallRollThreshold,
&result_rs));
ASSERT_EQ(1, result_rs.size());
@@ -1251,12 +1253,12 @@ TEST_F(TestCompaction,
TestRowHistoryJumpsBetweenRowsets) {
// Despite the overlapping time ranges across these inputs, the compaction
// should go off without a hitch.
MvccSnapshot snap(mvcc_);
- vector<shared_ptr<CompactionInput>> merge_inputs {
- shared_ptr<CompactionInput>(CompactionInput::Create(*mrs_a, &schema_,
snap)),
- shared_ptr<CompactionInput>(CompactionInput::Create(*mrs_b, &schema_,
snap)),
- shared_ptr<CompactionInput>(CompactionInput::Create(*mrs_c, &schema_,
snap)),
+ vector<shared_ptr<CompactionOrFlushInput>> merge_inputs {
+ shared_ptr<CompactionOrFlushInput>(CompactionOrFlushInput::Create(*mrs_a,
&schema_, snap)),
+ shared_ptr<CompactionOrFlushInput>(CompactionOrFlushInput::Create(*mrs_b,
&schema_, snap)),
+ shared_ptr<CompactionOrFlushInput>(CompactionOrFlushInput::Create(*mrs_c,
&schema_, snap)),
};
- unique_ptr<CompactionInput> input(CompactionInput::Merge(merge_inputs,
&schema_));
+ unique_ptr<CompactionOrFlushInput>
input(CompactionOrFlushInput::Merge(merge_inputs, &schema_));
vector<shared_ptr<DiskRowSet>> result_rs;
ASSERT_OK(DoFlushAndReopen(input.get(), schema_, snap, kSmallRollThreshold,
&result_rs));
ASSERT_EQ(1, result_rs.size());
@@ -1268,11 +1270,11 @@ TEST_F(TestCompaction,
TestMergeMRSWithAllInvisibleRows) {
shared_ptr<MemRowSet> mrs_a = CreateInvisibleMRS();
shared_ptr<MemRowSet> mrs_b = CreateInvisibleMRS();
MvccSnapshot snap(mvcc_);
- vector<shared_ptr<CompactionInput>> merge_inputs {
- shared_ptr<CompactionInput>(CompactionInput::Create(*mrs_a, &schema_,
snap)),
- shared_ptr<CompactionInput>(CompactionInput::Create(*mrs_b, &schema_,
snap))
+ vector<shared_ptr<CompactionOrFlushInput>> merge_inputs {
+ shared_ptr<CompactionOrFlushInput>(CompactionOrFlushInput::Create(*mrs_a,
&schema_, snap)),
+ shared_ptr<CompactionOrFlushInput>(CompactionOrFlushInput::Create(*mrs_b,
&schema_, snap))
};
- unique_ptr<CompactionInput> input(CompactionInput::Merge(merge_inputs,
&schema_));
+ unique_ptr<CompactionOrFlushInput>
input(CompactionOrFlushInput::Merge(merge_inputs, &schema_));
vector<shared_ptr<DiskRowSet>> result_rs;
ASSERT_OK(DoFlushAndReopen(input.get(), schema_, snap, kSmallRollThreshold,
&result_rs));
ASSERT_TRUE(result_rs.empty());
diff --git a/src/kudu/tablet/compaction.cc b/src/kudu/tablet/compaction.cc
index 2291ea0eb..d56b79b78 100644
--- a/src/kudu/tablet/compaction.cc
+++ b/src/kudu/tablet/compaction.cc
@@ -113,8 +113,8 @@ void AdvanceToLastInList(const Mutation** m) {
}
}
-// CompactionInput yielding rows and mutations from a MemRowSet.
-class MemRowSetCompactionInput : public CompactionInput {
+// CompactionOrFlushInput yielding rows and mutations from a MemRowSet.
+class MemRowSetCompactionInput : public CompactionOrFlushInput {
public:
MemRowSetCompactionInput(const MemRowSet& memrowset,
const MvccSnapshot& snap,
@@ -163,7 +163,7 @@ class MemRowSetCompactionInput : public CompactionInput {
// Handle the rare case where a row was inserted and deleted in the same
operation.
// This row can never be observed and should not be compacted/flushed.
This saves
- // us some trouble later on on compactions.
+ // us some trouble later during compaction.
//
// See CompareDuplicatedRows().
if (PREDICT_FALSE(input_row.redo_head != nullptr &&
@@ -228,8 +228,8 @@ class MemRowSetCompactionInput : public CompactionInput {
////////////////////////////////////////////////////////////
-// CompactionInput yielding rows and mutations from an on-disk DiskRowSet.
-class DiskRowSetCompactionInput : public CompactionInput {
+// CompactionOrFlushInput yielding rows and mutations from an on-disk
DiskRowSet.
+class DiskRowSetCompactionInput : public CompactionOrFlushInput {
public:
DiskRowSetCompactionInput(unique_ptr<RowwiseIterator> base_iter,
unique_ptr<DeltaIterator> redo_delta_iter,
@@ -539,7 +539,7 @@ void TransferRedoHistory(CompactionInputRow* newer,
CompactionInputRow* older) {
}
-class MergeCompactionInput : public CompactionInput {
+class MergeCompactionInput : public CompactionOrFlushInput {
private:
// State kept for each of the inputs.
struct MergeState {
@@ -584,7 +584,7 @@ class MergeCompactionInput : public CompactionInput {
return schema.Compare(pending.back().row, (*other.next()).row) < 0;
}
- shared_ptr<CompactionInput> input;
+ shared_ptr<CompactionOrFlushInput> input;
vector<CompactionInputRow> pending;
int pending_idx;
@@ -592,7 +592,7 @@ class MergeCompactionInput : public CompactionInput {
};
public:
- MergeCompactionInput(const vector<shared_ptr<CompactionInput>>& inputs,
+ MergeCompactionInput(const vector<shared_ptr<CompactionOrFlushInput>>&
inputs,
const Schema* schema)
: schema_(schema),
num_dup_rows_(0),
@@ -1096,11 +1096,11 @@ string CompactionInputRowToString(const
CompactionInputRow& input_row) {
////////////////////////////////////////////////////////////
-Status CompactionInput::Create(const DiskRowSet& rowset,
- const Schema* projection,
- const MvccSnapshot& snap,
- const IOContext* io_context,
- unique_ptr<CompactionInput>* out) {
+Status CompactionOrFlushInput::Create(const DiskRowSet& rowset,
+ const Schema* projection,
+ const MvccSnapshot& snap,
+ const IOContext* io_context,
+ unique_ptr<CompactionOrFlushInput>* out)
{
CHECK(projection->has_column_ids());
unique_ptr<ColumnwiseIterator>
base_cwise(rowset.base_data_->NewIterator(projection, io_context));
@@ -1130,45 +1130,47 @@ Status CompactionInput::Create(const DiskRowSet& rowset,
return Status::OK();
}
-CompactionInput* CompactionInput::Create(const MemRowSet& memrowset,
- const Schema* projection,
- const MvccSnapshot& snap) {
+CompactionOrFlushInput* CompactionOrFlushInput::Create(const MemRowSet&
memrowset,
+ const Schema*
projection,
+ const MvccSnapshot&
snap) {
CHECK(projection->has_column_ids());
return new MemRowSetCompactionInput(memrowset, snap, projection);
}
-CompactionInput* CompactionInput::Merge(const
vector<shared_ptr<CompactionInput>>& inputs,
- const Schema* schema) {
+CompactionOrFlushInput* CompactionOrFlushInput::Merge(
+ const vector<shared_ptr<CompactionOrFlushInput>>& inputs,
+ const Schema* schema) {
CHECK(schema->has_column_ids());
return new MergeCompactionInput(inputs, schema);
}
-Status RowSetsInCompaction::CreateCompactionInput(const MvccSnapshot& snap,
- const Schema* schema,
- const IOContext* io_context,
- shared_ptr<CompactionInput>*
out) const {
+Status RowSetsInCompactionOrFlush::CreateCompactionOrFlushInput(
+ const MvccSnapshot& snap,
+ const Schema* schema,
+ const IOContext* io_context,
+ shared_ptr<CompactionOrFlushInput>* out) const {
CHECK(schema->has_column_ids());
- vector<shared_ptr<CompactionInput>> inputs;
+ vector<shared_ptr<CompactionOrFlushInput>> inputs;
for (const auto& rs : rowsets_) {
- unique_ptr<CompactionInput> input;
+ unique_ptr<CompactionOrFlushInput> input;
RETURN_NOT_OK_PREPEND(rs->NewCompactionInput(schema, snap, io_context,
&input),
Substitute("Could not create compaction input for
rowset $0",
rs->ToString()));
- inputs.push_back(shared_ptr<CompactionInput>(input.release()));
+ inputs.push_back(shared_ptr<CompactionOrFlushInput>(input.release()));
}
if (inputs.size() == 1) {
*out = std::move(inputs[0]);
} else {
- out->reset(CompactionInput::Merge(inputs, schema));
+ out->reset(CompactionOrFlushInput::Merge(inputs, schema));
}
return Status::OK();
}
-void RowSetsInCompaction::DumpToLog() const {
+void RowSetsInCompactionOrFlush::DumpToLog() const {
VLOG(1) << "Selected " << rowsets_.size() << " rowsets to compact:";
// Dump the selected rowsets to the log, and collect corresponding iterators.
for (const auto& rs : rowsets_) {
@@ -1388,9 +1390,17 @@ Status ApplyMutationsAndGenerateUndos(const
MvccSnapshot& snap,
#undef ERROR_LOG_CONTEXT
}
+// Following method processes the compaction input by reading input rows in
+// blocks and for each row inside the block:
+// - Apply all REDO mutations collected for the row at hand.
+// - Generate corresponding UNDO deltas for applied mutations.
+// - For a row with 'ghost' entries, merge their histories of mutations.
+// - Remove any ancient UNDO mutations, as those are not applicable anymore.
+// - Append UNDO and REDO deltas to DiskRowSetWriter output.
+// - Append fully or partially (resized) processed rowblock to DRS writer
output.
Status FlushCompactionInput(const string& tablet_id,
const scoped_refptr<FsErrorManager>& error_manager,
- CompactionInput* input,
+ CompactionOrFlushInput* input,
const MvccSnapshot& snap,
const HistoryGcOpts& history_gc_opts,
RollingDiskRowSetWriter* out) {
@@ -1460,10 +1470,12 @@ Status FlushCompactionInput(const string& tablet_id,
rowid_t index_in_current_drs;
if (new_undos_head != nullptr) {
+ // Append UNDO deltas to DiskRowSetWriter output.
out->AppendUndoDeltas(dst_row.row_index(), new_undos_head,
&index_in_current_drs);
}
if (new_redos_head != nullptr) {
+ // Append REDO deltas to DiskRowSetWriter output.
out->AppendRedoDeltas(dst_row.row_index(), new_redos_head,
&index_in_current_drs);
}
@@ -1497,6 +1509,7 @@ Status FlushCompactionInput(const string& tablet_id,
n++;
if (n == block.nrows()) {
+ // Append fully processed rowblock to DRS writer output.
RETURN_NOT_OK(out->AppendBlock(block, live_row_count));
live_row_count = 0;
n = 0;
@@ -1505,6 +1518,7 @@ Status FlushCompactionInput(const string& tablet_id,
if (n > 0) {
block.Resize(n);
+ // Append partially (resized) processed rowblock to DRS writer output.
RETURN_NOT_OK(out->AppendBlock(block, live_row_count));
block.Resize(block.row_capacity());
}
@@ -1515,7 +1529,7 @@ Status FlushCompactionInput(const string& tablet_id,
}
Status ReupdateMissedDeltas(const IOContext* io_context,
- CompactionInput* input,
+ CompactionOrFlushInput* input,
const HistoryGcOpts& history_gc_opts,
const MvccSnapshot& snap_to_exclude,
const MvccSnapshot& snap_to_include,
@@ -1687,7 +1701,9 @@ Status ReupdateMissedDeltas(const IOContext* io_context,
}
-Status DebugDumpCompactionInput(CompactionInput* input, int64_t* rows_left,
vector<string>* lines) {
+Status DebugDumpCompactionInput(CompactionOrFlushInput* input,
+ int64_t* rows_left,
+ vector<string>* lines) {
RETURN_NOT_OK(input->Init());
vector<CompactionInputRow> rows;
diff --git a/src/kudu/tablet/compaction.h b/src/kudu/tablet/compaction.h
index 8b085958c..0404e06ca 100644
--- a/src/kudu/tablet/compaction.h
+++ b/src/kudu/tablet/compaction.h
@@ -95,7 +95,7 @@ class HistoryGcOpts {
};
// Interface for an input feeding into a compaction or flush.
-class CompactionInput {
+class CompactionOrFlushInput {
public:
// Create an input which reads from the given rowset, yielding base rows
// prior to the given snapshot.
@@ -110,20 +110,21 @@ class CompactionInput {
const Schema* projection,
const MvccSnapshot& snap,
const fs::IOContext* io_context,
- std::unique_ptr<CompactionInput>* out);
+ std::unique_ptr<CompactionOrFlushInput>* out);
// Create an input which reads from the given memrowset, yielding base rows
and updates
// prior to the given snapshot.
- static CompactionInput* Create(const MemRowSet& memrowset,
- const Schema* projection,
- const MvccSnapshot& snap);
+ static CompactionOrFlushInput* Create(const MemRowSet& memrowset,
+ const Schema* projection,
+ const MvccSnapshot& snap);
- // Create an input which merges several other compaction inputs. The inputs
are merged
+ // Create a collection of merge states containing a state per input. The
inputs are merged
// in key-order according to the given schema. All inputs must have matching
schemas.
- static CompactionInput* Merge(const
std::vector<std::shared_ptr<CompactionInput>>& inputs,
- const Schema* schema);
+ static CompactionOrFlushInput* Merge(
+ const std::vector<std::shared_ptr<CompactionOrFlushInput>>& inputs,
+ const Schema* schema);
- virtual ~CompactionInput() = default;
+ virtual ~CompactionOrFlushInput() = default;
virtual Status Init() = 0;
virtual Status PrepareBlock(std::vector<CompactionInputRow>* block) = 0;
@@ -143,8 +144,8 @@ class CompactionInput {
virtual size_t memory_footprint() const = 0;
};
-// The set of rowsets which are taking part in a given compaction.
-class RowSetsInCompaction {
+// The set of rowsets which are taking part in a given compaction or flush
operation.
+class RowSetsInCompactionOrFlush {
public:
void AddRowSet(const std::shared_ptr<RowSet>& rowset,
std::unique_lock<std::mutex> lock) {
@@ -154,15 +155,15 @@ class RowSetsInCompaction {
rowsets_.push_back(rowset);
}
- // Create the appropriate compaction input for this compaction -- either a
merge
+ // Create the appropriate input for this compaction/flush -- either a merge
// of all the inputs, or the single input if there was only one.
//
- // 'schema' is the schema for the output of the compaction, and must remain
valid
- // for the lifetime of the returned CompactionInput.
- Status CreateCompactionInput(const MvccSnapshot& snap,
- const Schema* schema,
- const fs::IOContext* io_context,
- std::shared_ptr<CompactionInput>* out) const;
+ // 'schema' is the schema for the output of the compaction/flush, and must
+ // remain valid for the lifetime of the returned CompactionOrFlushInput.
+ Status CreateCompactionOrFlushInput(const MvccSnapshot& snap,
+ const Schema* schema,
+ const fs::IOContext* io_context,
+ std::shared_ptr<CompactionOrFlushInput>*
out) const;
// Dump a log message indicating the chosen rowsets.
void DumpToLog() const;
@@ -178,7 +179,7 @@ class RowSetsInCompaction {
std::vector<std::unique_lock<std::mutex>> locks_;
};
-// One row yielded by CompactionInput::PrepareBlock.
+// One row yielded by CompactionOrFlushInput::PrepareBlock.
// Looks like this (assuming n UNDO records and m REDO records):
// UNDO_n <- ... <- UNDO_1 <- UNDO_head <- row -> REDO_head -> REDO_1 -> ...
-> REDO_m
struct CompactionInputRow {
@@ -232,11 +233,11 @@ Status ApplyMutationsAndGenerateUndos(const MvccSnapshot&
snap,
// Iterate through this compaction input, flushing all rows to the given
RollingDiskRowSetWriter.
// The 'snap' argument should match the MvccSnapshot used to create the
compaction input.
//
-// After return of this function, this CompactionInput object is "used up" and
will
+// After return of this function, this CompactionOrFlushInput object is "used
up" and will
// no longer be useful.
Status FlushCompactionInput(const std::string& tablet_id,
const scoped_refptr<fs::FsErrorManager>&
error_manager,
- CompactionInput* input,
+ CompactionOrFlushInput* input,
const MvccSnapshot& snap,
const HistoryGcOpts& history_gc_opts,
RollingDiskRowSetWriter* out);
@@ -249,10 +250,10 @@ Status FlushCompactionInput(const std::string& tablet_id,
// The output rowsets passed in must be non-overlapping and in ascending key
order:
// typically they are the resulting rowsets from a RollingDiskRowSetWriter.
//
-// After return of this function, this CompactionInput object is "used up" and
will
+// After return of this function, this CompactionOrFlushInput object is "used
up" and will
// yield no further rows.
Status ReupdateMissedDeltas(const fs::IOContext* io_context,
- CompactionInput* input,
+ CompactionOrFlushInput* input,
const HistoryGcOpts& history_gc_opts,
const MvccSnapshot& snap_to_exclude,
const MvccSnapshot& snap_to_include,
@@ -262,7 +263,7 @@ Status ReupdateMissedDeltas(const fs::IOContext* io_context,
// This consumes no more rows from the compaction input than specified by the
'rows_left' parameter.
// If 'rows_left' is nullptr, there is no limit on the number of rows to dump.
// If the content of 'rows_left' is equal to or less than 0, no rows will be
dumped.
-Status DebugDumpCompactionInput(CompactionInput* input, int64_t* rows_left,
+Status DebugDumpCompactionInput(CompactionOrFlushInput* input, int64_t*
rows_left,
std::vector<std::string>* lines);
// Helper methods to print a row with full history.
diff --git a/src/kudu/tablet/diskrowset.cc b/src/kudu/tablet/diskrowset.cc
index 07e27ff00..b05b6fcd3 100644
--- a/src/kudu/tablet/diskrowset.cc
+++ b/src/kudu/tablet/diskrowset.cc
@@ -672,8 +672,8 @@ Status DiskRowSet::NewRowIterator(const RowIteratorOptions&
opts,
Status DiskRowSet::NewCompactionInput(const Schema* projection,
const MvccSnapshot &snap,
const IOContext* io_context,
- unique_ptr<CompactionInput>* out) const {
- return CompactionInput::Create(*this, projection, snap, io_context, out);
+ unique_ptr<CompactionOrFlushInput>* out)
const {
+ return CompactionOrFlushInput::Create(*this, projection, snap, io_context,
out);
}
Status DiskRowSet::MutateRow(Timestamp timestamp,
@@ -926,9 +926,9 @@ Status DiskRowSet::DeleteAncientUndoDeltas(Timestamp
ancient_history_mark,
}
Status DiskRowSet::DebugDumpImpl(int64_t* rows_left, vector<string>* lines) {
- // Using CompactionInput to dump our data is an easy way of seeing all the
+ // Using CompactionOrFlushInput to dump our data is an easy way of seeing
all the
// rows and deltas.
- unique_ptr<CompactionInput> input;
+ unique_ptr<CompactionOrFlushInput> input;
RETURN_NOT_OK(NewCompactionInput(rowset_metadata_->tablet_schema().get(),
MvccSnapshot::CreateSnapshotIncludingAllOps(),
nullptr, &input));
diff --git a/src/kudu/tablet/diskrowset.h b/src/kudu/tablet/diskrowset.h
index 1719cbfe9..ea1250e3e 100644
--- a/src/kudu/tablet/diskrowset.h
+++ b/src/kudu/tablet/diskrowset.h
@@ -93,7 +93,7 @@ Status DumpRowSetInternal(const fs::IOContext& ctx,
namespace tablet {
class CFileSet;
-class CompactionInput;
+class CompactionOrFlushInput;
class DeltaFileWriter;
class DeltaStats;
class HistoryGcOpts;
@@ -381,7 +381,7 @@ class DiskRowSet :
Status NewCompactionInput(const Schema* projection,
const MvccSnapshot &snap,
const fs::IOContext* io_context,
- std::unique_ptr<CompactionInput>* out) const
override;
+ std::unique_ptr<CompactionOrFlushInput>* out)
const override;
// Gets the number of rows in this rowset, checking 'num_rows_' first. If not
// yet set, consults the base data and stores the result in 'num_rows_'.
@@ -489,7 +489,7 @@ class DiskRowSet :
FRIEND_TEST(TestRowSet, TestDMSFlush);
FRIEND_TEST(tserver::TabletServerTest, SetEncodedKeysWhenStartingUp);
- friend class CompactionInput;
+ friend class CompactionOrFlushInput;
friend class Tablet;
friend Status kudu::tools::DumpRowSetInternal(
const fs::IOContext& ctx,
diff --git a/src/kudu/tablet/memrowset.cc b/src/kudu/tablet/memrowset.cc
index 2f4a030fd..404b1d236 100644
--- a/src/kudu/tablet/memrowset.cc
+++ b/src/kudu/tablet/memrowset.cc
@@ -343,8 +343,8 @@ Status MemRowSet::NewRowIterator(const RowIteratorOptions&
opts,
Status MemRowSet::NewCompactionInput(const Schema* projection,
const MvccSnapshot& snap,
const IOContext* /*io_context*/,
- unique_ptr<CompactionInput>* out) const {
- out->reset(CompactionInput::Create(*this, projection, snap));
+ unique_ptr<CompactionOrFlushInput>* out)
const {
+ out->reset(CompactionOrFlushInput::Create(*this, projection, snap));
return Status::OK();
}
diff --git a/src/kudu/tablet/memrowset.h b/src/kudu/tablet/memrowset.h
index c58a09640..2f7254c76 100644
--- a/src/kudu/tablet/memrowset.h
+++ b/src/kudu/tablet/memrowset.h
@@ -88,7 +88,7 @@ namespace tablet {
// NOTE: all allocations done by the MemRowSet are done inside its associated
// thread-safe arena, and then freed in bulk when the MemRowSet is destructed.
-class CompactionInput;
+class CompactionOrFlushInput;
class MemRowSet;
class Mutation;
class OperationResultPB;
@@ -345,7 +345,7 @@ class MemRowSet : public RowSet,
Status NewCompactionInput(const Schema* projection,
const MvccSnapshot& snap,
const fs::IOContext* io_context,
- std::unique_ptr<CompactionInput>* out) const
override;
+ std::unique_ptr<CompactionOrFlushInput>* out)
const override;
// Return the Schema for the rows in this memrowset.
const Schema &schema() const {
@@ -493,6 +493,8 @@ class MemRowSet : public RowSet,
volatile uint64_t debug_insert_count_;
volatile uint64_t debug_update_count_;
+ // Lock governing this rowset's inclusion in a compact/flush. If locked,
+ // no other compactor/flusher will attempt to include this rowset.
std::mutex compact_flush_lock_;
log::MinLogIndexAnchorer anchorer_;
diff --git a/src/kudu/tablet/mock-rowsets.h b/src/kudu/tablet/mock-rowsets.h
index eedeb95f9..1e099fb79 100644
--- a/src/kudu/tablet/mock-rowsets.h
+++ b/src/kudu/tablet/mock-rowsets.h
@@ -61,7 +61,7 @@ class MockRowSet : public RowSet {
Status NewCompactionInput(const Schema* /*projection*/,
const MvccSnapshot& /*snap*/,
const fs::IOContext* /*io_context*/,
- std::unique_ptr<CompactionInput>* /*out*/) const
override {
+ std::unique_ptr<CompactionOrFlushInput>* /*out*/)
const override {
LOG(FATAL) << "Unimplemented";
return Status::OK();
}
diff --git a/src/kudu/tablet/rowset.cc b/src/kudu/tablet/rowset.cc
index 0fd72b78c..bd8084347 100644
--- a/src/kudu/tablet/rowset.cc
+++ b/src/kudu/tablet/rowset.cc
@@ -142,7 +142,7 @@ Status DuplicatingRowSet::NewRowIterator(const
RowIteratorOptions& opts,
Status DuplicatingRowSet::NewCompactionInput(const Schema* /*projection*/,
const MvccSnapshot& /*snap*/,
const IOContext* /*io_context*/,
- unique_ptr<CompactionInput>*
/*out*/) const {
+
unique_ptr<CompactionOrFlushInput>* /*out*/) const {
LOG(FATAL) << "duplicating rowsets do not act as compaction input";
return Status::OK();
}
diff --git a/src/kudu/tablet/rowset.h b/src/kudu/tablet/rowset.h
index c12b51304..7ecad1f28 100644
--- a/src/kudu/tablet/rowset.h
+++ b/src/kudu/tablet/rowset.h
@@ -59,7 +59,7 @@ struct IOContext;
namespace tablet {
-class CompactionInput;
+class CompactionOrFlushInput;
class OperationResultPB;
class RowSetKeyProbe;
class RowSetMetadata;
@@ -168,7 +168,7 @@ class RowSet {
virtual Status NewCompactionInput(const Schema* projection,
const MvccSnapshot &snap,
const fs::IOContext* io_context,
- std::unique_ptr<CompactionInput>* out)
const = 0;
+ std::unique_ptr<CompactionOrFlushInput>*
out) const = 0;
// Count the number of rows in this rowset.
virtual Status CountRows(const fs::IOContext* io_context, rowid_t *count)
const = 0;
@@ -423,7 +423,7 @@ class DuplicatingRowSet : public RowSet {
Status NewCompactionInput(const Schema* projection,
const MvccSnapshot &snap,
const fs::IOContext* io_context,
- std::unique_ptr<CompactionInput>* out) const
override;
+ std::unique_ptr<CompactionOrFlushInput>* out)
const override;
Status CountRows(const fs::IOContext* io_context, rowid_t *count) const
override;
diff --git a/src/kudu/tablet/tablet.cc b/src/kudu/tablet/tablet.cc
index d7a1e88b5..2131590bc 100644
--- a/src/kudu/tablet/tablet.cc
+++ b/src/kudu/tablet/tablet.cc
@@ -1566,7 +1566,7 @@ Status Tablet::Flush() {
Status Tablet::FlushUnlocked() {
TRACE_EVENT0("tablet", "Tablet::FlushUnlocked");
RETURN_NOT_OK(CheckHasNotBeenStopped());
- RowSetsInCompaction input;
+ RowSetsInCompactionOrFlush input;
vector<shared_ptr<MemRowSet>> old_mrss;
{
// Create a new MRS with the latest schema.
@@ -1654,7 +1654,7 @@ Status Tablet::FlushUnlocked() {
return Status::OK();
}
-Status Tablet::ReplaceMemRowSetsUnlocked(RowSetsInCompaction* compaction,
+Status Tablet::ReplaceMemRowSetsUnlocked(RowSetsInCompactionOrFlush* new_mrss,
vector<shared_ptr<MemRowSet>>*
old_mrss) {
DCHECK(old_mrss->empty());
old_mrss->emplace_back(components_->memrowset);
@@ -1666,7 +1666,7 @@ Status
Tablet::ReplaceMemRowSetsUnlocked(RowSetsInCompaction* compaction,
for (auto& mrs : *old_mrss) {
std::unique_lock<std::mutex> ms_lock(*mrs->compact_flush_lock(),
std::try_to_lock);
CHECK(ms_lock.owns_lock());
- compaction->AddRowSet(mrs, std::move(ms_lock));
+ new_mrss->AddRowSet(mrs, std::move(ms_lock));
}
shared_ptr<MemRowSet> new_mrs;
@@ -1801,7 +1801,7 @@ bool Tablet::ShouldThrottleAllow(int64_t bytes) {
return throttler_->Take(MonoTime::Now(), 1, bytes);
}
-Status Tablet::PickRowSetsToCompact(RowSetsInCompaction *picked,
+Status Tablet::PickRowSetsToCompact(RowSetsInCompactionOrFlush *picked,
CompactFlags flags) const {
RETURN_IF_STOPPED_OR_CHECK_STATE(kOpen);
// Grab a local reference to the current RowSetTree. This is to avoid
@@ -1997,7 +1997,20 @@ Status Tablet::FlushMetadata(const RowSetVector&
to_remove,
txns_being_flushed);
}
-Status Tablet::DoMergeCompactionOrFlush(const RowSetsInCompaction &input,
+// Computes on-disk size of all the deltas in provided rowsets.
+size_t Tablet::GetAllDeltasSizeOnDisk(const RowSetsInCompactionOrFlush &input)
{
+ size_t deltas_on_disk_size = 0;
+ for (const auto& rs : input.rowsets()) {
+ DiskRowSetSpace drss;
+ DiskRowSet* drs = down_cast<DiskRowSet*>(rs.get());
+ drs->GetDiskRowSetSpaceUsage(&drss);
+ deltas_on_disk_size += drss.redo_deltas_size + drss.undo_deltas_size;
+ }
+
+ return deltas_on_disk_size;
+}
+
+Status Tablet::DoMergeCompactionOrFlush(const RowSetsInCompactionOrFlush
&input,
int64_t mrs_being_flushed,
const vector<TxnInfoBeingFlushed>&
txns_being_flushed) {
const char *op_name =
@@ -2006,39 +2019,46 @@ Status Tablet::DoMergeCompactionOrFlush(const
RowSetsInCompaction &input,
"tablet_id", tablet_id(),
"op", op_name);
+ // Save the stats on the total on-disk size of all deltas in selected
rowsets.
+ size_t deltas_on_disk_size = 0;
+ if (mrs_being_flushed == TabletMetadata::kNoMrsFlushed) {
+ deltas_on_disk_size = GetAllDeltasSizeOnDisk(input);
+ }
+
const auto& tid = tablet_id();
const IOContext io_context({ tid });
+ shared_ptr<CompactionOrFlushInput> merge;
+ const SchemaPtr schema_ptr = schema();
MvccSnapshot flush_snap(mvcc_);
VLOG_WITH_PREFIX(1) << Substitute("$0: entering phase 1 (flushing snapshot).
"
"Phase 1 snapshot: $1",
op_name, flush_snap.ToString());
- // Save the stats on the total on-disk size of all deltas in selected
rowsets.
- size_t deltas_on_disk_size = 0;
- if (mrs_being_flushed == TabletMetadata::kNoMrsFlushed) {
- for (const auto& rs : input.rowsets()) {
- DiskRowSetSpace drss;
- DiskRowSet* drs = down_cast<DiskRowSet*>(rs.get());
- drs->GetDiskRowSetSpaceUsage(&drss);
- deltas_on_disk_size += drss.redo_deltas_size + drss.undo_deltas_size;
- }
- }
-
+ // Fault injection hook for testing and debugging purpose only.
if (common_hooks_) {
RETURN_NOT_OK_PREPEND(common_hooks_->PostTakeMvccSnapshot(),
"PostTakeMvccSnapshot hook failed");
}
- shared_ptr<CompactionInput> merge;
- const SchemaPtr schema_ptr = schema();
- RETURN_NOT_OK(input.CreateCompactionInput(flush_snap, schema_ptr.get(),
&io_context, &merge));
+ // Create input of rowsets by iterating through all rowsets and for each
rowset:
+ // - For compaction ops, create input that contains initialized base,
+ // relevant REDO and UNDO delta iterators to be used read from
persistent storage.
+ // - For Flush ops, create iterator for in-memory tree holding data
updates.
+ RETURN_NOT_OK(input.CreateCompactionOrFlushInput(flush_snap,
+ schema_ptr.get(),
+ &io_context,
+ &merge));
+ // Initializing a DRS writer, to be used later for writing REDO, UNDO
deltas, delta stats, etc.
RollingDiskRowSetWriter drsw(metadata_.get(), merge->schema(),
DefaultBloomSizing(),
compaction_policy_->target_rowset_size());
RETURN_NOT_OK_PREPEND(drsw.Open(), "Failed to open DiskRowSet for flush");
+ // Get tablet history, to be used later for AHM validation checks.
HistoryGcOpts history_gc_opts = GetHistoryGcOpts();
+
+ // Apply REDO and UNDO deltas to the rows, merge histories of rows with
'ghost' entries.
RETURN_NOT_OK_PREPEND(
FlushCompactionInput(
tid, metadata_->fs_manager()->block_manager()->error_manager(),
@@ -2046,6 +2066,7 @@ Status Tablet::DoMergeCompactionOrFlush(const
RowSetsInCompaction &input,
"Flush to disk failed");
RETURN_NOT_OK_PREPEND(drsw.Finish(), "Failed to finish DRS writer");
+ // Fault injection hook for testing and debugging purpose only.
if (common_hooks_) {
RETURN_NOT_OK_PREPEND(common_hooks_->PostWriteSnapshot(),
"PostWriteSnapshot hook failed");
@@ -2071,6 +2092,8 @@ Status Tablet::DoMergeCompactionOrFlush(const
RowSetsInCompaction &input,
metrics_->bytes_flushed->IncrementBy(drsw.written_size());
}
+ // Open all the rowsets (that were processed in this stage) from disk and
+ // store the pointers to each of rowset inside new_disk_rowsets.
vector<shared_ptr<RowSet>> new_disk_rowsets;
new_disk_rowsets.reserve(new_drs_metas.size());
{
@@ -2168,6 +2191,7 @@ Status Tablet::DoMergeCompactionOrFlush(const
RowSetsInCompaction &input,
// swap as committed in 'non_duplicated_ops_snap'.
non_duplicated_ops_snap.AddAppliedTimestamps(applying_during_swap);
+ // Fault injection hook for testing and debugging purpose only.
if (common_hooks_) {
RETURN_NOT_OK_PREPEND(common_hooks_->PostSwapInDuplicatingRowSet(),
"PostSwapInDuplicatingRowSet hook failed");
@@ -2182,9 +2206,11 @@ Status Tablet::DoMergeCompactionOrFlush(const
RowSetsInCompaction &input,
"which arrived during Phase 1. Snapshot:
$1",
op_name,
non_duplicated_ops_snap.ToString());
const SchemaPtr schema_ptr2 = schema();
- RETURN_NOT_OK_PREPEND(
- input.CreateCompactionInput(non_duplicated_ops_snap, schema_ptr2.get(),
&io_context, &merge),
- Substitute("Failed to create $0 inputs", op_name).c_str());
+
RETURN_NOT_OK_PREPEND(input.CreateCompactionOrFlushInput(non_duplicated_ops_snap,
+ schema_ptr2.get(),
+ &io_context,
+ &merge),
+ Substitute("Failed to create $0 inputs",
op_name).c_str());
// Update the output rowsets with the deltas that came in in phase 1, before
we swapped
// in the DuplicatingRowSets. This will perform a flush of the updated
DeltaTrackers
@@ -2199,6 +2225,7 @@ Status Tablet::DoMergeCompactionOrFlush(const
RowSetsInCompaction &input,
Substitute("Failed to re-update deltas missed during $0 phase 1",
op_name).c_str());
+ // Fault injection hook for testing and debugging purpose only.
if (common_hooks_) {
RETURN_NOT_OK_PREPEND(common_hooks_->PostReupdateMissedDeltas(),
"PostReupdateMissedDeltas hook failed");
@@ -2269,6 +2296,7 @@ Status Tablet::DoMergeCompactionOrFlush(const
RowSetsInCompaction &input,
drs_written,
bytes_written);
+ // Fault injection hook for testing and debugging purpose only.
if (common_hooks_) {
RETURN_NOT_OK_PREPEND(common_hooks_->PostSwapNewRowSet(),
"PostSwapNewRowSet hook failed");
@@ -2316,7 +2344,7 @@ void Tablet::UpdateAverageRowsetHeight() {
Status Tablet::Compact(CompactFlags flags) {
RETURN_IF_STOPPED_OR_CHECK_STATE(kOpen);
- RowSetsInCompaction input;
+ RowSetsInCompactionOrFlush input;
// Step 1. Capture the rowsets to be merged
RETURN_NOT_OK_PREPEND(PickRowSetsToCompact(&input, flags),
"Failed to pick rowsets to compact");
diff --git a/src/kudu/tablet/tablet.h b/src/kudu/tablet/tablet.h
index 16244deab..05ef62d6f 100644
--- a/src/kudu/tablet/tablet.h
+++ b/src/kudu/tablet/tablet.h
@@ -94,7 +94,7 @@ class HistoryGcOpts;
class MemRowSet;
class ParticipantOpState;
class RowSetTree;
-class RowSetsInCompaction;
+class RowSetsInCompactionOrFlush;
class TxnMetadata;
class WriteOpState;
struct RowOp;
@@ -678,11 +678,11 @@ class Tablet {
const ScanSpec* spec,
std::vector<IterWithBounds>* iters) const;
- Status PickRowSetsToCompact(RowSetsInCompaction *picked,
+ Status PickRowSetsToCompact(RowSetsInCompactionOrFlush *picked,
CompactFlags flags) const;
// Performs a merge compaction or a flush.
- Status DoMergeCompactionOrFlush(const RowSetsInCompaction &input,
+ Status DoMergeCompactionOrFlush(const RowSetsInCompactionOrFlush &input,
int64_t mrs_being_flushed,
const std::vector<TxnInfoBeingFlushed>&
txns_being_flushed);
@@ -702,6 +702,9 @@ class Tablet {
int64_t mrs_being_flushed,
const std::vector<TxnInfoBeingFlushed>&
txns_being_flushed);
+ // Computes on-disk size of all the deltas in provided rowsets.
+ size_t GetAllDeltasSizeOnDisk(const RowSetsInCompactionOrFlush& input);
+
static void ModifyRowSetTree(const RowSetTree& old_tree,
const RowSetVector& rowsets_to_remove,
const RowSetVector& rowsets_to_add,
@@ -732,7 +735,7 @@ class Tablet {
// before the replacement. If any MemRowSet is not empty it will be added to
// the 'compaction' input and the MemRowSets' compaction locks will be taken
// to prevent the inclusion in any concurrent compactions.
- Status ReplaceMemRowSetsUnlocked(RowSetsInCompaction* compaction,
+ Status ReplaceMemRowSetsUnlocked(RowSetsInCompactionOrFlush* new_mrss,
std::vector<std::shared_ptr<MemRowSet>>*
old_mrss);
// Convert the specified read client schema (without IDs) to a server schema
(with IDs)